暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ之重试消息逻辑

不修边幅的创客 2020-09-27
313
点击上方蓝色字体,选择“设为星标”


1

消息消费失败了?
复制

消费者在消费普通消息时,默认采用的是:ConsumeMessageConcurrentlyService并发消费,其它的逻辑我们不讲了,只看消费实例线程在消费MessageQueue的消息结束后的处理逻辑,可以查看ConsumeMessageConcurrentlyService的下子类:ConsumeRequest, 它就是并发线程要处理的任务,我们只看消费结果的处理:

@Override
public void run() 
{

  //消息消费的状态
  ConsumeConcurrentlyStatus status = null;

  ...

  try {
     //注意这里:提前设置retry_topic,如果消息失败,要把消息发到broker重试
     ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);

    //真正的消费逻辑
    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
   }
   catch (Throwable e) {
     log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",//
         RemotingHelper.exceptionSimpleDesc(e),//
         ConsumeMessageConcurrentlyService.this.consumerGroup,//
          msgs,//
        messageQueue);
   }

    ....

  if (!processQueue.isDropped()) {
     //根据消费状态处理消费结果
    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
  }
  else {
    log.warn(
        "processQueue is dropped without process consume result. messageQueue={}, msgTreeMap={}, msgs={}",
        new Object[] { messageQueue, processQueue.getMsgTreeMap(), msgs });
  }
}

复制


注意上面代码中,有个设置retry_topic的逻辑,因为消息有消费失败的可能,所以提前把msg的retry_topic设置到消息体中(其中:retry_topic是用一个固定前缀和当前conumerGroup构成:“%RETRY%+consumerGroup”的重试队列),如果失败消息会投递到当前重试队列中(一个重试队列在一个broker中只有一个queue),代码如下:

public void resetRetryTopic(final List<MessageExt> msgs) {
    //获取当前消费者group的重试队列
    final String groupTopic = MixAll.getRetryTopic(consumerGroup);
    for (MessageExt msg : msgs) {
        String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
        if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
            //重置retry topic到消息体
            msg.setTopic(retryTopic);
        }
    }
}

复制


processConsumeResult方法对消费结果进行处理,代码如下:

public void processConsumeResult(//
        final ConsumeConcurrentlyStatus status, //
        final ConsumeConcurrentlyContext context, //
        final ConsumeRequest consumeRequest//
)
 
{
    //ackIndex记录被应答的消息数,表示成功消费的
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;

    //消费状态判断
    switch (status) {
    case CONSUME_SUCCESS:
        if (ackIndex >= consumeRequest.getMsgs().size()) {
            ackIndex = consumeRequest.getMsgs().size() - 1;
        }
        //消费成功/失败数量统计
        int ok = ackIndex + 1;
        int failed = consumeRequest.getMsgs().size() - ok;
        this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup,
            consumeRequest.getMessageQueue().getTopic(), ok);
        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup,
            consumeRequest.getMessageQueue().getTopic(), failed);
        break;
    case RECONSUME_LATER:
        ackIndex = -1;
        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup,
            consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size());
        break;
    default:
        break;
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
        }
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
        //除去OK的,剩下的就是失败的,需要重新消费
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            //消息重新发送到broker,当前消费失败的消息需要重新消费,
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
              //这里会记录重试次数,如果超过16次,消息会被放进死信队列
              msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
              msgBackFailed.add(msg);
            }
        }

        if (!msgBackFailed.isEmpty()) {
            consumeRequest.getMsgs().removeAll(msgBackFailed);

            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
                consumeRequest.getMessageQueue());
        }
        break;
    default:
        break;
    }

    //消费完毕后从ProcessQueue中清除这批消息
    //offset为清除这批偏移量后processQueue.msgTreeMap中最小的偏移量
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),
            offset, true);
    }
}

复制

对上述代码简单介绍一下,变量ackIndex用于记录成功消费的消息数,用msg.size - ackIndex = 消费失败的数量,通过:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) 对消费失败消息进行处理,同时记录了消息的重试次数,如何处理失败的消息呢? 把消费失败的消息重新发送给broker,逻辑在this.sendMessageBack(msg,context)中,(但是这里要注意:msg消息体中的topic已经被设置成了retry_topic,也就是失败的消息不会在真实的topic中消费了,只会在retry_topic消费,下面会讲到),代码如下:

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
  int delayLevel = context.getDelayLevelWhenNextConsume();
  try {
    //请求broker,消费失败的消息需要重新消费
    this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue()
            .getBrokerName());
    return true;
  }
  catch (Exception e) {
    log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(),
            e);
  }

  return false;
}

复制


消费失败的消息发送到broker,会由:SendMessageProcessor类处理,如代码所示:

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
  SendMessageContext mqtraceContext = null;
  switch (request.getCode()) {
  case RequestCode.CONSUMER_SEND_MSG_BACK:
    //处理消费失败的消息
    return this.consumerSendMsgBack(ctx, request);
  default:
    SendMessageRequestHeader requestHeader = parseRequestHeader(request);
    if (requestHeader==null) {
      return null;
    }
     // 消息轨迹:记录到达 broker 的消息
    mqtraceContext = buildMsgContext(ctx, requestHeader);
    this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
    final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
    // 消息轨迹:记录发送成功的消息
    this.executeSendMessageHookAfter(response, mqtraceContext);
    return response;
  }
}

复制


consumerSendMsgBack中对消息进行一系列的验证,然后把消息放到重试队列中,并对消息设置延迟级别,同时记录消息的重试次数,重试次数越大,延迟级别越高,如果重试次数达到最大值,则停止重试,把消息丢到死信队列,死信队列中的消费需要消费时,就需要人工参与了,最后消息按正常消息处理逻辑处理,比如:落盘、构建索引等,只是当前消息的Topic是重试队列(注意:因为消息设置了延时级别,所以这条消息需要按照延时消息来处理了),代码如下:

private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
  throws RemotingCommandException
{
  
 ...
 
 //获取重试队列,%RETRY%+consumerGroup
 String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
 int queueIdInt =
   Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();


 // 检查topic是否存在,不存在则创建,并把当前重试队列注册到broker
 TopicConfig topicConfig =
   this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
    newTopic,//重试队列
    subscriptionGroupConfig.getRetryQueueNums(), //重试队列的MessageQueue,默认是1
    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
 if (null == topicConfig) {
  response.setCode(ResponseCode.SYSTEM_ERROR);
  response.setRemark("topic[" + newTopic + "] not exist");
  return response;
 }
 
 ...
 
 // 构造消息
 final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
 if (null == retryTopic) {
  MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
 }
 msgExt.setWaitStoreMsgOK(false);

 // 客户端自动决定定时级别
 int delayLevel = requestHeader.getDelayLevel();

 // 死信消息处理,消息的每次重试都会被记录,当重试次数达到最大阈值(16)时,不在重试,把消息丢到死信队列中
 if (msgExt.getReconsumeTimes() >= subscriptionGroupConfig.getRetryMaxTimes()//
   || delayLevel < 0) {
  //获取死信队列
  newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
  queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
  //创建死信队列并上报
  topicConfig =
    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
     newTopic, //
     DLQ_NUMS_PER_GROUP,//
     PermName.PERM_WRITE, 0 // 死信消息不需要同步,不需要较正。
     );
  if (null == topicConfig) {
   response.setCode(ResponseCode.SYSTEM_ERROR);
   response.setRemark("topic[" + newTopic + "] not exist");
   return response;
  }
 }
 // 继续重试
 else {
  //如果消息是第一次重试,则直接从级别3开始 (延迟10s)
  if (0 == delayLevel) {
   delayLevel = 3 + msgExt.getReconsumeTimes();
  }
  //消息体中设置延迟级别,注意这里只要设置延迟级别,就要表示是延迟消息
  msgExt.setDelayTimeLevel(delayLevel);
 }

 //重新构建重试消息,注意这里的消息的Topic是重试队列,并且是延时消息
 MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
 msgInner.setTopic(newTopic);
 msgInner.setBody(msgExt.getBody());
 msgInner.setFlag(msgExt.getFlag());
 MessageAccessor.setProperties(msgInner, msgExt.getProperties());
 msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
 msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

 msgInner.setQueueId(queueIdInt);
 msgInner.setSysFlag(msgExt.getSysFlag());
 msgInner.setBornTimestamp(msgExt.getBornTimestamp());
 msgInner.setBornHost(msgExt.getBornHost());
 msgInner.setStoreHost(this.getStoreHost());
 msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
 
 ...

}

复制


程序走到这里,接下来该如何处理呢 ? 因为消息中被设置了延迟级别,所以就是延时消息,只是这里的真实队列是retry_topic(重试队列)。延时消息如何处理见:RocketMQ之延时消费逻辑 。按照延时消息的处理逻辑,这条消息最终会被发送真实队列retry_topic中。哪有人又要问了,重试队列里的消息如何被消费者消费呢? 看下面


2

重试队列中的消息如何消费 ?
复制

因为消息在延时队列:SCHEDULE_TOPIC_XXX中到期后,最终还是会被送到重试队列中,所以我们就必须知道,重试队列中的消息是如何被消费的。

上面我讲到一点,broker在处理消费失败的时,会通过this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod()方法创建并且上报当前重试队列到nameserver。代码如下:

public TopicConfig createTopicInSendMessageBackMethod(//
        final String topic, //
        final int clientDefaultTopicQueueNums,//
        final int perm,//
        final int topicSysFlag)
 
{
    //如果Topic存在,直接放回
    TopicConfig topicConfig = this.topicConfigTable.get(topic);
    if (topicConfig != null)
        return topicConfig;

    boolean createNew = false;

    try {
        //否则加锁创建Topic
        if (this.lockTopicConfigTable.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
            try {
                topicConfig = this.topicConfigTable.get(topic);
                if (topicConfig != null)
                    return topicConfig;

                topicConfig = new TopicConfig(topic);
                topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
                topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
                topicConfig.setPerm(perm);
                topicConfig.setTopicSysFlag(topicSysFlag);

                log.info("create new topic {}", topicConfig);
                this.topicConfigTable.put(topic, topicConfig);
                createNew = true;
                this.dataVersion.nextVersion();
                this.persist();
            }
            finally {
                this.lockTopicConfigTable.unlock();
            }
        }
    }
    catch (InterruptedException e) {
        log.error("createTopicInSendMessageBackMethod exception", e);
    }
    //如果新建了Topic,把当前所有的Topic信息都上报一次到nameserver
    if (createNew) {
        this.brokerController.registerBrokerAll(false, true);
    }

    return topicConfig;
}

复制


把retry_topic(重试队列)注册到nameserver,消费者就有机会从nameserver获取到重试队列并消费,那消费者如何消费重试队列呢? 

在消费者启动时,指定DefaultMQPushConsumerImpl.start()方法,该方法主要是初始化一些消费者必须的对象,其中包括一个方法:this.copySubscription(),在该方法中,如果当前MessageModel为CLUSTING集群模式,则会把当前消费者的重试队列设置好。代码如下:

private void copySubscription() throws MQClientException {
try {
    Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
    if (sub != null) {
        for (final Map.Entry<String, String> entry : sub.entrySet()) {
            final String topic = entry.getKey();
            final String subString = entry.getValue();
            SubscriptionData subscriptionData =
                    FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                        topic, subString);
            //TODO subString ?
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        }
    }

    if (null == this.messageListenerInner) {
        this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        break;
    case CLUSTERING:
        //如果是集群模式,获取当前消费者group的重试队列
        final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
        //构建订阅信息
        SubscriptionData subscriptionData =
                FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                    retryTopic, SubscriptionData.SUB_ALL);
        //并放到RebalanceImpl的订阅信息ConcurrentHashMap中
        this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
        break;
    default:
        break;
    }
}
catch (Exception e) {
    throw new MQClientException("subscription exception", e);
}
}

复制


RebalanceImpl主要作用是对消费者做负载均衡,其它的不讲了,只讲重试队列是如何分配给消费者消费的,主要看:RebalanceImpl.doRebalance(),该方法对真实的Topic的消费做负载均衡,也对重试队列进行负载均衡,代码如下:

public void doRebalance() {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                //主要的逻辑都是在rebalanceByTopic()中实现的
                this.rebalanceByTopic(topic);
            } catch (Exception e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
    //做完rebalance后,检查是否有的queue已经不归自己负责消费,是的话就释放缓存message的queue
    this.truncateMessageQueueNotMyTopic();
}

复制

对于上述Map中的Topic订阅信息,每个Topic(包括重试队列)都要均匀的分配给当前consumerGroup下的消费者,这里注意:每个consumerGroup的重试队列在每个broker上只会有一个MessageQueue,用于接收所有的当前broker失败的消息,如下图所示:


  然后就正常消费了,通过前面三篇文章:RocketMQ之顺序消费逻辑 RocketMQ之延时消费逻辑 ,发现RocketMQ的真实Topic、重试队列、延时队列和死信队列来回穿插,巧妙之极~

用玉鑫的话就是:偷换topic是RocketMQ的常用伎俩~

文章转载自不修边幅的创客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论