
【作者】孙玺,中国民生银行信息科技部开源软件支持组工程师,目前主要负责RocketMQ源码研究和工具开发等相关工作。
RocketMQ是阿里巴巴开源的分布式消息中间件,它具有低延迟、高性能、高可靠性、万亿级容量和灵活的扩展性。本篇文章介绍了其存储文件和存储整体架构,并从源码角度分析了消息写入流程以及消息刷盘。
1.RocketMQ存储文件
Rocketmq存储路径为${ROCKET_HOME}/store,主要存储以下文件:
commitlog
消息存储目录
consumequeue
消息消费队列存储目录
index
消息索引文件存储目录
checkpoint
文件检查点,存储commitlog、consumequeue和index文件最后一次刷盘时间戳
abort
如果abort文件存储则表示broker非正常关闭,否则表示broker正常关闭。该文件是在broker启动的过程中创建的。
config
broker运行期间一些配置信息,主要包含以下信息:
consumerFilter.json
该文件保存的是每个topic中消息的过滤逻辑
consumerOffset.json
该文件保存的是每个consumer group的消费进度
delayOffset.json
该文件保存的是延迟消息队列拉取进展
subscriptionGroup.json
该文件保存的是每个消费者的订阅信息
topics.json
该文件保存的是topic的配置信息
2.RocketMQ消息存储整体架构
消息存储架构图中有三个与消息存储相关的文件,分别是commitlog、consumequeue和index。RocketMQ通过使用内存映射文件来提高IO访问性能,无论是commitlog、consumequeue还是index,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件。commitlog和consumequeue的文件名称是该文件第一条消息对应的全局物理偏移量,index的文件名称是以创建文件的时间戳命名。
在RocketMQ中所有topic的消息都存储在同一个文件中,这样就确保了发送时顺序写文件及消息发送的高性能和高吞吐量。但是RocketMQ是基于topic的消息订阅机制,这样便给消息消费以及消息检索带来了极大的不便。为了提高consumer消费消息的效率,RocketMQ引入了consumequeue,consumequeue文件组织方式是${ROCKET_HOME}/store/consumequeue/topic名称/queueid/,它记录的是消息的commitlog offset、消息大小和tag hashcode。为了提高消息检索的功能,RocketMQ中引入了index文件,其hash冲突设计理念借鉴了Java中HashMap的结构。index文件包含三个部分:IndexHeader、Hash槽和Index条目,其中IndexHeader记录了index中包含消息的最大及最小存储时间、最大及最小物理偏移量、hashSlot个数、index条目列表当前已使用的个数,Index条目记录的是消息key的hashcode、消息的commitlog offset、消息与第一条消息的时间戳差值及该条目的前一条目的index索引。(注意:根据key hashcode定位hash槽可能会引发hash冲突,index文件为了解决hash冲突其解决方法是每个hash槽存储的是落在这个槽的hashcode最新的index的索引,新的index条目的最后四个字节存储该槽上一个条目的index的下标。)
消息存储架构图可以简化为以下流程:
producer发送消息到broker
broker采用同步或者异步方式将消息刷盘持久化
broker的master和slave之间数据同步
broker后台服务线程ReputMessageService分发请求构建consumequeue和index文件
本篇文章我们一起先来看下消息的写入流程。
3.MappedFile与MappedFileQueue
在RocketMQ中使用MappedFile和MappedFileQueue来封装存储文件,MappedFile是RocketMQ内存映射文件的具体实现,MappedFileQueue是MappedFile的管理容器,MappedFileQueue是对存储目录的封装。下图可以表示出两者的关系:
MappedFile重要属性如下所示:
MappedFileQueue重要属性如下所示:
4.消息写入
4.1 消息写入流程
消息写入的整体流程如下图所示:
Commitlog#putMessage流程如下图所示:
下面我们详细分析写入流程中几个比较重要的方法:
getLastMappedFile(final long startOffset, boolean needCreate)
appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb)及doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner)
4.2 获取最新的mappedFile
获取最新的mappedFile的方法是getLastMappedFile(final long startOffset, boolean needCreate),其实现逻辑如下:
在MappedFileQueue中使用CopyOnWriteArrayList<MappedFile> mappedFiles记录了mappedFile的集合,在写入数据时我们总是在最新的mappedFile中写入数据,所以首先从mappedFiles中获取最后一个mappedFile
最新的mappedFile为空,这种情况下计算待创建的mappedFile的起始offset。mappedFile为空的场景是第一次使用broker
最新的mappedFile不为空并且已经写满了,这样情况下也需要计算待创建的mappedFile的起始offset,计算方法是最新mappedFile的初始偏移量与每个mappedFile大小的和
如果待创建的mappedFile的offset不为-1并且needCreate为true,构建出待创建的mappedFile的文件路径nextFilePath以及再下一个mappedFile的文件路径nextNextFilePath,然后调用allocateMappedFileService服务的putRequestAndReturnMappedFile方法构建AllocateRequest(该请求实现了compareTo方法,请求是按照文件名称从小到大排序的,即创建mappedFile是有序的)请求并将请求放在其待处理的队列中,后台allocateMappedFileService服务会从请求队列中获取请求并创建mappedFile。创建mappedFile的方法是allocateMappedFileService服务中的mmapOperation(),这里面需要注意:创建mappedFile有两种不同的方式。
方式一:
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
这种方式是在broker的配置文件中刷盘方式是异步刷盘并且TransientStorePoolEnable为true的情况下生效,该方式下MappedFile 会将向TransientStorePool 申请的堆外内存(Direct ByteBuffer)空间作为 writeBuffer,写入消息时先将消息写入 writeBuffer,然后将消息提交至 fileChannel 最后再 flush。
方式二:
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
这种方式是直接创建 MappedFile 内存映射文件字节缓冲区mappedByteBuffer,将消息写入 mappedByteBuffer 再 flush。
如果最新的mappedFile不为空则直接返回该mappedFile即可
getLastMappedFile(final long startOffset, boolean needCreate)的实现如下:
4.3 追加消息到mappedFile
追加消息到mappedFile的实现方法是appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb),其实现逻辑如下:
获取mappedFile写指针位置
判断写指针的位置与文件大小的关系,如果写指针的位置小于文件大小则按照消息的类型(普通消息及批量消息)调用AppendMessageCallback的回调函数doAppend追加消息,doAppend方法是追加消息的核心实现,其实现逻辑是:
计算消息写入的位置
为消息创建msgId,其创建规则是4个字节IP+4个字节的端口号+8字节的消息偏移量
在commitlog的topicQueueTable记录consumequeue的信息
序列化消息(注意:producer发送的消息格式和broker最终存储的消息格式是不一样的),broker端存储的消息的格式如下:
将消息写入消息队列缓存中
构建追加消息的AppendMessageResult并返回结果
更新mappedFile写指针位置及文件最后写入的时间
字段 | 字段含义 | 字段大小 |
---|---|---|
TOTALSIZE | 消息条目总长度 | 4 |
MAGICCODE | 魔数,用来判断消息是正常消息还是空消息 | 4 |
BODYCRC | 消息体CRC校验码 | 4 |
QUEUEID | 消息消费队列id | 4 |
FLAG | 消息flag | 4 |
QUEUEOFFSET | 消息在消息消费队列的偏移量 | 8 |
PHYSICALOFFSET | 消息在commitlog中的偏移量 | 8 |
SYSFLAG | 消息系统flag,例如是否压缩、是否是事务消息 | 4 |
BORNTIMESTAMP | 消息生产者调用消息发送API的时间戳 | 8 |
BORNHOST | 消息发送者的IP和端口号 | 8 |
STORETIMESTAMP | 消息存储时间 | 8 |
STOREHOSTADDRESS | broker的IP和端口号 | 8 |
RECONSUMETIMES | 消息重试次数 | 4 |
Prepared Transaction Offset | 事务消息物理偏移量 | 8 |
bodyLength | 消息体长度 | 4 |
body | 消息体内容 | bodyLength |
topicLength | Topic名称内容大小 | 1 |
topicData | topic的值 | topicLength |
propertiesLength | 消息属性大小 | 2 |
propertiesData | 消息属性 | propertiesLength |
appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb)的实现如下:
doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner)的实现如下:
5.消息刷盘
消息刷盘分为同步刷盘和异步刷盘,同步刷盘只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。异步刷盘能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt)的处理逻辑如下:首先会根据消息刷盘类型分为两类:同步和异步,然后不同的类型有不同的处理方式,这里需要注意异步刷盘的分支中还会再分为两种:启用TransientStorePoolEnable和不不启用TransientStorePoolEnable
实现消息同步刷盘的服务是GroupCommitService,该服务是在broker启动时启动的,在GroupCommitService服务中有两个存放GroupCommitRequest的list,分别是requestsWrite和requestsRead,在handleDiskFlush方法中GroupCommitRequest被put到requestsWrite中,GroupCommitService服务会每10毫秒执行一次swapRequests()方法,该方法会交换requestsWrite和requestsRead中的请求,GroupCommitService服务在后台会一直执行doCommit()方法,这个方法会不断从requestsRead中获取GroupCommitRequest并执行flush操作,最后清空requestsRead。在 GroupCommitService中使用requestsWrite和requestsRead可以避免提交刷盘请求与消费刷盘请求的锁竞争。整个过程可以使用下图来表示:
异步刷盘分为两种情况:
(1)TransientStorePoolEnable为false
TransientStorePoolEnable为false时,是使用FlushRealTimeService服务来进行刷盘操作,该服务的核心逻辑如下:首先从配置文件中获取flushCommitLogTimed、flushIntervalCommitLog、flushPhysicQueueLeastPages和flushPhysicQueueThoroughInterval,计算距离上次刷盘的时间差,判断是否超过flushPhysicQueueThoroughInterval,如果超过了flushPhysicQueueThoroughInterval则本次刷盘将忽略flushPhysicQueueLeastPages,会将所有内存缓存的全部数据刷盘到文件中,最后会调用flush将内存中的数据写到磁盘并更新checkpoint文件中commitlog文件的更新时间戳。
(2)TransientStorePoolEnable为true
TransientStorePoolEnable为true时会先使用CommitRealTimeService来将writeBuffer中的数据提交到fileChannel中之后会唤醒FlushCommitLogService服务来进行刷盘操作,CommitRealTimeService服务的核心逻辑如下:
综上,异步刷盘两种情况可以用下图来说明:
觉得本文有用,请转发或点击“在看”,让更多同行看到
资料/文章推荐:
长按二维码关注公众号
*本公众号所发布内容仅代表作者观点,不代表社区立场;封面图片由版权图库授权使用