深入拆解消息队列的存储
文章目录
- 一、消息队列元数据的存储
- 1.方案一:基于第三方组件
- 2.方案二:集群内部实现
- 二、消息数据的存储
- 1.数据存储结构设计
- 2.消息数据的分段实现
- (1)Kafka
- (2)RocketMQ
- (3)Pulsar
- (4)RabbitMQ
- (5)分段存储的本质
- (6)总结对比
- 3.消息数据存储格式
- (1)消息写入文件的格式
- (2)消息内容的格式
- ①Kafka 消息格式
- ②RocketMQ 消息格式
- ③Pulsar 消息格式
- ④RabbitMQ 消息格式
- ⑤消息格式对比总结
- 三、消息队列数据清理机制
- 方案一:ACK删除机制详解
- 特点分析
- 方案二:时间/大小删除机制详解
- 清理策略详解
- 特点分析
- 方案三:ACK+过期结合机制详解
- Group维度ACK机制
- 特点分析
- 延时删除机制详解
- 问题分析
- 延时删除实现机制
- 延时删除流程图
存储模块作为消息队列高吞吐、低延时、高可靠特性的基础保证,可以说是最核心的模块。
消息队列中的数据一般分为元数据和消息数据。元数据是指 Topic、Group、User、ACL、Config 等集群维度的资源数据信息,消息数据指客户端写入的用户的业务数据。
一、消息队列元数据的存储
-
元数据信息的特点:
- 数据量小:元数据相对于消息数据来说体积较小
- 读写频率低:不会频繁进行读写操作
- 强一致性要求:必须保证数据的强一致性和高可靠性
- 零容错:不允许出现数据丢失
- 全局通知:需要通知所有Broker节点执行相应操作
-
两种主要存储方案:
1.方案一:基于第三方组件
典型实现:
- Kafka + ZooKeeper:元数据存储在ZooKeeper中
- Pulsar + ZooKeeper:同样使用ZooKeeper作为元数据存储
- RocketMQ + NameServer:元数据存储在Broker+NameServer中
优点:
- ✅ 集成方便:可直接复用第三方组件的成熟功能
- ✅ 开发成本低:无需重新开发一致性存储机制
- ✅ 功能完善:复用已有的Hook机制、高性能读写等能力
缺点:
- ❌ 部署复杂:增加系统部署和运维复杂度
- ❌ 稳定性风险:第三方组件故障会影响整个系统
- ❌ 数据一致性:可能出现第三方组件与Broker间数据不一致
2.方案二:集群内部实现
典型实现:
- Kafka (无ZooKeeper版本):基于Raft协议实现内部元数据存储
- RabbitMQ:使用Mnesia数据库
- RedPanda:Kafka的C++版本,内置元数据服务
优点:
- ✅ 部署简单:无需额外部署第三方组件
- ✅ 稳定性高:不依赖外部服务,减少故障点
- ✅ 数据一致性:集群内部保证数据一致性
缺点:
- ❌ 开发成本高:需要实现复杂的一致性协议
- ❌ 前期投入大:需要大量开发人力和时间
- 总结:
当前主流选择:方案一(基于第三方组件)
- 主要考虑:开发成本和快速上线需求
- 适用场景:项目初期、资源有限的团队
长期最优选择:方案二(集群内部实现)
- 主要优势:运维成本低、稳定性高、无外部依赖
- 适用场景:成熟项目、有充足开发资源的团队
二、消息数据的存储
1.数据存储结构设计
底层数据存储结构当前有两种方案:
- 第一种方案,每个分区对应一个文件的形式去存储数据。
- 单个文件读和写都是顺序的,性能最高。
- 但是当文件很多且都有读写的场景下,硬盘层面就会退化为随机读写,性能会严重下降。
当文件很多且都有读写的场景下,每个文件的读写操作可能会分散在硬盘的不同位置,导致硬盘无法进行连续的顺序读写,而是需要频繁地移动磁头或访问不同的存储区域,从而退化为随机读写。这种随机读写会显著降低性能,因为硬盘在处理随机访问时需要更多的寻道时间和旋转延迟,而顺序读写则可以充分利用硬盘的高速缓存和数据预取机制,提高效率。
- 第二种方案,每个节点上所有分区的数据都存储在同一个文件中,这种方案需要为每个分区维护一个对应的索引文件,索引文件里会记录每条消息在 File 里面的位置信息。
- 因为只有一个文件,不存在文件过多的情况,写入层面一直都会是顺序的,性能一直很高。
- 但是在消费的时候,因为多个分区数据存储在同一个文件中,同一个分区的数据在底层存储上是不连续的,硬盘层面会出现随机读的情况,导致读取的性能降低。
不过随机读带来的性能问题,可以通过给底层配备高性能的硬件来缓解。所以当前比较多的消息队列选用的是第二种方案,但是 Kafka为了保证更高的吞吐性能,选用的是第一种方案。
关于 FD 的占用问题。Linux 上的 FD 数是可以配置的,比如配置几十万个 FD 没问题,所以我们一般不会用完系统的 FD 限制,这一点在实际的落地中不需要太担心。
但是不管是方案一还是方案二,在数据存储的过程中,如果单个文件过大,在文件加载、写入和检索的时候,性能就会有问题,并且消息队列有自动过期机制,如果单个文件过大,数据清理时会很麻烦,效率很低。所以,我们的消息数据都会分段存储。
2.消息数据的分段实现
(1)Kafka
- Kafka 的消息以 Topic 为单位,每个 Topic 可以分为多个 Partition(分区)。
- 每个 Partition 内部又被分为多个 Segment(段),每个 Segment 是一个实际的日志文件。
- Producer 发送消息时,消息会根据分区策略(如 key hash)被写入某个 Partition。
- Partition 内的 Segment 按顺序存储,Segment 达到一定大小或时间后会切换到新 Segment。
(2)RocketMQ
- RocketMQ 的消息以 Topic 为单位,每个 Topic 下有多个 MessageQueue(队列,类似分区)。
- 每个 MessageQueue 内部消息按顺序存储,底层以 CommitLog 文件分段(Segment)存储。
- CommitLog 文件达到设定大小后会切换到新文件,实现分段。
(3)Pulsar
- Pulsar 的消息以 Topic 为单位,每个 Topic 可以分为多个 Partition。
- 每个 Partition 内部由 BookKeeper 管理,BookKeeper 以 Ledger(账本)为单位分段存储消息。
- Ledger 达到一定大小或时间后会切换到新 Ledger。
(4)RabbitMQ
- RabbitMQ 的消息以 Queue(队列)为单位,通常不做分区。
- 队列内部消息顺序存储,底层存储可以采用环形缓冲区或磁盘分段(如持久化时)。
- 分段不是 RabbitMQ 的核心设计,但在持久化时会将消息写入多个磁盘文件(Segment)。
(5)分段存储的本质
在操作系统层面,分段存储(Segmented Storage)通常指的是将大文件拆分为多个较小的文件(Segment),每个 Segment 在磁盘上对应一个独立的文件。这样做有以下好处:
- 便于管理:小文件易于删除、备份、迁移。
- 高效查找:通过索引快速定位到某个 Segment。
- 顺序写入:顺序写磁盘效率高,减少磁盘碎片。
- 支持并发:多个 Segment 可并发读写。
文件系统的作用
- 文件映射:每个 Segment 通常对应一个磁盘文件(如
00000000000000000000.log
)。 - 顺序写入:消息写入时,操作系统通过缓冲区(Page Cache)将数据顺序写入文件,最终刷盘(fsync)保证持久化。
- 文件切换:当 Segment 文件达到设定大小或时间后,创建新文件,旧文件只读或归档。
以 Kafka 为例
- 创建新 Segment 文件
当当前 Segment 文件大小达到阈值(如1GB),Kafka 调用操作系统 API(如open
、write
)创建新文件。 - 顺序写入消息
Producer 发送的消息被写入当前 Segment 文件,操作系统负责将数据写入 Page Cache,定期刷盘。 - Segment 文件只读
Segment 文件写满后,变为只读,后续只做读取或删除操作。 - 文件删除
消息过期或被消费后,Kafka 调用unlink
删除 Segment 文件,操作系统回收空间。
(6)总结对比
消息队列 | 分段单位 | 分区/分片 | 分段文件/对象 |
---|---|---|---|
Kafka | Partition+Segment | 支持 | 日志文件(Segment) |
RocketMQ | MessageQueue+CommitLog | 支持 | CommitLog 文件 |
Pulsar | Partition+Ledger | 支持 | Ledger(账本) |
RabbitMQ | Queue+Segment | 不常用 | 持久化文件(Segment) |
3.消息数据存储格式
(1)消息写入文件的格式
消息写入文件的格式指消息是以什么格式写入到文件中的,比如 JSON 字符串或二进制。从性能和空间冗余的角度来看,消息队列中的数据基本都是以二进制的格式写入到文件的。
(2)消息内容的格式
①Kafka 消息格式
Kafka 消息以 Record Batch 为单位存储,每个 Batch 包含多条消息记录。
- Record 格式详解:
字段详细说明:
- baseOffset: 批次中第一条消息的偏移量
- batchLength: 整个批次的字节长度(不包括此字段本身)
- partitionLeaderEpoch: 分区Leader的纪元,用于检测数据一致性
- magic: 消息格式版本号,当前为2
- crc: CRC32校验和,从attributes字段开始计算
- attributes: 位标志,包含压缩类型、时间戳类型等信息
- producerId: 生产者的唯一标识,用于幂等性和事务
- producerEpoch: 生产者的纪元,防止僵尸生产者
- baseSequence: 批次中第一条消息的序列号
②RocketMQ 消息格式
- CommitLog 文件格式:
字段详细说明:
- TOTALSIZE: 整条消息的总字节数
- MAGICCODE: 固定魔数,用于验证消息格式
- BODYCRC: 消息体的CRC32校验值
- QUEUEID: 消息所属的队列ID
- FLAG: 消息标志位,包含压缩、事务等信息
- QUEUEOFFSET: 在MessageQueue中的逻辑偏移量
- PHYSICALOFFSET: 在CommitLog中的物理偏移量
- SYSFLAG: 系统标志,标识消息类型(普通、事务、定时等)
- BORNTIMESTAMP: 消息在生产者端产生的时间戳
- BORNHOST: 生产者的IP地址和端口
- STORETIMESTAMP: 消息在Broker端存储的时间戳
- RECONSUMETIMES: 消息重新消费的次数
③Pulsar 消息格式
- Message Metadata 格式:
- Batch Message 格式:
字段详细说明:
- producer_name: 生产者的唯一标识名称
- sequence_id: 生产者维护的消息序列号,用于去重
- publish_time: 消息发布到Broker的时间戳
- properties: 用户自定义的键值对属性
- partition_key: 用于分区路由的键值
- compression: 压缩算法类型(NONE、LZ4、ZLIB、ZSTD)
- num_messages_in_batch: 批次中包含的消息数量
- event_time: 消息的业务时间戳
- encryption_keys: 用于端到端加密的密钥信息
- schema_version: Schema演进的版本信息
④RabbitMQ 消息格式
- AMQP 0-9-1 消息格式:
- 持久化存储格式:
字段详细说明:
- content-type: 消息体的MIME类型
- content-encoding: 消息体的编码方式
- headers: 用户自定义的消息头部信息
- delivery-mode: 投递模式(1=非持久化,2=持久化)
- priority: 消息优先级(0-255)
- correlation-id: 用于关联请求和响应的标识符
- reply-to: 指定回复消息的队列名称
- expiration: 消息的TTL(生存时间)
- message-id: 消息的唯一标识符
- timestamp: 消息的时间戳
- type: 消息类型标识
- user-id: 发送消息的用户标识
- app-id: 发送消息的应用标识
⑤消息格式对比总结
特性 | Kafka | RocketMQ | Pulsar | RabbitMQ |
---|---|---|---|---|
批处理支持 | ✅ Record Batch | ❌ 单条消息 | ✅ Batch Message | ❌ 单条消息 |
压缩支持 | ✅ 批次级别 | ✅ 消息级别 | ✅ 消息级别 | ❌ 不支持 |
事务支持 | ✅ 生产者ID+纪元 | ✅ 事务偏移量 | ✅ 事务ID | ✅ 事务性队列 |
Schema演进 | ❌ 应用层 | ❌ 应用层 | ✅ 内置支持 | ❌ 应用层 |
加密支持 | ❌ 传输层 | ❌ 传输层 | ✅ 端到端加密 | ✅ TLS |
消息路由 | ✅ 分区键 | ✅ 队列选择 | ✅ 分区键 | ✅ 路由键 |
每种消息队列的格式设计都反映了其核心设计理念:
- Kafka: 高吞吐量,批处理优化
- RocketMQ: 功能丰富,支持事务和定时消息
- Pulsar: 云原生,支持多租户和Schema演进
- RabbitMQ: 标准AMQP协议,功能完整的消息代理
三、消息队列数据清理机制
- 数据保留周期概览:
说明:消息队列的数据保留周期根据业务需求而定。短期保留适用于实时性要求高的场景,常规保留是大多数业务的标准配置,长期保留仅在特殊合规或审计需求下使用。保留时间越长,存储成本越高,管理复杂度也随之增加。
- 三种主要清理机制:
说明:三种清理机制各有特点。ACK删除机制简单直接,适合任务队列场景(RabbitMQ);时间/大小删除机制支持数据重放,适合流处理场景(kafka、RocketMQ);ACK+过期结合机制兼顾两者优势,适合复杂的多租户场景(Pulsar)。选择哪种机制主要取决于业务对数据重放、存储成本、消费一致性的要求。
方案一:ACK删除机制详解
说明:ACK删除机制的核心是"消费即删除"。当消费者成功处理消息后,发送ACK确认,消息队列立即删除该消息。如果处理失败,消息会被转移到死信队列,避免阻塞后续消息的消费。这种机制保证了消息只被消费一次,但代价是无法进行消息重放。
特点分析
说明:ACK删除机制的优点在于逻辑简单、存储效率高、不会出现重复消费问题。但缺点也很明显:一旦消息被删除就无法恢复,存在数据丢失风险;需要额外的死信队列机制来处理异常情况;不适合需要多次处理同一消息的场景。
方案二:时间/大小删除机制详解
说明:时间/大小删除机制将消息的消费和删除解耦。消费者处理完消息后只提交偏移量,表示消费进度,消息本身不会立即删除。系统通过后台清理线程定期检查消息是否超过保留时间或存储大小限制,满足条件才进行删除。这种机制支持消息重放和多消费者组订阅。
清理策略详解
说明:清理策略通常有两个维度:时间和大小。时间策略根据消息的创建时间或最后访问时间来判断是否过期;大小策略根据分区或主题的总存储大小来决定是否需要清理老数据。异步清理线程定期执行,避免影响正常的消息生产和消费性能。实际应用中,两种策略通常会组合使用。
特点分析
说明:时间/大小删除机制的最大优势是灵活性强,支持消息重放和多消费者组场景,数据安全性高。但也带来了一些挑战:消费者需要自己管理偏移量,可能出现重复消费;需要更多的存储空间;消费进度的管理和监控相对复杂。
方案三:ACK+过期结合机制详解
说明:ACK+过期结合机制是前两种方案的融合。它保留了ACK的概念,但ACK是基于消费者组(Group)的。同一个消费者组内的消息被ACK后不会重复消费,但不同的消费者组可以独立消费同一条消息。消息的最终删除仍然基于时间或大小策略,而不是ACK状态。
Group维度ACK机制
说明:在Group维度ACK机制中,每个消费者组都有独立的消费状态。即使所有消费者组都已经ACK了某条消息,该消息仍然会保留,直到达到过期时间或大小限制才会被删除。这样既避免了同一消费者组内的重复消费,又支持了多消费者组的独立订阅。
特点分析
说明:ACK+过期结合机制集合了前两种方案的优点,但实现复杂度也最高。它需要维护每个消费者组的状态信息,存储开销也比较大。这种机制特别适合云原生和多租户场景,能够很好地平衡数据安全性、消费一致性和系统灵活性。
延时删除机制详解
问题分析
说明:在分段存储中,一个文件包含多条消息,这些消息的删除时机可能不同。如果每次ACK都立即删除对应的消息,就需要频繁地修改文件内容,进行随机读写操作,严重影响性能。延时删除机制通过批量操作和顺序删除来解决这个问题。
延时删除实现机制
说明:延时删除机制的核心是"标记删除"。当收到删除请求时,系统不会立即修改文件,而是在内存和Backlog文件中记录删除标记。在后续的消费过程中,系统会检查这些标记,跳过已标记删除的消息。只有当整个分段文件中的所有消息都可以删除时,才会物理删除整个文件。
延时删除流程图
说明:延时删除的完整流程包括:接收删除请求后先检查是否可以删除整个分段;如果不能,则进行标记删除;在消费时检查标记并跳过已删除的消息;定期检查分段状态,当整个分段都可删除时执行物理删除。这种机制大大减少了文件操作的频率,提高了系统性能。