1
生产者保证消息的投递顺序 复制
顺序消息消费是指消息队列中消息之间有先后的依赖关系,后一条消息的处理依赖于前一条消息的处理结果,比如:一个订单的顺序流程是:创建、付款、推送、完成。在处理时不可能乱了顺序。
一、默认情况下消息的投递方式
默认情况下,RocketMQ生产者生产的消息在所有的MessageQueue中轮询选择一个,进行投递,也就是说同一个生产者的消息被发送到不同的MessageQueue,在被消费者消费时,就无法保证消息被顺序消费,默认投递的MessageQueue的核心代码如下:
private SendResult sendDefaultImpl(//
Message msg,//
final CommunicationMode communicationMode,//
final SendCallback sendCallback, final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//轮询选择一个MessageQueue进行投递
MessageQueue tmpmq = topicPublishInfo.public class TopicPublishInfo {
//MessageQueue列表
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//记录当前消费实例发送消息 原子递增
private AtomicInteger sendWhichQueue = new AtomicInteger(0);
//....
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//lastBrokerName是上一次发送失败的broker,规避上次失败的broker,不再其上面发送
if (lastBrokerName != null) {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return null;
}
else {
//原子性的递增
int index = this.sendWhichQueue.getAndIncrement();
//对MessageQueue列表求余
int pos = Math.abs(index) % this.messageQueueList.size();
//获取需要发送的MessageQueue
return this.messageQueueList.get(pos);
}
}
}复制
由于消费者是绑定不同的队列进行消费,如果生产者采用这种轮询的方式进行生产,受消费者实例的处理能力的影响,肯定就没办法控制消息消费的先后了。
二、生产端如何发送顺序消息
RocketMQ在设计的时候考虑了这种顺序消息的场景,所以在生产者的时候,允许按照某种规则对消息进行设置,在这种规则的驱动下,可以控制某一类消息按先后顺序进入某一个指定的MessageQueue,这样消费者就有能力按顺序处理消息。
MQProducer接口定义了如下接口,使用者需要自己实现MessageQueueSelector的select接口,即可实现消息按规则发送,代码如下:
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;复制
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}复制
public class Producer {
public static void main(String[] args) {
try {
MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };
for (int i = 0; i < 100; i++) {
// 订单ID相同的消息要有序
int orderId = i % 10;
Message msg =
new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes());
//通过订单ID对MessageQueue进行Hash,即可把相同订单ID的消息发送到同一个MessageQueue
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.println(sendResult);
}
producer.shutdown();
}
catch (MQClientException e) {
e.printStackTrace();
}
}
}复制
private SendResult sendSelectImpl(//
Message msg,//
MessageQueueSelector selector,//
Object arg,//
final CommunicationMode communicationMode,//
final SendCallback sendCallback, final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//调用selector.select获取MessageQueue进行发送
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
}复制
2
消费者保证消息的消费顺序 复制
很多人立刻想到,如果把消费端实例的线程池里面线程数设置成1,是不是就可以了呢,这个当然是没有问题的,如果是使用MessageListenerConcurrently,则需要把线程池改为单线程模式,但不是很专业.假如有一个消息消费失败,可能会阻塞后面的所有消息的消费。接下来讲一个消费者专业的处理方式。
消费端提供了专门处理顺序消息的方法,这个方法就要涉及到MessageListenerOrderly,针对上面的生产,我们使用该方法来消费的话,代码为:
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();复制
如果MessageListener的实现是MessageListenerOrderly,DefaultMQPushConsumerImpl里面的属性consumeMessageService为ConsumeMessageOrderlyService,所有的顺序消费的逻辑都在这个类中处理.当消费者启动时,会调用start方法,该方法会启动一个每隔20秒的定时任务,该任务是锁定所有的MessageQueue,代码如下:
public void start() {
// 启动定时lock队列服务
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//延迟一秒,时间间隔为20秒,执行rebalanceImpl的lockAll()方法。
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);
}
}复制
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
//这个方法会锁定相关broker下面的相关的messagequeue对应的processQueue,这个地方下面会用到
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}复制
继续调用RebalanceImpl.lockAll()方法对所有broker上的当前topic的MessageQueue进行锁定(所谓锁定,就是把MessageQueue对应的ProcessQueue的成员变量locked设置为true),如果锁定失败,将暂停当前MessageQueue的拉取,直到下个定时任务重新锁定。
public void lockAll() {
//根据当前负载的消息队列,按照 Broker分类存储在Map。
//负载的消息队列在RebalanceService时根据当前消费者数量与消息消费队列按照负载算法进行分配,然后尝试对该消息队列加锁,如果申请锁成功,则加入到待拉取任务中
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<MessageQueue>> entry = it.next();
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty())
continue;
//根据Broker获取主节点的地址。
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs);
try {
//向Broker发送锁定消息队列请求,该方法会返回本次成功锁定的消息消费队列,关于Broker端消息队列锁定实现见下文详细分析。
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(
findBrokerResult.getBrokerAddr(), requestBody, 1000);
//遍历本次成功锁定的队列来更新对应的ProcessQueue的locked状态,如果locked为false,则设置成true,并更新锁定时间。
for (MessageQueue mq : lockOKMQSet) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
if (!processQueue.isLocked()) {
log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
}
//processQueue被锁定标识和锁定时间
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
//遍历mqs,如果消息队列未成功锁定,需要将ProcessQueue的locked状态为false,在该处理队列未被其他消费者锁定之前,该消息队列将暂停拉取消息。
for (MessageQueue mq : mqs) {
if (!lockOKMQSet.contains(mq)) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
processQueue.setLocked(false);
log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup,
mq);
}
}
}
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mqs, e);
}
}
}
}复制
为什么会有这个锁定呢 ? 顺序消费的时候使用,消费之前会判断一下ProcessQueue锁定时间是否超过阈值(默认30000ms),如果没有超时,代表还是持有锁。
先稍微讲一下消费逻辑,首先RebalancePushImpl.dispatchPullRequest方法把当前消费者实例负责的MessageQueue及对应的ProcessQueue封装成了pullRequest进行分发,会被分发到一个PullMessageService.pullRequestQueue的阻塞链表队列中,然后PullMessageService线程不停从该队列中获取请求(没有就阻塞)拉取消息,在DefaultMQPushConsumerImpl.pullMessage中执行真正的消费逻辑这里会不停的拉取(当然有限流),拉取到消息后被异步处理,但是请放心异步处理也会按顺序提交consumeOffset,不会造成Offset大的消息先消费的情况,这里我们异步处理类及方法:
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
...
switch (pullResult.getPullStatus()) {
case FOUND:
...
//拉取到的消息,会被放到对应的processQueue中,processQueue会做相应的顺序消费控制
//这里注意:如果返回true,才可以往线程池添加消费请求
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//然后在把当前结果封装成ConsumeRequest放到消费者线程池中处理
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
dispathToConsume);
}复制
ProcessQueue.putMessage是互斥的,不能同时写入,写入时会把消息转存到内部的TreeMap中,key是当前MessageQueue的queueOffset,value是MessageExt,并且是基于queueOffset排好序的消息,代码如下:
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
}
}
//msgCount记录为消费的消息
msgCount.addAndGet(validMsgCnt);
// 如果ProcessQueue有需要处理的消息(从上可知,如果msgs不为空那么msgTreeMap不为空)
// 如果consuming为false,将其设置为true,表示正在消费
// 这个值在放消息的时候会设置为true,在顺序消费模式,取不到消息则设置为false
if (!msgTreeMap.isEmpty() && !this.consuming) {
// 有消息,且为未消费状态,则顺序消费模式可以消费
dispatchToConsume = true;
this.consuming = true;
}
if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1);
// property为ConsumeQueue里最大的offset
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {// 当前消息的offset与最大消息的差值,相当于还有多少offset没有消费
this.msgAccCnt = accTotal;
}
}
}
}
finally {
this.lockTreeMap.writeLock().unlock();
}
}
catch (InterruptedException e) {
log.error("putMessage exception", e);
}
return dispatchToConsume;
}复制
@Override
public void submitConsumeRequest(//
final List<MessageExt> msgs, //
final ProcessQueue processQueue, //
final MessageQueue messageQueue, //
final boolean dispathToConsume) {
if (dispathToConsume) {
//封装成ConsumeRequest,由线程池里的线程处理消息的消费
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}复制
ConsumeRequest的run方法执行真正的消费逻辑,一边介绍消费逻辑,一边看代码吧.
1、processQueue不能被dropped,表示被废弃了;
//如果消息队列状态为 dropped 为true,则停止本次消息消费。
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
return;
}复制
2、处理时,对当前MessageQueue加锁,也就是说,即使有多个线程处理同一个MessageQueue的消息,也只会有一个线程对MessageQueue这个对象进行处理,其它线程等待(注意:不同MessageQueue不会阻塞,可以并行消费),这里有人会想,如果这个线程释放了锁,下一个拿到锁对线程会不会先消费QueueOffset大的消息先消费呢 ? 答案是肯定不会,因为:下一个拿到锁对线程,也是对ProcessQueue同一个人对象里的消费进行处理,记得哈,是同一个ProcessQueue,并且内部的消息是通过QueueOffset排好序的.
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
//当前messageQueue开始消费
}复制
3、如果当前ProcessQueue的锁还在,并且没有失效则可以消费;
if (MessageModel.BROADCASTING
.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
//循环对当前MessageQueue进行消费
}复制
4、for(boolean continueConsume = true;continueConsume)这个循环很有意思,表示当前MessageQueue是否可以持续消费,是否可以继续消费的标准有两个: a、是在consumeOffset的提交,如果提交成功,就可以继续消费,直到processQueue中msgTreeMap为空; b、consumeOffset没有正常提交.比如其它状态:SUSPEND_CURRENT_QUEUE_A_MOMENT
for (boolean continueConsume = true; continueConsume;) {
//循环消费processQueue中的消息
}复制
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
//重新消费该批消息
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
context.getSuspendCurrentQueueTimeMillis());
//停止当前messageQueue循环消费
continueConsume = false;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup,
consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;复制
5、再次判断processQueue的状态,不能drop,锁定状态,锁没有过期;否则就重新把messagequeue和processqueue包装成consumerequest放到线程池,重新拉取消费.
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
this.processQueue, 10);复制
6、然后就调用当前锁定对象的processQueue.takeMessages获取消息,takeMessages会从msgTreeMap对象中按传入的consumeBatchSize参数,按QueueOffset的大小(从小到大)取数据进行消费,这个过程也是互斥的,不能同时进行,然后执行消费了,如下:
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
//....
//真正执行消息的消费
status =
messageListener.consumeMessage(Collections.unmodifiableList(msgs),
context);复制
// 从ProcessQueue中取出batchSize条消息
public List<MessageExt> takeMessags(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
// 从treeMap中获取batchSize条数据,每次都返回offset最小的那条并移除
if (!this.msgTreeMap.isEmpty()) {
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
// 放到返回列表和一个临时用的treemapp中
result.add(entry.getValue());
msgTreeMapTemp.put(entry.getKey(), entry.getValue());
}
else {
break;
}
}
}
// 取到消息了就会开始进行消费,如果没取到,则不需要消费,那么consuming设为false
if (result.isEmpty()) {
consuming = false;
}
}
finally {
this.lockTreeMap.writeLock().unlock();
}
}
catch (InterruptedException e) {
log.error("take Messages exception", e);
}
return result;
}复制
7、ConsumeOrderlyStatus记录了消费执行的结果,根据不同的结果,要么进行consumeOffset提交、要么对消息重新消费,这里展示一个自动提交的(注意:手动提交需要业务自己把握ConsumeOrderlyStatus里四个的四个含义,根据实际的处理结果进行返回
continueConsume =
ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status,
context, this);复制
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn(
"the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
//提交本次消费的consumeOffset
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup,
consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
//重新消费该批消息
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup,
consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
default:
break;
}
}复制
consumeOffset提交到broker,代码如下:
if (commitOffset >= 0) {
//consumeOffset提交到broker
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),
commitOffset, false);
}复制
如果返回到是SUSPEND_CURRENT_QUEUE_A_MOMENT,这个里面会把本次消费的消息重新放回到ProcessQueue中的msgTreeMap然后由下个线程接着消费,为什么说是下个线程呢? 因为这里会把continueConsume设置为false,返回时,执行当前ConsumeRequest进行消费的线程就会完成消费,下一个获取当前MessageQueue的锁的线程接着处理当前的processQueue(不同MessageQueue对应的processQueue永远是同一个对象),代码如下:
//重新消费该批信息
//makeMessageToCosumeAgain在顺序消费客户端返回消息状态为SUSPEND_CURRENT_QUEUE_A_MOMENT时调用;
//将消息从msgTreeMapTemp移除,并将该批消息重新放入msgTreeMap。
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
for (MessageExt msg : msgs) {
this.msgTreeMapTemp.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
}
finally {
this.lockTreeMap.writeLock().unlock();
}
}
catch (InterruptedException e) {
log.error("makeMessageToCosumeAgain exception", e);
}
}复制