一、问题描述
某日由于Kafka生产环境一个节点出现坏盘,在下班后进行停机更换,更换完成后启动该节点Broker3,某项目组反馈其TopicXXX的第一个分区在十几分钟内无法生产和消费,报NotLeaderForPartition的错误,但是其他分区在该时间段内均可正常生产和消费,出现问题的分区优先级副本是在Broker3上。
二、原因分析
当天晚上唯一的变更就是 Broker3 的停机和拉起,而发生问题的分区优先级副本在 Broker3 上,在日志中我们发现发生问题的时间段内发生了多次Leader副本切换的日志,因此该问题原因定位在 Broker 拉起后副本重新选举造成的问题。
由于生产配置的参数 auto.leader.rebalance.enable 为 true,Controller就会每五分钟检查一下集群不平衡的状态,如果 Broker 上非优先级副本占全部副本数的 10%,就会进行优先级副本选举,在 Broker3 拉起后会发生 Leader 副本 回 切 , 回 切 的 过 程 主 要 是 Controller 发 起 优 先 级 副 本 选 举 , 分 区 TopicXXX-1 的 Leader 副本由 Broker8 变成 Broker3(修改 ZK 信息),然后 Controller 告知所有 Broker 副本变成 3,在 Broker 收到该请求后更新本地的元数据缓存。回切过程中客户端被阻塞,会报类似 NotLeaderForPartition 的异常, 但是按正常情况,该回切应该 1 分钟内可以完成,不应该会阻塞客户端 15 分钟这么久。接下来我们分析下当天晚上的日志:
| 步骤 | 时间点 | Controller日志 | TopicXXX-1 Leader(以ZK为准) | Broker3上缓存的元数据(标红部分表示和ZK上不一致) |
|---|---|---|---|---|
| 1 | 17:32:46 | Broker3 | ||
| 2 | 17:57:54 | Broker3加回 | LeaderEpoch=426 Leader=8 | LeaderEpoch=426 Leader=8 |
| 3 | 18:01:18 | Broker3 | LeaderEpoch=427 Leader=8 | LeaderEpoch=426 Leader=8 |
| 4 | 18:01:29 | Broker3加回 | LeaderEpoch=427 Leader=8 | 但是Broker3其实没有收到这个请求LeaderEpoch=426 Leader=8 |
| 5 | 18:02:00 | 副本回切 | LeaderEpoch=428 Leader=3日志里显示completed preferred replica leader election. New leader is 3.但是Broker3其实没有收到这个请求 | LeaderEpoch=426 Leader=8 |
| 6 | 18:02:18 | Broker3 | LeaderEpoch=429 Leader=8 | LeaderEpoch=426 Leader=8 |
| 7 | 18:02:35 | Broker3加回 | LeaderEpoch=429 Leader=8 | LeaderEpoch=426 Leader=8 |
| 8 | 18:07:00 | 副本回切 | LeaderEpoch=430 Leader=3同上次18:02一样,Broker3其实没收到该请求 | LeaderEpoch=426 Leader=8 |
| 9 | 18:15:32 | Broker3 | LeaderEpoch=431 Leader=8 | LeaderEpoch=426 Leader=8 |
| 10 | 18:15:33 | Broker3加回 | LeaderEpoch=432 Leader=8 | LeaderEpoch=426 Leader=8 |
| 11 | 18:17:00 | 副本回切 | LeaderEpoch=433 Leader=3同前,Broker3其实没收到该请求 | LeaderEpoch=426 Leader=8 |
| 12 | 18:27:28 | Broker3 | LeaderEpoch=434 Leader=8 | LeaderEpoch=426 Leader=8 |
| 13 | 18:27:29 | Broker3加回 | LeaderEpoch=434 Leader=8 | LeaderEpoch=426 Leader=8 |
| 14 | 18:32:00 | 副本回切 | LeaderEpoch=435 Leader=3 | LeaderEpoch=426 Leader=8 |
| 15 | 18:38:39 | 无变化 | LeaderEpoch=435 Leader=3 | Broker3刚接到LeaderEpoch=434 Leader=8的更新请求LeaderEpoch=434 Leader=8 |
根据上述日志,我们发现,Broker3在拉起后,发生了4次失联(与ZK的会话超时),同时叠加3次副本回切,这期间除Broker3外,其他Broker均能正常收到Controller发出的LeaderAndIsr和UpdateMetadata Request,而Broker3在18:02之后没有收到LeaderEpoch从427到433的更新请求,一直到18:38分才收到LeaderEpoch=434的信息,在18:54分收到LeaderEpoch=435信息。
在18:32-18:54之间,Broker3上缓存的Leader是指向Broker8,而其他Broker上缓存的Leader是指向Broker3,假设客户端从Broker3拿到元数据后往Leader Broker8发送数据,而Broker8认为自己不是Leader,就会给客户端返回NotLeaderForPartition的错误;而假设客户端从Broker8上拿到元数据后往Leader Broker3发送数据,而Broker3认为自己不是Leader,同样会给客户端返回错误,那么这段时间内客户端是找不到Leader的,一直到18:54分,Broker3和大家同步了元数据后,客户端才恢复正常。这种异常会发生在上述表格中多处,只要是Broker3保存的元数据和其他Broker不一致,就会有这个问题。
也就是说,这次问题主要是Broker3和其他Broker保存的元数据不一致导致的,而Broker3在18:02分之后为什么没有收到来自Controller的多个元数据更新请求,我们在日志中发现Broker3上大量的“there is no open connection”的warning信息。
根据监控数据来看,从18:00至18:10,Broker3上TCP连接数暴涨到将近3万,这大量的连接是从哪里来的呢,正常运行的情况下单个Broker只有1万个连接?这就要从客户端与服务器的连接原理来分析,从在18:02分左右副本回切,客户端从其他Broker上得知分区Leader在Broker3上,从而连接Broker3拉取元数据或者数据,但是Broker3上缓存的Leader不是自己(拉起后同步数据负载较高,未及时处理Controller发来的请求),返回给客户端NotLeaderForPartition的错误,客户端会再次连Broker3(客户端优先连已连过的Broker)强制刷新获取元数据,大量客户端请求叠加就造成了网络风暴。另外,在Kafka1.1版本上,Broker处理请求的时候,是不区分请求的优先级的,对于Controller发过来的更新元数据的请求和客户端发过来的数据请求都是放在同一个队列里的,大量客户端的请求排在队列里Broker3的请求队列里,导致Broker3无法及时处理Controller发来的更新元数据的请求。因此造成了Broker3和其他服务器上保存的元数据不一致。
总的来说,该生产问题主要是Broker3拉起后同步数据负载较大,再叠加18:02分副本回切未及时更新元数据,导致Broker3和其他Broker缓存的元数据不一致,而目前Kafka的设计是无法优先处理来自Controller的更新类控制请求,导致集群内在较长一段时间窗口范围内各Broker元数据处于不一致的状态,从而造成部分分区上生产或消费较长时间的阻塞。
三、优化方案
1.关闭优先级副本回切
当优先级副本回切开启时,auto.leader.rebalance.enable设置的是默认值true,Controller就会每五分钟(默认)检查一下集群不平衡的状态,如果Broker上非优先级副本占全部副本数的10%,那么会进行重新平衡。该参数设置为true的好处是,在Broker挂了之后,分布其上的leader副本就会自动切换到其他活着的broker上,在挂掉的Broker重启之后,集群会将他之前的leader副本再切换回来,从而维持整个集群的Leader副本均衡,不需要运维人员手工维护。
但是每做一次副本选举代价较高(Controller选举新Leader,大量更新ZK,广播给其他Broker,老Leader切换成Follower去新的Leader同步数据,客户端发现Leader切换后发起新的元数据请求等),会阻塞客户端。尤其是在分区数多,客户端多的高负载集群上,副本切换的代价更高。
当关闭优先级副本回切后,Broker3在拉起过程中,上面所有的分区一直都是follower,客户端不会往Broker3上写入或消费,不会出现新建会话激增的情况,可以避免像生产问题的出现,在Broker3拉起同步数据后,我们在业务低峰期用Confluent提供的kafka-preferred-replica-election脚本进行副本回切,保持整个集群的Leader副本均衡。
2.增大请求队列大小
在问题分析过程中,我们发现Broker3没有收到来自Controller的控制请求,而目前每个Broker上Leader副本有1000多个,假设在极端时刻,客户端在同一时间往所有的Leader副本发起请求,那么目前请求队列的大小(queued.max.request=500)不够缓存这么多请求,因此我们将该请求队列的大小调大至1000。
3.增加请求队列的相关监控
在本次问题中,Broker3未及时处理来自Controller的请求,我们发现监控里也缺少Broker上请求相关的监控,因此我们增加了以下监控项便于以后问题分析和调优:
| 监控项 | 说明 | 具体JMX指标 |
|---|---|---|
| Producer请求失败数 | Broker上每秒产生的Produce请求失败的数量 | kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec |
| Fetch请求失败数 | Broker上每秒产生的Fetch请求失败的数量 | kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec |
| 请求队列大小 | Broker上RequestChannel中请求个数 | kafka.network:type=RequestChannel,name=RequestQueueSize |
| 响应队列大小 | 响应队列大小 | kafka.network:type=RequestChannel,name=ResponseQueueSize |
| 请求处理时间 | 请求被处理的总时间 | kafka.network:type=RequestMetrics,name=TotalTimeMs,request=(Produce|FetchConsumer|FetchFollower).* |
| 请求排队时间 | 请求排队的时间 | kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request=(Produce|FetchConsumer|FetchFollower).* |
| 响应排队时间 | 请求在响应队列里排队时间 | kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request=(Produce|FetchConsumer|FetchFollower).* |
4.Kafka版本升级
目前Kafka Broker对所有发过来的请求都是一视同仁的,不会区别对待。不管是用于生产消费的PRODUCE和FETCH请求,还是Controller端发送的LeaderAndIsr/StopReplica/UpdateMetadata请求。这种处理逻辑会造成Kafka繁忙时由于无法及时处理Controller发来的控制类请求,集群中Broker的元数据缓存信息不一致的时间窗口较长(比如我们的生产上出现了连续22分钟的不一致)。在社区提出的KIP-291中,将请求分为两类,控制类请求和数据类请求,Controller类控制请求的处理优先级要高于数据类请求,该想法在Kafka 2.2中已经实现。在我们遇到的场景中,假如将请求优先级分类,Broker3将尽快更新自己的状态和元数据缓存,和其他Broker上状态保持一致,这样会缩短不一致的窗口。
参考资料:
关于Kafka区分请求处理优先级的讨论https://mp.weixin.qq.com/s/Dk3ME8gCGjNBZ3mlxSfJgg
四、总结
该问题产生的原因是集群节点上元数据不一致, 事后我们关闭了优先级副本回切,调大了请求队列大小,增加了相关监控(升级Kafka版本将在后续进行),在实施这些措施后,目前在生产该问题未复现。
作者介绍:
文乔:2012年硕士毕业后加入民生银行生产运营部系统管理中心,天眼日志平台主要参与人,目前在开源工具组负责Flume、Kafka的源码研究和工具开发等相关工作。




