作者 | richardluo(罗建军)
导读
公司业务发展使得RocketMQ的使用场景增多了,但是有一些消息状态依赖的场景没有考虑顺序。RocketMQ的顺序消息的原理是什么?只用过RocketMQ中间件,但顺序消息该如何使用?本文将为大家讲解RocketMQ顺序消息的原理以及在业务中的使用场景。
1. 顺序消息的业务使用场景
使用
RocketMQ
来传递带状态的订单消息使用
RocketMQ
来同步MySQL的 binlog 日志其他消息之间有先后的依赖关系,后一条消息需要依赖于前一条消息的处理结果的情况
2. 如何正确地使用顺序消息
2.1 消息生产端
在RocketMQ
的生产端,原生 API (DefaultMQProducer
) 提供了多种发送消息的方式,其中一种是按用户自定义选择队列进行发送,用户只需要实现MessageQueueSelector
接口即可,具体接口代码如下:
// 自定义选择队列接口
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
public class DefaultMQProducer{
// 同步发送方式
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout)
// 异步发送方式
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback)
}复制
同时RocketMQ
还提供了MessageQueueSelector
接口的实现类来给用户做选择
SelectMessageQueueByHash
:按参数arg
的hashCode
与可选队列的大小求余来选择发送队列
SelectMessageQueueByRandom
:在可选队列中随机选择一个MessageQueue
进行发送
在集团封装的RocketMQ
的 API 也有相应接口;注意集团发送消息有两个API接口MqPushService
和 RocketMqService
,而MqPushService
并没有发送顺序消息的接口,只有RocketMqService
有发送顺序消息的接口,实际RocketMqService
也是调用DefaultMQProducer
来发送消息,而 DefaultMQProducer
只是发送消息的 API,这里并没有具体的发送逻辑,DefaultMQProducerImpl
是发送的具体实现,MessageQueueSelector
接口的执行也是在这个类里面执行的, 具体代码如下:
//接口
@Deprecated
public interface RocketMqService {
// 发送顺序消息,这里要传MsgQueueSelector参数,如果没有传,那也不会按顺序发送消息的
SendResult sendOrderMessage(String topic, Map<String, Object> body, MsgQueueSelector selector, Object args);
}
//实现类 com.fenqile.mq.producer.server.impl.RocketMqImpl
@Lazy(true)
@Component
public class RocketMqImpl implements RocketMqService, InitializingBean {
private SendResult sendMessage(Message message, String topic, Map<String, Object> body, MsgQueueSelector selector, Object args) {
// 创建DefaultMQProducer的子类
initProducer();
// 其他逻辑省略
SendResult sendResult = null;
if (selector == null) {
sendResult = mqProducer.send(message, sendTimeOut);
} else {
sendResult = mqProducer.send(message, new MessageQueueSelectorImp(selector), args, sendTimeOut);
}
//省略其他异常判断
return sendResult;
}
}
// 具体的发送逻辑,和队列选择器的执行
class DefaultMQProducerImpl implements MQProducerInner {
// 如果有队列选择器,执行队列选择器,然后往选好的队列发送消息
private SendResult sendSelectImpl( Message msg, MessageQueueSelector selector, Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout ) {
// 获取主题的路由信息,里面包好主题下的队列列表
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// 获取主题下的队列
List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
// 选择具体的队列
MessageQueue mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
// 根据选好的队列投递消息
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
}
}
}复制
注意:如果你用了自定义队列的方式来发送消息,生产者的消息重投机制将无效,也就是配置的消息重投的参数
retryTimesWhenSendFailed
和retryTimesWhenSendAsyncFailed
将不生效
2.2 消息消费端
RocketMQ
原始API在消费端提供两种获取消息的方式,push 方式和pull 方式:
pull方式:客户端主动向broker拉取消息
push方式:broker有消息的时候推送到客户端(实际 push方式也是 consumer客户端起定时任务向broker 拉取消息的,来取过来的消息会放到consumer的本地消息池里)
2.2.1 pull 方式顺序消费
pull 方式的原始消费实例是DefaultMQPullConsumer
,它实现了MQPullConsumer
接口,拉取消息的API如下:
/**
* @param mq 从那个队列来取
* @param subExpression 根据tag过滤消息
* @param offset 从按个位置拉取
* @param maxNums 最大拉取数量
*/
PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException
// MessageQueue对象,可以用当前接口的另外方法拿到,即
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException复制
pull 方式要先指定拉取的队列,也就是消费端要自己维护节点要拉取的队列,而一次拉取过来的消息是顺序的,这部分的顺序消费都需要业务自己处理,这里不展开讨论。
2.2.2 push 方式消费
push方式原生的消费实例是DefaultMQPushConsumer
, 它主要实现了MQPushConsumer
接口,接口 API 如下:
public interface MQPushConsumer extends MQConsumer {
// 并行消费方式
void registerMessageListener(final MessageListenerConcurrently messageListener);
// 串行消费方式
void registerMessageListener(final MessageListenerOrderly messageListener);
}复制
RocketMQ
提供两种注册监听事件,其中实现MessageListenerOrderly
的接口监听,可以保证顺序消费,集团封装了顺序消费的API,需要在消费配置参数的时候指定否是顺序消费
// 消费配置i信息
MqConsumeConfigInfo configInfo = new MqConsumeConfigInfo();
// 是否使用顺序消费
configInfo.setIsConsumeOrderly(true);
mqConsumeService.subscribe(group, destinations, mqMessageListener, configInfo);复制
实际也是调用原生的api 注册监听实现MessageListenerOrderly
接口,代码在 com.fenqile.mq.consumer.impl.RocketMQConsumeImpl
的实现上,具体代码如下
// com.fenqile.mq.consumer.impl.RocketMQConsumeImpl
// 创建RocketMQ消费客户端
private void createRocketMqConsumer(/*省略参数。。。*/) {
// 省略代码
if(!consumeConfig.isConsumeOrderly()) { // 并行消费注册
RocketMqMessageListener rmqListener = new RocketMqMessageListener(listener, consumerGroup);
rmqListener.setAppName(this.appName);
mqPushConsumer.registerMessageListener(rmqListener);
} else { // 串行消费注册
RocketMqOrderMessageListener rmqListener = new RocketMqOrderMessageListener(listener, consumerGroup);
rmqListener.setAppName(this.appName);
mqPushConsumer.registerMessageListener(rmqListener);
}
}
// 其中,RocketMqOrderMessageListener实现了MessageListenerOrderly接口
public class RocketMqOrderMessageListener extends AbstractMessageListener implements MessageListenerOrderly {
//省略代码
}复制
RocketMQ
通过上面简单的消费 API 就实现了顺序消费,那它是如何实现的呢?接下来看RocketMQ
的源码来看顺序消费的原理
3. RocketMQ 顺序消息的原理
在了解顺序消息的原理之前,我们先看下RocketMQ
的消息模型。
3.1 RocketMQ顺序消息模型
producer 向
topic
发送消息,同一个topic
下存在多个队列,RocketMQ
的生产者在默认发送消息时轮询选取其中一个队列进行发送,会导致消息分散到两个队列上。broker 上的消息只有在同一个队列中消息才是顺序读取的。
消费组消费消息时,每一个consumer 单独消费broker 上的一个队列,一般情况下一个consumer 一个进程,不同进程消费不能保证消费的顺序性。
同一个进程下,通常会配置多个线程消费,多个线程消费的情况下没办法保证消费顺序。
所以RocketMQ
默认情况下是不能保证消息的顺序的。
3.2 RocketMQ顺序消费实现原理
通过上面RocketMQ
的消息模型,要实现顺序消息,为了能记住它实现的原理,我们先换个思路想想,如果是我们自己来实现这个一个功能,应该如何实现呢?
3.2.1 生产端
先看消息生产者,因为broker 上同一个主题下的多个queue 互相之间是不能保证消息的顺序的,只有单个queue 上的消息才能保证顺序,那可以思考下它的实现方案如下:
topic下只配置一个queue,但这样消息量多的情况下,并发量能会大大降低,而且这样不能发挥分布式系统的优点;
多数情况下,一个topic下并不是所有消息都要保证顺序的,只是部分消息要保证顺序就可以了,所以只要能保证,这部分要保证顺序的消息放到同一个队列下就可以保证生产者发送的消息是有序的。实际
Rocketmq
也是提供这样实现的API;
生产者发送的消息体上设置消息的顺序,消息顺序的处理完全有消费端处理,但是在分布式系统下,消费端一般都有多个节点,要保证节点之间消费的消息是顺序,节点之间就要做到互相的感知,这样增加了消费端的复杂度;
这里应该还有其他的方案,但是作为通用的中间件来说,方案 2 应该是最好的方案,实际
RocketMQ
也是这么做的。
3.2.2 消费端
3.2.2.1 原理分析
根据生产者在RocketMQ
通用的实现法案,生产者已经把有顺序的消息放到同一个队列中,而消费端要做的事情,实际就是保证同一个队列下的消息能串行消费,队列在broker
服务端,消费处理在consumer
上,他们分别是在两个进程中;根据这个背景,可以将这个问题拆解成三部分
broker上要保证一个队列只有一个进程消费,即一个队列同一时间只有一个consumer消费;
broker给consumer的消息顺序应该是一致的,这个通过 RPC 传输,序列化后消息顺序不变,所以很容易实现;
consumer 上的队列消息要保证同一个时间只有一个线程消费。
通过问题的拆分,问题转换为同一个共享资源串行访问或者处理了,要解决这个问题,通常的做法都是访问资源的时候加锁,即broker上一个队列消息在被consumer访问的必须加锁,单个consumer端多线程并发处理消息的时候需要加锁;这里还需要考虑broker锁的异常情况,假如一个broker队列上的消息被consumer锁住了,万一consumer崩溃了,这个锁就释放不了,所以broker上的锁需要加上锁的时间。
实际上RocketMQ
消费端也就是照着上面的思路做:
broker 中在类
RebalanceLockManager
的静态变量mqLockTable
(变量类型为ConcurrentMap
) 中存储了以消费组 为key ,以ConcurrentMap
为value的消费者锁定信息,broker 接受请求后执行
RebalanceLockManager
的tryLockBatch
方法 ,tryLockBatch
执行顺序逻辑如下:
遍历请求锁定的队列
通过
mqLockTable
中的信息判断单个队列是否已经锁定,已经锁定加入锁定队列;没有锁定,加入未锁定队列。(主要判断逻辑是调用LockEntry
的isLocked
方法,判断clientId
是否是当前消费者ID,如果是就更新锁定时间,并加入锁定队列中;如果不是,再判断LockEntry
中当前时间和上次锁的同步时间是否大于60 秒,如果大于,就替换clientId
为当前请求的clientId
,并更新锁定时间,,并加入锁定队列;如果是小于60 秒,加入未锁定队列中;如果mqLockTable
不存在这个消费组也加入未锁定队列)判断未锁定队列是否为空,不为空,判断当前消费组是否在
mqLockTable
中,不存在就创建,启用RebalanceLockManager
的可重入锁,遍历未锁定队列执行第2 步
释放
RebalanceLockManager
的可重入锁,返回当前锁定的信息
consumer上顺序消费的类里面有一个定时任务,每隔20秒发送锁定请求
consumer上在获取到队列的消息时提交给消费线程池去处理,处理前必须获取到本地消息队列的锁。
下面是具体的RocketMQ
实现代码,部分代码省略
3.2.2.1 核心代码解析
RocketMQ
顺序消费客户端的源码,顺序消费的源码都是在ConsumeMessageOrderlyService
上
// 顺序消费核心逻辑
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
// 消费线程池
private final ThreadPoolExecutor consumeExecutor;
public ConsumeMessageOrderlyService(defaultMQPushConsumerImpl, messageListener){
// 构造函数里面主要是初始化消费线程 ,即初始化 consumeExecutor,我们初始化参数里面 消费的最小线程数和最大线程数就是在这里用的
}
// 启动方法
public void start() {
// 启动向服务器发送broker上队列锁定的定时任务,默认每隔20 秒执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); }
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
// 获取服务器上的消费队列锁
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}
public void submitConsumeRequest( msgs, processQueue, messageQueue, dispathToConsume) {
//提交消费任务,从服务器上拉取到消息,提交消费任务,让线程池consumeExecutor里的线程执行消费任务
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
// 实际的消费任务
class ConsumeRequest implements Runnable {
// 本地消息队列
private final ProcessQueue processQueue;
// 当前消费队列的broker名称和broker下队列id 信息
private final MessageQueue messageQueue;
public void run(){
// ....校验代码省略
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// ....校验代码省略
// 从本地队列里面获取到一次消费的消息数据 consumeBatchSize 配置里面一次消费的消息数
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
// msgs 校验,执行小消费前的钩子方法 省略
try {
this.processQueue.getLockConsume().lock();
// 执行实际的消费逻辑,返回消费状态, 这里消费前调用 Collections.unmodifiableList ,保证不让用户修改消费消息的顺序
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
} finally {
this.processQueue.getLockConsume().unlock();
}
}
}
}
}复制
consumer 顺序消费,在启动的时候,会启动定时任务每隔20 秒向服务器发送锁定队列的命令,即上面的lockMQPeriodically
方法,这个方法实际调用 broker上的 org.apache.rocketmq.broker.processor.AdminBrokerProcessor
上lockBatchMQ
方法,即锁定broker上当前消费的队列,broker上队列的锁定信息都在RebalanceLockManager
保存着 ,即mqLockTable
这个ConcurrentHashMap
中,其中 group 为消费组名称,MessageQueue
保存队列信息,LockEntry
记录着锁定当前队列的 具体的consumer 客户端Id 和上次同步锁的时间,我们具体看下RebalanceLockManager
锁的维护代码:
/***
* 客户端请求,尝试锁定队列
* @param group 消费组名称
* @param mqs 尝试获取锁的队列(包括topic,queue ,broker信息)
* @param clientId,当前想获取锁的消费者id
* @return
*/
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
final String clientId) {
Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
for (MessageQueue mq : mqs) {
// 判断当前客户端是否已经锁定了当前队列,如果锁定了,直接加入锁定的set 中
if (this.isLocked(group, mq, clientId)) {
lockedMqs.add(mq);
} else {
notLockedMqs.add(mq);
}
}
// 没有锁定,尝试进行锁定
if (!notLockedMqs.isEmpty()) {
this.lock.lockInterruptibly();
try {
// 消费组不在所锁定map中,加入锁定的map中
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue) {
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
for (MessageQueue mq : notLockedMqs) {
LockEntry lockEntry = groupValue.get(mq);
if (null == lockEntry) {
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
}
// 如果已经锁定,更新最新的锁定时间,isLocked 实际的逻辑时判断lockEntry 中的clientId 和传参相同,并且在判断当前时间和上次锁定时间差是否大于60 秒,两个条件满足,为锁定成功
if (lockEntry.isLocked(clientId)) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
lockedMqs.add(mq);
continue;
}
String oldClientId = lockEntry.getClientId();
// isExpired判断当前时间和上次锁定时间是否大于60 秒,大于60 秒,表示过期,可用进行锁定,替换clientId,更新锁定时间,加入到锁定队列
if (lockEntry.isExpired()) {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
lockedMqs.add(mq);
continue;
}
}
} finally {
this.lock.unlock();
}
}
return lockedMqs;
}复制
这部分代码并不复杂,有兴趣可以下载RocketMQ
的源码,在broker包的RebalanceLockManager
看看。
3.3 顺序消息原理总结
消息生产端需要保证有顺序的消息放到topic 的同一个队列下,这部分
RocketMQ
提供了相应的API。消费的时候,使用了两把锁,一把锁住broker上要消费的队列,一把锁在consumer 要处理的本地消息上,为了在两个进程上做到锁的一致,使用了一个定时任务来保证锁的同步。
broker上的锁为了防止已经获得锁的consumer异常导致同一消费组上的其他consumer消费不了这个队列上的消息,broker上的锁加上了过期机制。
在了解了一个中间件底层原理的时候,我们往往当时看了文档,知道了它的底层原理,但是时间久了很容易忘记,而且这样记忆是没有意义的,我们应该站在开发者的角度思考下,如果这个功能,在他们那个背景下我们自己实现,应该如何实现?然后再对照下我们自己实现方案和实际的实现方案,优劣在哪里(也有可能你的方案可能和开发者的方案是一样的)?如果是现在的背景下,方案是否还有改进?下次遇到业务上相同的场景,是否也可以按照这个方案实行?只有经过了这样的思考,我们才能加深印象,才能学到源码的精髓。
4. 顺序消息注意事项
实际项目中并不是所有情况都需要用到顺序消息,但这也是设计方案的时候容易忽略的一点
顺序消息是生产者和消费者配合协调作用的结果,但是消费端保证顺序消费,是保证不了顺序消息的
消费端并行方式消费,只设置一次拉取消息的数量为 1(即配置参数
consumeBatchSize
),是否可以实现顺序消费 ?这里实际是不能的,并发消费在消费端有多个线程同时消费,consumeBatchSize
只是一个线程一次拉取消息的数量,对顺序消费没有意义,这里大家有兴趣可以看ConsumeMessageConcurrentlyService
的代码,并发消费的逻辑都在哪里。
RocketMQ 的学习资料,网上有很多,但是最全的信息还是在官网,下载慢可以去码云,它的文档都在源码包的 docs 目录下,很全!
end
热门文章:
没有接口,如何进行RPC调用?
Spark Sql 调优详解,何止性能翻倍!
RAT建设
分布式存储剖析
lexin_common系列之CSV
深入浅出 Go 协程
合同平台治理
Nginx高效的原因
乐信基础框架系列--MQ