当前位置: 首页 > news >正文

广州专业网站制作平台企业网站开发费用

广州专业网站制作平台,企业网站开发费用,怎么做义工网站,功能网站开发引言 在分布式消息队列系统中,消息的高效检索是核心需求之一。RocketMQ 作为一款高性能的分布式消息中间件,其索引文件(IndexFile)机制为消息的快速定位提供了强大支持。本文将深入探讨 RocketMQ 中 IndexFile 的设计思路、数据结…

引言

在分布式消息队列系统中,消息的高效检索是核心需求之一。RocketMQ 作为一款高性能的分布式消息中间件,其索引文件(IndexFile)机制为消息的快速定位提供了强大支持。本文将深入探讨 RocketMQ 中 IndexFile 的设计思路、数据结构及核心源码实现。

一、IndexFile 概述

1.1 索引文件的作用

RocketMQ 的消息存储采用顺序写 CommitLog 的方式保证高性能,但这种方式在随机读场景下效率较低。IndexFile 作为一种辅助存储结构,旨在为消息提供基于 Key 的快速检索能力,弥补了顺序写在查询场景下的不足。

1.2 索引文件的核心功能

  • 基于 Key 的消息检索:支持通过业务 Key 快速定位消息

  • 时间范围查询:支持按时间范围检索消息

  • 哈希索引结构:采用哈希表实现高效的键值映射

  • 文件滚动机制:单个文件大小固定,支持滚动创建新文件

二、IndexFile 的数据结构设计

2.1 文件整体结构

IndexFile 采用固定大小设计,默认每个文件大小为 400MB,由三部分组成:

  1. 文件头(IndexHeader):40 字节,存储文件元信息

  2. 哈希槽(Hash Slots):默认 500 万个,每个槽占 4 字节

  3. 索引项(Index Items):默认 2000 万个,每个项占 20 字节

+-------------------+
|    IndexHeader    |  40字节
+-------------------+
|                   |
|    Hash Slots     |  500万*4字节
|                   |
+-------------------+
|                   |
|   Index Items     |  2000万*20字节
|                   |
+-------------------+

2.2 数据结构详解

2.2.1 IndexHeader

存储文件的关键元信息,包含:

  • beginTimestamp:文件中第一条消息的存储时间

  • endTimestamp:文件中最后一条消息的存储时间

  • beginPhyOffset:文件中第一条消息的物理偏移量

  • endPhyOffset:文件中最后一条消息的物理偏移量

  • hashSlotCount:已使用的哈希槽数量

  • indexCount:已存储的索引项数量

        //内存映射区域private final ByteBuffer byteBuffer;//起始时间戳private final AtomicLong beginTimestamp = new AtomicLong(0);//结束时间戳private final AtomicLong endTimestamp = new AtomicLong(0);//起始的物理偏移量private final AtomicLong beginPhyOffset = new AtomicLong(0);//结尾的物理偏移量private final AtomicLong endPhyOffset = new AtomicLong(0);//hash槽位的数量private final AtomicInteger hashSlotCount = new AtomicInteger(0);//index索引的数量private final AtomicInteger indexCount = new AtomicInteger(1);

    2.2.2 哈希槽(Hash Slot)

    采用链式哈希表结构解决冲突,每个槽存储的是链表头节点在 Index Items 中的位置索引。如果发生哈希冲突,则新的索引项会被添加到链表头部,形成 LIFO 结构。

     2.2.3 索引项(Index Item)

      每个索引项包含以下字段:

  • keyHash:Key 的哈希值

  • phyOffset:消息在 CommitLog 中的物理偏移量

  • timestamp:消息存储时间戳

  • nextIndex:指向下一个索引项的位置,形成链表结构

三、IndexFile 源码分析

3.1 核心属性

   /*** hash槽位数量*/private final int hashSlotNum;/*** 索引数量*/private final int indexNum;/*** 每一个索引文件都会有一个mappedFile*/private final MappedFile mappedFile;/*** 内存映射的区域*/private final MappedByteBuffer mappedByteBuffer;/*** 索引头*/private final IndexHeader indexHeader;

3.2 核心方法解析

3.2.1 构造函数

/*** 构造函数的初始化* @param fileName 文件的名称* @param hashSlotNum hash槽位的数量* @param indexNum 索引的数量* @param endPhyOffset 结尾的物理偏移量* @param endTimestamp 结尾的时间戳* @throws IOException*/public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,final long endPhyOffset, final long endTimestamp) throws IOException {//索引文件的总大小  40字节+hashSlotNum(5百万)*4字节+indexNum(5百万*4)*20字节int fileTotalSize =IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);//基于索引文件和索引文件的大小进行构建一个MappedFilethis.mappedFile = new MappedFile(fileName, fileTotalSize);//获取文件的内存映射区域this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();//哈希槽位的数量this.hashSlotNum = hashSlotNum;//index索引的数量this.indexNum = indexNum;//获取到内存映射区域的ByteBuffer byteBuffer = this.mappedByteBuffer.slice();//构建一个index头this.indexHeader = new IndexHeader(byteBuffer);//给索引头进行设置开始的物理偏移量和结束的物理偏移量if (endPhyOffset > 0) {this.indexHeader.setBeginPhyOffset(endPhyOffset);this.indexHeader.setEndPhyOffset(endPhyOffset);}//给索引头设置起始的时间戳和结束的时间戳if (endTimestamp > 0) {this.indexHeader.setBeginTimestamp(endTimestamp);this.indexHeader.setEndTimestamp(endTimestamp);}}

3.2.2 添加索引项

 /*** 写入索引*  rocketMq 是支持按照key进行搜索的,所以在写入索引的时候,写入的是(key的哈希值+物理偏移量+存储时间戳)*。 先根据key的hash值获取hash的槽位,根据hash槽位获取到索引的位置,根据索引的位置 获取到在commitLog中的物理偏移量* @param key 索引的key* @param phyOffset 物理偏移量* @param storeTimestamp 存储时间戳* @return*/public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {//判断写入的索引数量是否超过了 规定的索引数量(5百万*4) 此时就可以继续写入if (this.indexHeader.getIndexCount() < this.indexNum) {//获取到key的hash值int keyHash = indexKeyHashMethod(key);//key的hash值 针对hash槽位的数量进行取模int slotPos = keyHash % this.hashSlotNum;//获取到hash槽位的绝对位置int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//基于槽位的绝对位置 可以从内存映射区中读取槽位的值int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//槽位的值小于0 或者槽位的值大于index索引的数量if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {slotValue = invalidIndex;}//存储时间戳 - 开始时间戳 获得时间差long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();timeDiff = timeDiff / 1000;//如果开始时间戳小于等于0 时间差值 重置为0// 如果时间差值大于Integer.MAX_VALUE 重置为Integer.MAX_VALUE// 如果时间差值小于0 时间差值重置为0if (this.indexHeader.getBeginTimestamp() <= 0) {timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {timeDiff = 0;}//获取绝对的index的位置int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;//给绝对索引的位置设置key的hash值this.mappedByteBuffer.putInt(absIndexPos, keyHash);//给绝对索引位置+4 设置物理偏移量this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//给绝对索引位置+4+8 设置时间差值this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//给绝对索引位置+4+8+4 槽位的值this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//给绝对槽位的位置设置 当前是第几个index(索引)this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());//如果此时的index数量小于等于1 就设置开始的物理偏移量和开始时间戳if (this.indexHeader.getIndexCount() <= 1) {this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}//如果槽位的值等于invalidIndex 进行累加hash槽位的数量if (invalidIndex == slotValue) {this.indexHeader.incHashSlotCount();}//累加索引的数量this.indexHeader.incIndexCount();//设置结尾的物理偏移量和时间戳this.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);}} else {log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()+ "; index max num = " + this.indexNum);}return false;}

 索引写入流程:

  1. 消息写入 CommitLog 后,异步生成索引任务

  2. IndexService 处理索引任务,调用 IndexFile 的 putKey 方法

  3. 计算 Key 的哈希值,确定哈希槽位置

  4. 将索引项写入 Index Items,并更新哈希槽指向新索引项

  5. 更新文件头信息,记录最新的时间戳和偏移量

3.2.3 根据 Key 查询消息

 /*** 查询消息的物理偏移量* @param phyOffsets 存储的物理偏移量的list* @param key 消息的核心key* @param maxNum 最大查询的范围* @param begin 从什么位置开始查(时间戳)* @param end   查询到什么位置结束 (时间戳)*/public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,final long begin, final long end) {if (this.mappedFile.hold()) {//先获取到key的哈希值int keyHash = indexKeyHashMethod(key);//根据key的hash值获取到hash槽位的绝对位置int slotPos = keyHash % this.hashSlotNum;//定位到slot的绝对位置int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//获取slot的value值 获取这个slot最后存放的第几indexint slotValue = this.mappedByteBuffer.getInt(absSlotPos);//如果slot的value值小于等于0 或者大于index的数量 或者index数量小于等于1 属于异常情况 不处理if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()|| this.indexHeader.getIndexCount() <= 1) {} else {//正常情况下 进入for循环 从slot写入的最后一个index 开始遍历查询for (int nextIndexToRead = slotValue; ; ) {if (phyOffsets.size() >= maxNum) {break;}//定位这个index的绝对位置int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ nextIndexToRead * indexSize;//从这个index的绝对位置开始读取 先读取这个index对应的key的hash值int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);//再去读取出来这条消息的物理位置long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);//再读取这条消息对应的时间差值long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);//再读取这条消息对应的前一个index的索引值int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);if (timeDiff < 0) {break;}timeDiff *= 1000L;//根据起始时间和差值做一个换算 就可以算出你存储的时间戳long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;//如果说你存储的时间戳在指定的时间区间内boolean timeMatched = (timeRead >= begin) && (timeRead <= end);//如果key的hash值和这个index的key的hash值一致 且这个index的存储时间在指定的时间区间内if (keyHash == keyHashRead && timeMatched) {//把物理偏移量添加到list中phyOffsets.add(phyOffsetRead);}if (prevIndexRead <= invalidIndex|| prevIndexRead > this.indexHeader.getIndexCount()|| prevIndexRead == nextIndexToRead || timeRead < begin) {break;}//开始读取上一个nextIndexToRead = prevIndexRead;}}} catch (Exception e) {log.error("selectPhyOffset exception ", e);} finally {this.mappedFile.release();}}}

索引查询流程

  1. 根据 Key 计算哈希值,定位哈希槽

  2. 从哈希槽获取链表头节点位置

  3. 遍历链表,检查每个索引项的哈希值和时间范围

  4. 收集符合条件的物理偏移量

  5. 根据物理偏移量从 CommitLog 中读取实际消息

总结

IndexFile 作为 RocketMQ 的核心组件之一,通过精心设计的哈希索引结构,为消息提供了高效的基于 Key 的检索能力。其固定大小的文件设计、内存映射机制以及时间范围查询特性,使得 RocketMQ 在处理海量消息时依然能够保持出色的检索性能。理解 IndexFile 的设计与实现原理,对于深入掌握 RocketMQ 的工作机制以及在生产环境中进行性能调优都具有重要意义。

 

http://www.dtcms.com/a/433021.html

相关文章:

  • 南沙移动网站建设英文网站建设教程
  • 高端互联网网站玉环城乡建设规划局网站
  • 学会了vue 能搭建一个网站平台网站开发博客
  • 法律网站建设学习之家网站
  • 第1篇|IEC 61400-1 的地位与演变:从“设计圣经”看2019版的关键变化
  • vps运行iis网站 需要输入账号和密码加强网站建设的原因
  • 企业网站策划过程怎么建立微网站?
  • flash网站方案医院网站建设的规划
  • 收录网站源码江西威乐建设集团有限公司企业网站
  • 自己做的公司网站百度搜不到怎么注销自己名下的公司
  • 景安一个空间怎么做多个网站淄博桓台网站建设定制
  • 网站开发做什么科目做电商哪个平台比较好
  • 电子商务网站建设需求分析百度爱采购优化
  • 江西省城乡建设厅网站如何做网站的内链优化
  • mysql 注册网站网站运营有前途吗
  • 最简单的做网站的软件公司网站建设关键字描述
  • 长春火车站附近有什么好玩的地方网页设计入门基础教程
  • 网站的内容网络营销顾问是什么
  • 百度商桥怎么绑定网站平台推广方式
  • 威海市建设局官方网站江苏seo推广网站建设
  • 网站建设 别墅如何制作个人网页最简单的方法
  • 背景全屏网站fevr wordpress
  • 手机建站最好的网站芜湖市建设办网站
  • MAC系统安装python及anaconda相关问题记录
  • 儿童网站html模板wordpress 淘宝客模板
  • 广州网站设计成功柚米科技无锡网站制作公司哪家好
  • 网站建设季度考核评价工作总结中国石油大学网站建设
  • 东莞网站推广培训下载应用商店app并安装到手机上
  • 营销型网站推广方式的论文手机网站单页怎么做
  • 自己站网站公司名称可以和网站域名不同吗