暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

一文理解Kafka重复消费的原因和解决方案

全菜工程师小辉 2021-06-23
7948

如果对Kafka不了解的话,可以先看这篇博客《一文快速了解Kafka》。

在解释Kafka重复消费出现原因之前,列举一下Kafka中与消费者有关的几个重要配置参数。

  • enable.auto.commit
    :表示消费者会周期性自动提交消费的offset。默认值true。

  • auto.commit.interval.ms
    :在enable.auto.commit
    为true的情况下, 自动提交的间隔。默认值5秒。

  • max.poll.records
    :单次消费者拉取的最大数据条数,默认值500。

  • max.poll.interval.ms
    :表示若在阈值时间之内消费者没有消费完上一次poll的消息,consumer client会主动向coordinator发起LeaveGroup请求,触发Rebalance;然后consumer重新发送JoinGroup请求。

  • session.timeout.ms
    :group Coordinator检测consumer发生崩溃所需的时间。在这个时间内如果Coordinator未收到Consumer的任何消息,那Coordinator就认为Consumer挂了。默认值10秒。

  • heartbeat.interval.ms
    :标识Consumer给Coordinator发一个心跳包的时间间隔。heartbeat.interval.ms
    越小,发的心跳包越多。默认值3秒。

Group Coordinator

基于Zookeeper的Rebalance存在不可避免的羊群效应和脑裂问题,如何不用Zookeeper来协调,而是将失败探测和Rebalance的逻辑放到一个高可用的中心,那么上述问题就能得以解决。因此Kafka0.9的版本重新设计了Consumer端,设计了Coordinator机制,大大减少了Zookeeper负载。

对于每一个Consumer Group,Kafka集群为其从Broker集群中选择一个Broker作为其Coordinator。Coordinator主要做两件事:

  1. 维持Group成员的组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。

  2. 协调Group成员的行为。

羊群效应:以Zookeeper为例,由于一个被watch的znode变化,导致大量的通知需要被发送,将会导致在这个通知期间的其他操作提交的延迟。

重复消费的原因

原因1:消费者宕机、重启或者被强行kill进程,导致消费者消费的offset没有提交。

原因2:设置enable.auto.commit
为true,如果在关闭消费者进程之前,取消了消费者的订阅,则有可能部分offset没提交,下次重启会重复消费。

原因3:消费后的数据,当offset还没有提交时,Partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout.ms
时间,那么就会触发reblance重平衡,此时可能存在消费者offset没提交,会导致重平衡后重复消费。

重复消费的解决方法

  1. 提高消费者的处理速度。例如:对消息处理中比较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费的同时,根据实际场景可将max.poll.interval.ms
    值设置大一点,避免不必要的Rebalance。可根据实际消息速率适当调小max.poll.records
    的值。

  2. 引入消息去重机制。例如:生成消息时,在消息中加入唯一标识符如消息id等。在消费端,可以保存最近的max.poll.records
    条消息id到redis或mysql表中,这样在消费消息时先通过查询去重后,再进行消息的处理。

  3. 保证消费者逻辑幂等。可以查看博客《一文理解如何实现接口的幂等性


文章转载自全菜工程师小辉,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论