暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

rocketmq异步刷盘的少量消息丢失问题

原创 刘韬 云和恩墨 2022-11-15
2528

       rocket作为开源消息中间件的佼佼者,其性能及稳定性得到业界普遍的认可;rocketmq的数据安全设计一共可分为两种类型:同步刷盘、异步刷盘。同步刷盘,每一条消息的发送都确认写入磁盘才确保消息发送成功。异步刷盘,按照预先定义的逻辑,间隔时间进行写入磁盘,允许部分数据丢失。二种方式各有优缺点,同步刷盘做到了数据的绝对安全,但是牺牲了部分性能为代价,异步刷盘做到了数据的相对安全,同时保证了数据发送接收的效率。

测试场景(异步刷盘)

Mq消息存储的文件定义在 storePathRootDir 路径下,默认的存储的第一个文件为

$storePathRootDir/commitlog/00000000000000000000,默认大小为1g

每一条消息(消息体)在刷盘时都将写入这个文件

假定有一个消息生产者,每隔1秒生产一条消息,当发送了n条消息后,假定rocketmq 发生了crash,此时检查消息的完整性

消息发送参考官网的例子

可以同步发送,也可以异步发送

public class AsyncProducer {

  public static void main(String[] args) throws Exception {

    // Initialize a producer and set the Producer group name

    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

    // Set the address of NameServer

    producer.setNamesrvAddr("localhost:9876");

    // Start Producer

    producer.start();

    producer.setRetryTimesWhenSendAsyncFailed(0);

    for (int i = 0; i < 100; i++) {

      final int index = i;

      // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.

      Message msg = new Message("TopicTest",

        "TagA",

        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

      // Send a message asynchronously, the result is returned to the client by callback

      producer.send(msg, new SendCallback() {

        @Override

        public void onSuccess(SendResult sendResult) {

          System.out.printf("%-10d OK %s %n", index,

            sendResult.getMsgId());

        }

        @Override

        public void onException(Throwable e) {

          System.out.printf("%-10d Exception %s %n", index, e);

          e.printStackTrace();

        }

      });

    }

    // Close the producer once it is no longer in use

    producer.shutdown();

  }

}

 

可以看到一共发送了9条消息

/home/rocketmq/4.7.1/bin/mqadmin topicStatus -n 192.168.40.132:9876 -t TopicTest

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).

RocketMQLog:WARN Please initialize the logger system properly.

#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated

broker-a                          0     0                     2                       2022-xx-xx 17:34:47,897

broker-a                          1     0                     3                       2022-xx-xx 17:34:48,895

broker-a                          2     0                     2                       2022-xx-xx 17:34:45,893

broker-a                          3     0                     2                       2022-xx-xx 17:34:46,893

 

但这9条消息只有第一条的消息是写入了磁盘,其余消息内容都是丢失的

$ sh bin/mqadmin queryMsgByUniqueKey -i C0A801072CE8659E0BFD46B845BC0000  -t TopicTest -n 192.168.40.132:9876

sh bin/mqadmin queryMsgByUniqueKey -i C0A801072CE8659E0BFD46B849AC0001  -t TopicTest -n 192.168.40.132:9876

sh bin/mqadmin queryMsgByUniqueKey -i C0A801072CE8659E0BFD46B84D950002  -t TopicTest -n 192.168.40.132:9876

sh bin/mqadmin queryMsgByUniqueKey -i C0A801072CE8659E0BFD46B8517D0003  -t TopicTest -n 192.168.40.132:9876RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).

RocketMQLog:WARN Please initialize the logger system properly.

 

Topic:               TopicTest

Tags:                [TagA]

Keys:                [null]

Queue ID:            1

Queue Offset:        0

CommitLog Offset:    0

Reconsume Times:     0

Born Timestamp:      2022-xx-xx 17:34:41,596

Store Timestamp:     2022-xx-xx 17:34:40,899

Born Host:           192.168.40.1:52415

Store Host:          192.168.40.132:10911

System Flag:         0

Properties:          {UNIQ_KEY=C0A801072CE8659E0BFD46B845BC0000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}

Message Body Path:   /tmp/rocketmq/msgbodys/C0A801072CE8659E0BFD46B845BC0000

 

sh bin/mqadmin queryMsgByUniqueKey -i C0A801072CE8659E0BFD46B8517D0003  -t TopicTest -n 192.168.40.132:9876

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).

RocketMQLog:WARN Please initialize the logger system properly.

org.apache.rocketmq.tools.command.SubCommandException: QueryMsgByUniqueKeySubCommand command failed

        at org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand.execute(QueryMsgByUniqueKeySubCommand.java:220)

        at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:139)

        at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:90)

Caused by: org.apache.rocketmq.client.exception.MQClientException: CODE: 208  DESC: query message by key finished, but no message.

For more information, please visit the url, http://rocketmq.apache.org/docs/faq/

        at org.apache.rocketmq.client.impl.MQAdminImpl.queryMessage(MQAdminImpl.java:435)

        at org.apache.rocketmq.client.impl.MQAdminImpl.queryMessageByUniqKey(MQAdminImpl.java:280)

        at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.viewMessage(DefaultMQAdminExtImpl.java:324)

        at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.viewMessage(DefaultMQAdminExt.java:497)

        at org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand.queryById(QueryMsgByUniqueKeySubCommand.java:63)

        at org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand.execute(QueryMsgByUniqueKeySubCommand.java:217)

        ... 2 more

 

分析过程

 

当收到第一条消息后,触发了mq的写盘动作,此后每隔flushIntervalCommitLog 默认500毫秒进行一次刷盘(而且保证存储在内存的消息大小达到最少flushCommitLogLeastPages 默认16k),间隔一秒后的消息由于并没有达到16k,所以没有触发写盘,如果到了 flushCommitLogThoroughInterval(间隔多久未刷盘,会强制刷盘) 默认10秒,会统一一次刷盘。

至此大概消息丢弃的原因就明显了,在第一次刷盘后,中间如果发送的消息大小没有超过16k,在这10秒时间内(没到10秒)发生了crash事件,那么此部分消息将丢失。

选择合适的架构

在极端情况下,允许少量消息丢失,同时兼顾mq的性能,建议采用异步刷盘方式。

消息不容许丢失,哪怕就在非常短暂时间内,可能就在几秒内丢失数据都不可容忍,那么建议采用同步刷盘,性能会有一定的损耗。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论