暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ之存储层

不修边幅的创客 2020-10-10
634
点击上方蓝色字体,选择“设为星标”



RocketMQ中针对消息的存储,有三个很重要的文件,1、原始消息文件,主要通过CommitLog对消息的存储进行抽象封装;2、消息索引文件,主要通过IndexService实现对索引文件的存储;3、消费逻辑队列文件,主要是通过ConsumeQueue记录消息在CommitLog中的位置。



1

消息文件的存储
复制
MapedFile是对MappedByteBuffer的封装,具有创建文件(使用非堆区内存), 写入,提交,读取,释放,关闭等功能,通过。对关键属性和方法做一下解释。
1.fileFromOffset
每个文件的大小是1G = 1024 * 1024 * 1024,每个文件的命名都是在队列MapedFileQueue中的偏移量,比如:
第一个文件名为:00000000000000000000,
那么第二个文件名就为:00000000001073741824,
那么第三个文件名就为:00000000002147483648。
...
剩下的以此类推。

2.appendMessage(Object msg,AppendMessageCallback cb)

     该方法通过回调AppendMessageCallback实现数据存储。


3.selectMapedBuffer(int pos, int size)

     返回从pos到size的内存映射,用于读取数据。3

MapedFileQueue类代表了MapedFile组成的队列(由大小相同的多个文件构成),重要方法解释:

1.getLastMapedFile
获取队列最后一个MapedFile对象,以下两种情况会创建新的MapedFile对象:a、最后一个不存在 b、文件写满了


2.findMapedFileByOffset(final long offset)

根据offset/filesize计算该offset所在那个文件中。


介绍完上面两个关键类,我们在看消息的存储。消息到达broker主要通过SendMessageProcessor.processRequest(ctx,request)进行处理,通过代码追踪很快定位到CommitLog.putMessage(msg)方法,该方法里对消息进行落盘处理,这里我们主要看两行代码,代码如下:

复制
//获取到队列中最新MapedFile文件
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
if (null == mapedFile) {
    log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: "
            + msg.getBornHostString());
    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//在当前MapedFile中追加消息到文件尾部
result = mapedFile.appendMessage(msg, this.appendMessageCallback);

复制


appendMessage对文件进行验证是否有空余空间,注意:这里没有对剩余空间与消息大小做对比,这里只是要文件有剩余就继续执行存储操作,而消息大小与剩余空间比较放到AppendMessageCallBack.doAppend中处理了。代码如下:
public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
    assert msg != null;
    assert cb != null;

    int currentPos = this.wrotePostion.get();

    // 表示有空余空间
    if (currentPos < this.fileSize) {
        //类似于浅拷贝,byteBuffer与this.mappedByteBuffer共享子序列
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        //在该byteBuffset上设置新的position
        byteBuffer.position(currentPos);
        //传入当前MapedFile文件的起始offset、byteBuffer、剩余空间和消息
        AppendMessageResult result =
                cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
        this.wrotePostion.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }

    // 上层应用应该保证不会走到这里
    log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: "
            + this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

复制


在doAppend中,对消息进行解析,按照一定的顺序put到ByteBuffer中,注意此时的消息在内存中,并没有落盘,刷盘主要通过FlushRealTimeService任务做,时间间隔为1秒,这里注意:虽然每隔一秒执行一次,也不一定刷盘,内存中的消息数据量大小必须大于等于4 * 4K,才执行刷盘。代码如下:
/**
 * 1s执行一次,把内存未落盘的数据落盘
 */

class FlushRealTimeService extends FlushCommitLogService {
  
   public void run() {

   ...

     try {
            //间隔1s
            if (flushCommitLogTimed) {
                Thread.sleep(interval);
            }
            else {
                this.waitForRunning(interval);
            }

            if (printFlushProgress) {
                this.printFlushProgress();
            }

            //调用MapedFileQueue.commit()对消息进行落盘,flushPhysicQueueLeastPages = 4 ;
            //注意:并不是每次都刷盘,内存中的消息堆积到:对消息进行落盘,flushPhysicQueueLeastPages * 一页大小(4K)才会刷盘,考虑到磁盘IO的影响
            CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
            long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(
                    storeTimestamp);
            }
        }
        catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            this.printFlushProgress();
        }

        ...
   }
}

复制


MapedFile.commit(int flushLeastPages)方法具体执行消息刷盘,代码如下:
/**
 * 消息刷盘
 *
 * @param flushLeastPages
 * 至少刷几个page
 * @return
 */

public int commit(final int flushLeastPages) {
    //未刷盘的数据必须有 > 4 * 4K(一页是4K,现在是4页)
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = this.wrotePostion.get();
            //强制刷一次盘
            //不要经常调用MappedByteBuffer.force()方法,这个方法强制操作系统将内存中的内容写入硬盘,所以如果你在每次写内存映射文件后都调用force()方法,你就不能真正从内存映射文件中获益,而是跟disk IO差不多。
            this.mappedByteBuffer.force();
            this.committedPosition.set(value);
            this.release();
        }
        else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            this.committedPosition.set(this.wrotePostion.get());
        }
    }

    return this.getCommittedPosition();
}

复制

为了提高消息落盘的效率和性能,作者使用了MappedByteBuffer进行内存映射,这种通过内存映射的方法访问硬盘上的文件,效率要比read和write系统调用高:

read()是系统调用,首先将文件从硬盘拷贝到内核空间的一个缓冲区,再将这些数据拷贝到用户空间,实际上进行了两次数据拷贝;

map()也是系统调用,但没有进行数据拷贝,当缺页中断发生时,直接将文件从硬盘拷贝到用户空间,只进行了一次数据拷贝。

MappedByteBuffer使用虚拟内存,因此分配(map)的内存大小不受JVM的-Xmx参数限制,但是也是有大小限制的。如果当文件超出1.5G限制时(当前为1G,处理起来更简单),可以通过position参数重新map文件后面的内容。MappedByteBuffer在处理大文件时的确性能很高,但也存在一些问题,如内存占用、文件关闭不确定,被其打开的文件只有在垃圾回收的才会被关闭,而且这个时间点是不确定的。



2

消费队列文件存储
复制
ConsumeQueue是消费队列的实现,该消费队列主要存储了消息在CommitLog的位置,与CommitLog类似,其内部采用MapedFileQueue实现了消息位置文件队列功能。一个topic和一个queueId对应一个ConsumeQueue。默认queue存储30W条消息。每个消息大小为20个字节,详细如下:

offset(long 8字节)+size(int 4字节)+tagsCode(long 8字节)

消费队列文件名规则:消息数(30W)*消息位置固定大小(20字节)=6000000字节,故每6000000字节一个文件,文件名依次递增,前缀不够20位补0,类似如下:

00000000000000000000

00000000000006000000

00000000000012000000

剩下的以此类推,如下图为实际情况,可以看到每个文件大小:6000000字节,文件名相差:6000000字节。


ConsumeQueue中有一些重要的属性和方法,提前做一个介绍。

1.属性 mapedFileQueue

存储消息消费消息的文件,由于一个文件只存30W条消息,所以如果需要一个队列来管理这些文件。


2.方法:putMessagePostionInfo(long offset, int size, long tagsCode,long cqOffset)

其中:offset:消息在commitLog中的起始位置;size:消息长度;cqOffset:该消息在topic对应的queue中的下标,参照Commitlog.topicQueueTable。

该方法主要实现了消息位置的存储,并产生消息文件:{storePathRootDir}/consumequeue/{topic}/${queueId}/消息位置队列文件

3.方法:getIndexBuffer(long startIndex)
startIndex代表了起始偏移量索引。
该方法会根据startIndex查找到相应的索引文件,并返回该文件当前写到的位置到startIndex的消息分区。
startIndex为客户端目前消费的进度,实际为消息位置队列文件的消费偏移量索引。

CommitLog.putMessage在存储消息成功之后,需要把当前消息的commitOffset及消息其它信息封装成DispatchRequest进行分发,一方面由ConsumeQueue构建消费队列文件,另一方面还需要由IndexService构建消息的索引文件。

DispatchRequest dispatchRequest = new DispatchRequest(//
    topic,// 1
    queueId,// 2
    result.getWroteOffset(),// 3
    result.getWroteBytes(),// 4
    tagsCode,// 5
    msg.getStoreTimestamp(),// 6
    result.getLogicsOffset(),// 7
    msg.getKeys(),// 8
    /**
     * Transaction
     */

    msg.getSysFlag(),// 9
    msg.getPreparedTransactionOffset());// 10

this.defaultMessageStore.putDispatchRequest(dispatchRequest);

复制


DispatchMessageService主要负责消息的分发,这里主要看其doDispatch方法。

public void putDispatchRequest(final DispatchRequest dispatchRequest) {
    this.dispatchMessageService.putRequest(dispatchRequest);
}

复制


private void doDispatch() {
    if (!this.requestsRead.isEmpty()) {
        for (DispatchRequest req : this.requestsRead) {

            final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
            // 1、分发消息位置信息到ConsumeQueue
            switch (tranType) {
            case MessageSysFlag.TransactionNotType:
            case MessageSysFlag.TransactionCommitType:
                // 将请求发到具体的Consume Queue
                DefaultMessageStore.this.putMessagePostionInfo(req.getTopic(), req.getQueueId(),
                    req.getCommitLogOffset(), req.getMsgSize(), req.getTagsCode(),
                    req.getStoreTimestamp(), req.getConsumeQueueOffset());
                break;
            case MessageSysFlag.TransactionPreparedType:
            case MessageSysFlag.TransactionRollbackType:
                break;
            }
        }
        //2、构建索引文件
        if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.putRequest(this.requestsRead.toArray());
        }

        this.requestsRead.clear();
    }
}

复制


下一章节,我们着重讲索引文件的存储,这一节我们先看ConsumeQueue消费队列的储存。DefaultMessageStore.putMessagePostionInfo()方法。代码如下:

public void putMessagePostionInfo(String topic, int queueId, long offset, int size, long tagsCode,
        long storeTimestamp, long logicOffset)
 
{
    //根据topic和queueId获取对应的consumeQueue对象
    ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
    //追加消息到当前队列
    cq.putMessagePostionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
}

复制


一个ConsumeQueue中可能会有多个文件,因为上面提到一个文件最多放30W条消息,如果大于30W需要创建新MapedFile文件,这也就解释了,为什么ConsumeQueue中有mapedFileQueue属性。ConsumeQueue的putMessagePostionInfoWrapper方法负责存储当前消息,其内部调用了putMessagePostionInfo方法进行处理。

/**
 * 存储一个20字节的信息,putMessagePostionInfo只有一个线程调用,所以不需要加锁
 *
 * @param offset
 * 消息对应的CommitLog offset
 * @param size
 * 消息在CommitLog存储的大小
 * @param tagsCode
 * tags 计算出来的长整数
 * @return 是否成功
 */

private boolean putMessagePostionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset)
 
{
    // 在数据恢复时会走到这个流程
    if (offset <= this.maxPhysicOffset) {
        return true;
    }

    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQStoreUnitSize);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);

    final long expectLogicOffset = cqOffset * CQStoreUnitSize;
    //1、获取最新的MapedFile
    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(expectLogicOffset);
    if (mapedFile != null) {
        // 纠正MapedFile逻辑队列索引顺序
        if (mapedFile.isFirstCreateInQueue() && cqOffset != 0 && mapedFile.getWrotePostion() == 0) {
            this.minLogicOffset = expectLogicOffset;
            this.fillPreBlank(mapedFile, expectLogicOffset);
            log.info("fill pre blank space " + mapedFile.getFileName() + " " + expectLogicOffset + " "
                    + mapedFile.getWrotePostion());
        }

        if (cqOffset != 0) {
            long currentLogicOffset = mapedFile.getWrotePostion() + mapedFile.getFileFromOffset();
            if (expectLogicOffset != currentLogicOffset) {
                // XXX: warn and notify me
                logError
                    .warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",//
                        expectLogicOffset, //
                        currentLogicOffset,//
                        this.topic,//
                        this.queueId,//
                        expectLogicOffset - currentLogicOffset//
                    );
            }
        }

        // 记录物理队列最大offset
        this.maxPhysicOffset = offset;
        //2、追加消息到消费队列文件
        return mapedFile.appendMessage(this.byteBufferIndex.array());
    }

    return false;
}

复制


因为MapedFile被抽象出来了,所以实现跟CommitLog文件相似,这里不在细说,当append消息之后,消息在内存中,并没有落盘,ConsumeQueue的落盘,需要看:FlushConsumeQueueService.doFlush()方法。FlushConsumeQueueService的run由单线程每隔一秒执行一次,专门做消息的刷盘。doFlush()方法会对所有的ConsumeQueue进行刷盘(因为一个broker上的topic可能有多个queueId)。

ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables =
        DefaultMessageStore.this.consumeQueueTable;

for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
    for (ConsumeQueue cq : maps.values()) {
        boolean result = false;
        for (int i = 0; i < retryTimes && !result; i++) {
            result = cq.commit(flushConsumeQueueLeastPages);
        }
    }
}

复制


通过代码发现执行的还是CosumeQueue.commit()方法,其内部丢给MapedFileQueue.commit()方法,下面的逻辑就跟CommitLog一致了,也是调用mappedByteBuffer.force(),把内存中的消息刷到磁盘中。

public boolean commit(final int flushLeastPages) {
    return this.mapedFileQueue.commit(flushLeastPages);
}

复制



3

索引文件存储
复制
IndexFile在构建时是以当前时间被格式化的字符串命名的(精确到毫秒),每个文件的允许放的索引个数:4 * 5000000 = 2千万条,这个参数maxIndexNum在MessageStoreConfig中设置。

try {
    String fileName =
            this.storePath + File.separator
                    + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
    indexFile =
            new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                lastUpdateIndexTimestamp);
    this.readWriteLock.writeLock().lock();
    this.indexFileList.add(indexFile);
}

复制

IndexService消息索引服务,主要根据topic和keys(producer设置)建立索引,并提供查询功能。前文讲过:RocketMQ之msgKey查询逻辑 就是根据根据索引进行查询,这里我们主要介绍,索引文件的存储。
上一章节讲了,分发消息时,一方面是ConumeQueue构建消费队列文件,还有就是根据当前消息构建所索引文件,IndexService的putRquest方法提供消息请求,并把消息放到LinkedBlockingQueue中,然后由IndexService的run()方法不停的从阻塞队列中获取request并构建索引,注意:阻塞队列的长度为30W,如果满了,后续消息会被丢弃。代码如下:
/**
 * 向队列中添加请求,队列满情况下,丢弃请求
 */

public void putRequest(final Object[] reqs) {
    boolean offer = this.requestQueue.offer(reqs);
    if (!offer) {
        if (log.isDebugEnabled()) {
            log.debug("putRequest index failed, {}", reqs);
        }
    }
}

复制


@Override
public void run() 
{
    log.info(this.getServiceName() + " service started");

    while (!this.isStoped()) {
        try {
            Object[] req = this.requestQueue.poll(3000, TimeUnit.MILLISECONDS);

            if (req != null) {
              //对当前的请求构建索引
                this.buildIndex(req);
            }
        }
        catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

复制


this.buildIndex()方法主要针对每个request中的每条消息的每个keys(多个key用空格分开)构建索引,如代码所示:
//尝试获取IndexFile
IndexFile indexFile = retryGetAndCreateIndexFile();
...

//循环每条消息的keys,用这些keys为当前消息构建索引
if (keys != null && keys.length() > 0) {
    String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
    for (String key : keyset) {
        // TODO 是否需要TRIM
        if (key.length() > 0) {

          //直到putKey执行成功结束
            for (boolean ok =
                    indexFile.putKey(buildKey(topic, key), msg.getCommitLogOffset(),
                        msg.getStoreTimestamp()); !ok;) {
                log.warn("index file full, so create another one, " + indexFile.getFileName());
                indexFile = retryGetAndCreateIndexFile();
                if (null == indexFile) {
                    breakdown = true;
                    break MSG_WHILE;
                }

                ok =
                        indexFile.putKey(buildKey(topic, key), msg.getCommitLogOffset(),
                            msg.getStoreTimestamp());
            }
        }
    }
}

复制


在IndexFile.putKey()方法中,把索引写到MappedByteBuffer中,如果当前方法返回false,则表示需要创建新的索引文件。代码如下:
//上一次写入的位置
int absIndexPos =
        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE
                + this.indexHeader.getIndexCount() * INDEX_SIZE;
// 写入真正索引
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

// 更新哈希槽
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

// 第一次写入
if (this.indexHeader.getIndexCount() <= 1) {
    this.indexHeader.setBeginPhyOffset(phyOffset);
    this.indexHeader.setBeginTimestamp(storeTimestamp);
}

this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);

复制


索引已经在内存里面了,但是什么时候触发刷盘动作呢。我们发现,IndexService.this.flush(flushThisFile)方法只在IndexService的getAndCreateLastIndexFile方法中调用,该方法被IndexService的retryGetAndCreateIndexFile方法调用,这里说一下结论,IndexFile的刷盘,只有在创建新文件的时候,才对上一个文件进行刷盘,此前的索引内容一直在内存中
// 每创建一个新文件,之前文件要刷盘
if (indexFile != null) {
    final IndexFile flushThisFile = prevIndexFile;
    Thread flushThread = new Thread(new Runnable() {
        @Override
        public void run() {
            IndexService.this.flush(flushThisFile);
        }
    }, "FlushIndexFileThread");

    flushThread.setDaemon(true);
    flushThread.start();
}

复制




文章转载自不修边幅的创客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论