暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

一文搞懂kafka的broker工作全流程

阿斌Java之路 2022-07-02
1868

工作流程

kafka设计

我们都知道kafka能堆积非常大的数据,一台服务器,肯定是放不下的。由此出现的集群的概念,集群不仅可以让消息负载均衡,还能提高消息存取的吞吐量。kafka集群中,会有多台broker,每台broker分别在不同的机器上。

img

为了提高吞吐量,每个topic也会都多个分区,同时为了保持可靠性,每个分区还会有多个副本。这些分区副本被均匀的散落在每个broker上,其中每个分区副本中有一个副本为leader,其他的为follower。

img

zookeeper作用

zookeeper在kafka中扮演了重要的角色,kafka使用zookeeper进行元数据管理,保存broker注册信息,包括主题(Topic)、分区(Partition)信息等,选择分区leader。

img

broker leader选举

这里需要先明确一个概念leader选举,因为kafka中涉及多处选举机制,容易搞混,kafka由三个方面会涉及到选举:

  • broker(控制器)选leader
  • 分区多副本选leader
  • 消费者选leader

在kafka集群中由很多的broker(也叫做控制器),但是他们之间需要选举出一个leader,其他的都是follower。broker的leader有很重要的作用,诸如:创建、删除主题、增加分区并分配leader分区;集群broker管理,包括新增、关闭和故障处理;分区重分配(auto.leader.rebalance.enable=true,后面会介绍),分区leader选举。

每个broker都有唯一的brokerId,他们在启动后会去竞争注册zookeeper上的Controller结点,谁先抢到,谁就是broker leader。而其他broker会监听该结点事件,以便后续leader下线后触发重新选举。

img

副本机制

  • Replica :副本,同一分区的不同副本保存的是相同的消息,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
  • Leader :每个分区的多个副本中的"主副本",生产者以及消费者只与 Leader 交互
  • Follower :每个分区的多个副本中的"从副本",负责实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,从 Follower 副本中重新选举新的 Leader 副本对外提供服务。
img
  • AR:分区中的所有 Replica 统称为 AR
  • ISR:所有与 Leader 副本保持一定程度同步的Replica(包括 Leader 副本在内)组成 ISR
  • OSR:与 Leader 副本同步滞后过多的 Replica 组成了 OSR
  • LEO:每个副本都有内部的LEO,代表当前队列消息的最后一条偏移量offset。
  • HW:高水位,代表所有ISR中的LEO最低的那个offset,也是消费者可见的最大消息offset。

follower故障流程

如果follower落后leader过多,体现在落后时间 repca.lag.time.max.ms  ,或者落后偏移量repca.lag.max.messages(由于kafka生成速度不好界定,后面取消了该参数),follower就会被移除ISR队列,等待该队列LEO追上HW,才会重新加入ISR中。

leader故障流程

也是一样会先从ISR队列中被踢出,然后从ISR中选出一个新的Leader来;此时为了保证多个副本之间的数据一致性,其他的follower会先将各自的log文件中高于HW的部分截取掉,然后从新的leader同步数据(由此可知这只能保证副本之间数据一致性,并不能保证数据不丢失或者不重复)。体现了设置ACK-all的重要性。

分区自动调整

一般情况下,我们的分区都是平衡散落在broker的,随着一些broker故障,会慢慢出现leader集中在某台broker上的情况,这时候就需要分区平衡。

为了解决上述问题kafka出现了自动平衡的机制。kafka提供了下面几个参数进行控制:

  • auto.leader.rebalance.enable
    :自动leader parition平衡,默认是true
  • leader.imbalance.per.broker.percentage
    :每个broker允许的不平衡的leader的比率,默认是10%,如果超过这个值,控制器将会触发leader的平衡
  • leader.imbalance.check.interval.seconds
    :检查leader负载是否平衡的时间间隔,默认是300秒
  • 但是在生产环境中是不开启这个自动平衡,因为触发leader partition的自动平衡会损耗性能,或者可以将触发自动平和的参数leader.imbalance.per.broker.percentage
    的值调大点。

我们也可以通过修改配置,然后手动触发分区的再平衡

文件存储

存储结构

在kafka中主题(Topic)是一个逻辑上的概念,分区(partition)是物理上的存在的。每个partition对应一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端。为防止log文件过大导致数据定位效率低下,kafka采用了分片和索引机制,将每个partition分为多个segment,每个segment默认1G( log.segment.bytes
),每个segment包括.index文件、**.log文件和.timeindex**等文件。这些文件位于文件夹下,该文件命名规则为:topic名称+分区号。

img

当log文件写入4k(这里可以通过log.index.interval.bytes
设置)数据,就会写入一条索引信息到index文件中,这样的index索引文件就是一个稀疏索引,它并不会每条日志都建立索引信息。

当kafka查询一条offset对应实际消息时,可以通过index进行二分查找,获取最近的低位offset,然后从低位offset对应的position开始,从实际的log文件中开始往后查找对应的消息。

时间戳索引文件
,它的作用是可以查询某一个时间段内的消息,它的数据结构是:时间戳(8byte)+ 相对offset(4byte),如果要使用这个索引文件,先要通过时间范围找到对应的offset,然后再去找对应的index文件找到position信息,最后在遍历log文件,这个过程也是需要用到index索引文件的。

日志清理

Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka 中每一个分区副本都对应一个Log,而Log又可以分为多个日志分段,这样也便于日志的清理操作。Kafka提供了两种日志清理策略。

  1. 日志删除(delete) :按照一定的保留策略直接删除不符合条件的日志分段。
  2. 日志压缩(compact) :针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。

我们可以通过修改broker端参数 log.cleanup.policy
 来进行配置

日志删除

kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

  • log.retention.hours
    :最低优先级小时,默认7天
  • log.retention.minutes
    :分钟
  • log.retention.ms
    :最高优先级毫秒
  • log.retention.check.interval.ms
    :负责设置检查周期,默认5分钟
  • file.delete.delay.ms
    :延迟执行删除时间
  • log.retention.bytes
    :当设置为-1时表示运行保留日志最大值(相当于关闭);当设置为1G时,表示日志文件最大值

具体的保留日志策略有三种:

基于时间策略

日志删除任务会周期检查当前日志文件中是否有保留时间超过设定的阈值来寻找可删除的日志段文件集合;这里需要注意log.retention参数的优先级:log.retention.ms > log.retention.minutes > log.retention.hours
,默认只会配置log.retention.hours参数,值为168即为7天。

删除过期的日志段文件,并不是简单的根据日志段文件的修改时间计算,而是要根据该日志段中最大的时间戳来计算的,首先要查询该日志分段所对应的时间戳索引文件,查找该时间戳索引文件的最后一条索引数据,如果时间戳大于0就取值,否则才会使用最近修改时间。

在删除的时候先从Log对象所维护的日志段的跳跃表中移除要删除的日志段,用来确保已经没有线程来读取这些日志段;接着将日志段所对应的所有文件,包括索引文件都添加上**.deleted的后缀;最后交给一个以delete-file命名的延迟任务来删除这些以.deleted为后缀的文件,默认是1分钟执行一次,可以通过file.delete.delay.ms**来配置。

基于日志大小策略

日志删除任务会周期性检查当前日志大小是否超过设定的阈值(log.retention.bytes,默认是-1,表示无穷大),就从第一个日志分段中寻找可删除的日志段文件集合。如果超过阈值,

基于日志起始偏移量

该策略判断依据是日志段的下一个日志段的起始偏移量 baseOffset是否小于等于 logStartOffset,如果是,则可以删除此日志分段。这里说一下logStartOffset,一般情况下,日志文件的起始偏移量 logStartOffset等于第一个日志分段的 baseOffset,但这并不是绝对的,logStartOffset的值可以通过 DeleteRecordsRequest请求、使用 kafka-delete-records.sh 脚本、日志的清理和截断等操作进行修改。

日志压缩

日志压缩对于有相同key的不同value值,只保留最后一个版本。如果应用只关心 key对应的最新 value值,则可以开启 Kafka相应的日志清理功能,Kafka会定期将相同 key的消息进行合并,只保留最新的 value值。

kafka高效读写原因

kafka之所以可以快速读写的原因如下:

  1. kafka是分布式集群,采用分区方式,并行操作
  2. 读取数据采用稀疏索引,可以快速定位消费数据
  3. 顺序写磁盘
  4. 页缓冲和零拷贝

页缓存与零拷贝

kafka高效读写的原因很大一部分取决于页缓存零拷贝

页缓存

在 Kafka 中,大量使用了 PageCache
, 这也是 Kafka 能实现高吞吐的重要因素之一。

首先看一下读操作,当一个进程要去读取磁盘上的文件内容时,操作系统会先查看要读取的数据页是否缓冲在PageCache
中,如果存在则直接返回要读取的数据,这就减少了对于磁盘 I/O的 操作;但是如果没有查到,操作系统会向磁盘发起读取请求并将读取的数据页存入 PageCache 中,之后再将数据返回给进程,就和使用redis缓冲是一个道理。

接着写操作和读操作是一样的,如果一个进程需要将数据写入磁盘,操作系统会检查数据页是否在PageCache 中已经存在,如果不存在就在 PageCache中添加相应的数据页,接着将数据写入对应的数据页。另外被修改过后的数据页也就变成了脏页,操作系统会在适当时间将脏页中的数据写入磁盘,以保持数据的一致性。

具体的刷盘机制可以通过 log.flush.interval messages
log.flush .interval .ms
等参数来控制。同步刷盘可以提高消息的可靠性,防止由于机器 掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。一般并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这 种严重影响性能的行为来保障 。

零拷贝

零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数,通常使用在IO读写过程中。常规应用程序IO过程如下图,会经过四次拷贝:

  1. 数据从磁盘经过DMA到内核的Read Buffer;
  2. 内核态的Read Buffer到用户态应用层的Buffer
  3. 用户态的Buffer到内核态的Socket Buffer
  4. Socket Buffer到网卡的NIC Buffer

从上面的流程可以知道内核态和用户态之间的拷贝相当于执行两次无用的操作,之间切换也会花费很多资源;当数据从磁盘经过DMA 拷贝到内核缓存(页缓存)后,为了减少CPU拷贝的性能损耗,操作系统会将该内核缓存与用户层进行共享,减少一次CPU copy过程,同时用户层的读写也会直接访问该共享存储,本身由用户层到Socket缓存的数据拷贝过程也变成了从内核到内核的CPU拷贝过程,更加的快速,这就是零拷贝,IO流程如下图。

甚至如果我们的消息存在页缓存PageCache
中,还避免了硬盘到内核的拷贝过程,更加一步提升了消息的吞吐量。

img

Java的JDK NIO中方法transferTo()方法就能够实现零拷贝操作,这个实现依赖于操作系统底层的sendFile()实现的

参考

broker原理




END



后台回复关键词 打卡 获取今日推荐资料

微信8.0新增了一万的好友数,之前没加上好友的可以加一下我的个人微信,再晚又满了,一起抱团取暖结伴内卷。



扫码拉群,学习打卡,交流经验


每周一读




文章转载自阿斌Java之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论