暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ HA模式slave节点文件系统readonly时,producer端catch exception

民生运维人 2022-05-28
918

问题描述:

1.RocketMQ集群(V4.9.1)部署方式为一主一从,并且是同步刷盘同步复制

2.当slave节点的文件系统readonly时,在master节点上执行benchmark下的producer直到Send TPS为0,此时以下程序再往集群中同步发送数据,producer端在等待设置的超时时间(超时时间设置从秒级到分钟级)后catch RemotingTooMuchRequestException,具体的报错信息如下:

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
        at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:690)
        at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1398)
        at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:349)

复制

producer发送数据到集群的程序如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("Test",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, 30 * 1000);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

复制

原因分析:

首先我们需要清楚producer与broker之间的交互过程:

1.producer会发送消息给master

2.master会将消息追加到commitlog、刷盘、主从同步

当slave节点的文件系统处于readonly状态下,此时会影响主从同步数据,那么原因究竟是什么呢?这里需要回顾下master和slave之间数据同步复制的过程:在RocketMQ中实现主从同步复制的是GroupTransferService,GroupTransferService的作用是判断commitlog同步复制是否完成。在GroupTransferService中有两个队列分别是requestsWrite和requestsRead,在master处理producer的写请求过程的最后一步会提交GroupCommitRequest请求到requestsWrite,接着GroupTransferService会交换requestsWrite和requestsRead并遍历requestsRead中每个请求,然后判断push2SlaveMaxOffset是否大于等于请求的nextOffset(也就是master和slave之间数据同步是否完成,因为push2SlaveMaxOffset记录是所有slave节点中已完成的数据同步的最大偏移量),这里需要注意两点:

1.GroupTransferService是个单线程

2.在判断主从之间数据同步是否完成时最多会判断6次,其中最后5次判断是每一秒判断一次(syncFlushTimeout默认是5秒)

3.当在完成6次判断后还没有完成数据同步时返回给producer的sendStatus是FLUSH_SLAVE_TIMEOUT

现在我们来接着分析,当slave的文件系统处于readonly时,slave节点的刷盘会受到影响,所以此时GroupTransferService会对每个请求判断6次,也就是每个请求会花费5秒的时间,此时在master节点上执行benchmark下的producer脚本,短时间内会使GroupTransferService的队列中堆积大量的请求(这里请求的数量大概是8000+),当Send TPS为0时,此时再执行上面的示例程序,由于producer端设置同步发送的超时时间远远小于master端5秒*GroupCommitRequest数量,所以在producer端会触发超时,由于是同步发送,所以其发送次数最多是2次,在进行第二次发送时由于已经超时,所以会抛出RemotingTooMuchRequestException异常

关键代码展示: 

1.producer端发送消息的sendDefaultImpl方法

2.master端GroupTransferService处理逻辑

private void doWaitTransfer() {
            if (!this.requestsRead.isEmpty()) {
                for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                    boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                    long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
                            + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
                    while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
                        this.notifyTransferObject.waitForRunning(1000);
                        transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                    }

                    if (!transferOK) {
                        log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                    }

                    req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                }

                this.requestsRead = new LinkedList<>();
            }
        }

复制

解决方案:

在master端提交GroupCommitRequest前会进行一个判断,具体如下:

1.当前master节点上是否有slave连接

2.slave与master之间commitlog offset差距是否小于haSlaveFallbehindMax(默认是256M)

public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                            this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    return request.future();
                }
                else {
                    return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }

public boolean isSlaveOK(final long masterPutWhere) {
        boolean result = this.connectionCount.get() > 0;
        result =
            result
                && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
                .getMessageStoreConfig().getHaSlaveFallbehindMax());
        return result;
    }

复制

当上述两个条件都满足的情况下才会提交GroupCommitRequest请求,所以可以通过在broker端适当调小haSlaveFallbehindMax的值,这样做有两个好处:

1.在slave的文件系统readonly时,GroupTransferService的队列中不会堆积大量的请求

2.producer端不会出现RemotingTooMuchRequestException异常而是会收到SLAVE_NOT_AVAILABLE的sendStatus

作者简介

孙玺,中国民生银行信息科技部开源软件支持组工程师,目前主要负责RocketMQ源码研究和工具开发等相关工作。

文章转载自民生运维人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论