当前位置: 首页 > news >正文

RocketMQ 事务消息

在分布式系统中,跨服务的数据一致性是绕不开的难题。RocketMQ 提供的事务消息机制,通过两阶段提交思想完美解决了“本地事务与消息发送原子性”问题。本文将从原理拆解、流程分析到代码实战,带你彻底搞懂 RocketMQ 事务消息的实现逻辑。

一、为什么需要事务消息?

先看一个经典场景:用户在电商平台下单,需要完成两个操作——本地数据库创建订单 + 发送消息通知库存系统扣减库存。这两个操作必须同时成功或同时失败,否则会出现“订单创建了但库存没扣”或“库存扣了但订单没创建”的不一致问题。

传统方案的痛点:

  • 先发消息后执行本地事务:若消息发送成功但本地事务失败,会导致库存无辜扣减。
  • 先执行本地事务再发消息:若本地事务成功但消息发送失败,会导致订单创建了但库存未扣。

RocketMQ 的事务消息通过“两阶段提交”机制,解决了这一分布式事务难题,确保本地事务与消息发送的原子性。

二、核心原理:两阶段提交 + 回查机制

RocketMQ 事务消息的核心逻辑可概括为:先发送“预备消息”,再执行本地事务,最后根据事务结果决定提交或回滚消息。若中间出现异常,通过“回查机制”兜底确认事务状态。

1. 三大核心阶段拆解

(1)准备阶段(Prepare Phase):发送预备消息

生产者首先向 Broker 发送一条“预备消息”(Prepare Message):

  • 这条消息会被 Broker 持久化存储,但标记为“暂不可消费”状态,消费者无法感知。
  • 目的是确保消息已经“落地”,为后续提交/回滚提供基础。
(2)提交/回滚阶段(Commit/Rollback Phase):根据本地事务结果处理

生产者发送预备消息成功后,立即执行本地事务(如数据库操作):

  • 若本地事务执行成功:向 Broker 发送“Commit”指令,Broker 将预备消息标记为“可消费”,消费者即可收到消息。
  • 若本地事务执行失败:向 Broker 发送“Rollback”指令,Broker 会删除预备消息,消费者不会收到。
(3)事务状态检查阶段(Check Phase):解决异常场景

若因网络中断、生产者宕机等原因,Broker 未收到 Commit/Rollback 指令,会触发“回查机制”:

  • Broker 会定期(默认 60 秒,可配置)向生产者发送“事务状态查询”请求。
  • 生产者需实现回调接口,查询本地事务的实际执行结果,再返回 Commit 或 Rollback 指令。
  • 确保即使中间过程异常,最终消息状态也能与本地事务一致。

三、实战代码:事务消息的完整实现

下面通过代码示例,展示 RocketMQ 事务消息的生产者实现(以 Java 为例)。

1. 引入依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.3</version>
</dependency>

2. 核心实现:事务生产者 + 事务监听器

(1)初始化事务生产者
public class TransactionProducer {public static void main(String[] args) throws MQClientException {// 1. 创建事务生产者(指定生产者组)TransactionMQProducer producer = new TransactionMQProducer("Transaction_Group");// 2. 设置 NameServer 地址producer.setNamesrvAddr("localhost:9876");// 3. 注册事务监听器(核心:处理本地事务 + 回查逻辑)producer.setTransactionListener(new TransactionListenerImpl());// 4. 启动生产者producer.start();try {// 5. 发送预备消息(事务消息的入口)Message msg = new Message("Transaction_Topic",  // 主题"Order_Tag",          // 标签"Order_12345".getBytes()  // 消息体(例如订单ID));TransactionSendResult result = producer.sendMessageInTransaction(msg, null);System.out.println("预备消息发送结果:" + result.getSendStatus());} catch (Exception e) {e.printStackTrace();}// 保持生产者运行,等待回查(实际生产环境需保持服务存活)Runtime.getRuntime().addShutdownHook(new Thread(producer::shutdown));}
}
(2)实现事务监听器(核心逻辑)
public class TransactionListenerImpl implements TransactionListener {// 阶段1:执行本地事务(发送预备消息后触发)@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 模拟本地事务:例如创建订单String orderId = new String(msg.getBody());boolean isSuccess = createOrderInDB(orderId); // 数据库操作if (isSuccess) {// 本地事务成功 → 返回事务提交状态return LocalTransactionState.COMMIT_MESSAGE;} else {// 本地事务失败 → 返回事务回滚状态return LocalTransactionState.ROLLBACK_MESSAGE;}} catch (Exception e) {// 异常情况 → 暂时不确定,等待回查(重要!避免直接回滚)return LocalTransactionState.UNKNOW;}}// 阶段2:事务回查(Broker 未收到结果时触发)@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 查询本地事务实际状态(例如查数据库中订单是否存在)String orderId = new String(msg.getBody());boolean isOrderExists = checkOrderInDB(orderId);if (isOrderExists) {// 订单已创建 → 确认提交return LocalTransactionState.COMMIT_MESSAGE;} else {// 订单未创建 → 确认回滚return LocalTransactionState.ROLLBACK_MESSAGE;}}// 模拟:创建订单(本地事务逻辑)private boolean createOrderInDB(String orderId) {// 实际场景:执行数据库 insert 操作System.out.println("创建订单成功:" + orderId);return true; // 模拟成功}// 模拟:查询订单状态(回查逻辑)private boolean checkOrderInDB(String orderId) {// 实际场景:执行数据库 select 操作System.out.println("回查订单状态:" + orderId);return true; // 模拟订单存在}
}

四、关键细节:事务消息的存储与状态管理

  1. Broker 端的消息存储
    预备消息会被存入专门的“事务消息存储队列”,并标记状态为 PREPARED
    • 收到 Commit 指令后,状态更新为 COMMITTED,消息被移至普通消费队列,供消费者消费。
    • 收到 Rollback 指令后,状态更新为 ROLLBACKED,消息被删除。
  1. 回查机制的配置
    • 回查间隔:默认 60 秒,可通过 transactionCheckInterval 配置。
    • 最大回查次数:默认 15 次,超过后消息会被标记为“异常”,需人工介入(可配置死信队列存储)。
  1. 消费者的处理
    消费者无需区分事务消息和普通消息,只需正常订阅主题即可。只有状态为 COMMITTED 的消息才会被投递到消费者。

五、总结

RocketMQ 事务消息通过“预备消息打底、本地事务执行、结果确认、回查兜底”的全流程设计,完美解决了分布式系统中“本地操作与消息发送原子性”问题。其核心是通过两阶段提交思想,结合持久化存储和定时回查,确保最终数据一致性。

核心要点

  1. 事务消息的核心机制:两阶段提交(预备消息 + 提交/回滚) + 回查机制
  2. 三大状态:COMMIT_MESSAGE(提交)、ROLLBACK_MESSAGE(回滚)、UNKNOW(待回查)。
  3. 回查的触发条件:生产者未及时返回 Commit/Rollback 结果(如网络异常、生产者宕机)。

常见误区

  • 本地事务异常时直接返回 Rollback:建议先返回 UNKNOW,等待回查确认,避免因临时异常导致误回滚。
  • 忽略回查逻辑的实现:回查是兜底机制,必须确保能通过本地存储(如数据库)查询事务实际状态。
  • 事务消息过度使用:事务消息会增加 Broker 存储和网络开销,非核心场景可考虑本地消息表等轻量方案。
http://www.dtcms.com/a/490425.html

相关文章:

  • 做网站的不肯给ftp企业163邮箱登录
  • reactNative 遇到的问题记录
  • 使用 Cloudflare Turnstile 实现 Java 后端的人机验证
  • 【论文阅读】Knowledge Circuits in Pretrained Transformers
  • SpringBoot3集成Mybatis(开启第一个集成Mybatis的后端接口)
  • 论文阅读 (2) :Reducing Divergence in GPGPU Programs with Loop Merging
  • React 01
  • 建设开发网站潍坊百度网站优化
  • AI 在数据库操作中的各类应用场景、方案与实践指南
  • ASTM C615/C615M-23 花岗石检测
  • 用php做的网站论文抖音的商业营销手段
  • 子数组/子串问题
  • 办公空间设计网站浙江恒元建设网站
  • 银河麒麟 aarch64 linux 里面的 qt 怎么安装kit
  • 2025电脑价格数据集/构建电脑价格预测模型/数据量为 10 万行
  • Linux 系统下 MySQL 的安装配置
  • 16、Docker Compose 安装Kafka(含Zookeeper)
  • QT(c++)开发自学笔记:2.TCP/IP
  • C语言基础语法进阶
  • 池州网站建设公司好的网站你知道
  • 从零起步学习MySQL || 第五章:select语句的执行过程是怎么样的?(结合源码深度解析)
  • 专业的家居网站建设网站单页支付宝支付怎么做的
  • CC10-判断链表中是否有环
  • 【ZEGO即构开发者日报】谷歌推出新款视频生成模型 Veo 3.1;腾讯开源通用文本表示模型Youtu-Embedding;AI 陪伴赛道观察……
  • [Sora] 视频自动编码器(VAE) | `encode_``decode`
  • 算法沉淀第四天(Winner)
  • 西藏地图飞线html
  • 网站建设与管理课程代码做徽章标牌的企业网站
  • selenium实现自动化脚本的常用函数
  • 大语言模型,一个巨大的矩阵