当前位置: 首页 > news >正文

RocketMQ 消息存储机制 CommitLog和ConsumerQu

Rocketmq在消息存储方面做了很多的优化,那也是为了提升rocketmq的性能。首先要知道消息不会保存在内存里面,如果生产者发送的消息是保存在内存里面,那很有可能消息会丢失。

所以Rocketmq和kafka一样会把消息保存在磁盘文件当中,既然保存在文件当中就要考虑对文件读写性能的问题。

生产者去发送消息,生产者发送的消息一定是保存在commitlog这个文件里面。

之前不是说的生产者发送的消息保存在某个主题,某个队列里面?实际上这只是映射成逻辑的关系。

所有的消息都是在一个文件里面,commitlog里面,当然如果这个文件存储不下,还会有另外一个commitlog文件。

所以生产者所有的消息都会保存在这个文件里面。保存在这个文件里面对于消费者来说,它要去消费某个主题。生产者可能会往t1这个主题去发,t2这个主题也会去发,也可能会往t3主题去发。在这个文件当中会有很多主题的消息,对于消费者去这个文件检索t1主题的消息其实是性能比较差的。

Rocketmq会去做这么一件事情,它会去创建逻辑上的一些划分,比如生产者负载均衡的往队列上面去发。但是保存依赖是保存在commitlog里面,这个时候为了提高消费者的性能,这个时候就会有逻辑上的队列叫做consumerquenue0,那这里面保存了什么?

ConsumeQueue 0 1 2 3

消息消费队列,引⼊的⽬的主要是提⾼消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进⾏的,如果要遍历commitlog⽂件中根据 topic检索消息是⾮常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。

其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息⼤⼩size和消息Tag的HashCode值。consumequeue⽂件可以看成是基于topiccommitlog索引⽂,故consumequeue⽂件夹的组织⽅式如下:

topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue⽂件采取定⻓设计,每⼀个条⽬共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息⻓度、8字节tag hashcode,单个⽂件由30W个 条⽬组成,可以像数组⼀样随机访问每⼀个条⽬,每个ConsumeQueue⽂件⼤⼩约 5.72M;

也就是所有的数据都在commitlog里面,怎么知道topic1 里面的messagequenue0的数据是哪些?因为所有的数据都在commitlog里面。所以consumer quenue0告诉你了topic1里面messagequenue0里面的数据在commitlog当中起始的偏移量是多少。

在消费者进行消费的时候,每次消费完它也上报消费者的偏移量,也就是哪些数据 topic1里面messagequenue0里面哪些数据已经被消费者消费了。也需要把这个告知Rocketmq。

因为消费者挂掉,那么同组的消费者可以接着上个消费者的位置继续消费。所以还会保存consumer的offset。

所有的消息都在commitlog里面,我们知道这些消息在topic里面的某个队列里面。虽然保存在commitlog里面,但是实际上会进行逻辑的划分。划分在topic里面某个message quenue里面。

具体的offset,在哪个位置,保存在对应的consumerquenue0 1 2里面,这样消费者在消费的时候效率明显得到了提升。

CommitLog
消息主体以及元数据的存储主体,存储Producer端写⼊的消息主体内容,消息内容不是定⻓的。单个⽂件⼤⼩默认1G ,⽂件名⻓度为20位,左边补零,剩余为起始偏 移量,⽐如00000000000000000000代表了第⼀个⽂件,起始偏移量为0,⽂件⼤⼩
1G=1073741824;当第⼀个⽂件写满了,第⼆个⽂件为00000000001073741824, 起始偏移量为1073741824,以此类推。消息主要是顺序写⼊⽇志⽂件,当⽂件满 了,写⼊下⼀个⽂件;

在broker里面配置了commitlog位置和consumerquenue位置

#commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/broker-a-master/store/commitlog
#消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/broker-a-master/store/consumequeue[root@localhost commitlog]# pwd
/usr/local/rocketmq/broker-a-master/store/commitlog
[root@localhost commitlog]# ls
00000000000000000000[root@localhost commitlog]# ll
total 860
-rw-r--r--. 1 root root 1073741824 Aug  9 09:46 00000000000000000000

所有的消息都会保存在000000这个文件里面,这个文件大小是1G,当这个文件保存的数据超过了这个文件大小,那么就会创建出新的文件来。

consumer quenue里面有我们创建的topic,

[root@localhost ~]# ls  /usr/local/rocketmq/broker-a-master/store/consumequeue
TopicTest

可以看到这个主题对应的了4个队列

[root@localhost consumequeue]# cd TopicTest/
[root@localhost TopicTest]# ls
0  1  2  3

0里面也放了一些文件,这个文件其实就是consumerquenue0,

[root@localhost TopicTest]# ll 0
total 24
-rw-r--r--. 1 root root 6000000 Aug  9 09:46 00000000000000000000
[root@localhost TopicTest]# ll 1
total 24
-rw-r--r--. 1 root root 6000000 Aug  9 09:46 00000000000000000000

其实就是这个文件,这个文件里面放了相应的偏移量,这里只会保存具体偏移量的数据。

在发送消息的时候要往topic1上的messagequenue0上面发,但是都保存在这个commitlog里面。消息的逻辑位置在具体的物理上的位置在哪?那么就在consumerquenue0的文件里面,

http://www.dtcms.com/a/330306.html

相关文章:

  • 第八课:python的运算符
  • 从 VLA 到 VLM:低延迟RTSP|RTMP视频链路在多模态AI中的核心角色与工程实现
  • 论文分享 | Flashboom:一种声东击西攻击手段以致盲基于大语言模型的代码审计
  • 04-spring-手写spring-demo-aop0V1
  • Canal解析MySQL Binlog原理与应用
  • Unity、C#常用的时间处理类
  • Laravel 使用ssh链接远程数据库
  • 使用 Simple Floating Menu 插件轻松实现浮动联系表单
  • AI一周事件(2025年8月6日-8月12日)
  • [ Mybatis 多表关联查询 ] resultMap
  • ResourcelessTransactionManager的作用
  • 第三天-如何在DBC中描述CAN Signal的“负数/值”
  • JetPack系列教程(六):Paging——让分页加载不再“秃”然
  • 理财学习资料推荐
  • 谈一些iOS组件化相关的东西
  • C# 多线程:并发编程的原理与实践
  • C++中的STL标准模板库和string
  • Heterophily-aware Representation Learning on Heterogeneous Graphs
  • AI - 工具调用
  • AI智能体记忆策略
  • 10 ABP 模块系统
  • [转]SURREAL数据集国内下载链接
  • Deep Agents:用于复杂任务自动化的 AI 代理框架
  • nm命令和nm -D命令参数
  • 19. 重载的方法能否根据返回值类型进行区分
  • Java之String类
  • 3.Cursor提效应用场景实战
  • UEdior富文本编辑器接入AI
  • 算法篇----分治(归并排序)
  • 云电竞盒子对游戏性能有影响吗?