背景概述
我们都知道Kafka创建topic使用kafka-topics.sh脚本,调用类
kafka.admin.TopicCommand(路径:kafka/core/src/main/scala/kafka/admin)
早期版本使用--zookeeper参数(目前已标记废弃,未删除,仍可使用)。
--zookeeper <String: hosts> DEPRECATED, The connection string for
the zookeeper connection in the form
host:port. Multiple hosts can be
given to allow fail-over.
后续版本建议使用--bootstrap-server
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to. In case of providing this, a
direct Zookeeper connection won't be
required.
两种参数分别对应于TopicService类的两个子类:ZookeeperTopicService、AdminClientTopicService,其实就是ZookeeperClient、AdminClient。
leader负载均衡
创建topic时,kafka尽量将partition均分在所有的brokers上,并且将replicas也均分在不同的broker上。
在不考虑机架信息的情况下:通过round-robin方式从broker列表的随机位置开始,分配每一个分区的第一个replica,然后以递增移位方式分配剩下的replicas,例如(源码示例):
在考虑机架信息的情况下:如果replica数大于或者等于rack数,将确保每个rack至少一个replica;否则,将确保每个rack最多分配到一个replica,例如(源码示例):
基于上述尽量均分分区、副本的原则,kafka存在优先副本(preferred replica)概念。每个partitiion的所有replicas叫做"assigned replicas","assigned replicas"排在第一个的replicas叫"preferred replica",新建topic的"preferred replica"通常是leader。这样可以保证集群负载相对均衡。但是随着时间推移,broker宕机或者正常缩容停止,会导致leader转移、集群负载不均衡。
例如:一个分区有3个副本,优先级别分别为0,1,2,根据优先副本概念,0会作为leader ;如果0节点挂掉,1节点会成为leader。如果宕机节点恢复,leader并不会切换成0。但是我们期望对topic的leader进行重新负载均衡,让partition选择"preferred replica"做为leader。
可以通过kafka-preferred-replica-election.sh脚本触发leader负载均衡操作,也可以通过后台配置(auto.leader.rebalance.enable=true)自动触发。注意:这个自动触发机制,需要结合另外两个参数共同作用
auto.leader.rebalance.enable
Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by `leader.imbalance.check.interval.seconds`. If the leader imbalance exceeds `leader.imbalance.per.broker.percentage`, leader rebalance to the preferred leader for partitions is triggered.
注意:这里所说的负载均衡是leader层面的,更确切的说,leader只会在副本所在的节点切换以达到leader均衡的目的。leader或副本并不会自动迁移到原本不存在相应副本的节点。
题外话
抛开负载均衡不说,如果仅仅是为了达到优先副本的原则,除了上述leader均衡方法外,还有个另类操作方案:通过kafka-reassign-partitions.sh脚本将副本优先级由原来的0,1,2改成1,2,0或者1,0,2。比如1,2,0的话,reassignment-json-file如下:
#cat move.json
{
"partitions": [
{
"topic": "test",
"partition": 3,
"replicas": [
1,
2,
0
]
}]
}
执行副本重分配命令:
kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file move.json --execute