
ISR的用途
ISR存在的目的是为了平衡在数据安全和处理延迟方面的需求。在大多数副本出现故障时通过它仍能提供一定的可用性,同时最大限度地减少无效副本或慢副本在延迟方面的影响。
我们可以通过调整replica.lag.time.max.ms的设置来达到我们的目的。如果一个follower在replica.lag.time.max.ms设定的时间范围内没有向leader发出fetch操作请求或者滞后于leader的最新的数据点超过replica.lag.time.max.ms所设定的时间,那么这个follower将被leader从ISR中剔除。缺省值是30秒。如果你觉得这个值比较长你可以缩小它。但是这也将导致follower会频繁地从ISR中被移除或加入。
ISR可以在一定程度上解决高延迟的问题,因为当acks=all时需保证ISR列表中的副本都提交数据才能认为副本已成功过提交数据,而实际上ISR列表中的副本数量是会变化的。这带来的风险是会降低数据的冗余度,极端情况下会减少到只剩下leader一个数据副本,在这种情况下即使设置acks=all也极难保证数据不丢失。如果你想避免这个情况,需要在topic的配置中通过设置min.insync.replicas来进行控制。
确保客户连接
客户端通常会通过多个broker来进行连接,这需要在生产端或消费端通过bootstrap.servers来进行设置。理想的情况是当一个broker节点失效的时候,客户端仍然可以通过剩下的broker节点建立新的连接。这些bootstrap servers可能并不一定是客户端所访问topic分区leader所在的broker,但是它连接的桥头堡,客户端通过向它询问从而得到具体写入或读取数据所需的分区leader在哪个broker节点上。
kafka客户端需要与分区leader副本所在的节点间建立直接连接。所以很难在其上使用负载均衡的技术,读写流量无法负载到其他的非leader节点上。bootstrap.servers对于客户端能够找到正确的节点进行通讯以及在发生故障切换后发现新的节点至关重要。
Kafka的共识架构
到目前为止我们还没有涉及到集群是如何发现一个broker节点失效的,同时集群是如何重新进行leader选举的。为了弄明白kafka是如何处理网络分区的故障,我们首先需要了解kafka的共识架构。
kafka集群中的每个节点都是与zookeeper集群共同部署的。Zookeeper是一种分布式共识服务,它允许分布式系统围绕某个给定状态达成共识。它本身是分布式的,并且对于一致性的要求要高于可用性的要求。只有在大多数的zookeeper节点都认可的情况下才接受一个读取或写入操作。
zookeeper负责存储kafka集群的状态:
topic的列表信息、分区信息、配置信息、当前分区的leader信息、首选副本信息等 集群的节点信息。每个broker节点都会向zookeeper集群定期发送心跳信息,如果在一定时间内zookeeper不能再接收到broker发送过来的心跳信息,那么zookeeper就会认为这个broker已经死掉或失效。 当控制节点失效时zookeeper触发控制节点切换,重新选举出新的控制节点
控制节点是kafka broker节点中其中的一个节点,它负责选举出每个复制分区的leader副本。zookeeper将向这些控制节点发送集群节点变化、topic变化信息,控制节点将根据这些信息作出响应。
举个例子,如果我们需要创建一个拥有10个分区的topic,同时副本数为3.控制节点需要为每个分区选举出leader副本,同时确保这些分区是均匀分布在所有broker节点中的。
对于每个分区来说:
需要更新zookeeper中的ISR和leader信息 向分区中的每个数据副本所在broker节点发送LeaderAndISRCommand,通知相关的ISR及leader信息
如果包含leader副本的那个broker失效,zookeeper将第一时间发现并向控制节点发出通知,触发其重新选举出新的leader节点。之后控制节点会首先更新zookeeper上的信息,然后通知所有分区副本节点当前leader已经发生改变。
分区的每个leader都负责维护ISR信息。它通过replica.lag.time.max.ms来确保ISR中每个成员是符合要求的。如果ISR发生改变,leader也将负责更新zookeeper上的相关信息。
zookeeper将时刻保持更新到最新状态信息,因此在发生故障切换时,新的leader将很容易获得领导权。

复制协议
弄明白kafka数据复制的相关细节有助于我们更好地理解数据是如何发生丢失的,从而尽量避免问题的发生
Fetch请求、Log End Offset (LEO)和the Highwater Mark (HW)
我们知道follower会定期向leader发送fetch请求。缺省间隔是500ms。也就是说每次都是follower主动从leader拉取数据,而不是leader向follower推送数据。
分区的leader和每个follower都会保存Log End Offset (LEO) 和 Highwater Mark (HW)信息。LEO是分区副本在本地的最新数据偏移量,HW是最新已提交数据的偏移量。需要记住的是,一条消息要被提交,它需要在ISR中的每个数据副本上都完成持久化操作。这就会出现LEO有可能会领先HW的情况。

当leader接收到一条数据时,它首先在本地进行持久化。之后follower会向leader发送fetch请求,告知自己的LEO。这时leader会从follower发送过来LEO所指示的位置开始打包发送数据,同时发送的还有当前的HW。当leader发现ISR中所有副本都已经持久化到一定的偏移量,那么它就会将HW推进到这个偏移量,如上图中的4号偏移。只有leader可以推进HW,同时也负责让所有follower在每次fetch请求中获知当前最新的HW值。这意味着follower在数据处理方面总是会落后于leader,在获知当前HW方面同样也是会落后的。对于消费者来说只能消费HW之前的数据。
需要注意的是,这里所说的持久化仅仅是数据已经写入内存,并非数据已经真实地写入磁盘。从性能的角度考虑,kafka是通过异步的方式将数据定期从内存刷入磁盘的。对kafka来说只要数据已经写入内存就可以返回写入确认信息,这样可以提高处理性能。因为kafka认为冗余的数据可以抵消短时间将已确认数据存储在内存中所带来的风险。
Leader的故障切换
当leader失效时,zookeeper会通知控制节点由其选出新的leader。新的leader将使用新的HW作为当前自己的LEO。所有的follower也将被告知新leader的信息。对于不同版本的kafka来说,follower将会有不同的处理方式:
根据已知的HW截断本地的日志,然后以新的偏移为开始向新leader发出fetch请求 先向leader发出一个请求,获知当前最新的HW,然后根据这个HW截断本地的日志。然后从这里开始定期发送fetch请求
之所以follower分区在leader选举完成后需要截断本地的日志:
当leader发生故障切换时,在ISR注册的第一个follower将会赢得选举,并成为新的leader。但是对于在ISR中的每个follower,并不是时刻与前任leader保持数据同步的。因此赢得选举的这个follower并不一定包含最新数据。kafka要确保数据副本间不存在差异。为了避免差异的产生,每个follower必须根据新leader的HW截断自身的数据。这也是如果你要强调保证数据一致性时,需要设置acks=all的另一个重要原因。 由于数据是周期性写入磁盘的。如果集群所有节点在同一时刻失效,那么对于不同的数据副本来说持久化到磁盘的数据也是不一致的。当这些节点再重新恢复的时候,极有可能出现新选举出来的leader,它的数据是落后于它follower的,这是由于相对于其他节点来说它在故障前最后一次调用fsync的时间点比较靠前所导致。
重新加入集群
就像leader的故障切换,数据副本节点在重新加入集群后会自动发现新的leader节点,同时将自己的本地日志截断到HW所指示的位置。在完成这些操作后,follower的最新数据是落后于leader的,这时它只要简单地定期向leader发出fetch请求,就可以从当前的LEO开始处理数据逐步追上leader。
新节点或者是重新加入集群的follower节点刚开始的时候并不在ISR列表中,也不参与到数据的提交操作过程。它们仅仅需要尽快地从leader同步数据,最终能够赶上leader,然后就可以被添加到ISR列表中。期间不会被阻塞,也不会丢弃原有数据。
网络分区
Kafka在经历网络分区时会出现很多复杂的场景。但是kafka从一开始就设计成一个集群来运行,在面临网络分区方面是有很多应对方式的。在这里网络分区是指将计算机网络划分为相对独立的子网,无论是设计上单独优化它们,还是由于网络设备的故障。分布式软件必须设计为可分区耐分割,即使在网络被分区后,它仍然正常工作。下面我们就列举一些网络分区的场景:
场景一:一个follower不能与leader通讯,但是仍然可以和zookeeper通讯 场景二:leader与任意一个follower都不能通讯,但是仍然可以和zookeeper通讯 场景三:一个follower能够与leader通讯,但是不能与zookeeper通讯 场景四:leader可以与任意一个follower通讯,但是不能与zookeeper通讯 场景五:一个follower完全与kafka其他节点及zookeeper失去联系 场景六:leader完全与kafka其他节点及zookeeper失去联系 场景七:kafka的控制节点不能与其他kafka节点进行联系 场景八:kafka的控制节点不能与zookeeper进行联系
对于以上场景都会展现出不同的结果:
场景一:一个follower不能与leader通讯,但是仍然可以和zookeeper通讯

网络分区将broker 3与broker 1和2隔离开来,但是仍然可以和zookeeper通讯。因此broker 3不再能够向leader发送fetch请求,在超过replica.lag.time.max.ms所设定的时间后将从ISR中被移除,同时也不能再参与到数据的提交操作中,broker 3上的数据与leader上最新数据的差距逐渐拉大。当网络分区问题得到解决后将能够继续发送fetch请求,在赶上leader的数据后将被重新加入到ISR中。zookeeper在这整个过程中都能够收到broker 3的心跳信息,因此认为这个节点工作正常。

这种情况没有出现脑裂,仅仅只是降低了数据冗余
场景二:leader与任意一个follower都不能通讯,但是仍然可以和zookeeper通讯

网络分区将leader与它的follower隔离开,但是leader仍然能够与zookeeper通讯。这与场景一类似,由于follower不能发送fetch请求,因此ISR将会缩减到只剩leader。同样,这也没有产生脑裂,仅仅是在网络分区问题解决前对于新数据丧失了数据冗余的保护。zookeeper在这整个过程中都能够收到broker 1的心跳信息,因此认为这个节点工作正常。

场景三:一个follower能够与leader通讯,但是不能与zookeeper通讯
网络分区导致一个follower与zookeeper失去联系,但仍然能够与leader通讯。结果是它仍然能够向leader发送fetch请求,同时仍然被保留在ISR中。zookeeper由于不能收到这个broker的心跳信息,因此认为它已经失效,但由于它仅仅是一个follower,因此没有更多的动作。

场景四:leader可以与任意一个follower通讯,但是不能与zookeeper通讯

网络分区导致leader与zookeeper失去联系,但是仍然能够与follower进行通讯

经过一段时间后zookeeper将原leader标记为失效,同时通知控制节点。控制节点将从剩余的follower中选举出新的leader。然而原leader仍然认为自己还是leader,同时接受acks=1的写入请求。由于新leader选举出来后,follower就不再向原leader发送fetch请求,因此原leader认为follower都已经失效同时将自己管理的ISR缩减到自身。但是由于它不能与zookeeper通讯,因此它不能将这一更新发布到zookeeper上,从这一刻开始它将拒绝更多的写入请求。
对于设置了acks=all的数据,由于刚开始ISR包含了所有副本节点,因此无法收集到所有节点的确认信息,写入是失败的。当原leader试图将这些节点从ISR中剔除时,它将失败,由此将完全拒绝所有数据的写入。
客户端将很快感知到leader已经发生切换,并将数据写往新的leader。网络分区问题解决后,原leader将从zookeeper获知到自己不再是leader,同时将自己的日志截断到新leader发生故障切换时自己保存的HW所指示的位置。这避免了日志的分叉,从这里开始向新的leader发出fetch请求同步数据。所有写往原leader而没有复制到新leader的数据都将丢失。也就是说,在存在两个leader期间,由原leader所确认写入的数据都将丢失。

场景五:一个follower完全与kafka其他节点及zookeeper失去联系
一个follower与kafka的其他broker节点及zookeeper都失去联系。这个时候它仅仅是从ISR列表中被移除,同时等待网络的恢复,在网络问题解决后可以重新加入集群再追赶上leader。由于其仅仅是一个失效的follower,zookeeper在将其标志为失效后没有后续动作。

场景六:leader完全与kafka其他节点及zookeeper失去联系
Leader完全与所有的follower,控制节点,zookeeper隔离,失去联系。在短时间内它仍然会接受acks=1的写入请求。

在replica.lag.time.max.ms超时后原leader仍然不能接收到任何fetch请求,它会试图将ISR缩减到自身,但是由于不能将这个变化更新到zookeeper,因此它不再接收新的写入请求。
与此同时,zookeeper将已经被隔离的broker标记为失效,通知控制节点重新选举新的leader。

原leader短时间内仍然会接收acks=1的写入请求,但很快就停止这么做。客户端每隔60秒将更新最新的元数据信息。他们将被告知新的leader已经选出,并开始将写入请求发送到新leader。

在网络分区发生后仍然写往原leader并获得确认的信息将丢失。在网络分区恢复后,原leader通过zookeeper发现自己已经不再是leader。同时将自己的日志截断到新leader发生故障切换时自己保存的HW所指示的位置,然后以follower的身份向新leader发出fetch请求。

这是一个当acks=1且min.insync.replicas=1时由于网络分区会导致短时间脑裂的案例。脑裂的问题在网络分区结束后会自动终结,原leader将认识到自身已经不是leader,所有的客户端也会发现leader已经发生切换并开始向新leader写入数据。在acks=1的情况下都会导致数据的丢失。
在网络分区前也存在其他不同的情形,follower由于缓慢而导致ISR已经缩减到leader自身。这时网络分区隔离了leader。新leader被选举出来,但是原leader仍然接收写请求,即使这时acks=all,因为这时的ISR仅有原leader自身。这之后写入的数据都将丢失。因此要避免ISR退化为自身仍然可以写入数据并返回成功的情况,至少需要设置min.insync.replicas = 2。
场景七:kafka的控制节点不能与其他kafka节点进行联系
通常情况下,如果控制节点发生网络隔离不能与其他的kafka节点进行通讯的结果是,就不能将leader的变化情况传达到各个节点。最坏的情况下,与场景六类似将出现短时间的脑裂。更多的情况是,在发生故障切换时将导致一个broker不能成为leader的候选者。
场景八:kafka的控制节点不能与zookeeper进行联系
如果控制节点与zookeeper失去联系,zookeeper将会把这个控制节点标记为失效,同时选举出新的控制节点。原控制节点会继续认为自己是控制节点,但由于不能从zookeeper接收到任何新的指令,因此不会进行进一步的操作。一旦网络分区问题解决它将不再成为一个控制节点,而转变为普通节点。
场景总结
我们可以看到在发生网络分区时,如果影响的是follower副本节点,不会导致数据丢失,仅仅是在网络分区期间会降低数据冗余度,失去冗余数据的保护。当然,如果有更多的节点失效也是会导致数据丢失的。
如果leader与zookeeper间发生网络分区,在acks=1的情况下会发生数据丢失。无法与Zookeeper通讯会导致我们出现两个leader的情况,从而形成短暂脑裂。我们可以通过设置acks=all来进行补救。
此外通过设置min.insync.replicas为2及以上,可以增加进一步的安全性,确保不会丢失数据,就像在场景六的变体中所描述的。可以有效避免即使在设置acks=all,但ISR已经退化为leader的情况下发生网络分区,单个leader仍然可以继续写入数据的问题。
数据丢失的场景回顾
我们在这里总结一下会导致kafka丢失数据的情况:
当acks=1时发生leader的故障切换 即使在设置acks=all的情况下,如果允许unclean的故障切换(被选举为新leader的相关follower已经不在ISR列表中) 当acks=1时leader与zookeeper发生网络分区,形成短暂脑裂 在ISR已经缩减到leader自身的情况下,leader与其他kafka节点及zookeeper都发生隔离,即使将acks设置为all,原leader也会接收写入请求,导致数据丢失。特别是当min.insync.replicas=1的时候。尤其是这时如果leader发生了不可修复的磁盘故障导致,leader上的数据丢失,在空数据恢复leader后会最终导致整个topic的数据丢失 数据分区的所有节点同时出现故障。由于数据仅仅在内存中写入即被确认,这些数据很有可能没有被写入到磁盘中。当数据节点恢复的时候将导致数据的丢失。
要避免上述情况,可以通过一些设置来一定程度上解决。Unclean的故障切换可以通过设置unclean.leader.election.enable=false来避免,或者是通过设置min.insync.replicas大于等于2确保数据冗余度至少是2来避免。在需要保证高可靠的环境中通常设置acks=all,同时min.insync.replicas大于1。但这样做的代价是:
增加数据处理延迟,性能降低 在ISR退化到只有leader的时候不能再写入数据,可用性降低 由于不能切换到unclean的节点,因此如果leader出现故障无法修复,即使有其他节点拥有部分数据,仍然不可用,可用性降低
作者简介
詹玉林,中国民生银行信息科技部开源软件支持组工程师




