问题描述:
1.RocketMQ集群(V4.9.1)部署方式为一主一从,并且是同步刷盘同步复制
2.当slave节点的文件系统readonly时,在master节点上执行benchmark下的producer直到Send TPS为0,此时以下程序再往集群中同步发送数据,producer端在等待设置的超时时间(超时时间设置从秒级到分钟级)后catch RemotingTooMuchRequestException,具体的报错信息如下:
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:690)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1398)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:349)复制
producer发送数据到集群的程序如下:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("Test",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, 30 * 1000);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}复制
原因分析:
首先我们需要清楚producer与broker之间的交互过程:
1.producer会发送消息给master
2.master会将消息追加到commitlog、刷盘、主从同步
当slave节点的文件系统处于readonly状态下,此时会影响主从同步数据,那么原因究竟是什么呢?这里需要回顾下master和slave之间数据同步复制的过程:在RocketMQ中实现主从同步复制的是GroupTransferService,GroupTransferService的作用是判断commitlog同步复制是否完成。在GroupTransferService中有两个队列分别是requestsWrite和requestsRead,在master处理producer的写请求过程的最后一步会提交GroupCommitRequest请求到requestsWrite,接着GroupTransferService会交换requestsWrite和requestsRead并遍历requestsRead中每个请求,然后判断push2SlaveMaxOffset是否大于等于请求的nextOffset(也就是master和slave之间数据同步是否完成,因为push2SlaveMaxOffset记录是所有slave节点中已完成的数据同步的最大偏移量),这里需要注意两点:
1.GroupTransferService是个单线程
2.在判断主从之间数据同步是否完成时最多会判断6次,其中最后5次判断是每一秒判断一次(syncFlushTimeout默认是5秒)
3.当在完成6次判断后还没有完成数据同步时返回给producer的sendStatus是FLUSH_SLAVE_TIMEOUT
现在我们来接着分析,当slave的文件系统处于readonly时,slave节点的刷盘会受到影响,所以此时GroupTransferService会对每个请求判断6次,也就是每个请求会花费5秒的时间,此时在master节点上执行benchmark下的producer脚本,短时间内会使GroupTransferService的队列中堆积大量的请求(这里请求的数量大概是8000+),当Send TPS为0时,此时再执行上面的示例程序,由于producer端设置同步发送的超时时间远远小于master端5秒*GroupCommitRequest数量,所以在producer端会触发超时,由于是同步发送,所以其发送次数最多是2次,在进行第二次发送时由于已经超时,所以会抛出RemotingTooMuchRequestException异常
关键代码展示:
1.producer端发送消息的sendDefaultImpl方法


2.master端GroupTransferService处理逻辑
private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead = new LinkedList<>();
}
}复制
解决方案:
在master端提交GroupCommitRequest前会进行一个判断,具体如下:
1.当前master节点上是否有slave连接
2.slave与master之间commitlog offset差距是否小于haSlaveFallbehindMax(默认是256M)
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
return request.future();
}
else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
public boolean isSlaveOK(final long masterPutWhere) {
boolean result = this.connectionCount.get() > 0;
result =
result
&& ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
.getMessageStoreConfig().getHaSlaveFallbehindMax());
return result;
}复制
当上述两个条件都满足的情况下才会提交GroupCommitRequest请求,所以可以通过在broker端适当调小haSlaveFallbehindMax的值,这样做有两个好处:
1.在slave的文件系统readonly时,GroupTransferService的队列中不会堆积大量的请求
2.producer端不会出现RemotingTooMuchRequestException异常而是会收到SLAVE_NOT_AVAILABLE的sendStatus
作者简介
孙玺,中国民生银行信息科技部开源软件支持组工程师,目前主要负责RocketMQ源码研究和工具开发等相关工作。