rocket作为开源消息中间件的佼佼者,其性能及稳定性得到业界普遍的认可;rocketmq的数据安全设计一共可分为两种类型:同步刷盘、异步刷盘。同步刷盘,每一条消息的发送都确认写入磁盘才确保消息发送成功。异步刷盘,按照预先定义的逻辑,间隔时间进行写入磁盘,允许部分数据丢失。二种方式各有优缺点,同步刷盘做到了数据的绝对安全,但是牺牲了部分性能为代价,异步刷盘做到了数据的相对安全,同时保证了数据发送接收的效率。
测试场景(异步刷盘)
Mq消息存储的文件定义在 storePathRootDir 路径下,默认的存储的第一个文件为
$storePathRootDir/commitlog/00000000000000000000,默认大小为1g
每一条消息(消息体)在刷盘时都将写入这个文件
假定有一个消息生产者,每隔1秒生产一条消息,当发送了n条消息后,假定rocketmq 发生了crash,此时检查消息的完整性
消息发送参考官网的例子 可以同步发送,也可以异步发送 public class AsyncProducer { public static void main(String[] args) throws Exception { // Initialize a producer and set the Producer group name DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Set the address of NameServer producer.setNamesrvAddr("localhost:9876"); // Start Producer producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 100; i++) { final int index = i; // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side. Message msg = new Message("TopicTest", "TagA", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // Send a message asynchronously, the result is returned to the client by callback producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } // Close the producer once it is no longer in use producer.shutdown(); } } |
可以看到一共发送了9条消息 /home/rocketmq/4.7.1/bin/mqadmin topicStatus -n 192.168.40.132:9876 -t TopicTest RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. #Broker Name #QID #Min Offset #Max Offset #Last Updated broker-a 0 0 2 2022-xx-xx 17:34:47,897 broker-a 1 0 3 2022-xx-xx 17:34:48,895 broker-a 2 0 2 2022-xx-xx 17:34:45,893 broker-a 3 0 2 2022-xx-xx 17:34:46,893 |
但这9条消息只有第一条的消息是写入了磁盘,其余消息内容都是丢失的 $ sh bin/mqadmin queryMsgByUniqueKey -i C0A801072CE8659E0BFD46B845BC0000 -t TopicTest -n 192.168.40.132:9876 sh bin/mqadmin queryMsgByUniqueKey -i C0A801072CE8659E0BFD46B849AC0001 -t TopicTest -n 192.168.40.132:9876 sh bin/mqadmin queryMsgByUniqueKey -i C0A801072CE8659E0BFD46B84D950002 -t TopicTest -n 192.168.40.132:9876 sh bin/mqadmin queryMsgByUniqueKey -i C0A801072CE8659E0BFD46B8517D0003 -t TopicTest -n 192.168.40.132:9876RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. Topic: TopicTest Tags: [TagA] Keys: [null] Queue ID: 1 Queue Offset: 0 CommitLog Offset: 0 Reconsume Times: 0 Born Timestamp: 2022-xx-xx 17:34:41,596 Store Timestamp: 2022-xx-xx 17:34:40,899 Born Host: 192.168.40.1:52415 Store Host: 192.168.40.132:10911 System Flag: 0 Properties: {UNIQ_KEY=C0A801072CE8659E0BFD46B845BC0000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA} Message Body Path: /tmp/rocketmq/msgbodys/C0A801072CE8659E0BFD46B845BC0000 sh bin/mqadmin queryMsgByUniqueKey -i C0A801072CE8659E0BFD46B8517D0003 -t TopicTest -n 192.168.40.132:9876 RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. org.apache.rocketmq.tools.command.SubCommandException: QueryMsgByUniqueKeySubCommand command failed at org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand.execute(QueryMsgByUniqueKeySubCommand.java:220) at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:139) at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:90) Caused by: org.apache.rocketmq.client.exception.MQClientException: CODE: 208 DESC: query message by key finished, but no message. For more information, please visit the url, http://rocketmq.apache.org/docs/faq/ at org.apache.rocketmq.client.impl.MQAdminImpl.queryMessage(MQAdminImpl.java:435) at org.apache.rocketmq.client.impl.MQAdminImpl.queryMessageByUniqKey(MQAdminImpl.java:280) at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.viewMessage(DefaultMQAdminExtImpl.java:324) at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.viewMessage(DefaultMQAdminExt.java:497) at org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand.queryById(QueryMsgByUniqueKeySubCommand.java:63) at org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand.execute(QueryMsgByUniqueKeySubCommand.java:217) ... 2 more |
分析过程
当收到第一条消息后,触发了mq的写盘动作,此后每隔flushIntervalCommitLog 默认500毫秒进行一次刷盘(而且保证存储在内存的消息大小达到最少flushCommitLogLeastPages 默认16k),间隔一秒后的消息由于并没有达到16k,所以没有触发写盘,如果到了 flushCommitLogThoroughInterval(间隔多久未刷盘,会强制刷盘) 默认10秒,会统一一次刷盘。
至此大概消息丢弃的原因就明显了,在第一次刷盘后,中间如果发送的消息大小没有超过16k,在这10秒时间内(没到10秒)发生了crash事件,那么此部分消息将丢失。
选择合适的架构
在极端情况下,允许少量消息丢失,同时兼顾mq的性能,建议采用异步刷盘方式。
消息不容许丢失,哪怕就在非常短暂时间内,可能就在几秒内丢失数据都不可容忍,那么建议采用同步刷盘,性能会有一定的损耗。