当前位置: 首页 > news >正文

深入理解 RocketMQ:生产者详解

在生产者一章的基本概念包括消息,Tag,Keys,队列和生产者的介绍。

一、消息

  • topic:消息所属 topic 的名称
  • flagflag 字段最常见的用途是表示消息体的压缩类型
    • 第 0 位 (0x1): 用来标记消息体是否被压缩。如果这一位是 1,表示 body 字段是经过压缩的数据。
    • 压缩算法类型: 接下来的几位可以用来指明具体使用了哪种压缩算法。例如:
      • ZLIB 压缩: (flag & 0x1) == 1
      • LZ4 压缩: (flag & 0x2) == 2 (需要先检查第0位)
      • 等等…
  • body:表示消息的存储内容
  • properties:表示消息属性
    • KEYS: 代表这条消息的业务关键词
    • TAGS:消息标签,方便服务器过滤使用。目前只支持每个消息设置一个
    • DelayTimeLevel:消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费
    • WaitStoreMsgOK:表示消息是否在服务器落盘后才返回应答
  • transactionId:会在事务消息中使用。
public class Message implements Serializable {private static final long serialVersionUID = 8445773977080406428L;private String topic;private int flag;private Map<String, String> properties;private byte[] body;private String transactionId;
}
  • Tag: 不管是 RocketMQ 的 Tag 过滤还是延迟消息等都会利用 Properties 消息属性机制,这些特殊信息使用了系统保留的属性Key,设置自定义属性时需要避免和系统属性Key冲突。
  • Keys: 服务器会根据 keys 创建哈希索引,设置后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等。

二、Tag

Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。

  • Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
  • Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。

2.1. 什么时候该用 Topic,什么时候该用 Tag?

核心理念:Topic 是资源边界,Tag 是业务筛选

Topic(主题):在消息中间件中,Topic 是一个一等公民 (First-Class Citizen)。它代表了物理或逻辑上的资源边界。创建一个新的 Topic,通常意味着创建了一套新的队列、存储机制、甚至独立的消费资源池。它定义了“一类”消息,这类消息在本质上是不同的。

Tag(标签):Tag 是 Topic 下的二级分类机制,是一个轻量级的过滤条件。它用于筛选 Topic 内本质相同,但业务细分不同的消息。它不创建新的物理资源,只是在消费时提供一个筛选的钩子。

理解了这个核心差异,我们再来详细看这四个准则。

2.1.1. 消息类型是否一致 -> 技术实现机制的根本不同

这是最刚性的一条准则,必须使用不同的 Topic,没有任何商量余地。

  • 详细阐述
    • 普通消息:遵循标准的先进先出(FIFO)队列模型,投递和消费逻辑最简单。
    • 顺序消息:要求一组消息严格按照发送顺序投递到同一个队列,并且被同一个消费者锁定消费。这需要在生产者端进行特殊路由(使用MessageQueueSelector),在消费者端进行队列锁定。其底层的消费模型和并发控制与普通消息完全不同。
    • 定时/延时消息:消息发送到 Broker 后,不会立即进入目标 Topic 的队列,而是先被暂存到一个特殊的内部 Topic(例如 RocketMQ 的 SCHEDULE_TOPIC_XXXX)。由一个专门的调度服务在预定时间到达后,才将消息“恢复”并投递到原始 Topic。
    • 事务消息:涉及到两阶段提交(2PC)。消息先以“半消息(Half Message)”的形式发送,对消费者不可见。待本地事务执行成功后,再发送 Commit 确认,消息才对消费者可见。如果本地事务回滚,则发送 Rollback。这套机制也需要 Broker 提供特殊的存储和处理逻辑。
  • 结论:这四种消息类型在 Broker 端的处理链路、存储模型和消费机制是完全不同的。Broker 需要根据 Topic 来判断应该启动哪一套处理流程。因此,将它们混在一个 Topic 里,系统将无法工作。Topic 在这里是区分技术实现模型的唯一标识。

2.1.2. 业务是否相关联 -> 数据边界与领域驱动设计 (DDD)

这个准则关注的是业务内聚性

  • 详细阐述
    • “淘宝交易消息” vs. “京东物流消息” (使用不同 Topic)
      • 领域不同:这代表了两个完全独立的商业生态和业务领域(Bounded Context)。它们的数据模型、生命周期、业务规则、演进速度都毫无关联。
      • 治理与演进:如果将它们放在同一个 Topic(如 E-Commerce-Topic)下,任何一方的消息体结构变更都可能影响到另一方的消费者,造成解析错误。消费者代码需要写大量的 if-else 来判断消息来源,这是一种典型的“反模式”。
      • 关注点分离:创建独立的 Topic(Taobao_Trade_Topic, JD_Logistics_Topic)可以确保团队可以独立地开发、迭代和运维自己的系统,互不干扰。这就像微服务架构中,不同的服务有自己独立的数据库一样。
    • “天猫交易消息”中的电器、女装、化妆品 (使用 Tag)
      • 领域内聚:这些订单都属于“天猫交易”这个同一个业务领域。它们共享一套核心的业务流程(下单、支付、履约)、相似的数据结构(订单号、用户ID、金额、时间等),并且可能被同一个下游系统(如结算中心、积分服务)所处理。
      • 子集关系:这是一种典型的“全集与子集”关系。“天猫交易”是全集,“电器订单”是其中的一个子集。使用 Tag (Tag_Appliance, Tag_WomensWear) 是对此类子集进行分类和筛选的完美方式。
      • 消费灵活性:一个消费者可以只订阅“电器”和“化妆品” (Tag_Appliance || Tag_Cosmetics);而另一个数据分析服务则可以订阅所有 *,获取全量交易数据。Tag 提供了这种强大的消费灵活性。

2.1.3. 消息优先级是否一致 -> 服务水平协议 (SLA) 与资源隔离

这个准则关注的是服务质量(QoS)

  • 详细阐述
    • 问题根源:消息队列的单个队列是严格 FIFO 的。如果一个要求“1小时内必须处理”的高优先级消息,不幸地排在了一万个“24小时内处理即可”的低优先级消息之后,那么它的高优先级就形同虚设,其 SLA 必然无法满足。这被称为队头阻塞 (Head-of-Line Blocking)
    • Tag 无法解决:Tag 仅仅是消费端的过滤器,它不能改变消息在队列中的物理存储顺序。
    • Topic 作为解决方案:通过创建不同的 Topic(Logistics_Hema_Urgent_Topic, Logistics_Tmall_Normal_Topic),我们实际上是为不同优先级的消息创建了独立的物理通道和资源池
      • 物理隔离:高优先级消息拥有自己专属的队列,不会被低优先级消息阻塞。
      • 资源专用:我们可以为消费高优先级 Topic 的 Consumer Group 分配更多的机器、更高的 CPU/内存配额,确保它们能够被快速、优先地处理。这实现了真正的资源隔离和服务等级保障

2.1.4. 消息量级是否相当 -> 吞吐量与公平性(防“饿死”)

这个准则关注的是性能和系统公平性

  • 详细阐述
    • 问题根源:这与优先级问题类似,但更侧重于吞吐量差异。想象一条高速公路的一个车道(一个队列),如果一辆需要快速通过的“救护车”(低量级、高实时性消息),被前方一个由上万辆卡车组成的“超长运输车队”(超高量级的批处理消息)所堵塞,那么“救护车”将永远无法及时到达目的地。
    • “饿死”现象:当海量消息瞬间涌入一个 Topic 时,会占满 Broker 的队列和缓冲区。此时,即使有零星的、但很重要的低量级消息想进入,也可能因为资源被占满而处理延迟,甚至失败。消费端也可能因为被海量消息淹没,而来不及处理那些混在其中的重要消息。
    • Topic 作为解决方案:将它们拆分到不同的 Topic(Realtime_Critical_Event_Topic, Massive_Data_Sync_Topic),相当于为“救护车”和“运输车队”修建了专属车道。这确保了低量级、高时效性的消息拥有一个干净、快速的通道,避免了因资源争抢而被高量级消息“饿死”的风险。

三、Keys

Apache RocketMQ 每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题。 Broker 端会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

   // 订单IdString orderId = "20034568923546";message.setKeys(orderId);

当调用 producer.send(msg) 时,在消息被真正发送到网络之前,RocketMQ 的客户端(Producer)内部会自动为这条消息生成一个全局唯一的ID,并将其设置到消息的 properties 里的一个特殊键上,通常是 UNIQ_KEY。它的用途:

  • 消息实例的唯一指纹:它的首要职责是为每一次 send() 操作产生的消息体赋予一个在时空上都独一无二的“指纹”或“身份证号”。这个ID与业务内容完全无关。
  • 发送端可靠性诊断:在消息发送前,这个ID就已经生成。如果客户端日志中记录了这个ID,但服务端始终找不到,这有助于定位问题是在客户端、网络还是Broker入口处丢失。
  • 为Broker生成最终MSG ID提供素材:当消息到达Broker并被成功存储后,Broker会生成一个最终的、不可变的 Message ID (MSG ID)。这个最终ID通常由Broker的地址、消息在CommitLog中的物理偏移量构成。而生产者生成的UNIQ_KEY会作为消息的一个重要属性被存下来,成为整个消息链路追踪的一部分,尽管它不是最终的存储ID。
  • 应对客户端重试时的消息去重:在某些场景下(虽然RocketMQ的默认重试是在失败后选择另一Broker,但此ID为去重提供了基础),如果客户端因网络超时而重试发送,这两次发送的消息 keys 是相同的,但自动生成的 UNIQ_KEY 是不同的。这为在Broker端或客户端实现更精细的去重逻辑提供了可能性,以识别这是否是“同一业务内容的不同投递尝试”。

为什么这么设计?

  • 不能只用 keys:我们无法强制要求所有开发者都设置一个全局唯一的 keyskeys 的唯一性仅限于业务范畴,两个不同的业务完全可能产生相同的 keys(比如订单系统的ID和用户系统的ID恰好一样)。系统需要一个与业务无关的、可靠的唯一标识来作为消息的“指纹”。

  • 不能只用 UNIQ_KEY:这个ID是无业务含义的“天书”,你不可能让客服人员拿着一串如 C0A8033A591E18B4AAC2794D868C0000 的ID去和用户沟通。我们需要一个与业务实体(如订单号)直接关联的、可读的键来进行查询。

四、队列

在 Apache RocketMQ 的世界里,主题(Topic)队列(Queue) 是两个相辅相成、密不可分的核心概念。理解它们的区别与联系,是设计高并发、可水平扩展消息应用架构的基石。

4.1. Topic 与 Queue 的关系:逻辑与物理的映射

我们可以用一个形象的比喻来理解二者的关系:

  • Topic (主题):可以看作是一个逻辑上的消息分类或目的地,例如“订单消息主题”。所有关于订单的消息都应该发送到这个 Topic。它定义了消息“是什么”。
  • Queue (队列):则是 Topic 的物理分区实体。为了支持高并发和水平扩展,一个逻辑上的 Topic 被划分为一个或多个物理上的 Queue。它定义了消息“存储在哪里以及如何被并发处理”。

简而言之,Topic 是消息的逻辑归属,而 Queue 是承载消息、实现并发的物理单元。

4.2. 为什么需要队列?—— 并发与水平扩展的基石

将 Topic 划分为多个 Queue 的设计,是 RocketMQ 实现高性能和高扩展性的关键所在。它同时提升了消息发送和消费的并发度。

  1. 提升发送并发度:如果一个 Topic 只有一个队列,那么在同一时刻,所有生产者都必须竞争写入这一个队列,这会形成性能瓶颈。通过将 Topic 分为多个队列(甚至分布在不同的 Broker 机器上),多个生产者可以并行地向不同的队列发送消息,极大地提高了写入的吞吐量。
  2. 提升消费并发度:这是队列最重要的作用。一个消费者组(Consumer Group)可以启动多个消费者实例。这些实例会自动地、平均地分摊对 Topic下所有队列的消费。例如,一个有 8 个队列的 Topic,最多可以被一个消费组中的 8 个消费者实例并行处理,每个消费者负责一个队列。这使得消费能力可以通过增加消费者实例数量来轻松地水平扩展

4.3. 队列(Queue)的核心特性

  1. 消息的唯一归属:在标准发送场景下(不考虑因网络失败等导致的重试),一条消息只会被精确地写入其 Topic 下的一个队列中,不会在多个队列中重复存在。
  2. 先进先出(FIFO):在单个队列内部,消息的存储和消费严格遵循“先进先出”的顺序。这为实现“顺序消息”提供了基础保障。
  3. 位点(Offset):消息在队列中的坐标
    • 每一条存储在队列中的消息,都会被分配一个唯一的、连续递增的序号,这个序号就是它的位点(Offset)。它就像消息在队列这个“大数组”中的索引,从 0 开始。
    • 起始位点(MinOffset):代表队列中现存最老那条消息的位点。通常为 0,除非队列因存储策略清除了旧消息。
    • 最大位点(MaxOffset):代表即将要写入下一条消息的位置,因此,其数值也等于当前队列中消息的总条数。例如,一个队列有 3 条消息(Offset 分别为 0, 1, 2),那么它的 MaxOffset 就是 3
  4. 分布式部署:为了高可用和负载均衡,一个 Topic 的多个队列可以(也建议)分散部署在不同的 Broker 服务器上。即使某一台 Broker 宕机,其他 Broker 上的队列依然可以继续提供服务,保证了系统的健壮性。

总的来说,RocketMQ 通过将逻辑上的 Topic 划分为物理上的 Queue,实现了功能强大、灵活可扩展的架构。Topic 定义了业务边界,而 Queue 则为跨越多个 Broker 的并行处理提供了物理基础,最终使得系统能够从容应对高并发的生产和消费负载。

五、生产者

作为消息的发送方,生产者(Producer)在 Apache RocketMQ 中扮演着至关重要的角色。它不仅仅是数据的发送者,更是业务场景与消息模型的连接点。RocketMQ 提供了丰富的消息类型,以精准匹配不同的架构挑战。为特定场景选择正确的消息类型,是构建高效、可靠系统的第一步。

以下是四种核心消息类型及其最典型的应用场景:

5.1. 延时消息 (Delayed Message)

  • 核心场景:在未来的特定时间点,触发一个预设的业务操作。
  • 典型用例电商超时未支付,自动关闭订单。
    • 流程:当用户创建订单时,应用会立即向 Broker 发送一条延时消息,例如,设定延迟时间为 30 分钟。
    • 执行:30 分钟后,消息才会被投递给消费者。消费者收到消息后,会检查该订单的支付状态:
      • 若订单未支付,则执行“关闭订单”操作。
      • 若订单已支付,则忽略此消息,不作任何处理。
  • 设计思想:通过将“定时巡检”的压力从业务服务器转移到消息中间件,极大地简化了业务应用的开发逻辑,并提高了系统的可扩展性。

5.2. 顺序消息 (Ordered Message)

  • 核心场景:保证一组相互关联的消息,能够严格按照其发送顺序被消费者处理。
  • 典型用例完整的订单处理流程。
    • 流程:一个订单的生命周期包含多个状态:订单创建 -> 订单付款 -> 订单发货 -> 订单完成
    • 执行:为保证业务逻辑的正确性,这些状态变更消息必须按顺序处理。通过将同一订单ID的所有消息投递到同一个消息队列(Message Queue),RocketMQ 保证了这些消息能被消费者按序获取和处理,避免了状态错乱。
  • 设计思想:通过消息分区(Partitioning)机制,将需要保证顺序的“消息流”锁定在单一的消费上下文中,实现了“局部有序”,兼顾了顺序性与系统整体的并行处理能力。

5.3. 批量消息 (Batch Message)

  • 核心场景:在对实时性要求不高,但对吞吐量要求极高的场景下,显著提升发送性能。
  • 典型用例海量日志收集与处理。
    • 流程:日志采集端在短时间内会产生数百万条日志。如果每条日志都作为一条独立消息发送,会产生巨大的网络I/O和Broker请求开销。
    • 执行:通过将多条日志消息打包成一个批量消息,在一次网络请求中发送出去。这大大减少了网络往返次数和Broker的服务端开销,使吞吐量提升数倍甚至数十倍。
  • 设计思想:这是一种用延迟换吞吐的经典优化策略,适用于数据上报、流式数据处理等不追求单条消息低延迟的场景。

5.4. 事务消息 (Transactional Message)

  • 核心场景:确保本地业务操作(如数据库操作)与消息发送这两个分布式步骤之间,具备原子性(要么都成功,要么都失败)。
  • 典型用例银行跨行转账中的扣款与通知。
    • 流程:用户A向用户B转账。系统需要先在A的账户中执行扣款操作(本地DB事务),然后发送一条“已扣款”的消息通知下游服务进行后续处理。
    • 执行:使用事务消息,可以保证:如果扣款成功,那么“已扣款”消息一定能被成功发送;如果扣款失败,或者消息发送最终失败,那么整个扣款操作将会回滚,就好像什么都没发生过一样。
  • 设计思想:通过两阶段提交(2PC)的模式,实现了分布式事务的最终一致性,是构建可靠金融级别应用的关键技术。

5.5. 架构黄金法则:一种主题,一类消息

在生产环境中,强烈建议为不同类型的消息使用不同的主题(Topic)。

这是一个必须遵守的核心架构原则。将多种消息类型混用于同一个 Topic 会带来严重的运维风险和潜在的逻辑混乱。

  • 技术隔离:不同消息类型的底层处理机制完全不同。例如,延时消息需要进入调度队列,事务消息有“半消息”状态。混用 Topic 会让 Broker 的处理逻辑变得混乱,甚至无法正常工作。
  • 运维清晰:一个命名清晰的 Topic(如 Order_Transaction_Topic)能够让开发者和运维人员立刻明白其用途和行为特性。这对于监控、告警和故障排查至关重要。
  • 风险规避:清晰的 Topic 划分可以避免因误操作或配置错误导致的行为异常,保证了系统的稳定性和可维护性。
http://www.dtcms.com/a/278039.html

相关文章:

  • 并行并发丨C++ 协程、现场池 学习笔记
  • 闲庭信步使用图像验证平台加速FPGA的开发:第十三课——图像浮雕效果的FPGA实现
  • 语言模型常用的激活函数(Sigmoid ,GeLU ,SwiGLU,GLU,SiLU,Swish)
  • 算法-汽水瓶兑换
  • Spring AI 项目实战(十七):Spring Boot + AI + 通义千问星辰航空智能机票预订系统(附完整源码)
  • 【webrtc】gcc当前可用码率3:x264响应码率改变
  • 系规备考论文:论IT服务部署实施方法
  • 西藏氆氇新生:牦牛绒混搭液态金属的先锋尝试
  • 分布式锁踩坑记:当“防重“变成了“重复“
  • JAVA并发——什么是Java的原子性、可见性和有序性
  • Redis缓存设计与性能优化指南
  • 使用Starrocks替换Clickhouse的理由
  • C++封装、多态、继承
  • 在 Ubuntu 下安装 MySQL 数据库
  • 从文本中 “提取” 商业洞察“DatawhaleAI夏令营”
  • 电路分析基础(02)-电阻电路的等效变换
  • Matlab批量转换1km降水数据为tiff格式
  • 【LeetCode100】--- 5.盛水最多的容器【复习回顾】
  • ssm学习笔记day05
  • QT 多线程 管理串口
  • 《[系统底层攻坚] 张冬〈大话存储终极版〉精读计划启动——存储架构原理深度拆解之旅》-系统性学习笔记(适合小白与IT工作人员)
  • springboot高校竞赛赛事管理系统 计算机毕业设计源码23756
  • Java行为型模式---策略模式
  • 第1章 概 述
  • dll文件缺失解决方法
  • C++——static成员
  • HiPPO: Recurrent Memory with Optimal Polynomial Projections论文精读(逐段解析)
  • QT控件命名简写
  • Linux内核高效之道:Slab分配器与task_struct缓存管理
  • 编译器优化——LLVM IR,零基础入门