Kafka 笔记
极客时间
分区与主题
分区的作用就是提供负载均衡的能力,实现系统的高伸缩性(Scalability)
不同的分区能够被放置到不同节点的机器上
不同的分布式系统对分区的叫法也不尽相同
比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上
Kafka 允许为每条消息定义消息键,简称为 Key
一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面
由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
消息压缩
Producer 端压缩、Broker 端保持、Consumer 端解压缩
引申【零拷贝】
Kafka 支持的压缩算法:
GZIP、Snappy、LZ4 和 zstd
在吞吐量方面:
LZ4 > Snappy > zstd 和 GZIP
在压缩比方面,zstd > LZ4 > GZIP > Snappy
避免消息丢失
Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证
目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成
Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)
防止 Consumer 端的消息丢失,就要维持先消费消息(阅读),再更新位移(书签)的顺序
如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移
消息交付可靠性保障
最多一次(at most once):
消息可能会丢失,但绝不会被重复发送
至少一次(at least once):
消息不会丢失,但有可能被重复发送
精确一次(exactly once):
消息不会丢失,也不会被重复发送
Kafka 默认提供的交付可靠性保障是第二种,即至少一次
幂等 Idempotence
“幂等”这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的
在计算机领域中,若一个子程序是幂等的,那它必然不能修改系统状态。这样不管运行这个子程序多少次,与该子程序关联的那部分系统状态保持不变
kafka 消息幂等性 [props.put(“enable.idempotence”, ture)]
背后是 Broker 端对消息唯一性做的判断
注意这个机制只能保证单分区上的幂等性,同时也只能实现单会话上的幂等性
避免重复消费
使用 producer.send(msg, callback) 保证消息不会丢失
消息不被重复消费这件事可以交由消费者做
每个消息有一个 uuid【譬如雪花算法】,有一张表存着这些 uuid 且是唯一键
消费消息时首先向表中插入 uuid,报错即说明重复消息
鉴于 uuid 自增的特性,可以定期删除表中积攒的 uuid 以保证效率
获取 uuid 有网络消耗怎么办?
那就把所有请求参数拼起来 md5 编码下作为唯一 key 插入表中
同时多记录个插入时间,后边根据这个插入时间去定期清除
并发很高怎么办?
用 Redis 来搞,设置一个key,给个过期时间【分布式锁的套路】
总之都是基于一个事实,这种重复的消息都是在一个短暂的时间段里
消费者组
consumer group 下可以有一个或多个 consumer instance
consumer instance 可以是一个进程,也可以是一个线程
组内多个消费者共享一个公共的 ID,即 group ID
组内的所有消费者协调在一起来消费订阅主题的所有分区
每个分区只能由同一个消费组内的一个 consumer 来消费
简单说
一个消费者只能属于一个消费者组
消费者组订阅的主题下的一个分区只能被其中的一个消费者消费
不同消费者组中的消费者可以消费同一个 topic
广播
同一个 topic,每个消费者组都可以拿到相同的全部数据
负载均衡
同一个分区内的消息只能被同一个组中的一个消费者消费
当消费者数量多于分区数量时,多余的消费者空闲(不能消费数据)
当消费者少于分区数时候,有的消费者对应多个分区去消费
位移 offset
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)
因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的
即 Consumer 需要为分配给它的每个分区提交各自的位移数据
储存
config/server.properties 中的 log.dirs 中配置,如 log.dirs=/tmp/kafka-logs
目录
- ,里面存储若干个 segment 相关文件 一个 segment 从 0 开始,segment 是 20 位数字字符长度,没有数字用 0 填充
之后 segment 文件名为上一个 segment 文件最后一条消息的 offset
例如:
xx.index ,xx.log ,xx.timeindex,xx 为 segment 编号
log 文件
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1641297918172 size: 117 magic: 2 compresscodec: NONE crc: 2304349257 isvalid: true
复制
index 文件
offset: 0 position: 0
复制
timeindex 文件
timestamp: 0 offset: 0
复制