1
消息消费失败了? 复制
消费者在消费普通消息时,默认采用的是:ConsumeMessageConcurrentlyService并发消费,其它的逻辑我们不讲了,只看消费实例线程在消费MessageQueue的消息结束后的处理逻辑,可以查看ConsumeMessageConcurrentlyService的下子类:ConsumeRequest, 它就是并发线程要处理的任务,我们只看消费结果的处理:
@Override
public void run() {
//消息消费的状态
ConsumeConcurrentlyStatus status = null;
...
try {
//注意这里:提前设置retry_topic,如果消息失败,要把消息发到broker重试
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
//真正的消费逻辑
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
}
catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",//
RemotingHelper.exceptionSimpleDesc(e),//
ConsumeMessageConcurrentlyService.this.consumerGroup,//
msgs,//
messageQueue);
}
....
if (!processQueue.isDropped()) {
//根据消费状态处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
else {
log.warn(
"processQueue is dropped without process consume result. messageQueue={}, msgTreeMap={}, msgs={}",
new Object[] { messageQueue, processQueue.getMsgTreeMap(), msgs });
}
}复制
注意上面代码中,有个设置retry_topic的逻辑,因为消息有消费失败的可能,所以提前把msg的retry_topic设置到消息体中(其中:retry_topic是用一个固定前缀和当前conumerGroup构成:“%RETRY%+consumerGroup”的重试队列),如果失败消息会投递到当前重试队列中(一个重试队列在一个broker中只有一个queue),代码如下:
public void resetRetryTopic(final List<MessageExt> msgs) {
//获取当前消费者group的重试队列
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
//重置retry topic到消息体
msg.setTopic(retryTopic);
}
}
}复制
processConsumeResult方法对消费结果进行处理,代码如下:
public void processConsumeResult(//
final ConsumeConcurrentlyStatus status, //
final ConsumeConcurrentlyContext context, //
final ConsumeRequest consumeRequest//
) {
//ackIndex记录被应答的消息数,表示成功消费的
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
//消费状态判断
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
//消费成功/失败数量统计
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup,
consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup,
consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup,
consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
//除去OK的,剩下的就是失败的,需要重新消费
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//消息重新发送到broker,当前消费失败的消息需要重新消费,
boolean result = this.sendMessageBack(msg, context);
if (!result) {
//这里会记录重试次数,如果超过16次,消息会被放进死信队列
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue());
}
break;
default:
break;
}
//消费完毕后从ProcessQueue中清除这批消息
//offset为清除这批偏移量后processQueue.msgTreeMap中最小的偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),
offset, true);
}
}复制
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();
try {
//请求broker,消费失败的消息需要重新消费
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue()
.getBrokerName());
return true;
}
catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(),
e);
}
return false;
}复制
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext = null;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
//处理消费失败的消息
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader==null) {
return null;
}
// 消息轨迹:记录到达 broker 的消息
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
// 消息轨迹:记录发送成功的消息
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}复制
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
...
//获取重试队列,%RETRY%+consumerGroup
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt =
Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
// 检查topic是否存在,不存在则创建,并把当前重试队列注册到broker
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
newTopic,//重试队列
subscriptionGroupConfig.getRetryQueueNums(), //重试队列的MessageQueue,默认是1
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
...
// 构造消息
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
// 客户端自动决定定时级别
int delayLevel = requestHeader.getDelayLevel();
// 死信消息处理,消息的每次重试都会被记录,当重试次数达到最大阈值(16)时,不在重试,把消息丢到死信队列中
if (msgExt.getReconsumeTimes() >= subscriptionGroupConfig.getRetryMaxTimes()//
|| delayLevel < 0) {
//获取死信队列
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
//创建死信队列并上报
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic, //
DLQ_NUMS_PER_GROUP,//
PermName.PERM_WRITE, 0 // 死信消息不需要同步,不需要较正。
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
}
// 继续重试
else {
//如果消息是第一次重试,则直接从级别3开始 (延迟10s)
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
//消息体中设置延迟级别,注意这里只要设置延迟级别,就要表示是延迟消息
msgExt.setDelayTimeLevel(delayLevel);
}
//重新构建重试消息,注意这里的消息的Topic是重试队列,并且是延时消息
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
...
}复制
程序走到这里,接下来该如何处理呢 ? 因为消息中被设置了延迟级别,所以就是延时消息,只是这里的真实队列是retry_topic(重试队列)。延时消息如何处理见:RocketMQ之延时消费逻辑 。按照延时消息的处理逻辑,这条消息最终会被发送真实队列retry_topic中。哪有人又要问了,重试队列里的消息如何被消费者消费呢? 看下面
2
重试队列中的消息如何消费 ? 复制
因为消息在延时队列:SCHEDULE_TOPIC_XXX中到期后,最终还是会被送到重试队列中,所以我们就必须知道,重试队列中的消息是如何被消费的。
上面我讲到一点,broker在处理消费失败的时,会通过this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod()方法创建并且上报当前重试队列到nameserver。代码如下:
public TopicConfig createTopicInSendMessageBackMethod(//
final String topic, //
final int clientDefaultTopicQueueNums,//
final int perm,//
final int topicSysFlag) {
//如果Topic存在,直接放回
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
boolean createNew = false;
try {
//否则加锁创建Topic
if (this.lockTopicConfigTable.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
log.info("create new topic {}", topicConfig);
this.topicConfigTable.put(topic, topicConfig);
createNew = true;
this.dataVersion.nextVersion();
this.persist();
}
finally {
this.lockTopicConfigTable.unlock();
}
}
}
catch (InterruptedException e) {
log.error("createTopicInSendMessageBackMethod exception", e);
}
//如果新建了Topic,把当前所有的Topic信息都上报一次到nameserver
if (createNew) {
this.brokerController.registerBrokerAll(false, true);
}
return topicConfig;
}复制
把retry_topic(重试队列)注册到nameserver,消费者就有机会从nameserver获取到重试队列并消费,那消费者如何消费重试队列呢?
在消费者启动时,指定DefaultMQPushConsumerImpl.start()方法,该方法主要是初始化一些消费者必须的对象,其中包括一个方法:this.copySubscription(),在该方法中,如果当前MessageModel为CLUSTING集群模式,则会把当前消费者的重试队列设置好。代码如下:
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, subString);
//TODO subString ?
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
//如果是集群模式,获取当前消费者group的重试队列
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
//构建订阅信息
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
retryTopic, SubscriptionData.SUB_ALL);
//并放到RebalanceImpl的订阅信息ConcurrentHashMap中
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}复制
RebalanceImpl主要作用是对消费者做负载均衡,其它的不讲了,只讲重试队列是如何分配给消费者消费的,主要看:RebalanceImpl.doRebalance(),该方法对真实的Topic的消费做负载均衡,也对重试队列进行负载均衡,代码如下:
public void doRebalance() {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//主要的逻辑都是在rebalanceByTopic()中实现的
this.rebalanceByTopic(topic);
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//做完rebalance后,检查是否有的queue已经不归自己负责消费,是的话就释放缓存message的queue
this.truncateMessageQueueNotMyTopic();
}复制
对于上述Map中的Topic订阅信息,每个Topic(包括重试队列)都要均匀的分配给当前consumerGroup下的消费者,这里注意:每个consumerGroup的重试队列在每个broker上只会有一个MessageQueue,用于接收所有的当前broker失败的消息,如下图所示:
然后就正常消费了,通过前面三篇文章:RocketMQ之顺序消费逻辑 RocketMQ之延时消费逻辑 ,发现RocketMQ的真实Topic、重试队列、延时队列和死信队列来回穿插,巧妙之极~
用玉鑫的话就是:偷换topic是RocketMQ的常用伎俩~