暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ之延时消费逻辑

不修边幅的创客 2020-09-26
899
点击上方蓝色字体,选择“设为星标”


1

延迟消息定义及发送
复制

延迟消息,你称他为定时消息也行,就是说这条消息发送到broker之后,不会立即被消费,而是在等待一个预设好的特定时间后把消息投递到Topic

这样的延时场景,RocketMQ也是考虑到了, 生产者该如何定义延迟消息呢 ?借用官方的例子说明一下,我们的重点是消费者如何处理

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {
   public static void main(String[] args) throws Exception {
      // 实例化一个生产者来产生延时消息
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // 启动生产者
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
          message.setDelayTimeLevel(3);
          // 发送消息
          producer.send(message);
      }
       // 关闭生产者
      producer.shutdown();
  }
}

复制

Message这个消息体对象的setDelayTimeLevel(int level)中,允许设置一个延迟级别,总共16个级别,由messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 40m 50m 1h 2h 6h决定,但是我们传入的时候只能传入int类型且大于0的整数,比如:1表示延迟1s发送,3表示延迟10s发送,16表示延迟6h发送,以此类推。一旦设定了延迟级别,就代表这是一条延迟消息。

生产者要做的就是设置这个DELAY属性,broker接受到这个延迟消息后如何操作呢? 


2

Broker对延迟消息的处理
复制

Broker中,处理生产者消息的类叫:SendMessageProcessor,在它的processRequest(ctx,request)对生产者发送的消息进行处理,如下:

@Override
  public RemotingCommand processRequest(ChannelHandlerContext ctx,
             RemotingCommand request)
 throws RemotingCommandException
{
    SendMessageContext mqtraceContext = null;
    switch (request.getCode()) {
    case RequestCode.CONSUMER_SEND_MSG_BACK:
      //消费者消费失败,发送给broker,重新消费
      return this.consumerSendMsgBack(ctx, request);
    default:
      SendMessageRequestHeader requestHeader = parseRequestHeader(request);
      if (requestHeader==null) {
        return null;
      }
      mqtraceContext = buildMsgContext(ctx, requestHeader);
      this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
      //默认处理生产者发送的消息
      final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
      this.executeSendMessageHookAfter(response, mqtraceContext);
      return response;
    }
}

复制


this.sendMessage(ctx,request,mqtraceContext,requestHeader)broker主要通过该方法对生产者发送的消息进行处理(注意:这里只关心延迟,其它逻辑不做讲解),跟踪该方法进入到MessageStore.putMessage(msgInner),在进入CommitLog.putMessage(msg),在putMessage(ms)方法中将看到对延迟消息对处理,代码如下:

//存储消息
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

复制
//存储消息
PutMessageResult result = this.commitLog.putMessage(msg);

复制


public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

    ...
   //生产端不设置,sysFlay为0
  final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
  if (tranType == MessageSysFlag.TransactionNotType//
        || tranType == MessageSysFlag.TransactionCommitType) {
      //从消息中获取延迟级别,如果延迟级别大于0,说明是延迟消息,进入如下逻辑
     if (msg.getDelayTimeLevel() > 0) {
          //延迟级别不能超过最大值,这里是16
          if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService()
                   .getMaxDelayLevel()) {
              msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService()
                        .getMaxDelayLevel());
          }
          //获取延迟队列:SCHEDULE_TOPIC_XXX
          topic = ScheduleMessageService.SCHEDULE_TOPIC;
          //延迟队列每个级别的queueId = 级别,也就是说:延迟队列有16个queue,对对应的queueId放对应的延迟消息
          queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
        tagsCode =
              this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(
                          msg.getDelayTimeLevel(), msg.getStoreTimestamp());

        //备份一下真实的Topoic和queueId,因为消息到时间,最终还是丢给真实队列发送
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID,
                    String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

        //设置消息的topic为延迟队列和延迟队列的queueId
        msg.setTopic(topic);
        msg.setQueueId(queueId);
      }
   }
}

复制


通过上面的代码可以看到,broker接收到延迟消息后,会把消息放到延迟队列SCHEDULE_TOPIC_XXX这个预设好的固定队列中,queueId为当前消息的延迟级别,比如:int delayLevel=3,则queueId=3(注意:无论哪个topic,只要消息的延迟级别一样,就会放到对应的queueId中,只是在消息到时间后,分发到自己的真实队列中),使用MessageAccessor.putProperty记录真实的Topic和queueId到消息体中

接下来就该看延迟队列:SCHEDULE_TOPIC_XXX是如何工作的,在broker启动之前需要把延迟队列级别配置到配置文件中,如图所示:

broker在启动时,会同时启动一个定时任务,这个任务叫:ScheduleMessageService,该类单独处理每个级别的延迟消息,其start()方法在broker启动时也会启动,代码如下:

//每个level对应的延时时间
private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
    new ConcurrentHashMap<Integer, Long>(32);
//延时计算到了哪里
private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
    new ConcurrentHashMap<Integer, Long>(32);
//定时器
private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
//存储顶层对象
private final DefaultMessageStore defaultMessageStore;
// 最大值
private int maxDelayLevel;

public void start() {
    // 为每个延时队列增加定时器
    for (Integer level : this.delayLevelTable.keySet()) {
        Long timeDelay = this.delayLevelTable.get(level);
        Long offset = this.offsetTable.get(level);
        if (null == offset) {
            offset = 0L;
        }

        if (timeDelay != null) {
            //为当前延迟级别添加定时任务
            this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
        }
    }

    // 定时将延时进度刷盘
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                ScheduleMessageService.this.persist();
            }
            catch (Exception e) {
                log.error("scheduleAtFixedRate flush exception", e);
            }
        }
    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

复制

DeliverDelayedMessageTimerTask继承TimerTask,并重写run方法,代码如下:

@Override
public void run() 
{
    try {
        //计算延迟消息是否到时,把到时的消息投递到真实的topic(注意消息体中已经记录了真实的topic)
        this.executeOnTimeup();
    }
    catch (Exception e) {
        // XXX: warn and notify me
        log.error("ScheduleMessageService, executeOnTimeup exception", e);
        //异常了,重新提交定时任务,100ms后重新执行
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
            this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
    }
}

复制


this.executeOnTimeUp大概逻辑就是:上次调度执行消费的最大queueOffset作为当前本次调度的起始queueOffset,并依次往后计算所有到期的消息,如何计算当前消息是否到期呢? 如果当前时间-消息的生成时间 >= level对应的等待时间,则表示当前消息已经到时,可以投递,投递的逻辑:消息体中去除延迟设置,同时把消息的真实topic和queueId取出,并设置到msg的topic和queueId,交给CommitLog.putMessage即可,代码如下:

public void executeOnTimeup() {
//获取当前延迟级别的未被消费的消息
ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));

long failScheduleOffset = offset;
//如果不为空
if (cq != null) {
    //根据当前起始queueOffset获取未消费的消息,计算延迟时间
    SelectMapedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
    if (bufferCQ != null) {
        try {
            long nextOffset = offset;
            int i = 0;
            for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQStoreUnitSize) {
                long offsetPy = bufferCQ.getByteBuffer().getLong();
                int sizePy = bufferCQ.getByteBuffer().getInt();
                long tagsCode = bufferCQ.getByteBuffer().getLong();

                // 队列里存储的tagsCode实际是一个时间点
                long now = System.currentTimeMillis();
                long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                nextOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);
                //计算延迟了多久
                long countdown = deliverTimestamp - now;
                // 时间到了,该投递
                if (countdown <= 0) {
                    MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);

                    if (msgExt != null) {
                        try {
                            //把延迟消息取出
                            MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                            //重新投递
                            PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.defaultMessageStore
                                        .putMessage(msgInner);
                            // 成功
                            if (putMessageResult != null
                                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                continue;
                            }
                            // 失败
                            else {
                                // XXX: warn and notify me
                                log.error(
                                    "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                    msgExt.getTopic(), msgExt.getMsgId());
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel,
                                        nextOffset), DELAY_FOR_A_PERIOD);
                                ScheduleMessageService.this.updateOffset(this.delayLevel,
                                    nextOffset);
                                return;
                            }
                        }
                        catch (Exception e) {
                            /*
                             * XXX: warn and notify me
                             * msgExt里面的内容不完整
                             * ,如没有REAL_QID,REAL_TOPIC之类的
                             * ,导致数据无法正常的投递到正确的消费队列,所以暂时先直接跳过该条消息
                             */

                            log.error(
                                "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                        + offsetPy + ",sizePy=" + sizePy, e);
                        }
                    }
                }
                // 时候未到,继续定时
                else {
                    ScheduleMessageService.this.timer.schedule(
                        new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                        countdown);
                    ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                    return;
                }
            } // end of for

            nextOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
            ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
            return;
        }
        finally {
            // 必须释放资源
            bufferCQ.release();
        }
    } // end of if (bufferCQ != null)
    else {
        /*
         * 索引文件被删除,定时任务中记录的offset已经被删除,会导致从该位置中取不到数据,
         * 这里直接纠正下一次定时任务的offset为当前定时任务队列的最小值
         */

        long cqMinOffset = cq.getMinOffsetInQuque();
        if (offset < cqMinOffset) {
            failScheduleOffset = cqMinOffset;
            log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                    + cqMinOffset + ", queueId=" + cq.getQueueId());
        }
    }
} // end of if (cq != null)

//注意这里:100ms继续往定时器中放任务,如此往复,直到broker停止。。。
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
    failScheduleOffset), DELAY_FOR_A_WHILE);
}

复制
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());

    TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
    long tagsCodeValue =
            MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
    msgInner.setTagsCode(tagsCodeValue);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

    msgInner.setSysFlag(msgExt.getSysFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(msgExt.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());

    msgInner.setWaitStoreMsgOK(false);
    //延迟时间到了,去除延迟级别
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

    // 恢复真实的Topic
    msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));

    // 恢复真实的QueueId
    String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
    int queueId = Integer.parseInt(queueIdStr);
    msgInner.setQueueId(queueId);

    return msgInner;
}
}

复制


注意:每个level的调度周期都是100ms,当一个周期执行完成后,通过如下代码开启下一个周期:

//注意:100ms继续往定时器中放任务,如此往复,直到broker停止。。。
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
    failScheduleOffset), DELAY_FOR_A_WHILE);

复制


延迟消息到期后,投递到真实的topic中,剩下的流程就正常了,只要真实的Topic中的消息没有堆积,消费者就能在延迟一定的时间后,获取到延迟消息。

文章转载自不修边幅的创客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论