1
延迟消息定义及发送 复制
延迟消息,你称他为定时消息也行,就是说这条消息发送到broker之后,不会立即被消费,而是在等待一个预设好的特定时间后把消息投递到Topic。
这样的延时场景,RocketMQ也是考虑到了, 生产者该如何定义延迟消息呢 ?借用官方的例子说明一下,我们的重点是消费者如何处理。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}复制
生产者要做的就是设置这个DELAY属性,broker接受到这个延迟消息后如何操作呢?
2
Broker对延迟消息的处理 复制
Broker中,处理生产者消息的类叫:SendMessageProcessor,在它的processRequest(ctx,request)对生产者发送的消息进行处理,如下:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext = null;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
//消费者消费失败,发送给broker,重新消费
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader==null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
//默认处理生产者发送的消息
final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}复制
this.sendMessage(ctx,request,mqtraceContext,requestHeader)broker主要通过该方法对生产者发送的消息进行处理(注意:这里只关心延迟,其它逻辑不做讲解),跟踪该方法进入到MessageStore.putMessage(msgInner),在进入CommitLog.putMessage(msg),在putMessage(ms)方法中将看到对延迟消息对处理,代码如下:
//存储消息
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);复制
//存储消息
PutMessageResult result = this.commitLog.putMessage(msg);复制
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
...
//生产端不设置,sysFlay为0
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TransactionNotType//
|| tranType == MessageSysFlag.TransactionCommitType) {
//从消息中获取延迟级别,如果延迟级别大于0,说明是延迟消息,进入如下逻辑
if (msg.getDelayTimeLevel() > 0) {
//延迟级别不能超过最大值,这里是16
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService()
.getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService()
.getMaxDelayLevel());
}
//获取延迟队列:SCHEDULE_TOPIC_XXX
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//延迟队列每个级别的queueId = 级别,也就是说:延迟队列有16个queue,对对应的queueId放对应的延迟消息
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
tagsCode =
this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(
msg.getDelayTimeLevel(), msg.getStoreTimestamp());
//备份一下真实的Topoic和queueId,因为消息到时间,最终还是丢给真实队列发送
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
//设置消息的topic为延迟队列和延迟队列的queueId
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
}复制
通过上面的代码可以看到,broker接收到延迟消息后,会把消息放到延迟队列SCHEDULE_TOPIC_XXX这个预设好的固定队列中,queueId为当前消息的延迟级别,比如:int delayLevel=3,则queueId=3(注意:无论哪个topic,只要消息的延迟级别一样,就会放到对应的queueId中,只是在消息到时间后,分发到自己的真实队列中),使用MessageAccessor.putProperty记录真实的Topic和queueId到消息体中。
接下来就该看延迟队列:SCHEDULE_TOPIC_XXX是如何工作的,在broker启动之前需要把延迟队列级别配置到配置文件中,如图所示:
broker在启动时,会同时启动一个定时任务,这个任务叫:ScheduleMessageService,该类单独处理每个级别的延迟消息,其start()方法在broker启动时也会启动,代码如下:
//每个level对应的延时时间
private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
//延时计算到了哪里
private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
//定时器
private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
//存储顶层对象
private final DefaultMessageStore defaultMessageStore;
// 最大值
private int maxDelayLevel;
public void start() {
// 为每个延时队列增加定时器
for (Integer level : this.delayLevelTable.keySet()) {
Long timeDelay = this.delayLevelTable.get(level);
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
//为当前延迟级别添加定时任务
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 定时将延时进度刷盘
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
}
catch (Exception e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}复制
DeliverDelayedMessageTimerTask继承TimerTask,并重写run方法,代码如下:
@Override
public void run() {
try {
//计算延迟消息是否到时,把到时的消息投递到真实的topic(注意消息体中已经记录了真实的topic)
this.executeOnTimeup();
}
catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
//异常了,重新提交定时任务,100ms后重新执行
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}复制
this.executeOnTimeUp大概逻辑就是:上次调度执行消费的最大queueOffset作为当前本次调度的起始queueOffset,并依次往后计算所有到期的消息,如何计算当前消息是否到期呢? 如果当前时间-消息的生成时间 >= level对应的等待时间,则表示当前消息已经到时,可以投递,投递的逻辑:消息体中去除延迟设置,同时把消息的真实topic和queueId取出,并设置到msg的topic和queueId,交给CommitLog.putMessage即可,代码如下:
public void executeOnTimeup() {
//获取当前延迟级别的未被消费的消息
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
//如果不为空
if (cq != null) {
//根据当前起始queueOffset获取未消费的消息,计算延迟时间
SelectMapedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQStoreUnitSize) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
// 队列里存储的tagsCode实际是一个时间点
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);
//计算延迟了多久
long countdown = deliverTimestamp - now;
// 时间到了,该投递
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//把延迟消息取出
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
//重新投递
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
// 成功
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
}
// 失败
else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
}
catch (Exception e) {
/*
* XXX: warn and notify me
* msgExt里面的内容不完整
* ,如没有REAL_QID,REAL_TOPIC之类的
* ,导致数据无法正常的投递到正确的消费队列,所以暂时先直接跳过该条消息
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
}
// 时候未到,继续定时
else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
finally {
// 必须释放资源
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
/*
* 索引文件被删除,定时任务中记录的offset已经被删除,会导致从该位置中取不到数据,
* 这里直接纠正下一次定时任务的offset为当前定时任务队列的最小值
*/
long cqMinOffset = cq.getMinOffsetInQuque();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
//注意这里:100ms继续往定时器中放任务,如此往复,直到broker停止。。。
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}复制
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
//延迟时间到了,去除延迟级别
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
// 恢复真实的Topic
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
// 恢复真实的QueueId
String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
int queueId = Integer.parseInt(queueIdStr);
msgInner.setQueueId(queueId);
return msgInner;
}
}复制
注意:每个level的调度周期都是100ms,当一个周期执行完成后,通过如下代码开启下一个周期:
//注意:100ms继续往定时器中放任务,如此往复,直到broker停止。。。
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);复制
延迟消息到期后,投递到真实的topic中,剩下的流程就正常了,只要真实的Topic中的消息没有堆积,消费者就能在延迟一定的时间后,获取到延迟消息。