当前位置: 首页 > news >正文

RocketMQ 索引文件(IndexFile)详解:结构、原理与源码剖析

引言

在分布式消息队列系统中,消息的高效检索是核心需求之一。RocketMQ 作为一款高性能的分布式消息中间件,其索引文件(IndexFile)机制为消息的快速定位提供了强大支持。本文将深入探讨 RocketMQ 中 IndexFile 的设计思路、数据结构及核心源码实现。

一、IndexFile 概述

1.1 索引文件的作用

RocketMQ 的消息存储采用顺序写 CommitLog 的方式保证高性能,但这种方式在随机读场景下效率较低。IndexFile 作为一种辅助存储结构,旨在为消息提供基于 Key 的快速检索能力,弥补了顺序写在查询场景下的不足。

1.2 索引文件的核心功能

  • 基于 Key 的消息检索:支持通过业务 Key 快速定位消息

  • 时间范围查询:支持按时间范围检索消息

  • 哈希索引结构:采用哈希表实现高效的键值映射

  • 文件滚动机制:单个文件大小固定,支持滚动创建新文件

二、IndexFile 的数据结构设计

2.1 文件整体结构

IndexFile 采用固定大小设计,默认每个文件大小为 400MB,由三部分组成:

  1. 文件头(IndexHeader):40 字节,存储文件元信息

  2. 哈希槽(Hash Slots):默认 500 万个,每个槽占 4 字节

  3. 索引项(Index Items):默认 2000 万个,每个项占 20 字节

+-------------------+
|    IndexHeader    |  40字节
+-------------------+
|                   |
|    Hash Slots     |  500万*4字节
|                   |
+-------------------+
|                   |
|   Index Items     |  2000万*20字节
|                   |
+-------------------+

2.2 数据结构详解

2.2.1 IndexHeader

存储文件的关键元信息,包含:

  • beginTimestamp:文件中第一条消息的存储时间

  • endTimestamp:文件中最后一条消息的存储时间

  • beginPhyOffset:文件中第一条消息的物理偏移量

  • endPhyOffset:文件中最后一条消息的物理偏移量

  • hashSlotCount:已使用的哈希槽数量

  • indexCount:已存储的索引项数量

        //内存映射区域private final ByteBuffer byteBuffer;//起始时间戳private final AtomicLong beginTimestamp = new AtomicLong(0);//结束时间戳private final AtomicLong endTimestamp = new AtomicLong(0);//起始的物理偏移量private final AtomicLong beginPhyOffset = new AtomicLong(0);//结尾的物理偏移量private final AtomicLong endPhyOffset = new AtomicLong(0);//hash槽位的数量private final AtomicInteger hashSlotCount = new AtomicInteger(0);//index索引的数量private final AtomicInteger indexCount = new AtomicInteger(1);

    2.2.2 哈希槽(Hash Slot)

    采用链式哈希表结构解决冲突,每个槽存储的是链表头节点在 Index Items 中的位置索引。如果发生哈希冲突,则新的索引项会被添加到链表头部,形成 LIFO 结构。

     2.2.3 索引项(Index Item)

      每个索引项包含以下字段:

  • keyHash:Key 的哈希值

  • phyOffset:消息在 CommitLog 中的物理偏移量

  • timestamp:消息存储时间戳

  • nextIndex:指向下一个索引项的位置,形成链表结构

三、IndexFile 源码分析

3.1 核心属性

   /*** hash槽位数量*/private final int hashSlotNum;/*** 索引数量*/private final int indexNum;/*** 每一个索引文件都会有一个mappedFile*/private final MappedFile mappedFile;/*** 内存映射的区域*/private final MappedByteBuffer mappedByteBuffer;/*** 索引头*/private final IndexHeader indexHeader;

3.2 核心方法解析

3.2.1 构造函数

/*** 构造函数的初始化* @param fileName 文件的名称* @param hashSlotNum hash槽位的数量* @param indexNum 索引的数量* @param endPhyOffset 结尾的物理偏移量* @param endTimestamp 结尾的时间戳* @throws IOException*/public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,final long endPhyOffset, final long endTimestamp) throws IOException {//索引文件的总大小  40字节+hashSlotNum(5百万)*4字节+indexNum(5百万*4)*20字节int fileTotalSize =IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);//基于索引文件和索引文件的大小进行构建一个MappedFilethis.mappedFile = new MappedFile(fileName, fileTotalSize);//获取文件的内存映射区域this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();//哈希槽位的数量this.hashSlotNum = hashSlotNum;//index索引的数量this.indexNum = indexNum;//获取到内存映射区域的ByteBuffer byteBuffer = this.mappedByteBuffer.slice();//构建一个index头this.indexHeader = new IndexHeader(byteBuffer);//给索引头进行设置开始的物理偏移量和结束的物理偏移量if (endPhyOffset > 0) {this.indexHeader.setBeginPhyOffset(endPhyOffset);this.indexHeader.setEndPhyOffset(endPhyOffset);}//给索引头设置起始的时间戳和结束的时间戳if (endTimestamp > 0) {this.indexHeader.setBeginTimestamp(endTimestamp);this.indexHeader.setEndTimestamp(endTimestamp);}}

3.2.2 添加索引项

 /*** 写入索引*  rocketMq 是支持按照key进行搜索的,所以在写入索引的时候,写入的是(key的哈希值+物理偏移量+存储时间戳)*。 先根据key的hash值获取hash的槽位,根据hash槽位获取到索引的位置,根据索引的位置 获取到在commitLog中的物理偏移量* @param key 索引的key* @param phyOffset 物理偏移量* @param storeTimestamp 存储时间戳* @return*/public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {//判断写入的索引数量是否超过了 规定的索引数量(5百万*4) 此时就可以继续写入if (this.indexHeader.getIndexCount() < this.indexNum) {//获取到key的hash值int keyHash = indexKeyHashMethod(key);//key的hash值 针对hash槽位的数量进行取模int slotPos = keyHash % this.hashSlotNum;//获取到hash槽位的绝对位置int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//基于槽位的绝对位置 可以从内存映射区中读取槽位的值int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//槽位的值小于0 或者槽位的值大于index索引的数量if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {slotValue = invalidIndex;}//存储时间戳 - 开始时间戳 获得时间差long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();timeDiff = timeDiff / 1000;//如果开始时间戳小于等于0 时间差值 重置为0// 如果时间差值大于Integer.MAX_VALUE 重置为Integer.MAX_VALUE// 如果时间差值小于0 时间差值重置为0if (this.indexHeader.getBeginTimestamp() <= 0) {timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {timeDiff = 0;}//获取绝对的index的位置int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;//给绝对索引的位置设置key的hash值this.mappedByteBuffer.putInt(absIndexPos, keyHash);//给绝对索引位置+4 设置物理偏移量this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//给绝对索引位置+4+8 设置时间差值this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//给绝对索引位置+4+8+4 槽位的值this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//给绝对槽位的位置设置 当前是第几个index(索引)this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());//如果此时的index数量小于等于1 就设置开始的物理偏移量和开始时间戳if (this.indexHeader.getIndexCount() <= 1) {this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}//如果槽位的值等于invalidIndex 进行累加hash槽位的数量if (invalidIndex == slotValue) {this.indexHeader.incHashSlotCount();}//累加索引的数量this.indexHeader.incIndexCount();//设置结尾的物理偏移量和时间戳this.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);}} else {log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()+ "; index max num = " + this.indexNum);}return false;}

 索引写入流程:

  1. 消息写入 CommitLog 后,异步生成索引任务

  2. IndexService 处理索引任务,调用 IndexFile 的 putKey 方法

  3. 计算 Key 的哈希值,确定哈希槽位置

  4. 将索引项写入 Index Items,并更新哈希槽指向新索引项

  5. 更新文件头信息,记录最新的时间戳和偏移量

3.2.3 根据 Key 查询消息

 /*** 查询消息的物理偏移量* @param phyOffsets 存储的物理偏移量的list* @param key 消息的核心key* @param maxNum 最大查询的范围* @param begin 从什么位置开始查(时间戳)* @param end   查询到什么位置结束 (时间戳)*/public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,final long begin, final long end) {if (this.mappedFile.hold()) {//先获取到key的哈希值int keyHash = indexKeyHashMethod(key);//根据key的hash值获取到hash槽位的绝对位置int slotPos = keyHash % this.hashSlotNum;//定位到slot的绝对位置int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//获取slot的value值 获取这个slot最后存放的第几indexint slotValue = this.mappedByteBuffer.getInt(absSlotPos);//如果slot的value值小于等于0 或者大于index的数量 或者index数量小于等于1 属于异常情况 不处理if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()|| this.indexHeader.getIndexCount() <= 1) {} else {//正常情况下 进入for循环 从slot写入的最后一个index 开始遍历查询for (int nextIndexToRead = slotValue; ; ) {if (phyOffsets.size() >= maxNum) {break;}//定位这个index的绝对位置int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ nextIndexToRead * indexSize;//从这个index的绝对位置开始读取 先读取这个index对应的key的hash值int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);//再去读取出来这条消息的物理位置long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);//再读取这条消息对应的时间差值long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);//再读取这条消息对应的前一个index的索引值int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);if (timeDiff < 0) {break;}timeDiff *= 1000L;//根据起始时间和差值做一个换算 就可以算出你存储的时间戳long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;//如果说你存储的时间戳在指定的时间区间内boolean timeMatched = (timeRead >= begin) && (timeRead <= end);//如果key的hash值和这个index的key的hash值一致 且这个index的存储时间在指定的时间区间内if (keyHash == keyHashRead && timeMatched) {//把物理偏移量添加到list中phyOffsets.add(phyOffsetRead);}if (prevIndexRead <= invalidIndex|| prevIndexRead > this.indexHeader.getIndexCount()|| prevIndexRead == nextIndexToRead || timeRead < begin) {break;}//开始读取上一个nextIndexToRead = prevIndexRead;}}} catch (Exception e) {log.error("selectPhyOffset exception ", e);} finally {this.mappedFile.release();}}}

索引查询流程

  1. 根据 Key 计算哈希值,定位哈希槽

  2. 从哈希槽获取链表头节点位置

  3. 遍历链表,检查每个索引项的哈希值和时间范围

  4. 收集符合条件的物理偏移量

  5. 根据物理偏移量从 CommitLog 中读取实际消息

总结

IndexFile 作为 RocketMQ 的核心组件之一,通过精心设计的哈希索引结构,为消息提供了高效的基于 Key 的检索能力。其固定大小的文件设计、内存映射机制以及时间范围查询特性,使得 RocketMQ 在处理海量消息时依然能够保持出色的检索性能。理解 IndexFile 的设计与实现原理,对于深入掌握 RocketMQ 的工作机制以及在生产环境中进行性能调优都具有重要意义。

 

相关文章:

  • 用 Python 实现了哪些办公自动化
  • 力扣第157场双周赛
  • 湖北理元理律师事务所债务优化方案:让还款与生活平衡的艺术
  • 基于PyTorch的残差网络图像分类实现指南
  • SGMD辛几何模态分解
  • 【MATLAB代码】主动声纳多路径目标测距与定位,测距使用互相关,频率、采样率可调、声速可调,定位使用三边法|订阅专栏后可直接查看源代码
  • 第一章 半导体基础知识
  • 华为OD机试真题——出租车计费/靠谱的车 (2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • 网络安全--PHP第二天
  • 华为OD机试真题——启动多任务排序(2025B卷:200分)Java/python/JavaScript/C/C++/GO最佳实现
  • Qt for Android 安卓低功耗蓝牙(BLE)开发环境搭建
  • JavaWeb:SpringBoot工作原理详解
  • 【第五篇】 SpringBoot中的高级配置
  • 可编程幻彩LED灯条的设计
  • 3d世界坐标系转屏幕坐标系
  • 详解3DGS
  • JVM相关
  • 什么是智能体agent?
  • linux 新增驱动宏config.in配置
  • Python之Pandas
  • 母婴网站 模板/网站关键词seo排名
  • 公司官网的作用/潍坊百度快速排名优化
  • 58同城做网站/网络营销策划书2000字
  • 教你做文案的网站推荐/百度云盘官网登录入口
  • 网站建设佰首选金手指四/关键字优化用什么系统
  • 网站建设在什么税控盘/百度模拟点击