点击蓝字关注我们
大家好,我是杰哥
前两篇RocketMq与Kafka选型(一)和RocketMq与Kafka选型(二)中,主要对两者的前几个方面分别进行了分析对比。今天,我们再接着看看消息中心的重复消息场景与处理方式,以及顺序消息
01.两者的相同点
02.部署架构不同
03.工作流程不同
04.日志存储方式不同
05.保证消息顺序消息的方法是否相同
06.消息重复机制不同
07.是否支持延时消息
08.消息过滤方式不同
09.消息失败支持重试吗?
10.事务不同
11.是否支持回溯消费?
12.高可用机制不同
13.性能不同?
14.社区活跃度
15.其他方面不同
一 比较
两个方面
01.消息重复场景及解决方式
1)kafka的解决方式
kafka的0.11.0.0版本引入了idempotent producer机制,即kafka生产者的幂等特性。Kafka会通过消息的编号为分区中的每条消息去重,即只保留最开始接收到的那条消息,后续再来的消息均会被丢掉。这样就避免了一条消息发送到broker多次存储的问题
通过设置producer端的参数enable.idempotent为true即可开启这个机制
当然,前面提到了这个编号是在单个分区下可以实现消息发送过程中的幂等性,它的主键是<PID, Topic, Partition>+消息编号(即生产者对应同一个主题的同一个分区下的消息可以保证是不会重复的)。
但是在多分区的情况下,我们需要保证原子性的写入多个分区,也就是说,发送到多个分区的消息要么全部成功,要么全部回滚,要怎么办呢?
这时候就需要使用事务,在producer端设置transcational.id为一个指定字符串就可以保证啦~
2)RocketMq的解决方式
不解决~
2、消费消息
消费者在消费消息时,需要完成两个步骤:读消息和记录offset的值
那么在消费过程中就会出现以下两种情况:
最多一次:consumer先读消息,记录offset,最后再处理消息


两个消息中心的解决方式:
这种情况的确不可避免,需要在消费端实现幂等性。可以考虑将两者放到一个事务里,即读完消息并记录了offset值以后才去提交事务,这样不仅会避免消息重复的情况,还会有效地避免消息丢失的情况
总结
总得来说,造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。正常情况下出现重复消息的概率其实很小。
如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以Kafka和RocketMq都选择不再消息系统处理重复消息,只是通过消费端进行消息的去重操作
01.顺序消息实现
分别需要在发送消息和消费消息时分别进行控制
1 发送消息
总得来说,Kafka和RocketMQ的发送机制几乎一样,均是将消息发送给主题topic,而主题则实际由一个个的分区组成。因此他们实现顺序消息的方法也类似,要实现发送消息有顺序,需要分别保证以下两个方面
1)保证同步发送
RocketMq和Kafka均有三种发送方式
同步发送
异步发送
oneWay发送
其中,Kafka的异步发送也支持回调函数处理
为了保证消息的严格有序,需要使用同步方式进行发送
2)保证消息发送在同一partition中
发送消息时,保证同一key分发到同一partition上面。因为每个partition是固定分配给某个消费者线程进行消费的,所以对于在同一个分区的消息来说,是严格有序的
2 消费消息
Kafka保证,1个partition只能被1个consumer消费。因此kafka可以支持消息的顺序消费,但是一台Broker宕机后,就会产生消息乱序
首先我们需要知道,RocketMq支持两种消费模式:集群消费和广播消费。两者的区别是,在广播消费模式下每条消息会被ConsumerGroup的每个Consumer消费,在集群消费模式下每条消息只会被ConsumerGroup的一个Consumer消费,因此若需要实现集群消费模式下消息的顺序消费,就需要指定并锁定一个队列进行消费
RocketMQ的顺序消费机制,我们可以通过ConsumeMessageOrderlyService类中的ConsumeRequest内部类的run()方法的执行逻辑来查看
1)判断可否正常消费
由于前面代码已经执行了对队列的加锁操作。确保同一时间只有一个线程消费指定的MessageQueue,因此下面进入是否可以消费的判断
如果是广播模式,直接进入消费;如果是集群模式,则判断processQueue被锁定且锁未超时

若不是广播模式,并且当前的队列并没有加上锁,就进入else处理,提交延迟100s后重新尝试消费的请求

2)顺序消费机制
对于RocketMQ来说,它的顺序消费,原理是同一个消息队列只允许Consumer中的一个消费线程拉取消费。Consumer中有个消费线程池,多个线程会同时消费消息。在顺序消费的场景下消费线程请求到Broker时会先申请独占锁,获得锁的请求则允许消费

3)消费成功之后的处理
消息消费成功后,会向Broker提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息,顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前这条消息,并不会继续消费该队列中的后续消息,从而保证顺序消费
总结:相对来说,RocketMq在对于顺序消息上更有保障
二 总结
总而言之

往期精彩回顾

SpringCloud篇章






