这篇文章谈一谈Kafka的位移管理。Kafka位移是指消息的偏移量,即每条消息在分区内的消息"序号"。不同于Consumer端的消费者位移,代表的是消费者当前要消费的下一条消息的消息位移。每一个消费者,都会对应一个消费者位移,或者叫消费进度,而Kafka位移管理,管理的正是消费者的消费位移。
比如,P1存了10条消息,消息位移分别是0,1,2...,9,Consumer1负责消费P1分区的消息,Consumer1需要向P1汇报它的消费进度,比如Consumer1消费完前三条数据(消息位移是0,1,2)之后,此时Consumer1的消费者位移就是3,指向下一条消息的消息位移,如果此时消费者重启,那么重新消费时就会从消息位移3处开始消费,而不会从头开始。Consumer1向P1上报Offset这个过程叫做位移提交。位移提交又分为自动位移提交和手动位移提交两种。
自动提交
自动提交,就是Consumer定时自动向Broker提交位移,对于用户来说完全是无感知的,也不用对此做任何操作。而手动提交,则是指用户需要手动提交位移。Consumer 端参数 enable.auto.commit用来控制是否启用自动移交位移,为true代表自动提交,false则代表需要手动管理位移,默认值是 true。如果启用了自动提交,那么Consumer端还有个参数是和它搭配使用的,auto.commit.interval.ms,自动提交位移间隔,它的默认值是 5 秒,代表Kafka 每5秒会自动提交一次位移。
手动提交
手动提交,就是用户手动提交位移。和自动提交相反, 手动提交需要设置enable.auto.commit=false,然后用户自己来管理分区位移,手动提交又可以分为同步提交和异步提交。
同步提交
同步提交,即阻塞提交,调用org.apache.kafka.clients.consumer.KafkaConsumer#commitSync手动提交位移,同步提交会阻塞直到位移被成功提交才会返回。以下是同步提交源代码
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
this.invokeCompletedOffsetCommitCallbacks();
if (offsets.isEmpty()) {
return true;
} else {
do {
if (this.coordinatorUnknown() && !this.ensureCoordinatorReady(timer)) {
return false;
}
RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
this.client.poll(future, timer);
this.invokeCompletedOffsetCommitCallbacks();
if (future.succeeded()) {
if (this.interceptors != null) {
this.interceptors.onCommit(offsets);
}
return true;
}
if (future.failed() && !future.isRetriable()){
throw future.exception();
}
timer.sleep(this.rebalanceConfig.retryBacoffMs);
} while(timer.notExpired());
return false;
}
}
复制
在超时时间内,会一直等待提交完成,不设置超时时间时,默认值为60000ms。
异步提交
异步提交,顾名思义就是异步提交位移,通过注册回调的方式接受提交结果。以下是异步提交源代码
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? this.defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener<Void>() {
public void onSuccess(Void value) {
if (ConsumerCoordinator.this.interceptors != null) {
ConsumerCoordinator.this.interceptors.onCommit(offsets);
}
ConsumerCoordinator.this.completedOffsetCommits.add(new CosumerCoordinator.OffsetCommitCompletion(cb, offsets, (Exception)null));
}
public void onFailure(RuntimeException e) {
Exception commitException = e;
if (e instanceof RetriableException) {
commitException = new RetriableCommitFailedException(e);
}
ConsumerCoordinator.this.completedOffsetComits.add(new ConsumerCoordinator.OffsetCommitCompletion(cb, offsets, (Exception)commitException));
if (commitException instanceof FencedInstaceIdException) {
ConsumerCoordinator.this.asyncCommitFenced.set(true);
}
}
});
}
复制
相比于同步提交,异步提交可以以回调的方式得到提交结果而非阻塞式的。使用起来更为方便,但是异步提交默认是不重试的,同步提交会重试。
上面说了位移提交的方式,最后讨论一下位移提交的细节
可以简单认为Kafka位移管理是一个较大的HashMap,key为分区号,value为消费者位移,位移提交时, 只需要知道哪个分区提交的位移是多少, 就可以管理该分区的位移。实际上的位移管理数据结构要复杂的多,同一个分区可以被多个group消费,因此实际上还需要根据group来区分位移,而Kafka的位移信息实际上是写入了内部主题_consumer_offset(旧版本是zookeeper),但原理大致一致,便于理解,我们就把它当做这样的一个map。基于这种位移管理模式, 继续聊聊自动提交位移和手动提交位移可能会遇到的问题。
自动提交位移时,Consumer会在消费完消息后,定时像Broker上报位移,假设一段时间内,完全没有任何消息消费,Consumer还是不停向Broker上报重复位移数据,Broker会向位移主题写入同样的信息,为了避免磁盘过载,kafka会压缩这种重复数据,同样的信息只会保留最近一次的位移消息,因此自动提交位移一方面就是会重复提交无用的位移信息。另一个问题在于自动提交之后的下一个时间间隔内,如果Broker发生Reblance,那么消息极有可能会重复消费。假如,自动提交间隔是10s,此时消费者位移是500,提交完成之后的9s时候发生重平衡,此时已经消费了100条消息,已经消费到了600的位置,但是位移还没有提交,结果该分区被分配给了其它分区,那么Broker记录的消费者位移仍然是500,因此500-600段的位移就会被重复消费。(消费者重平衡是分区和消费者的一个重分配的过程,该过程每个主题的协调者会负责完成分区和消费者的分配,此过程类似FGC的STW,集群不提供服务)。手动提交便于人工管理位移,能够更好的应对各种场景。但是由于手动提交位移时,Broker只会无脑的接受提交的位移,无法判断对错,因此需要人工维护位移的准确性。避免重复或者丢消息。
最后,生产上只能尽量的保证不重复消费消息,但不绝对保证不出现重复消息,因此对于业务要求严格的场景,数据库做唯一性约束是一个不错的选择。