暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMq源码系列九-消息重试和过滤

易林的博客 2019-10-22
367

在上一篇文章中,我们对整个消息消费的流程简单分析了下,这篇我们主要来分析消息重试和过滤,在分析之前我们先思考两个问题

1.当我们消息消费失败后,rocketMq是怎么做的呢?

2.消息过滤是发生在broker端还是cosumer端?

1.消息重试

1.1.消息消费的一个简单示例

     DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
//设置订阅关系
consumer.subscribe("TopicTest", "tagA");
//设置消费位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
//设置消费时间
consumer.setConsumeTimestamp("20181109221800");
注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息消费者
consumer.start();

复制

上面就是我们平时在用consumer时,很简单的一段代码,我们现在跟进去其中的start方法

  public synchronized void start() throws MQClientException {
switch (this.serviceState) {
//consume刚刚启动
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
//检查配置
this.checkConfig();
//复制订阅关系
this.copySubscription();
....................省略了部分代码


private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
//构造订阅关系 data
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
//把订阅关系缓存在reblance维护的map
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}

if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
//集群模式
case CLUSTERING:
//由相同的组,创建一个重试topic
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
//构造订阅关系 data
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
//把重试订阅关系缓存在reblance维护的map
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

复制

这个地方主要就是,我们在启动consumer时,会在reblance维护的map中保存两个主题的订阅关系,一个是自己订阅的主题,另外一个是用于消息重试的,这些订阅关系最终会通过心跳的方式发送到所有的broker,至于reblance到底是什么,请大家看下上篇的文章,简单来说就是reblance时会根据topic为维度,每20秒,构造pullRequest,用于拉取消息,到此我们就产生两个topic了,只是retry topic我们还没有消息而已

关于心跳大家可以看下这个类MQClientInstance.startScheduledTask

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

复制

上面是向集群内所有 broker 发送订阅心跳信息的定时任务,发现会给集群中的每个 broker 都发送自己的 HeartbeatData,HeartbeatData 即是每个客户端的心跳数据,它包含了如下数据:

// 客户端ID
private String clientID;
// 生产者信息
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
// 消费者信息 ConsumerData :
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();

复制
public class ConsumerData {
private String groupName;
private ConsumeType consumeType;
private MessageModel messageModel;
private ConsumeFromWhere consumeFromWhere;
//订阅关系
private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();
private boolean unitMode;

复制

1.2.消费失败consumer的处理

ConsumeMessageConcurrentlyService.processConsumeResult

 public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();

if (consumeRequest.getMsgs().isEmpty())
return;

switch (status) {
//消息成功
case CONSUME_SUCCESS:
//设置ackIndex
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
//消费失败
case RECONSUME_LATER:

ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {

case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
//获取到失败的那条消息
MessageExt msg = consumeRequest.getMsgs().get(i);
//重新发送
boolean result = this.sendMessageBack(msg, context);
if (!result) {
//设置重新消费的次数
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

//删除红黑树的缓存消息
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//不管消费成功还是失败,都更新消费进度
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}


复制

上面就是当我们消费失败后,消费端的处理,消费端,会找到那条失败的消息进行重新发送,接下来我们继续跟进去,看消费发送是否和之前的分析过的普通的消息发送是否有不同

    public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
//获取延迟等级(默认就是0
int delayLevel = context.getDelayLevelWhenNextConsume();

// Wrap topic with namespace before sending back message.
//设置topic
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
//消息发送
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}

return false;
}

复制

上面我们我们关注一点,delayLevel rocketMq处理消息重试的策略,我们简单来看下,rocketMq的延迟策略

这个图,我们等等在分析,现在有个印象就行,我们继续跟进去

   public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
//获取broker的地址
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
//消息发送
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
//发送异常作为普通的消息发送

log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}

复制

上面过程比较简单,我们继续跟进去,看rocketMq的broker到底是怎么处理这类消息的呢

1.3.消费失败broker的处理

SendMessageProcessor.processRequest

  public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request)
throws RemotingCommandException
{
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);

复制

上面过程比较简单,我们直接跟进去看

    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
//解码
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
//获取nameSpace
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());

//一些hook
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {

ConsumeMessageContext context = new ConsumeMessageContext();
context.setNamespace(namespace);
context.setConsumerGroup(requestHeader.getGroup());
context.setTopic(requestHeader.getOriginTopic());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));

this.executeConsumeMessageHookAfter(context);
}

//以组为单位,获取topic的路由配置信息
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());

//找不到就告诉路由关系不存在
//这个地方也是一个重要点,如果我们同一个消费者组的订阅关系不一致时,会出现覆盖的情况,这样会导致某个consumer在拉取消息时,发现本来是要拉取topicA的消息的,结果broker保存的是topicB的消息,就会报下面的这个错
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
//是否可写
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response;
}
//重试队列的个数
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
//retry topic
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
//队列id
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}

//topic 的配置信息
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
//是否可写
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return response;
}
//根据偏移量获取之前的那条处理失败的消息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return response;
}

final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
//保存一些熟悉
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);

//默认是0
int delayLevel = requestHeader.getDelayLevel();
//重试次数
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
//重试多次后会进入死信队列
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
} else {
if (0 == delayLevel) {
//delayLevel=3
delayLevel = 3 + msgExt.getReconsumeTimes();
}

msgExt.setDelayTimeLevel(delayLevel);
}

//重新构造了消息对象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
//retry topic
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

//持久化
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}

this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);

response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);

return response;
default:
break;
}

response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
return response;
}
//返回响应
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
}

复制

我们简单总结下,首先是会去活动topic的订阅关系,如果消息重试次数达到最大值了,此时会进入死信队列,否则,按照偏移量获取我们消费失败的message对象,重新创建一个新的message对象,并复制一些属性从旧的message对象中,此时消息的topic是retry topic ,接下来就是持久化,返回响应

我们继续分析,messagerStore到底是怎么处理这类消息的呢,和普通的消息处理是一样的嘛

CommitLog.putMessage.

 public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;

StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

String topic = msg.getTopic();
int queueId = msg.getQueueId();

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
||
tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery 用来处理重试消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//重新设置topicSCHEDULE_TOPIC
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//根据delayLev获取queue 我们之前3 ,跟进去看,我们就知道queue2
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId
//备份retry topic
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
//备份队列id
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
重新设置topicschedule topic
msg.setTopic(topic);
//队列id2
msg.setQueueId(queueId);
}
}

//省略了一些代码,大家可以自行看之前的消息发送篇


复制

消息在真正put之后,会建立 consumeQueue 和index

到这里我们的重试消息发送到了一个schedule 的topic,那么这个topic中的消息是怎么处理的呢

1.4. schedule topic的处理

入口:ScheduleMessageService.executeOnTimeup 是一个定时任务

 public void executeOnTimeup() {

//根据topic 和队列id获取到对应的consumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));

long failScheduleOffset = offset;

if (cq != null) {
//获取到对应的缓存空间
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//CQ_STORE_UNIT_SIZE(默认值 20consumeQueue的每个单位恰好是20个长度
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//物理偏移
long offsetPy = bufferCQ.getByteBuffer().getLong();
//size
int sizePy = bufferCQ.getByteBuffer().getInt();
//hashCode
long tagsCode = bufferCQ.getByteBuffer().getLong();

//一些额外的扩展,我们没有,不管
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);

//获取到这条消息的存储时间
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);

//tagCode 此时是真正要执行的时间
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
//当前时间
long now = System.currentTimeMillis();

//做一些时间校正
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
//下一次的偏移量
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

//是否已经到达执行时间
long countdown = deliverTimestamp - now;

//已经到达执行时间了
if (countdown <= 0) {
//根据偏移量获取到message对象,这个是新的message对象
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);

if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

//消息写入 ,到这里该消息就可以重新被消费了
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);

//写入成功
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());

//写入失败,会等一段时间再次遍历该队列
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);

//更新消费进度
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me



*/

log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
//时间不够 等到时间够了,再遍历
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for

//根本上没有消息,就过一下在遍历该队列
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {

bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {

long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}

复制

上面的过程大家对着注释看下,不是特别复杂,其实主要就是根据consumeQueue 取队列中的数据,然后找到对应的message,然后在用message的存储时间+delay Level的时间作为消息要处理的时间,和当前时间做对比,如果当前时间大于这个时间,那走消息正常写入过程,该消息会被消费,如果时间不满足,则过一段时间在次偏移该消息,同时都会更新消费进度

到这里我们就分析完了,整个重试消息的过程,我们拿一张图总结下

2.消息过滤

rocketMq提供了几种过滤方式,我们本次说的是tag的过滤

我们在说tag的过滤时,就不得不说订阅关系的创建,因为tag是放在我们的订阅关系中的

FilterAPI

// subString 就是tag
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);

if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
//tag做拆分
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
//在订阅关系的tagsSet中,保存tag的原值
subscriptionData.getTagsSet().add(trimString);
//codeSet中保存taghash
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}

return subscriptionData;
}

复制

这个地方就是我们tag的创建地方,比较清晰,接下来我们来看看,rocketMq到底是怎么处理消息过滤的呢

2.1.filter in broker

入口:DefaultMessageStore.getMessage

在获取消息时,有这么一段代码

                    //consumeQueue的过滤
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}

continue;
}
//获取消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}

nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}

//commitLog的过滤
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}


复制

我们这里只分析consumeQueue的过滤,我们看其过滤方法

DefaultMessageFilter

@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {
return true;
}

if (subscriptionData.isClassFilterMode()) {
return true;
}

return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
|| subscriptionData.getCodeSet().contains(tagsCode.intValue());
}

复制

上面方法是不是特别熟悉,就是去比较了我们的hashCode

2.2.filter in consumer

DefaultMQPushConsumerImpl.pullMessage

我们在消息拉取成功后的监听,其中有一段代码


PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//处理消息拉松结果
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);


复制

我们进入分析下

 public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {

PullResultExt pullResultExt = (PullResultExt) pullResult;
//更新是从哪个broker拉取的
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
//做消息解码
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
//循环遍历
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
//做消息过滤 此时做的是原值的过滤,
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
//一些hook
if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
}

for (MessageExt msg : msgListFilterAgain) {
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
//存储一些属性值
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
}

pullResultExt.setMsgFoundList(msgListFilterAgain);
}

pullResultExt.setMessageBinary(null);

return pullResult;
}

复制

这个地方我们对消息解码后,循环遍历消息,然后对tag的原值进行消息过滤,最后更新一些属性

消息过滤之所以在两边做,是为了保证consumeQueue定长,我们可以像访问数组那样,去获取对应的consumeQueue,然后根据consumeQueue的一些属性值(其实就是索引),我么可以方便的获取对于的commitLog

3总结

到这个地方,我想大家对于文章中开始提的两个问题都有了自己的思考,对于文中有错误的点,欢迎大家指出,谢谢大家

备注:

参考了费红建老师的相关博客,附上相关简书地址

https://www.jianshu.com/u/81e2cd2747f1


文章转载自易林的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论