背景
公司系统使用到了Kafka作为消息中间件,该中间件单独部署在云上的服务器中,业务系统部署在Kubernetes集群中,为了保证消息中间件的可用性以及统一整个公司的基础设施,我们决定将Kafka迁移至 Kubernetes集群中。
准备工作
将Kafka集群迁移到Kubernetes集群中,我们需要做好以下准备:
集群和Topic等要易于维护和管理,配置变更集群自动更新,这里我们会用到Operator
要做好监控和预警手段,防止集群长时间不可用或者消息长时间堆积等问题。
结合公司的实际情况,不太适合重复造轮子,做Operator的开发,因此我们就去开源市场寻找轮子,在市场上我们找到了两个Operator:
KOperator
Strimzi Operator
Strimzi Operator提供了全面Kafka集群管理方案,支持自动化部署扩展监控等功能,而且提供了对 Kafka生态的支持(比如Kafka Connect、Kafka MirrorMaker),开源社区活跃,最终我们选择了Strimzi Operator作为Kubernetes Kafka集群的解决方案。
资源准备
为了做资源的隔离,我们会将Strimzi Operator 和Kafka部署在单独的节点和Namepace下,因此需要执行两个动作:
准备好节点,现在云供应商都会提供节点池的概念,就是一组节点的集合,我们在配置节点池的时候可以配置节点标签和污点
对于节点标签,我们会配置node.kubernetes.io/component=kafka,对于污点我们设置app.kubernetes.io/component=kafka:NoSchedule
kubectl describe node $nodename
复制
通过节点的详细信息,我们可以看到节点被打上了相的标签和污点。
2.第二步就是创建Kubernetes Namespace
kubectl create namespace kafka
复制
部署Strimzi Operator
我们这里部署的版本是0.41.0的Operator:
wget "https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.43.0/strimzi-cluster-operator-0.43.0.yaml"
复制
strimzi-cluster-operator-0.43.0.yaml下载完成以后,先不要急于部署,我们需要对其中的Deployment 做一些改动:
apiVersion: apps/v1
kind: Deployment
metadata:
name: strimzi-cluster-operator
labels:
app: strimzi
spec:
template:
spec:
.....
tolerations:
- key: "app.kubernetes.io/component"
operator: "Equal"
value: "kafka"
effect: "NoSchedule"
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node.kubernetes.io/component
operator: In
values:
- kafka
复制
主要的改动就是我们需要配置tolerations和affinity,这个就是为了让Operator的Pod调度到特定的节点上。
将cluster-operator中namespace中使用下面的命令替换成我们的kafka:
#linux
sed -i 's/namespace: .*/namespace: kafka/' strimzi-cluster-operator-0.43.0.yaml
#macos
sed -i '''s/namespace: .*/namespace: kafka/' strimzi-cluster-operator-0.43.0.yaml
复制
2.文件都准备好以后,可以使用下面的命令部署Operator:
kubectl apply -f strimzi-cluster-operator-0.43.0.yaml-n kafka
复制
准备KafkaNodePool CRD的配置
在我们部署的Kafka集群中我们使用的KRaft,KafkaNodePool Rule有三种:
Controller:基于Raft协议负责管理集群的元数据和状态
Broker:负责处理消息,接收和保存消息
Dual-role:双重角色,既当Controller又当Broker
这里我们不使用双重角色,我们分别准备Controller 和Broker。
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: business-kafka-np
labels:
strimzi.io/cluster: business-kafka
spec:
replicas: 3
roles:
- controller
- brokers
jvmOptions:
-Xms: 500m
-Xmx: 2048m
resources:
requests:
memory: 500Mi
cpu: "10m"
limits:
memory: 3Gi
cpu: "2000m"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 50Gi
kraftMetadata: shared
deleteClaim: false
复制
label中strimzi.io/cluster: business-kafka该标签表明该Controller隶属于business-kafka
spec.replicas:指定了controller和 broker的数量
spec.roles:指定了KafkaNodePool的角色,这里我们同时指定了controller和broker
spec.jvmOptions:指定了controller和broker jvm启动时的参数
spec.resources:指定了controller和broker的pod请求和资源限制
spec.storage:指定了controller和broker的存储
spec.storage.type: 指定了storage的类型,这里是指定了jbod,jbod允许将数据存储到多个卷上来提高性能和可靠性
spec.storage.volumes.id:为卷指定唯一标识符
spec.storage.volumes.type:指定卷类型为持久化声明
spec.storage.volumes.size:指定卷的初始化大小
spec.storage.volumes.kraftMetadata:指定该卷用于共享KRaft的元数据
spec.storage.volumes.deleteClaim:用于指定controller被删除时,卷不会被删除,这样可以防止数据丢失
准备Kafka CRD的配置
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: business-kafka
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: 3.8.0
metadataVersion: 3.8-IV0
template:
pod:
tolerations:
- key: "app.kubernetes.io/component"
operator: "Equal"
value: "kafka"
effect: "NoSchedule"
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node.kubernetes.io/component
operator: In
values:
- kafka
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: external
port: 9094
type: loadbalancer
tls: false
configuration:
bootstrap:
annotations:
service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id: "lb-1"
service.beta.kubernetes.io/alibaba-cloud-loadbalancer-force-override-listeners: "true"
brokers:
- broker: 0
annotations:
service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id: "lb-2"
service.beta.kubernetes.io/alibaba-cloud-loadbalancer-force-override-listeners: "true"
- broker: 1
annotations:
service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id: "lb-3"
service.beta.kubernetes.io/alibaba-cloud-loadbalancer-force-override-listeners: "true"
- broker: 2
annotations:
service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id: "lb-4"
service.beta.kubernetes.io/alibaba-cloud-loadbalancer-force-override-listeners: "true"
config:
auto.create.topics.enable: "true"
num.partitions: 3
message.max.bytes: 10485760
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.8"
entityOperator:
topicOperator: {}
userOperator: {}
复制
spec.metadata.annotations.strimzi.io/node-pools:该值设置为true,表示我们使用 KafkaNodePool来部署我们的controller和broker
spec.metadata.annotations.strimzi.io/kraft:该值设置为true,表示我们使用Kraft 来管理Kafka的原数据信息
spec.kafka.version和spec.kafka.metadataVersion:指定Kafka和Kafka元数据的版本
spec.kafka.template.pod.tolerations和spec.kafka.template.pod.affinity:指定了Kafka Pod的污点容忍和亲和性,这样可以保证我们的Kafka被调度到我们希望的节点之上,避免和业务应用混部在一台节点上。
spec.kafka.readinessProbe和spec.kafka.livenessProbe:指定健康监测的配置
spec.kafka.listeners:指定Kafka的监听器,第一个用于k8s内部通信,端口是9092,无 TLS,第二个我们借助的是云供应商的负载均衡器(建议使用内网的)来打通集群外的应用对kafka 的访问,端口是 9094,具体的annotations因供应商不同可能参数不通,大家可以自行查阅问题,bootstrap和broke列表负载均衡器我是提前在云供应商平台创建好的,大家也可以根据需要自行复用之前的,但是注意一定不要把原来的配置覆盖了,一定要在测试环境测试。
spec.kafka.config:指定了Kafka的配置,由于我们之前的Kafka集群允许自动创建Topic,所以我这里添加了auto.create.topics.enable: true的配置。
spec.kafka.entityOperator:定义实体操作器,可以用来管理Topic和用户,这里使用{},采用默认的配置创建entityOperator,容器组里面会包含topic-operator和user-operator
启动Kafka
通过上述配置,我们就可以拉起一个Kafka集群,使用以下命令:
kubectl apply -f business-kafka.yaml -n kafka
复制
通过上图就可以看出我们的所有容器组都被正常运行起来。
配置磁盘自动扩容
随着Kafka集群的运行,数据也会越来越多,50Gi的初始化大小很明显不够用,因此我们还需要配置 Volume自动扩容策略:
apiVersion: storage.alibabacloud.com/v1alpha1
kind: StorageAutoScalerPolicy
metadata:
name: business-kafka-policy
spec:
actions:
- name: action1
params:
limits: 350Gi
scale: 50Gi
type: volume-expand
conditions:
- key: volume-capacity-used-percentage
name: condition1
operator: Gt
values:
- '90'
namespaces:
- kafka
pvcSelector:
matchLabels:
strimzi.io/cluster: business-kafka
复制
这个需要根据云供应商的CRD进行配置,我们这里使用的是阿里云:
spec.actions.actions:用于指定扩容的操作,就是如何扩容,这里我们配置了每次扩容50G,但是上限被设置成了350Gi,这个大家可以根据自己的业务量来灵活配置
spec.conditions:指定了扩容的条件,这里我们配置的是当卷的使用率超过90%时触发扩容操作
spec.namespaces:指定该自动扩容策略作用的namespace,我们指定为kafka
spec.pvcSelector:指定了PVC选择器,我们这里只关心business-kafka的pvc
自动扩容策略如果要生效,PVC所使用的StorageClass的参数allowVolumeExpansion必须为 true,如下图:
后续
Kafka集群虽然已经run起来了,但是还少了一双可以替我们监测他的眼睛,就是监控和告警,关于监控和告警我们下篇文章再讲,通过将Kafka迁移到Kubernetes集群,我们可以更加高效自动化的维护Kafka集群,进行自动故障转移和恢复,提高了可用性和稳定性。