大家好,这里是编程Cookbook。本文概要介绍消息队列的核心原理和实现,以及常见问题及其解决方案等。本文不会过多的扩展详细的消息队列系统,如RocketMQ、RabbitMQ、Kafka等,这些会在后续系列文章中详细介绍。

目录
消息队列的传递模式 概念介绍 具体系统实现 常见的消息队列系统 如何实现可靠传输? 如何保证消息不丢失? 如何处理消息重复问题? 如何保证消息有序性? 如何处理消息堆积? 总结
由于篇幅问题,本文分为上下两节,上节内容参考历史文章:《消息队列概要讲解(上)》
消息队列的传递模式
在消息队列的运行机制中,消费者从消息队列获取数据主要有两种方式:推(Push) 模式和 拉(Pull) 模式,它们在数据传递方式、实时性和资源消耗等方面有显著差异。
首先需要明确一下,推拉模式指的是Consumer和Broker之间的交互。Producer与Broker之间就是推的方式,即Producer将消息推送给Broker,而不是Broker主动去拉取消息。
如果需要Broker去拉取消息,那么Producer就必须在本地保存消息来等待Broker的拉取,如果有很多生产者的话,那么消息的可靠性不仅仅靠Broker自身,还需要靠成千上万的Producer。
概念介绍
1. 推模式(Push)
在推模式中,消息队列主动将消息推送给消费者,消费者被动接收。
具体流程:
消息到达队列:生产者将消息发送到消息队列。 队列推送消息:消息队列根据消费者的订阅关系,将消息推送给对应的消费者。 消费者处理消息:消费者接收到消息后进行处理,处理完成后向队列发送确认(ACK)。 消息确认:队列收到确认后,将消息标记为已处理并删除(或存档)。
技术实现:
长连接:消费者与队列之间通常通过长连接(如WebSocket、TCP长连接)保持通信。 回调机制:消费者注册回调函数, 队列在消息到达时调用回调函数
。负载均衡:队列可以根据消费者的处理能力动态分配消息,避免某些消费者过载。
特点:
实时性高:消息到达队列后立即推送给消费者,适合对实时性要求高的场景。 消费者负载均衡:队列可以根据消费者负载动态分配消息,避免某些消费者过载。 资源消耗:消费者需随时准备接收消息,可能增加资源消耗,尤其在消息量大时。
优点:
低延迟:消息到达后立即推送,延迟低。 简化消费者逻辑:消费者无需主动请求消息,逻辑更简单。
缺点:
消费者压力大: Broker 主动推送消息,可能导致消费者处理不过来,造成消息积压或消费者崩溃。 难以控制速率: 消费者无法根据自身处理能力控制消息的接收速率。 负载不均衡: 如果多个消费者消费同一个队列,Broker 可能无法均衡分配消息,导致某些消费者负载过高。
解决方案:
限流机制: 在消费者端实现限流(如令牌桶或漏桶算法),控制消息处理速率。 RabbitMQ 支持 QoS(Quality of Service)
,通过basic.qos
方法可以限制未确认消息的数量。动态反馈: 消费者可以向 Broker 反馈自身的负载情况,Broker 根据反馈调整推送速率。 负载均衡: 使用多个消费者实例,并通过负载均衡器(如 RabbitMQ 的 Consistent Hashing Exchange
或 Kafka 的分区机制)分配消息。
适用场景:
实时消息推送,如即时通讯、实时监控等。 消费者处理能力强,能快速处理大量消息。
实际应用:
Kafka Consumer Group:Kafka支持推模式,消费者组中的消费者会自动接收分配的消息。 RabbitMQ:通过 Basic.Consume
方法,消费者可以注册一个回调函数来接收消息。
2. 拉模式(Pull)
在拉模式中,消费者主动从队列中拉取消息,队列不主动推送。
具体流程:
消息到达队列:生产者将消息发送到消息队列。 消费者拉取消息:消费者根据自己的处理能力,主动向队列请求消息。 队列返回消息:队列将消息返回给消费者。 消费者处理消息:消费者处理消息后,向队列发送确认(ACK)。 消息确认:队列收到确认后,将消息标记为已处理并删除(或存档)。
技术实现:
轮询机制:消费者定期向队列发送请求,检查是否有新消息。 批量拉取:消费者可以一次性拉取多条消息,减少网络开销。 长轮询:消费者发送请求后,队列在没有消息时保持连接,直到有新消息到达或超时。
特点:
按需拉取:消费者根据自身处理能力拉取消息,避免过载。 可控性强:消费者可以控制拉取频率和数量,灵活调整。 资源消耗:消费者按需拉取,资源消耗较低,但可能增加延迟。
优点:
消费者压力小:消费者按自身能力拉取消息,避免过载。 灵活性高:消费者可控制拉取节奏,适应不同处理能力。
缺点:
延迟较高: 消费者需要不断轮询 Broker 以拉取消息,可能导致消息处理的延迟。 网络开销大: 频繁的拉取请求会增加网络开销,尤其是在消息较少的情况下。 实现复杂: 需要消费者自己管理拉取逻辑(如拉取频率、批量拉取等),增加了开发复杂度。
解决方案:
长轮询(Long Polling): 消费者拉取消息时,Broker 在没有消息时保持连接,直到有新消息到达或超时。这种方式减少了无效的轮询请求。 Kafka 和 RocketMQ 都支持长轮询。 批量拉取: 消费者一次性拉取多条消息,减少拉取频率和网络开销。 Kafka 的 max.poll.records
参数可以控制每次拉取的消息数量。拉取频率优化: 根据消息的到达速率动态调整拉取频率。例如,消息多时增加拉取频率,消息少时降低拉取频率。
适用场景:
消费者处理能力有限,需控制消息处理速度。 允许一定延迟的场景,如日志处理、批量任务等。
实际应用:
Kafka:Kafka的消费者默认采用拉模式,消费者可以控制拉取消息的频率和数量。 RocketMQ:RocketMQ支持拉模式,消费者可以批量拉取消息。 Redis Streams:消费者可以通过 XREAD
命令主动拉取消息。
RocketMQ和Kafka利用“长轮询”实现拉模式。具体做法是消费者向Broker拉取消息时,有消息则Broker直接返回;无消息则暂时hold主请求,当对应队列或分区有新消息时,通过之前hold的请求返回消息,保证消息及时性,避免消费者频繁拉取动作。
3. 推拉模式对比
实时性 | ||
消费者负载 | ||
资源消耗 | ||
实现复杂度 | ||
适用场景 |
4. 实际应用中的选择
推模式:适用于实时性要求高的场景,如即时通讯、实时监控等。 拉模式:适用于消费者处理能力有限或允许延迟的场景,如日志处理、批量任务等。
5. 混合模式
某些系统结合推拉模式,如先推少量消息,消费者处理完后再拉取更多,兼顾实时性和负载均衡。
总结
推模式和拉模式各有优劣,选择取决于具体需求。推模式适合实时性要求高的场景,拉模式适合消费者处理能力有限或允许延迟的场景。
具体系统实现
RocketMQ、Kafka 和 RabbitMQ 是三种广泛使用的消息队列系统,它们在消息传递模式(推模式 Push 和拉模式 Pull)上有不同的设计和实现。以下是它们的详细对比:
1. RocketMQ
消息传递模式:
推模式(Push)的实现:RocketMQ 的消费者默认采用推模式,但其底层仍然是基于拉模式的封装。消费者会启动一个长轮询任务,定期向 Broker 拉取消息,从而实现类似推模式的效果。 拉模式(Pull)为主:消费者主动从 Broker 拉取消息。
特点:
长轮询机制:消费者拉取消息时,如果队列中没有消息,Broker 会保持连接并等待一段时间(可配置),直到有新消息到达或超时。 批量拉取:消费者可以一次性拉取多条消息,减少网络开销。 负载均衡:RocketMQ 支持消费者组的负载均衡,消息会均匀分配给组内的消费者。
适用场景:
高吞吐量、低延迟的场景,如电商订单系统、日志收集等。 需要灵活控制消息拉取频率的场景。
2. Kafka
消息传递模式:
拉模式(Pull)为主:Kafka 的消费者默认采用拉模式,消费者主动从 Broker 拉取消息。 推模式的模拟:Kafka 本身不支持推模式,但可以通过消费者的轮询机制实现类似推模式的效果。
特点:
批量拉取:消费者可以一次性拉取多条消息,减少网络开销。 分区消费:Kafka 的消息是按分区存储的,每个消费者可以消费一个或多个分区的消息。 高吞吐量:Kafka 的设计目标是高吞吐量,适合处理海量数据。 长轮询:消费者拉取消息时,如果分区中没有消息,消费者会等待一段时间(可配置),直到有新消息到达或超时。
适用场景:
大数据处理、日志收集、流式计算等场景。 需要高吞吐量和持久化存储的场景。
3. RabbitMQ
消息传递模式:
推模式(Push)为主:RabbitMQ 的消费者默认采用推模式,Broker 会主动将消息推送给消费者。 拉模式(Pull)的支持:RabbitMQ 也支持拉模式,消费者可以主动从队列中拉取消息,但这种方式使用较少。
特点:
实时性高:消息到达队列后立即推送给消费者,适合实时性要求高的场景。 ACK 机制:消费者处理完消息后需要发送确认(ACK),Broker 才会将消息标记为已处理。 负载均衡:RabbitMQ 支持多个消费者同时消费一个队列,消息会均匀分配给消费者。 灵活性高:支持多种消息模型(如点对点、发布/订阅),适合复杂的业务场景。
适用场景:
实时性要求高的场景,如即时通讯、任务队列等。 需要灵活消息路由和复杂业务逻辑的场景。
4. 对比总结
默认模式 | |||
推模式支持 | |||
拉模式支持 | |||
实时性 | |||
吞吐量 | |||
适用场景 | |||
负载均衡 | |||
消息模型 |
性能上他们有极大的区别,主要体现在吞吐量和消息延迟两方面,选择时候主要考虑他们的性能,具体如下:
吞吐量 Kafka:具有极高的吞吐量,每秒可以处理数十万条消息,非常适合大数据场景下的日志收集和流数据处理。 RocketMQ:吞吐量也很高,在金融、电商等场景下表现出色,能够满足大规模业务的需求。 RabbitMQ:吞吐量相对较低,每秒处理消息的数量在万级左右,但在消息处理的延迟方面表现较好。 消息延迟 RabbitMQ:消息延迟较低,能够快速响应消息的生产和消费,适用于对实时性要求较高的场景。 RocketMQ:消息延迟在毫秒级别,能够满足大多数业务的实时性需求。 Kafka:消息延迟相对较高,特别是在处理大量消息时,但可以通过调整配置来降低延迟。
5. 选择建议
RocketMQ:适合需要高吞吐量、延迟不大的场景,同时需要灵活控制消息拉取频率。 Kafka:适合大数据处理、日志收集和流式计算场景,尤其是需要高吞吐量、持久化存储、可以有延迟的场景。 RabbitMQ:适合实时性要求高、业务逻辑复杂的场景,尤其是需要灵活消息路由和多种消息模型的场景。
根据业务需求和系统特点选择合适的消息队列系统,可以更好地满足性能和功能要求。
常见的消息队列系统
RocketMQ:高性能、分布式的消息队列,适用于大规模系统。 RabbitMQ:基于 AMQP 协议的消息队列,以易用性和灵活性著称。 Kafka:高吞吐量的分布式流处理平台,适用于日志收集和实时数据处理。 ActiveMQ:开源的 JMS 实现,支持多种协议和消息模式。
如何实现可靠传输?
如何保证消息不丢失?
消息的生命周期可以分为三个阶段:生产消息、存储消息和消费消息。为了确保消息不丢失,每个阶段都需要采取相应的措施。
1. 生产消息阶段
目标:确保消息成功发送到Broker,避免因网络问题或Broker故障导致消息丢失。
处理Broker响应/确认机制:生产者发送消息后,Broker应返回确认(ACK),确保消息已成功接收。若未收到ACK,生产者可重试发送。
重试机制: 如果Broker返回写入失败等错误,生产者需要进行重试。当多次重试失败时,应触发报警并记录日志,以便人工干预。
2. 存储消息阶段
目标:确保消息在Broker中持久化存储,避免因Broker宕机或断电导致消息丢失。
消息持久化:MQ服务器应将消息持久化到磁盘,防止服务器崩溃时丢失数据。
多副本机制: 通过主从复制或集群模式,将消息复制到多个节点,确保单个节点故障时消息仍可用。
确认机制:MQ服务器在将消息写入磁盘或完成复制后,向生产者发送确认,确保消息已安全存储。
3. 消费消息阶段
目标:确保消息被消费者成功处理,避免因消费者宕机或处理失败导致消息丢失。
手动确认机制: 消费者应在真正完成业务逻辑处理后,再向Broker发送消费成功的确认(ACK)。如果在处理前就返回ACK,一旦消费者宕机,消息将丢失。
重试机制:如果消费者处理失败,Broker应重新投递消息。
死信队列:多次重试失败的消息可转入死信队列,供后续处理,避免消息丢失。
小结
确保消息不丢失需要生产者、Broker和消费者三方协同配合:
生产者:
妥善处理Broker的响应,异常时重试并报警。 Broker:
单机情况下,消息持久化后再返回响应。 集群情况下,消息写入多个副本后再返回响应。 消费者:
在业务逻辑处理完成后,再向Broker返回ACK。
如何处理消息重复问题?
在MQ(消息队列)中,消息重复是一个常见问题,通常由网络抖动、生产者重试或消费者处理失败后重试等原因引起。消息重复的处理主要在消费阶段,通过幂等性设计和唯一标识进行处理:
消息系统本身提供的去重机制:如 RocketMQ 消息的 Message ID 或业务唯一键(Message Key)实现去重。 幂等性设计:确保消费者处理消息时具备幂等性,即多次处理同一消息不会产生副作用。可通过在业务逻辑中检查消息是否已处理来实现。 唯一标识:为每条消息分配唯一ID(如UUID)或者使用消费位点(Offset),消费者在处理前检查该ID是否已处理过,通常借助数据库或缓存记录已处理的消息ID。
能否从生产者消除重复消息,即消费者仅接收一条消息? 答案是不能,因为消息至少得发到 Broker 上,确定 Broker 收到消息就得等 Broker 的响应,但可能存在 Broker 已写入但响应未收到,导致生产者重发,消息就重复了。同理,在Broker阶段去解决消费重复问题也存在同样的问题。
如何保证消息有序性?
在MQ(消息队列)中,保证消息的有序性是一个常见的需求,尤其是在需要严格按照消息发送顺序处理的场景中。以下是保证消息有序性的几种常见方法:
单分区/单队列模式
原理:将消息发送到同一个队列,并由单个消费者处理。由于队列是先进先出(FIFO)的,单个消费者可以保证消息按顺序处理。 优点:实现简单,天然保证顺序。 缺点:性能受限,无法利用多消费者并发处理的优势。
分区(Partition)或分片(Sharding)
原理:根据消息的某个关键属性(如订单ID、用户ID等)将消息分发到不同的分区或队列中,每个分区或队列由单独的消费者处理。这样可以保证同一分区内的消息有序。 实现: Kafka:通过消息的Key将消息分配到同一个Partition,Partition内部保证顺序。 RocketMQ:通过MessageQueue实现类似的分区机制。 优点:在分区级别保证顺序的同时,支持并发处理。 缺点:需要设计合理的分区策略,避免数据倾斜。
消息序号或版本号
原理:为每条消息分配一个递增的序号或版本号,消费者在处理消息时检查序号,确保按顺序处理。 实现: 生产者为每条消息生成一个全局唯一的递增序号。 消费者维护已处理的最大序号,丢弃或缓存乱序到达的消息,直到收到正确的下一条消息。 优点:灵活,适用于分布式场景。 缺点:实现复杂,需要额外的逻辑处理乱序消息。
事务消息
原理:通过事务机制保证消息的发送和处理顺序一致。 实现: 生产者发送消息时开启事务,确保消息按顺序发送。 消费者处理消息时也开启事务,确保按顺序处理。 优点:严格保证顺序。 缺点:性能开销较大,实现复杂。
消息系统提供的顺序保证
原理: 利用消息系统本身提供的顺序保证机制。 实现方式: Kafka:通过分区内的消息顺序保证。 RocketMQ:通过队列内的消息顺序保证。 RabbitMQ:通过单队列的消息顺序保证。 优点: 无需额外实现,直接利用消息系统功能。 缺点: 依赖消息系统的实现。
总结
具体选择取决于业务需求、性能要求和MQ的实现。例如,Kafka适合分区级别的顺序保证,而RocketMQ支持全局顺序消息。
全局有序:指的是整个消息队列中的所有消息都按照严格的顺序进行生产和消费,即消息从生产者发出的顺序和消费者接收的顺序完全一致。这种有序性要求非常严格,实现起来相对复杂,性能开销也较大。 局部有序:只需要保证特定分组内的消息有序即可。例如,对于同一个业务ID相关的消息,要求它们按照生产顺序被消费,而不同业务ID的消息之间则不要求严格的顺序。在实际应用中,局部有序更为常见。
RabbitMQ
使用单个队列实现近似全局有序:RabbitMQ本身不直接支持全局有序,但可以通过使用单个队列来近似实现全局有序。生产者将消息依次发送到同一个队列中,消费者从该队列中依次消费消息。由于队列是FIFO(先进先出)的数据结构,所以可以保证消息的顺序性。 使用多个队列实现局部有序:对于局部有序的需求,可以根据业务规则将消息路由到不同的队列中。例如,根据业务ID的哈希值对队列数量取模,将相同业务ID的消息发送到同一个队列中,每个队列由一个消费者进行消费。
Kafka
使用单个分区实现全局有序:Kafka的每个分区是有序的,生产者将消息发送到同一个分区中,消费者从该分区中按顺序消费消息,就可以保证消息的有序性。 自定义分区器实现局部有序:对于局部有序的需求,可以通过自定义分区器将具有相同业务特征的消息发送到同一个分区中「也是类似于消息路由」。例如,根据业务ID进行分区,确保相同业务ID的消息在同一个分区内有序。
RocketMQ
全局顺序消息:RocketMQ 把所有消息都发送到同一个队列,消费者按顺序消费该队列消息,符合全局有序的实现逻辑,能保证所有消息的生产和消费顺序一致。 分区顺序消息:按照业务规则把消息分配到不同队列,相同业务 ID 的消息进入同一队列,每个队列由一个消费者顺序消费,满足局部有序的要求,即特定业务 ID 分组内消息有序。
全局顺序:一般将所有消息发送到同一个队列或分区,并由单个消费者顺序处理。局部顺序:可以通过路由规则(如业务ID)将消息分发到不同的队列或分区,每个队列或分区内的消息保持顺序。
如何处理消息堆积?
消息堆积的常见原因
生产者生产速度过快:生产者发送消息的速度远远超过了消费者处理消息的速度,导致消息在队列中不断积压。例如,在电商大促期间,大量用户下单,订单消息的生产速度可能会瞬间飙升。 消费者处理能力不足:消费者由于自身性能瓶颈、资源限制或业务逻辑复杂等原因,无法及时处理接收到的消息。比如消费者所在服务器的 CPU、内存资源不足,或者消息处理逻辑包含复杂的数据库操作。 网络问题:生产者与消息队列之间、消息队列与消费者之间的网络连接不稳定,可能导致消息传输延迟或失败,进而引发消息堆积。 消费者故障:消费者出现异常bug或崩溃,无法正常消费消息,使得消息在队列中不断积累。
解决方案
消息队列(MQ)中出现消息堆积是一个常见的问题,可能由多种原因引起,处理消息堆积需要从扩展消费能力、优化消费逻辑、调整配置、监控告警等多方面入手,结合限流、降级、死信队列等手段,确保系统稳定运行。以下是一些处理消息堆积问题的常见方法:
监控和预警
原理:建立完善的监控系统,实时监控消息队列的状态,及时发现消息堆积问题并发出预警,以便及时采取措施进行处理。 常用工具 开源监控工具:如 Prometheus 和 Grafana 可以用于监控 MQ 的各项指标,如队列长度、消息生产和消费速率等。 MQ 自带监控功能:许多 MQ 系统都提供了自带的监控功能,例如 RabbitMQ 的管理界面可以查看队列的详细信息。
优化消费者处理逻辑(有bug及时解决)
原理:检查消费者的代码逻辑,找出可能导致消费缓慢的瓶颈,并进行优化,从而提高单个消费者的处理能力。 具体优化点 减少 I/O 操作:尽量减少磁盘 I/O、网络 I/O 等操作,例如将频繁的数据库查询合并为批量查询。 异步处理:对于一些耗时的操作,可以采用异步处理的方式,例如使用线程池或异步 I/O 库。
消息转移和清理
原理:将堆积的消息转移到其他队列或存储系统中进行处理,或者直接清理一些不必要的消息。 具体做法 消息转移:编写脚本将堆积的消息从一个队列转移到另一个队列或存储系统中,然后再进行处理。 消息清理:对于一些过期或无效的消息,可以直接进行清理,以释放队列的空间。例如,在 Redis 作为消息队列时,可以使用 LTRIM
命令清理队列中的旧消息。
临时扩容消费者
原理:增加消费者的数量可以提高消息的消费速度,从而缓解消息堆积的情况。 实现步骤 横向扩展:在分布式系统中,可以通过增加消费者实例的数量来提高整体的消费能力。例如,在使用 RabbitMQ 时,可以启动多个消费者进程或容器来并行消费消息。
控制生产者发送消息的效率
原理:如果生产者发送消息的速度过快,可能会导致消息堆积。可以通过控制生产者的发送频率或批量发送消息来缓解这个问题。 实现方法 限流:在生产者端实现限流机制,例如使用令牌桶算法或漏桶算法,控制消息的发送速率。 批量发送:将多个消息打包成一个批量消息发送,减少网络开销。例如,在 Kafka 中可以使用批量生产者。
增加消息队列的容量
原理:如果消息队列的容量不足,可能会导致消息堆积。可以通过增加队列的存储容量来解决这个问题。 具体操作 调整配置:对于一些 MQ 系统,可以通过调整配置参数来增加队列的容量。例如,在 RabbitMQ 中可以调整队列的最大长度参数。 分布式存储:使用分布式存储系统来扩展消息队列的容量,例如使用 Kafka 的分区机制将消息分散存储在多个节点上。
总结
消息队列是一种强大的工具,用于在分布式系统中实现异步通信、解耦和流量削峰。它通过将消息存储在队列中,允许生产者和消费者独立工作,从而提高系统的可靠性、扩展性和性能。选择合适的消息队列系统(如 RocketMQ、RabbitMQ、Kafka 等)可以更好地满足业务需求。
热门文章回顾
欢迎订阅我们的Cookbook知识合集,目前《MySQL数据库》、《Redis数据库》和 《Go语言》已基本更新完成,欢迎点击下方图片进入合集目录。


