RocketMQ 索引文件(IndexFile)详解:结构、原理与源码剖析
引言
在分布式消息队列系统中,消息的高效检索是核心需求之一。RocketMQ 作为一款高性能的分布式消息中间件,其索引文件(IndexFile)机制为消息的快速定位提供了强大支持。本文将深入探讨 RocketMQ 中 IndexFile 的设计思路、数据结构及核心源码实现。
一、IndexFile 概述
1.1 索引文件的作用
RocketMQ 的消息存储采用顺序写 CommitLog 的方式保证高性能,但这种方式在随机读场景下效率较低。IndexFile 作为一种辅助存储结构,旨在为消息提供基于 Key 的快速检索能力,弥补了顺序写在查询场景下的不足。
1.2 索引文件的核心功能
-
基于 Key 的消息检索:支持通过业务 Key 快速定位消息
-
时间范围查询:支持按时间范围检索消息
-
哈希索引结构:采用哈希表实现高效的键值映射
-
文件滚动机制:单个文件大小固定,支持滚动创建新文件
二、IndexFile 的数据结构设计
2.1 文件整体结构
IndexFile 采用固定大小设计,默认每个文件大小为 400MB,由三部分组成:
-
文件头(IndexHeader):40 字节,存储文件元信息
-
哈希槽(Hash Slots):默认 500 万个,每个槽占 4 字节
-
索引项(Index Items):默认 2000 万个,每个项占 20 字节
+-------------------+
| IndexHeader | 40字节
+-------------------+
| |
| Hash Slots | 500万*4字节
| |
+-------------------+
| |
| Index Items | 2000万*20字节
| |
+-------------------+
2.2 数据结构详解
2.2.1 IndexHeader
存储文件的关键元信息,包含:
-
beginTimestamp:文件中第一条消息的存储时间
-
endTimestamp:文件中最后一条消息的存储时间
-
beginPhyOffset:文件中第一条消息的物理偏移量
-
endPhyOffset:文件中最后一条消息的物理偏移量
-
hashSlotCount:已使用的哈希槽数量
-
indexCount:已存储的索引项数量
//内存映射区域private final ByteBuffer byteBuffer;//起始时间戳private final AtomicLong beginTimestamp = new AtomicLong(0);//结束时间戳private final AtomicLong endTimestamp = new AtomicLong(0);//起始的物理偏移量private final AtomicLong beginPhyOffset = new AtomicLong(0);//结尾的物理偏移量private final AtomicLong endPhyOffset = new AtomicLong(0);//hash槽位的数量private final AtomicInteger hashSlotCount = new AtomicInteger(0);//index索引的数量private final AtomicInteger indexCount = new AtomicInteger(1);
2.2.2 哈希槽(Hash Slot)
采用链式哈希表结构解决冲突,每个槽存储的是链表头节点在 Index Items 中的位置索引。如果发生哈希冲突,则新的索引项会被添加到链表头部,形成 LIFO 结构。
2.2.3 索引项(Index Item)
每个索引项包含以下字段:
-
keyHash:Key 的哈希值
-
phyOffset:消息在 CommitLog 中的物理偏移量
-
timestamp:消息存储时间戳
-
nextIndex:指向下一个索引项的位置,形成链表结构
三、IndexFile 源码分析
3.1 核心属性
/*** hash槽位数量*/private final int hashSlotNum;/*** 索引数量*/private final int indexNum;/*** 每一个索引文件都会有一个mappedFile*/private final MappedFile mappedFile;/*** 内存映射的区域*/private final MappedByteBuffer mappedByteBuffer;/*** 索引头*/private final IndexHeader indexHeader;
3.2 核心方法解析
3.2.1 构造函数
/*** 构造函数的初始化* @param fileName 文件的名称* @param hashSlotNum hash槽位的数量* @param indexNum 索引的数量* @param endPhyOffset 结尾的物理偏移量* @param endTimestamp 结尾的时间戳* @throws IOException*/public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,final long endPhyOffset, final long endTimestamp) throws IOException {//索引文件的总大小 40字节+hashSlotNum(5百万)*4字节+indexNum(5百万*4)*20字节int fileTotalSize =IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);//基于索引文件和索引文件的大小进行构建一个MappedFilethis.mappedFile = new MappedFile(fileName, fileTotalSize);//获取文件的内存映射区域this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();//哈希槽位的数量this.hashSlotNum = hashSlotNum;//index索引的数量this.indexNum = indexNum;//获取到内存映射区域的ByteBuffer byteBuffer = this.mappedByteBuffer.slice();//构建一个index头this.indexHeader = new IndexHeader(byteBuffer);//给索引头进行设置开始的物理偏移量和结束的物理偏移量if (endPhyOffset > 0) {this.indexHeader.setBeginPhyOffset(endPhyOffset);this.indexHeader.setEndPhyOffset(endPhyOffset);}//给索引头设置起始的时间戳和结束的时间戳if (endTimestamp > 0) {this.indexHeader.setBeginTimestamp(endTimestamp);this.indexHeader.setEndTimestamp(endTimestamp);}}
3.2.2 添加索引项
/*** 写入索引* rocketMq 是支持按照key进行搜索的,所以在写入索引的时候,写入的是(key的哈希值+物理偏移量+存储时间戳)*。 先根据key的hash值获取hash的槽位,根据hash槽位获取到索引的位置,根据索引的位置 获取到在commitLog中的物理偏移量* @param key 索引的key* @param phyOffset 物理偏移量* @param storeTimestamp 存储时间戳* @return*/public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {//判断写入的索引数量是否超过了 规定的索引数量(5百万*4) 此时就可以继续写入if (this.indexHeader.getIndexCount() < this.indexNum) {//获取到key的hash值int keyHash = indexKeyHashMethod(key);//key的hash值 针对hash槽位的数量进行取模int slotPos = keyHash % this.hashSlotNum;//获取到hash槽位的绝对位置int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//基于槽位的绝对位置 可以从内存映射区中读取槽位的值int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//槽位的值小于0 或者槽位的值大于index索引的数量if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {slotValue = invalidIndex;}//存储时间戳 - 开始时间戳 获得时间差long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();timeDiff = timeDiff / 1000;//如果开始时间戳小于等于0 时间差值 重置为0// 如果时间差值大于Integer.MAX_VALUE 重置为Integer.MAX_VALUE// 如果时间差值小于0 时间差值重置为0if (this.indexHeader.getBeginTimestamp() <= 0) {timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {timeDiff = 0;}//获取绝对的index的位置int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;//给绝对索引的位置设置key的hash值this.mappedByteBuffer.putInt(absIndexPos, keyHash);//给绝对索引位置+4 设置物理偏移量this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//给绝对索引位置+4+8 设置时间差值this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//给绝对索引位置+4+8+4 槽位的值this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//给绝对槽位的位置设置 当前是第几个index(索引)this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());//如果此时的index数量小于等于1 就设置开始的物理偏移量和开始时间戳if (this.indexHeader.getIndexCount() <= 1) {this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}//如果槽位的值等于invalidIndex 进行累加hash槽位的数量if (invalidIndex == slotValue) {this.indexHeader.incHashSlotCount();}//累加索引的数量this.indexHeader.incIndexCount();//设置结尾的物理偏移量和时间戳this.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);}} else {log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()+ "; index max num = " + this.indexNum);}return false;}
索引写入流程:
-
消息写入 CommitLog 后,异步生成索引任务
-
IndexService 处理索引任务,调用 IndexFile 的 putKey 方法
-
计算 Key 的哈希值,确定哈希槽位置
-
将索引项写入 Index Items,并更新哈希槽指向新索引项
-
更新文件头信息,记录最新的时间戳和偏移量
3.2.3 根据 Key 查询消息
/*** 查询消息的物理偏移量* @param phyOffsets 存储的物理偏移量的list* @param key 消息的核心key* @param maxNum 最大查询的范围* @param begin 从什么位置开始查(时间戳)* @param end 查询到什么位置结束 (时间戳)*/public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,final long begin, final long end) {if (this.mappedFile.hold()) {//先获取到key的哈希值int keyHash = indexKeyHashMethod(key);//根据key的hash值获取到hash槽位的绝对位置int slotPos = keyHash % this.hashSlotNum;//定位到slot的绝对位置int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//获取slot的value值 获取这个slot最后存放的第几indexint slotValue = this.mappedByteBuffer.getInt(absSlotPos);//如果slot的value值小于等于0 或者大于index的数量 或者index数量小于等于1 属于异常情况 不处理if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()|| this.indexHeader.getIndexCount() <= 1) {} else {//正常情况下 进入for循环 从slot写入的最后一个index 开始遍历查询for (int nextIndexToRead = slotValue; ; ) {if (phyOffsets.size() >= maxNum) {break;}//定位这个index的绝对位置int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ nextIndexToRead * indexSize;//从这个index的绝对位置开始读取 先读取这个index对应的key的hash值int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);//再去读取出来这条消息的物理位置long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);//再读取这条消息对应的时间差值long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);//再读取这条消息对应的前一个index的索引值int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);if (timeDiff < 0) {break;}timeDiff *= 1000L;//根据起始时间和差值做一个换算 就可以算出你存储的时间戳long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;//如果说你存储的时间戳在指定的时间区间内boolean timeMatched = (timeRead >= begin) && (timeRead <= end);//如果key的hash值和这个index的key的hash值一致 且这个index的存储时间在指定的时间区间内if (keyHash == keyHashRead && timeMatched) {//把物理偏移量添加到list中phyOffsets.add(phyOffsetRead);}if (prevIndexRead <= invalidIndex|| prevIndexRead > this.indexHeader.getIndexCount()|| prevIndexRead == nextIndexToRead || timeRead < begin) {break;}//开始读取上一个nextIndexToRead = prevIndexRead;}}} catch (Exception e) {log.error("selectPhyOffset exception ", e);} finally {this.mappedFile.release();}}}
索引查询流程
-
根据 Key 计算哈希值,定位哈希槽
-
从哈希槽获取链表头节点位置
-
遍历链表,检查每个索引项的哈希值和时间范围
-
收集符合条件的物理偏移量
-
根据物理偏移量从 CommitLog 中读取实际消息
总结
IndexFile 作为 RocketMQ 的核心组件之一,通过精心设计的哈希索引结构,为消息提供了高效的基于 Key 的检索能力。其固定大小的文件设计、内存映射机制以及时间范围查询特性,使得 RocketMQ 在处理海量消息时依然能够保持出色的检索性能。理解 IndexFile 的设计与实现原理,对于深入掌握 RocketMQ 的工作机制以及在生产环境中进行性能调优都具有重要意义。