暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kubernetes Kafka集群

83

背景

公司系统使用到了Kafka作为消息中间件,该中间件单独部署在云上的服务器中,业务系统部署在Kubernetes集群中,为了保证消息中间件的可用性以及统一整个公司的基础设施,我们决定将Kafka迁移至 Kubernetes集群中。

准备工作

将Kafka集群迁移到Kubernetes集群中,我们需要做好以下准备:

  1. 集群和Topic等要易于维护和管理,配置变更集群自动更新,这里我们会用到Operator

  2. 要做好监控和预警手段,防止集群长时间不可用或者消息长时间堆积等问题。

结合公司的实际情况,不太适合重复造轮子,做Operator的开发,因此我们就去开源市场寻找轮子,在市场上我们找到了两个Operator:

  1. KOperator

  2. Strimzi Operator

Strimzi Operator提供了全面Kafka集群管理方案,支持自动化部署扩展监控等功能,而且提供了对 Kafka生态的支持(比如Kafka Connect、Kafka MirrorMaker),开源社区活跃,最终我们选择了Strimzi Operator作为Kubernetes Kafka集群的解决方案。

资源准备

为了做资源的隔离,我们会将Strimzi Operator 和Kafka部署在单独的节点和Namepace下,因此需要执行两个动作:

  1. 准备好节点,现在云供应商都会提供节点池的概念,就是一组节点的集合,我们在配置节点池的时候可以配置节点标签和污点

对于节点标签,我们会配置node.kubernetes.io/component=kafka,对于污点我们设置app.kubernetes.io/component=kafka:NoSchedule

  1. kubectl describe node $nodename

复制

通过节点的详细信息,我们可以看到节点被打上了相的标签和污点。

  2.第二步就是创建Kubernetes Namespace

  1. kubectl create namespace kafka

复制

部署Strimzi Operator

我们这里部署的版本是0.41.0的Operator:

  1. wget "https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.43.0/strimzi-cluster-operator-0.43.0.yaml"

复制
  1. strimzi-cluster-operator-0.43.0.yaml下载完成以后,先不要急于部署,我们需要对其中的Deployment 做一些改动

  1. apiVersion: apps/v1

  2. kind: Deployment

  3. metadata:

  4. name: strimzi-cluster-operator

  5. labels:

  6. app: strimzi

  7. spec:

  8. template:

  9. spec:

  10. .....

  11. tolerations:

  12. - key: "app.kubernetes.io/component"

  13. operator: "Equal"

  14. value: "kafka"

  15. effect: "NoSchedule"

  16. affinity:

  17. nodeAffinity:

  18. requiredDuringSchedulingIgnoredDuringExecution:

  19. nodeSelectorTerms:

  20. - matchExpressions:

  21. - key: node.kubernetes.io/component

  22. operator: In

  23. values:

  24. - kafka

复制

主要的改动就是我们需要配置tolerations和affinity,这个就是为了让Operator的Pod调度到特定的节点上。

  1. 将cluster-operator中namespace中使用下面的命令替换成我们的kafka:

  1. #linux

  2. sed -i 's/namespace: .*/namespace: kafka/' strimzi-cluster-operator-0.43.0.yaml


  3. #macos

  4. sed -i '''s/namespace: .*/namespace: kafka/' strimzi-cluster-operator-0.43.0.yaml

复制

2.文件都准备好以后,可以使用下面的命令部署Operator:

  1. kubectl apply -f strimzi-cluster-operator-0.43.0.yaml-n kafka

复制

准备KafkaNodePool CRD的配置

在我们部署的Kafka集群中我们使用的KRaft,KafkaNodePool Rule有三种:

  • Controller:基于Raft协议负责管理集群的元数据和状态

  • Broker:负责处理消息,接收和保存消息

  • Dual-role:双重角色,既当Controller又当Broker

这里我们不使用双重角色,我们分别准备Controller 和Broker。

  1. apiVersion: kafka.strimzi.io/v1beta2

  2. kind: KafkaNodePool

  3. metadata:

  4. name: business-kafka-np

  5. labels:

  6. strimzi.io/cluster: business-kafka

  7. spec:

  8. replicas: 3

  9. roles:

  10. - controller

  11. - brokers

  12. jvmOptions:

  13. -Xms: 500m

  14. -Xmx: 2048m

  15. resources:

  16. requests:

  17. memory: 500Mi

  18. cpu: "10m"

  19. limits:

  20. memory: 3Gi

  21. cpu: "2000m"

  22. storage:

  23. type: jbod

  24. volumes:

  25. - id: 0

  26. type: persistent-claim

  27. size: 50Gi

  28. kraftMetadata: shared

  29. deleteClaim: false

复制
  • label中strimzi.io/cluster: business-kafka该标签表明该Controller隶属于business-kafka

  • spec.replicas:指定了controller和 broker的数量

  • spec.roles:指定了KafkaNodePool的角色,这里我们同时指定了controller和broker

  • spec.jvmOptions:指定了controller和broker jvm启动时的参数

  • spec.resources:指定了controller和broker的pod请求和资源限制

  • spec.storage:指定了controller和broker的存储

  • spec.storage.type: 指定了storage的类型,这里是指定了jbod,jbod允许将数据存储到多个卷上来提高性能和可靠性

  • spec.storage.volumes.id:为卷指定唯一标识符

  • spec.storage.volumes.type:指定卷类型为持久化声明

  • spec.storage.volumes.size:指定卷的初始化大小

  • spec.storage.volumes.kraftMetadata:指定该卷用于共享KRaft的元数据

  • spec.storage.volumes.deleteClaim:用于指定controller被删除时,卷不会被删除,这样可以防止数据丢失

准备Kafka CRD的配置

  1. apiVersion: kafka.strimzi.io/v1beta2

  2. kind: Kafka

  3. metadata:

  4. name: business-kafka

  5. annotations:

  6. strimzi.io/node-pools: enabled

  7. strimzi.io/kraft: enabled

  8. spec:

  9. kafka:

  10. version: 3.8.0

  11. metadataVersion: 3.8-IV0

  12. template:

  13. pod:

  14. tolerations:

  15. - key: "app.kubernetes.io/component"

  16. operator: "Equal"

  17. value: "kafka"

  18. effect: "NoSchedule"

  19. affinity:

  20. nodeAffinity:

  21. requiredDuringSchedulingIgnoredDuringExecution:

  22. nodeSelectorTerms:

  23. - matchExpressions:

  24. - key: node.kubernetes.io/component

  25. operator: In

  26. values:

  27. - kafka

  28. readinessProbe:

  29. initialDelaySeconds: 15

  30. timeoutSeconds: 5

  31. livenessProbe:

  32. initialDelaySeconds: 15

  33. timeoutSeconds: 5

  34. listeners:

  35. - name: plain

  36. port: 9092

  37. type: internal

  38. tls: false

  39. - name: external

  40. port: 9094

  41. type: loadbalancer

  42. tls: false

  43. configuration:

  44. bootstrap:

  45. annotations:

  46. service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id: "lb-1"

  47. service.beta.kubernetes.io/alibaba-cloud-loadbalancer-force-override-listeners: "true"

  48. brokers:

  49. - broker: 0

  50. annotations:

  51. service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id: "lb-2"

  52. service.beta.kubernetes.io/alibaba-cloud-loadbalancer-force-override-listeners: "true"

  53. - broker: 1

  54. annotations:

  55. service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id: "lb-3"

  56. service.beta.kubernetes.io/alibaba-cloud-loadbalancer-force-override-listeners: "true"

  57. - broker: 2

  58. annotations:

  59. service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id: "lb-4"

  60. service.beta.kubernetes.io/alibaba-cloud-loadbalancer-force-override-listeners: "true"

  61. config:

  62. auto.create.topics.enable: "true"

  63. num.partitions: 3

  64. message.max.bytes: 10485760

  65. offsets.topic.replication.factor: 3

  66. transaction.state.log.replication.factor: 3

  67. transaction.state.log.min.isr: 2

  68. default.replication.factor: 3

  69. min.insync.replicas: 2

  70. inter.broker.protocol.version: "3.8"

  71. entityOperator:

  72. topicOperator: {}

  73. userOperator: {}

复制
  • spec.metadata.annotations.strimzi.io/node-pools:该值设置为true,表示我们使用 KafkaNodePool来部署我们的controller和broker

  • spec.metadata.annotations.strimzi.io/kraft:该值设置为true,表示我们使用Kraft 来管理Kafka的原数据信息

  • spec.kafka.version和spec.kafka.metadataVersion:指定Kafka和Kafka元数据的版本

  • spec.kafka.template.pod.tolerations和spec.kafka.template.pod.affinity:指定了Kafka Pod的污点容忍和亲和性,这样可以保证我们的Kafka被调度到我们希望的节点之上,避免和业务应用混部在一台节点上。

  • spec.kafka.readinessProbe和spec.kafka.livenessProbe:指定健康监测的配置

  • spec.kafka.listeners:指定Kafka的监听器,第一个用于k8s内部通信,端口是9092,无 TLS,第二个我们借助的是云供应商的负载均衡器(建议使用内网的)来打通集群外的应用对kafka 的访问,端口是 9094,具体的annotations因供应商不同可能参数不通,大家可以自行查阅问题,bootstrap和broke列表负载均衡器我是提前在云供应商平台创建好的,大家也可以根据需要自行复用之前的,但是注意一定不要把原来的配置覆盖了,一定要在测试环境测试。

  • spec.kafka.config:指定了Kafka的配置,由于我们之前的Kafka集群允许自动创建Topic,所以我这里添加了auto.create.topics.enable: true的配置。

  • spec.kafka.entityOperator:定义实体操作器,可以用来管理Topic和用户,这里使用{},采用默认的配置创建entityOperator,容器组里面会包含topic-operator和user-operator

启动Kafka

通过上述配置,我们就可以拉起一个Kafka集群,使用以下命令:

  1. kubectl apply -f business-kafka.yaml -n kafka

复制

通过上图就可以看出我们的所有容器组都被正常运行起来

配置磁盘自动扩容

随着Kafka集群的运行,数据也会越来越多,50Gi的初始化大小很明显不够用,因此我们还需要配置 Volume自动扩容策略:

  1. apiVersion: storage.alibabacloud.com/v1alpha1

  2. kind: StorageAutoScalerPolicy

  3. metadata:

  4. name: business-kafka-policy

  5. spec:

  6. actions:

  7. - name: action1

  8. params:

  9. limits: 350Gi

  10. scale: 50Gi

  11. type: volume-expand

  12. conditions:

  13. - key: volume-capacity-used-percentage

  14. name: condition1

  15. operator: Gt

  16. values:

  17. - '90'

  18. namespaces:

  19. - kafka

  20. pvcSelector:

  21. matchLabels:

  22. strimzi.io/cluster: business-kafka

复制

这个需要根据云供应商的CRD进行配置,我们这里使用的是阿里云:

  • spec.actions.actions:用于指定扩容的操作,就是如何扩容,这里我们配置了每次扩容50G,但是上限被设置成了350Gi,这个大家可以根据自己的业务量来灵活配置

  • spec.conditions:指定了扩容的条件,这里我们配置的是当卷的使用率超过90%时触发扩容操作

  • spec.namespaces:指定该自动扩容策略作用的namespace,我们指定为kafka

  • spec.pvcSelector:指定了PVC选择器,我们这里只关心business-kafka的pvc

自动扩容策略如果要生效,PVC所使用的StorageClass的参数allowVolumeExpansion必须为 true,如下图:

后续

Kafka集群虽然已经run起来了,但是还少了一双可以替我们监测他的眼睛,就是监控和告警,关于监控和告警我们下篇文章再讲,通过将Kafka迁移到Kubernetes集群,我们可以更加高效自动化的维护Kafka集群,进行自动故障转移和恢复,提高了可用性和稳定性。


文章转载自程序员修炼笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论