RocketMQ存储核心:MappedFile解析
DefaultMappedFile
DefaultMappedFile
是对单个物理文件进行内存映射(Memory Mapped File, MMF)操作的封装。MappedFileQueue
中管理的 MappedFile
列表,其元素的具体实现就是 DefaultMappedFile
。无论是 CommitLog 还是 ConsumeQueue,最终的数据读写都落在这个类上。
在分析代码之前,必须先理解什么是内存映射文件。
简单来说,MMF 是一种将磁盘上的文件直接映射到进程的虚拟内存空间的技术。一旦映射完成,应用程序就可以像访问内存一样直接读写文件内容(通过一个 ByteBuffer
),而不需要频繁地调用 read()
、write()
等系统调用。优点:
- 高性能:绕过了用户态和内核态之间的数据拷贝。写入数据时,只是写入到内存(Page Cache),由操作系统负责后续异步刷盘。读取数据时,如果数据在 Page Cache 中,则直接从内存返回,速度极快。
- 简化编程:将文件IO操作简化为内存操作。
- 充分利用 Page Cache:操作系统会自动缓存文件数据,实现高效的预读和缓存。
DefaultMappedFile
正是基于 Java NIO 的 MappedByteBuffer
实现了 MMF。
核心属性
DefaultMappedFile
包含许多重要属性来维护一个映射文件的状态。
// ... existing code ...
public class DefaultMappedFile extends AbstractMappedFile {
// ... existing code ...// 用于原子更新 wrotePosition, committedPosition, flushedPositionprotected static final AtomicIntegerFieldUpdater<DefaultMappedFile> WROTE_POSITION_UPDATER;protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> COMMITTED_POSITION_UPDATER;protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> FLUSHED_POSITION_UPDATER;// 当前文件已写入的位置。这是数据写入的“前沿”。protected volatile int wrotePosition;// 已提交的位置。当启用 TransientStorePool 时,数据先写入堆外内存,再提交到 FileChannel。protected volatile int committedPosition;// 已刷写到磁盘的位置。这是数据持久化的保证。protected volatile int flushedPosition;protected int fileSize; // 文件大小protected FileChannel fileChannel; // 文件通道/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/// 堆外内存缓冲区,用于 TransientStorePool 机制protected ByteBuffer writeBuffer = null;protected TransientStorePool transientStorePool = null;protected String fileName; // 文件名protected long fileFromOffset; // 文件起始的全局偏移量protected File file; // 文件对象protected MappedByteBuffer mappedByteBuffer; // 核心:内存映射的 ByteBuffer
// ... existing code ...
mappedByteBuffer
: 这是 MMF 的核心,是物理文件在内存中的映射。对它的读写最终会反映到磁盘文件中。- 三个关键位置指针:
wrotePosition
: 表示当前文件已经写入了多少数据。写入新消息时,会从这个位置开始。committedPosition
: 当启用TransientStorePool
(一种性能优化,见下文)时,数据会先写入一个临时的writeBuffer
,然后才被提交(commit
)到fileChannel
。这个指针记录了提交的进度。如果没启用,它通常等于wrotePosition
。flushedPosition
: 表示已经从操作系统的 Page Cache 刷写到物理磁盘的进度。只有小于这个位置的数据才是真正持久化的。
AtomicIntegerFieldUpdater
: RocketMQ 对性能追求极致。为了无锁地、线程安全地更新这三个位置指针,它没有使用AtomicInteger
(会额外创建对象),而是使用了AtomicIntegerFieldUpdater
,可以直接对DefaultMappedFile
对象的volatile
字段进行原子操作,开销更小。transientStorePool
和writeBuffer
: 这是 RocketMQ 的一个重要性能优化,称为“堆外内存池”。- 背景: 直接写入
mappedByteBuffer
可能会触发缺页中断(Page Fault),导致写操作阻塞。 - 机制: 开启此功能后,数据会先写入从
transientStorePool
借来的一个普通的DirectByteBuffer
(即writeBuffer
)。这个写入过程非常快,不会有缺页中断。然后,一个后台线程负责将writeBuffer
中的数据批量提交(commit
)到fileChannel
,再由另一个线程负责刷盘(flush
)。 - 效果: 将消息写入的耗时与磁盘IO解耦,使得写入操作的延迟非常低且稳定。
- 背景: 直接写入
初始化 (init
)
这是 DefaultMappedFile
的生命周期起点。
// ... existing code ...private void init(final String fileName, final int fileSize) throws IOException {this.fileName = fileName;this.fileSize = fileSize;this.file = new File(fileName);// 文件名就是其在 MappedFileQueue 中的起始偏移量this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;UtilAll.ensureDirOK(this.file.getParent());try {// 1. 创建 RandomAccessFile 并获取 FileChannelthis.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();// 2. 核心步骤:将文件映射到内存this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);// 3. 更新全局统计信息TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok = true;
// ... existing code ...} finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();}}}
// ... existing code ...
init
方法的核心就是调用 fileChannel.map()
创建 MappedByteBuffer
,完成文件到内存的映射。如果启用了 transientStorePool
,还会从池中借一个 ByteBuffer
赋值给 writeBuffer
。
写入数据 (appendMessage...
)
这是最主要的数据写入接口。
// ... existing code ...public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt != null;assert cb != null;// 1. 获取当前可写位置int currentPos = WROTE_POSITION_UPDATER.get(this);// 2. 检查文件是否已满if (currentPos < this.fileSize) {// 3. 获取用于写入的 ByteBuffer (可能是 mappedByteBuffer 或 writeBuffer)ByteBuffer byteBuffer = appendMessageBuffer().slice();byteBuffer.position(currentPos);AppendMessageResult result;// ...// 4. 通过回调函数将消息内容序列化到 ByteBuffer 中result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);// ...// 5. 原子地更新 wrotePositionWROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;}// 文件已满,返回错误log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}protected ByteBuffer appendMessageBuffer() {this.mappedByteBufferAccessCountSinceLastSwap++;// 如果 writeBuffer 不为 null (即启用了 TransientStorePool),则返回它,否则返回 mappedByteBufferreturn writeBuffer != null ? writeBuffer : this.mappedByteBuffer;}
// ... existing code ...
写入流程清晰:
- 获取当前可写位置
wrotePosition
。 - 检查文件空间是否足够。
- 调用
appendMessageBuffer()
获取正确的缓冲区。 - 使用
AppendMessageCallback
回调,将消息序列化到缓冲区。这种设计将文件操作与消息格式解耦。 - 原子更新
wrotePosition
。
数据持久化 (commit
和 flush
)
commit(int commitLeastPages)
:- 如果未使用
transientStorePool
(writeBuffer
为 null),则无需 commit,直接返回。 - 否则,它会调用
commit0()
,将writeBuffer
中从lastCommittedPosition
到wrotePosition
的数据写入fileChannel
。 - 最后更新
committedPosition
。
- 如果未使用
flush(int flushLeastPages)
:- 这是真正将数据持久化到磁盘的操作。
- 它首先检查是否满足刷盘条件(例如,脏数据页数是否达到
flushLeastPages
)。 - 调用
fileChannel.force(false)
或mappedByteBuffer.force()
,这是一个阻塞操作,会强制操作系统将 Page Cache 中的数据写入磁盘。 - 更新
flushedPosition
。
RocketMQ 的同步刷盘和异步刷盘策略,就是通过在不同时机、由不同线程调用 flush()
方法来实现的。
读取数据 (selectMappedBuffer
)
// ... existing code ...@Overridepublic SelectMappedBufferResult selectMappedBuffer(int pos) {// 1. 获取可读的最大位置int readPosition = getReadPosition();// 2. 检查请求的位置是否合法if (pos < readPosition && pos >= 0) {// 3. 增加引用计数,防止文件被清理if (this.hold()) {this.mappedByteBufferAccessCountSinceLastSwap++;// 4. 从 mappedByteBuffer 创建一个分片ByteBuffer byteBuffer = this.mappedByteBuffer.slice();byteBuffer.position(pos);int size = readPosition - pos;ByteBuffer byteBufferNew = byteBuffer.slice();byteBufferNew.limit(size);// 5. 包装成 SelectMappedBufferResult 返回,其中包含了对 MappedFile 的引用return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);}}return null;}
// ... existing code ...public int getReadPosition() {// 如果使用堆外内存池,只能读取已提交的数据;否则可以读取已写入的数据return transientStorePool == null || !transientStorePool.isRealCommit() ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this);}
// ... existing code ...
读取流程:
- 获取可读位置
readPosition
。注意getReadPosition()
的逻辑:如果启用了transientStorePool
,消费者只能读到已经commit
到fileChannel
的数据,保证了数据一致性。 - 校验请求位置
pos
的合法性。 - 调用
hold()
增加引用计数。这是一个非常重要的并发控制手段,确保在读取期间,该文件不会被其他线程销毁或清理。 - 创建
mappedByteBuffer
的一个只读分片(slice),并包装成SelectMappedBufferResult
返回。 - 调用者必须在使用完
SelectMappedBufferResult
后调用其release()
方法,这会相应地减少引用计数,使得文件最终可以被安全地回收。
资源清理 (shutdown
, cleanup
, destroy
)
DefaultMappedFile
继承自 AbstractMappedFile
,后者实现了 ReferenceResource
接口,使用引用计数法来管理资源生命周期。
shutdown()
: 将引用计数减到 0,并尝试调用cleanup()
。cleanup()
: 当引用计数为 0 且文件已关闭时执行。它会调用UtilAll.cleanBuffer(this.mappedByteBuffer)
来解除内存映射,并关闭fileChannel
。解除映射是释放虚拟内存的关键步骤。destroy()
: 调用shutdown()
后,再从文件系统上删除该文件。
总结
DefaultMappedFile
是 RocketMQ 高性能存储的核心实现。它通过内存映射文件(MMF)技术,将磁盘文件操作转换为了高效的内存操作。
其精髓在于:
- MMF 的高效利用:充分利用了操作系统的 Page Cache,减少了用户态/内核态切换和数据拷贝,实现了极高的IO吞吐量。
- 精细的状态追踪:通过
wrotePosition
、committedPosition
、flushedPosition
三个指针,精确地描述了数据在写入、提交、持久化三个阶段的状态。 - 极致的并发优化:使用
AtomicIntegerFieldUpdater
进行无锁状态更新,以及基于引用计数的资源管理,保证了高并发下的线程安全和高性能。 - TransientStorePool 优化:通过引入堆外内存作为一级缓存,将消息写入延迟与磁盘IO解耦,提供了稳定且低延迟的写入性能。
可以说,DefaultMappedFile
是一个将 Java NIO 功能、并发编程技巧和系统级性能优化思想完美结合的典范。
MappedFileQueue
MappedFileQueue
是 Apache RocketMQ 存储实现中的一个核心组件。它是一个“映射文件队列”。它的主要职责是管理一组大小固定的、顺序的 MappedFile
文件,并将这些物理上分离的文件逻辑上视为一个连续的、无限增长的队列。在 RocketMQ 中,CommitLog、ConsumeQueue 以及 TimerLog 都使用 MappedFileQueue
来管理其物理存储文件。
MappedFileQueue
通过几个关键属性来维护整个文件队列的状态:
// ... existing code ...
public class MappedFileQueue implements Swappable {private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);// 文件存储的根目录路径protected final String storePath;// 单个 MappedFile 的大小,例如 CommitLog 默认为 1GBprotected final int mappedFileSize;// 核心数据结构,存储所有 MappedFile 实例。// 使用 CopyOnWriteArrayList 保证了读写安全和高读性能。protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<>();// 预分配 MappedFile 的服务,用于提前创建下一个文件,减少写入时的延迟protected final AllocateMappedFileService allocateMappedFileService;// 已经刷写到磁盘的物理偏移量protected long flushedWhere = 0;// 已经提交到文件通道的物理偏移量(对于开启 transientStorePool 有效)protected long committedWhere = 0;// 存储中最新消息的时间戳protected volatile long storeTimestamp = 0;public MappedFileQueue(final String storePath, int mappedFileSize,AllocateMappedFileService allocateMappedFileService) {
// ... existing code ...
storePath
: 指定了这组文件存储在磁盘上的目录。mappedFileSize
: 定义了队列中每个文件的固定大小。当一个文件写满后,会自动创建下一个文件。mappedFiles
: 这是最核心的属性,它是一个CopyOnWriteArrayList
列表,按顺序存储了所有的MappedFile
对象。文件名通常是该文件起始的物理偏移量,例如00000000000000000000
,00000000001073741824
(如果文件大小为1GB)。- 为什么用
CopyOnWriteArrayList
? 因为对MappedFile
队列的访问是典型的“读多写少”场景。查找文件(读操作)非常频繁,而创建新文件或删除旧文件(写操作)相对较少。CopyOnWriteArrayList
提供了无锁的读操作,性能极高。写操作虽然会复制整个列表,但由于频率低,这个开销是可以接受的,并且保证了线程安全。
- 为什么用
allocateMappedFileService
: 这是一个可选的后台服务。当启用时,它会提前创建并预热下一个MappedFile
,这样当需要新文件时,可以直接从服务中获取,避免了创建文件和内存映射时的延迟,对写入性能有很大提升。flushedWhere
和committedWhere
: 这两个字段追踪数据持久化的进度。committedWhere
表示数据从堆外内存(transientStorePool
)提交到文件通道(FileChannel)的位置,flushedWhere
表示数据从文件通道真正刷写到磁盘的位置。
加载与恢复 (Loading & Recovery)
当 Broker 启动时,需要加载磁盘上已有的文件来恢复之前的状态。
load()
&doLoad(List<File> files)
:load()
方法首先列出storePath
目录下的所有文件。doLoad()
接收文件列表,并进行处理:- 排序: 按文件名(即起始偏移量)升序排序,确保文件队列的有序性。
- 校验: 检查文件大小是否等于
mappedFileSize
。如果大小不匹配,会加载失败,需要人工干预。 - 创建
MappedFile
: 为每个合法的文件创建一个DefaultMappedFile
实例,并将其添加到mappedFiles
列表中。 - 设置状态: 对于加载的每个文件,其写入、提交、刷盘位置都设置为文件大小,表示这些文件都是已经写满的旧文件。
// ... existing code ...public boolean doLoad(List<File> files) {// ascending orderfiles.sort(Comparator.comparing(File::getName));for (int i = 0; i < files.size(); i++) {
// ... existing code ...if (file.length() != this.mappedFileSize) {log.warn(file + "\t" + file.length()+ " length not matched message store config value, please check it manually");return false;}try {MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {
// ... existing code ...}}return true;}
// ... existing code ...
truncateDirtyFiles(long offset)
: 在异常停机后,最后一个文件可能只写了一部分(称为“脏数据”)。此方法用于将文件队列截断到给定的安全偏移量offset
。- 它会遍历
mappedFiles
,找到offset
所在的文件,并将该文件的写入位置(wrotePosition
)等重置为offset
在文件内的位置。 - 所有起始偏移量大于
offset
的文件都会被销毁和删除。
- 它会遍历
文件查找与创建
getLastMappedFile(...)
: 这是写入操作的入口。当需要写入数据时(例如CommitLog::putMessage
),会调用此方法获取当前可写的MappedFile
。- 如果
mappedFiles
为空,或者最后一个MappedFile
已写满,它会计算出新文件的起始偏移量。 - 如果需要创建 (
needCreate
为true
),它会调用tryCreateMappedFile
来创建新文件。
- 如果
// ... existing code ...public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset = -1;MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast == null) {createOffset = startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast != null && mappedFileLast.isFull()) {createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}if (createOffset != -1 && needCreate) {return tryCreateMappedFile(createOffset);}return mappedFileLast;}
// ... existing code ...
tryCreateMappedFile(long createOffset)
: 根据给定的起始偏移量创建新的MappedFile
。它会优先使用allocateMappedFileService
(如果可用)来获取预分配的文件,否则就直接new DefaultMappedFile()
。findMappedFileByOffset(final long offset, ...)
: 这是读取操作的核心。根据一个全局的物理偏移量offset
,快速定位到它所在的MappedFile
。- 它首先会进行范围检查,判断
offset
是否在队列的有效范围内。 - 然后通过数学计算
(offset / mappedFileSize) - (firstFileOffset / mappedFileSize)
来估算出目标文件在mappedFiles
列表中的索引,这大大提高了查找效率。 - 由于并发原因(例如文件刚被删除),直接用索引访问可能会失败,所以有一个 fallback 逻辑会遍历列表来查找。
- 它首先会进行范围检查,判断
文件生命周期管理 (删除)
磁盘空间是有限的,旧的文件需要被删除。
deleteExpiredFileByTime(...)
: 根据时间来删除过期文件。它会遍历文件列表,检查每个文件的最后修改时间,如果(当前时间 - 最后修改时间) > 过期时间
,则销毁并删除该文件。为了防止跳跃删除(比如中间某个文件由于某种原因没被删除),一旦遇到一个未过期的文件,就会停止向后检查。deleteExpiredFileByOffset(...)
: 根据偏移量来删除文件。这主要用于 ConsumeQueue。ConsumeQueue 的清理依赖于 CommitLog 的清理。当 CommitLog 中最早的消息被删除后,会计算出一个物理偏移量,所有 ConsumeQueue 中指向该偏移量之前的索引都可以被清除了。此方法会检查每个 ConsumeQueue 文件中记录的最大物理偏移量,如果这个最大偏移量小于给定的offset
,就说明整个文件都已过期,可以被删除。deleteExpiredFile(List<MappedFile> files)
: 这是一个内部辅助方法,负责将给定的MappedFile
列表从mappedFiles
中移除。
数据持久化 (刷盘)
MappedFileQueue
自身不直接写入数据,但它负责触发刷盘和提交操作。
flush(int flushLeastPages)
: 刷盘操作。- 根据
flushedWhere
找到需要刷盘的MappedFile
。 - 调用该
MappedFile
的flush()
方法,执行真正的刷盘逻辑。 - 更新
flushedWhere
到新的刷盘位置。
- 根据
commit(int commitLeastPages)
: 提交操作(当transientStorePoolEnable
为true
时)。- 根据
committedWhere
找到需要提交的MappedFile
。 - 调用该
MappedFile
的commit()
方法,将数据从堆外内存写入到 FileChannel。 - 更新
committedWhere
。
- 根据
总结
MappedFileQueue
是 RocketMQ 存储层一个设计精巧且至关重要的类。它成功地将多个离散的物理文件抽象成了一个单一、连续的逻辑队列,极大地简化了上层(如 CommitLog 和 ConsumeQueue)的实现逻辑。
其关键设计思想包括:
- 分片存储: 将无限增长的队列数据切分成固定大小的文件,便于管理和删除。
- 逻辑连续: 通过基于偏移量的文件名和有序列表,维护了文件的逻辑连续性。
- 高效查找: 利用偏移量和文件大小的数学关系,实现了
O(1)
复杂度的文件定位。 - 并发安全: 采用
CopyOnWriteArrayList
适应了“读多写少”的并发场景,保证了高性能和线程安全。 - 生命周期管理: 提供了灵活的按时间或按偏移量的文件删除机制,有效管理了磁盘空间。
- 性能优化: 通过
AllocateMappedFileService
预分配机制,消除了写入时创建文件所带来的性能抖动。