上篇文章我们说完了rocketMq的同步刷盘和异步刷盘,本篇我们分析文件的删除和恢复,在说这个之前,我们先来直观感受下,我们的commitLog文件,和consumeQueue文件
其实我们恢复就是按照图上的数据结构恢复相关文件
1.rocketMq文件的恢复
1.1.恢复文件的必要性
CommitLog已经刷盘的数据,ConsumeQueue/Index还没有构建完成
ConsumeQueue/Index已经构建完成并刷盘的数据,CommitLog没有完成刷盘
所有的文件大小都是固定的,需要确定最近一个文件的具体刷盘位置
1.2.文件恢复的整个流程
1.3.源码分析
DefaultMessageStore.load
public boolean load() {
boolean result = true;
try {
//判断上次是否是异常关机
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
//定时消息
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// load Commit Log 初始化commitLog
result = result && this.commitLog.load();
// load Consume Queue 初始化consumeQueue
result = result && this.loadConsumeQueue();
if (result) {
//恢复存储的检测点
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
//索引文件的初始化
this.indexService.load(lastExitOK);
//上面完成了所有文件的初始化,这个地方开始执行数据的恢复
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
//执行恢复
private void recover(final boolean lastExitOK) {
//在所有逻辑队列最后一个消息的commitLog绝对的物理位置,取最大值
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
//正常恢复commitLog文件,并和consumeQueue保持一致
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
//异常情况下恢复commitLog文件,保持和consumeQueue一致
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
//恢复topic维护的这个haspMap:HashMap<String,/*quueid*/,log/*offset*/>
//每个topic的每个队列的最大逻辑偏移
//修正每个topic的每个队列的最小偏移量
this.recoverTopicQueueTable();
}复制
1.3.1. commitLog.load:
commitLog.load:
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
public boolean load() {
//storePath commitLog的存储路径
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
//初始化mappedFile
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
//设置写入位置
mappedFile.setWrotePosition(this.mappedFileSize);
//刷盘位置
mappedFile.setFlushedPosition(this.mappedFileSize);
//commit位置
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}复制
上面整个过程就完成了整个commitLog文件的初始化,接下来我们看consumeQueue文件的初始化相关工作
1.3.2. DefaultMessageStore.loadConsumeQueue
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
//列举出所有的topic文件
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {
String topic = fileTopic.getName();
//根据topic获取所有topic下面的quueId文件
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId;
try {
//队列id
queueId = Integer.parseInt(fileQueueId.getName());
} catch (NumberFormatException e) {
continue;
}
//初始化consumeQueue
ConsumeQueue logic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
//cosumeQuue放入map集合
this.putConsumeQueue(topic, queueId, logic);
//加载所有consumeQueue对应的mappedFile文件
if (!logic.load()) {
return false;
}
}
}
}
}
log.info("load logics queue all over, OK");
return true;
}
public boolean load() {
//load方法和上面的1.3.1一样就不分析了
boolean result = this.mappedFileQueue.load();
log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
if (isExtReadEnable()) {
result &= this.consumeQueueExt.load();
}
return result;
}
复制
到这里我们就分析完了consumeQueue的初始,和对应mappeFile文件的初始化,要恢复文件的初始化我们到这里就做完了
索引文件的代码我们就不分析了,大家按照上面给的图自己看下
1.4.1.文件恢复
1.4.1.DefaultMessageStore.recoverConsumeQueue
//恢复consumeQueue的映射文件
private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
//每一个topic下面的每个quueid进行恢复
logic.recover();
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}
return maxPhysicOffset;
}
//恢复consumeQueue的commitLog文件
public void recover() {
//获取到所有的mappedFile (上面已经完成了整个初始化了)
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
//倒数第三个文件 开始恢复
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
int mappedFileSizeLogics = this.mappedFileSize;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
long maxExtAddr = 1;
while (true) {
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
//consumeQueue的三个数据组成
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
this.maxPhysicOffset = offset + size;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
//已经恢复完了
if (mappedFileOffset == mappedFileSizeLogics) {
index++;
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
//没有恢复完,执行下一个文件的恢复
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ (processOffset + mappedFileOffset));
break;
}
}
processOffset += mappedFileOffset;
//执行写入相应的偏移量
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
//算最后一个mappedFile的偏移,因为只有最后一个需要重新算位置,前面的根据fileSize就可以知道,因为前面的文件写满了,才会写后面的文件
this.mappedFileQueue.truncateDirtyFiles(processOffset);
if (isExtReadEnable()) {
this.consumeQueueExt.recover();
log.info("Truncate consume queue extend file by max {}", maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
}复制
到此我们恢复完了consumeQueue的mappedFile文件
1.4.2.CommitLog.recoverNormally 正常恢复
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
//获取所有的commitLog对应的映射文件
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
//从倒数第三个文件开始恢复
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
processOffset += mappedFileOffset;
//重新设置相关偏移量
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data 清除脏数据(正常情况下是consumeQueue=commitLog的)
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}复制
1.4.3.CommitLog.recoverAbnormally 异常恢复
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
//获取到所有的commitLog文件
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
/从最后一个文件开始恢复
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
//从最后一个mappedFile开始恢复
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
// Normal data
if (size > 0) {
mappedFileOffset += size;
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
//从新开始构建consumeQueue和index
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} else {
log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
//清除脏数据
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
// Commitlog case files are deleted
else {
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}复制
到此我们分析完了rocketMq文件的恢复过程
2.rocketMq文件删除
2.1.哪些情况会触发文件的删除
如果非当前写文件在一定时间间隔内(可配置)没有被再次被更新,则认为是过期文件,可以删除。
如果磁盘空间不充足,也应该触发过期文件删除操作
executeDeleteFilesManualy,可以手动删除过期文件,mqAdmin目前没有对外提供接口
2.2.源码分析
它的整个过程是通过一个定时任务执行的,DefaultMessageStore#addScheduleTask,每隔一段时间会调用DefaultMessageStore#cleanFilesPeriodically方法,这个方法会调用CleanCommitLogService#run(清理commitLog),CleanConsumeQueueService#run(清理consumeQueue),IndexService#deleteExpiredFiles(清理index)
private void addScheduleTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
........................
private void cleanFilesPeriodically() {
//删除commitLog文件
this.cleanCommitLogService.run();
//删除consumeQueue和index文件
this.cleanConsumeQueueService.run();
}
复制
上面就是整个删除文件的流程,本次只会分析commitLog文件的删除,其它的文件的删除,请大家自行分析下
下面我们继续分析cleanCommitLogService.run();
public void run() {
try {
//删除过期的文件
this.deleteExpiredFiles();
//删除挂起的文件
this.redeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
private void deleteExpiredFiles() {
int deleteCount = 0;
//每隔多少时间后开始删除文件
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
//每次删除文件后休息一会的时间
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
//强制删除
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
//是否到达了过期删除的时间
boolean timeup = this.isTimeToDelete();
//磁盘空间是否充足
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
//是否立即删除
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime,
timeup,
spacefull,
manualDeleteFileSeveralTimes,
cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
//删除文件
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
private boolean isTimeToDelete() {
//每天什么时候删除,可以配置的,(默认每天凌晨4点)
String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
if (UtilAll.isItTimeToDo(when)) {
DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
return true;
}
return false;
}
复制
大家根据注释读,应该能读懂,上面代码的意思就是在删除文件时做一些条件检查,如果满足,则执行删除文件的操作,下面继续分析,真正删除文件的地方
MappedFileQueue.deleteExpiredFileByTime 真正删除文件
//expiredTime: 配置的过期时间
//deleteFilesInterval:删除物理文件的间隔,可能要删除的文件超过一个,删除文件是一个比较费资源的操作,所以会让多个文件的删除进行等待
//intervalForcibly:在读取消息时,强制延时删除的时间
//cleanImmediately:是否立即删除
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
//文件在读取时,可以不执行删除,当过了一段时间后在执行强制删除
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
deleteExpiredFile(files);
return deleteCount;
}
public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
//执行强制删除
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
//执行删除
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
//先不删除
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
public void shutdown(final long intervalForcibly) {
if (this.available) {
//如果是第一次删除时,文件正在读,则暂缓删除
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
} else if (this.getRefCount() > 0) {
//第二次进行删除时,超过intervalForcibly时间后,还在被读,会触发强制删除
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
public boolean isCleanupOver() {
return this.refCount.get() <= 0 && this.cleanupOver;
}
复制
到此我们分析完了整个文件的删除和恢复,如果有不足,欢迎赐教,谢谢大家
备注:
参考了陈厚道老师在官方社区的分享