衡阳哪有做网站推广的网站优化效果
文章目录
- 1. 前言
- 2. recover 异常退出恢复服务
- 3. CommitLog#recoverAbnormally
- 4. CommitLog#isMappedFileMatchedRecover
- 5. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
RocketMQ 存储部分系列文章:
-
【RocketMQ 存储】- RocketMQ存储类 MappedFile
-
【RocketMQ 存储】- 一文总结 RocketMQ 的存储结构-基础
-
【RocketMQ 存储】- broker 端存储单条消息的逻辑
-
【RocketMQ 存储】- broker 端存储批量消息的逻辑
-
【RocketMQ 存储】- 同步刷盘和异步刷盘
-
【RocketMQ 存储】- 同步刷盘服务 GroupCommitService
-
【RocketMQ 存储】- 异步刷盘服务 FlushRealTimeService
-
【RocketMQ 存储】- 异步提交服务 CommitRealTimeService
-
【RocketMQ 存储】RocketMQ 如何高效创建 MappedFile
-
【RocketMQ 存储】消息重放服务-ReputMessageService
-
【RocketMQ 存储】CommitLogDispatcherBuildConsumeQueue 构建 ConsumeQueue 索引
-
【RocketMQ 存储】CommitLogDispatcherBuildIndex 构建 IndexFile 索引
-
【RocketMQ 存储】ConsumeQueue 刷盘服务 FlushConsumeQueueService
-
【RocketMQ 存储】- ConsumeQueue 过期清除服务CleanConsumeQueueService
-
【RocketMQ 存储】- CommitLog 过期清除服务 CleanCommitLogService
-
【RocketMQ 存储】- 正常退出恢复逻辑 recoverNormally
2. recover 异常退出恢复服务
如果 RocketMQ 因为某种原因异常关闭,这种情况下 abort
文件是不会被删掉的,那么在 broker 启动的时候初始化 DefaultMessageStore 就会检测出是异常退出重启,这种情况下就会调用异常退出恢复方法 recoverAbnormally
。
这个方法我们在上一篇文章 【RocketMQ 存储】- 正常退出恢复逻辑 recoverNormally 中已经讲解过,这里就不多说了,下面就简单看下文件是否是异常退出的判断逻辑。
/*** Temp 临时文件,RocketMQ 启动的时候会创建一个大小为 0 的文件,当服务正常关闭(destory、shutdown),就会调用删除方法把这个临时文件删掉,* 否则如果是异常关闭那么就不会删掉这个文件,这样一来只需要根据这个文件就能判断服务是不是正常关闭的* @return*/
private boolean isTempFileExist() {// 获取临时文件的位置: ${home}/store/abortString fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());// 构建 File 文件File file = new File(fileName);// 判断这个文件是否存在return file.exists();
}
同时也看下异常退出方法的入口。
/*** 恢复 CommitLog 和 ConsumeQueue 中的数据到内存中* @param lastExitOK*/
private void recover(final boolean lastExitOK) {// 恢复所有 ConsumeQueue 文件,返回的是 ConsumeQueue 中存储的最大有效 CommitLog 偏移量long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();// 上一次 Broker 退出是正常退出还是异常退出if (lastExitOK) {// 这里就是正常退出,所以正常恢复 CommitLogthis.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);} else {// 这里就是异常退出,所以异常恢复 CommitLogthis.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);}// 最后恢复 topicQueueTablethis.recoverTopicQueueTable();
}
3. CommitLog#recoverAbnormally
异常恢复不同于正常恢复,正常恢复由于在 broker 启动前的写入是正常的,所以正常恢复可以从倒数第三个文件开始进行遍历,但是异常恢复不确定在哪一个文件出了问题,所以需要从后往前遍历找到第一个正确的 CommitLog 文件。
// 从最后一个文件开始往前找到第一个正确存储 CommitLog 消息的文件
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);// 校验整个 CommitLog 文件是不是一个正确的文件if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}
}
接着从第一个正确的 CommitLog 文件开始恢复,恢复的流程和正常恢复的流程差不多,有一点不同的是由于是异常恢复,所以恢复的时候对于正常的消息索引需要重新构建 ConsumeQueue 和 IndexFile 索引,当遇到第一条异常的消息 (CRC 校验不通过,魔术不合法、记录的长度和实际求出来的长度不一样) 就会退出恢复流程,表示找到第一条不合法的消息,接着根据这条消息的物理偏移量去销毁 CommitLog 和 ConsumeQueue 中的无效文件,因为里面的非法我们基本都在上一篇文章正常恢复中讲过,所以整个恢复逻辑直接看下面注释就行。
/*** 异常恢复,异常恢复由于没有记录 ConsumeQueue 和 CommitLog 的最新刷盘时间点,所以需要从后往前遍历找到第一个正确的 CommitLog 文件* @param maxPhyOffsetOfConsumeQueue*/
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {// recover by the minimum time stamp// 是否需要启用 CRC32 校验文件,就是确保消息在数据传输和文件存储过程中没有出现问题,如位翻转、数据损坏等// 由于 CRC32 校验需要计算校验和,因此会对性能产生一定的影响boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();// 所有的 MappedFile 文件final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// 从最后一个文件开始往前找到第一个正确存储 CommitLog 消息的文件int index = mappedFiles.size() - 1;MappedFile mappedFile = null;for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);// 校验整个 CommitLog 文件是不是一个正确的文件if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}// 从第一个正确的 CommitLog 文件开始恢复if (index < 0) {index = 0;mappedFile = mappedFiles.get(index);}// 获取 CommotLog 对应的 ByteBuffer 视图ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();// 获取文件的初始偏移量,默认是文件名long processOffset = mappedFile.getFileFromOffset();// 已经校验过的有效 offsetlong mappedFileOffset = 0;while (true) {// 校验本条消息是否合法DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);// 消息大小int size = dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {// 消息正常,同时没有到文件的尾部if (size > 0) {// 合法的 mappedFileOffset 加上消息大小mappedFileOffset += size;// 如果允许消息索引重复转发构建if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {// 这里判断下如果这条消息的物理偏移量小于 CommitLog 中的提交位置if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {// 重新构建 ConsumeQueue、IndexFile 索引this.defaultMessageStore.doDispatch(dispatchRequest);}} else {// 重新构建 ConsumeQueue、IndexFile 索引this.defaultMessageStore.doDispatch(dispatchRequest);}}// Come the end of the file, switch to the next file// Since the return 0 representatives met last hole, this can// not be included in truncate offset// 这里就是到文件尾部了,就需要跳到下一个文件继续恢复else if (size == 0) {index++;if (index >= mappedFiles.size()) {// The current branch under normal circumstances should// not happenlog.info("recover physics file over, last mapped file " + mappedFile.getFileName());break;} else {// 这里就是没有到最后一个文件,所以设置下下一个文件的各个参数mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}} else {// 当前消息异常,那么后续所有文件都不需要恢复了log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());break;}}// CommitLog 文件的最大有效偏移量processOffset += mappedFileOffset;// 设置刷盘位置this.mappedFileQueue.setFlushedWhere(processOffset);// 设置提交位置this.mappedFileQueue.setCommittedWhere(processOffset);// 删掉有效偏移量 processOffset 之后的文件this.mappedFileQueue.truncateDirtyFiles(processOffset);// 上面清除了 CommitLog 中的无效数据,下面就要清除 ConsumeQueue 中的无效数据if (maxPhyOffsetOfConsumeQueue >= processOffset) {log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);// 删除 ConsumeQueue 中的无效数据this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}}// Commitlog case files are deletedelse {// 这里就是 CommitLog 下面的所有文件都不存在log.warn("The commitlog files are deleted, and delete the consume queue files");// 重置最新刷盘位置和提交位置this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);// 销毁所有 ConsumeQueue 文件this.defaultMessageStore.destroyLogics();}
}
4. CommitLog#isMappedFileMatchedRecover
这个方法用于校验整个 CommitLog 文件是不是一个正确的文件,主要是校验下面几个方面的内容。
- 如果魔数 != MESSAGE_MAGIC_CODE,就说明不是合法的 CommitLog 文件,校验失败
- 如果消息在 broker 端的存储时间为 0,就表示不是正常的消息,校验失败
由于 StoreCheckPoint 文件中存储了 ConsumeQueue、IndexFile、CommitLog 的最新刷盘的消息的存储时间戳,啥意思呢?就是消息刷盘的时候会记录下来最新的消息在 broker 端存储的时间,也就是 storeTimeStamp
。
// CommitLog 文件的最新刷盘的消息存储在 broker 端的时间戳
private volatile long physicMsgTimestamp = 0;
// ConsumeQueue 文件的最新刷盘的消息的存储时间戳
private volatile long logicsMsgTimestamp = 0;
// IndexFile 文件的最新刷盘的消息的存储时间戳
private volatile long indexMsgTimestamp = 0;
校验的时候就需要用这几个变量来判断,比如判断 IndexFile 合不合法就这个文件最新的存储时间 storeTimestamp
和记录的 indexMsgTimestamp
比较,如果是 storeTimestamp <= indexMsgTimestamp
,那么说明这个文件最新消息的存储时间比记录的时间戳都要小,也就是这个文件里面的消息都被刷盘了。
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {// 检测就是判断下如果 checkpoint 中存储的 IndexFile 最小刷盘时间比当前这个 CommitLog 文件的最新数据存储时间要大if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {log.info("find check timestamp, {} {}",storeTimestamp,UtilAll.timeMillisToHumanString(storeTimestamp));// 这里就直接返回 true,表示当前文件是合法的,因为这个文件最新消息的存储时间比记录的时间戳都要小,说明这个文件里面的消息都被刷盘了return true;}
}
如果是没有使用安全的 Index 索引模式,那么就校验 ConsumeQueue 和 IndexFile 的时间戳。
// 这里就是普通模式,检查下 checkpoint 中最小刷盘时间比当前这个 CommitLog 文件的最新数据存储时间要大
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {log.info("find check timestamp, {} {}",storeTimestamp,UtilAll.timeMillisToHumanString(storeTimestamp));// 这里就直接返回 true,表示当前文件是合法的,因为这个文件最新消息的存储时间比记录的时间戳都要小,说明这个文件里面的消息都被刷盘了return true;
}# StoreCheckPoint#getMinTimestamp
/*** 获取 physicMsgTimestamp 和 logicsMsgTimestamp 的最小值并且将去 3s* @return*/
public long getMinTimestamp() {long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);min -= 1000 * 3;if (min < 0)min = 0;return min;
}
下面给出全部的逻辑。
/*** 检查文件是否是一个正常的 MappedFile 文件,也就是 CommitLog 里面的消息是不是正确的* @param mappedFile* @return*/
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {// 获取 ByteBuffer 视图ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();// 获取文件开头的魔数int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);// 如果魔数 != -626843481,就说明不是合法的 CommitLog 文件,校验失败if (magicCode != MESSAGE_MAGIC_CODE) {return false;}// 获取 sysFlag,也就是消息的属性类型int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION);// 获取下消息生成的 Producer 端地址int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;// 获取消息在 broker 端存储的偏移量int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhostLength;// 获取消息在 broker 端存储的最新时间long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);if (0 == storeTimestamp) {// 如果存储时间为 0,就表示不是正常的消息,校验错误return false;}// 使用安全的 Index 索引模式,也就是要对 IndexFile 进行检测if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {// 检测就是判断下如果 checkpoint 中存储的 IndexFile 最小刷盘时间比当前这个 CommitLog 文件的最新数据存储时间要大if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {log.info("find check timestamp, {} {}",storeTimestamp,UtilAll.timeMillisToHumanString(storeTimestamp));// 这里就直接返回 true,表示当前文件是合法的,因为这个文件最新消息的存储时间比记录的时间戳都要小,说明这个文件里面的消息都被刷盘了return true;}} else {// 这里就是普通模式,检查下 checkpoint 中最小刷盘时间比当前这个 CommitLog 文件的最新数据存储时间要大if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {log.info("find check timestamp, {} {}",storeTimestamp,UtilAll.timeMillisToHumanString(storeTimestamp));// 这里就直接返回 true,表示当前文件是合法的,因为这个文件最新消息的存储时间比记录的时间戳都要小,说明这个文件里面的消息都被刷盘了return true;}}return false;
}
5. 小结
到这里我们就讲完异常退出恢复逻辑 recoverAbnormally 了,里面很多逻辑在前一篇文章中也说过了,所以这里就不详细解释每一个方法的逻辑。
如有错误,欢迎指出!!!