RocketMQ 事务消息
在分布式系统中,跨服务的数据一致性是绕不开的难题。RocketMQ 提供的事务消息机制,通过两阶段提交思想完美解决了“本地事务与消息发送原子性”问题。本文将从原理拆解、流程分析到代码实战,带你彻底搞懂 RocketMQ 事务消息的实现逻辑。
一、为什么需要事务消息?
先看一个经典场景:用户在电商平台下单,需要完成两个操作——本地数据库创建订单 + 发送消息通知库存系统扣减库存。这两个操作必须同时成功或同时失败,否则会出现“订单创建了但库存没扣”或“库存扣了但订单没创建”的不一致问题。
传统方案的痛点:
- 先发消息后执行本地事务:若消息发送成功但本地事务失败,会导致库存无辜扣减。
- 先执行本地事务再发消息:若本地事务成功但消息发送失败,会导致订单创建了但库存未扣。
RocketMQ 的事务消息通过“两阶段提交”机制,解决了这一分布式事务难题,确保本地事务与消息发送的原子性。
二、核心原理:两阶段提交 + 回查机制
RocketMQ 事务消息的核心逻辑可概括为:先发送“预备消息”,再执行本地事务,最后根据事务结果决定提交或回滚消息。若中间出现异常,通过“回查机制”兜底确认事务状态。
1. 三大核心阶段拆解
(1)准备阶段(Prepare Phase):发送预备消息
生产者首先向 Broker 发送一条“预备消息”(Prepare Message):
- 这条消息会被 Broker 持久化存储,但标记为“暂不可消费”状态,消费者无法感知。
- 目的是确保消息已经“落地”,为后续提交/回滚提供基础。
(2)提交/回滚阶段(Commit/Rollback Phase):根据本地事务结果处理
生产者发送预备消息成功后,立即执行本地事务(如数据库操作):
- 若本地事务执行成功:向 Broker 发送“Commit”指令,Broker 将预备消息标记为“可消费”,消费者即可收到消息。
- 若本地事务执行失败:向 Broker 发送“Rollback”指令,Broker 会删除预备消息,消费者不会收到。
(3)事务状态检查阶段(Check Phase):解决异常场景
若因网络中断、生产者宕机等原因,Broker 未收到 Commit/Rollback 指令,会触发“回查机制”:
- Broker 会定期(默认 60 秒,可配置)向生产者发送“事务状态查询”请求。
- 生产者需实现回调接口,查询本地事务的实际执行结果,再返回 Commit 或 Rollback 指令。
- 确保即使中间过程异常,最终消息状态也能与本地事务一致。
三、实战代码:事务消息的完整实现
下面通过代码示例,展示 RocketMQ 事务消息的生产者实现(以 Java 为例)。
1. 引入依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.3</version>
</dependency>
2. 核心实现:事务生产者 + 事务监听器
(1)初始化事务生产者
public class TransactionProducer {public static void main(String[] args) throws MQClientException {// 1. 创建事务生产者(指定生产者组)TransactionMQProducer producer = new TransactionMQProducer("Transaction_Group");// 2. 设置 NameServer 地址producer.setNamesrvAddr("localhost:9876");// 3. 注册事务监听器(核心:处理本地事务 + 回查逻辑)producer.setTransactionListener(new TransactionListenerImpl());// 4. 启动生产者producer.start();try {// 5. 发送预备消息(事务消息的入口)Message msg = new Message("Transaction_Topic", // 主题"Order_Tag", // 标签"Order_12345".getBytes() // 消息体(例如订单ID));TransactionSendResult result = producer.sendMessageInTransaction(msg, null);System.out.println("预备消息发送结果:" + result.getSendStatus());} catch (Exception e) {e.printStackTrace();}// 保持生产者运行,等待回查(实际生产环境需保持服务存活)Runtime.getRuntime().addShutdownHook(new Thread(producer::shutdown));}
}
(2)实现事务监听器(核心逻辑)
public class TransactionListenerImpl implements TransactionListener {// 阶段1:执行本地事务(发送预备消息后触发)@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 模拟本地事务:例如创建订单String orderId = new String(msg.getBody());boolean isSuccess = createOrderInDB(orderId); // 数据库操作if (isSuccess) {// 本地事务成功 → 返回事务提交状态return LocalTransactionState.COMMIT_MESSAGE;} else {// 本地事务失败 → 返回事务回滚状态return LocalTransactionState.ROLLBACK_MESSAGE;}} catch (Exception e) {// 异常情况 → 暂时不确定,等待回查(重要!避免直接回滚)return LocalTransactionState.UNKNOW;}}// 阶段2:事务回查(Broker 未收到结果时触发)@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 查询本地事务实际状态(例如查数据库中订单是否存在)String orderId = new String(msg.getBody());boolean isOrderExists = checkOrderInDB(orderId);if (isOrderExists) {// 订单已创建 → 确认提交return LocalTransactionState.COMMIT_MESSAGE;} else {// 订单未创建 → 确认回滚return LocalTransactionState.ROLLBACK_MESSAGE;}}// 模拟:创建订单(本地事务逻辑)private boolean createOrderInDB(String orderId) {// 实际场景:执行数据库 insert 操作System.out.println("创建订单成功:" + orderId);return true; // 模拟成功}// 模拟:查询订单状态(回查逻辑)private boolean checkOrderInDB(String orderId) {// 实际场景:执行数据库 select 操作System.out.println("回查订单状态:" + orderId);return true; // 模拟订单存在}
}
四、关键细节:事务消息的存储与状态管理
- Broker 端的消息存储:
预备消息会被存入专门的“事务消息存储队列”,并标记状态为PREPARED
。
-
- 收到 Commit 指令后,状态更新为
COMMITTED
,消息被移至普通消费队列,供消费者消费。 - 收到 Rollback 指令后,状态更新为
ROLLBACKED
,消息被删除。
- 收到 Commit 指令后,状态更新为
- 回查机制的配置:
-
- 回查间隔:默认 60 秒,可通过
transactionCheckInterval
配置。 - 最大回查次数:默认 15 次,超过后消息会被标记为“异常”,需人工介入(可配置死信队列存储)。
- 回查间隔:默认 60 秒,可通过
- 消费者的处理:
消费者无需区分事务消息和普通消息,只需正常订阅主题即可。只有状态为COMMITTED
的消息才会被投递到消费者。
五、总结
RocketMQ 事务消息通过“预备消息打底、本地事务执行、结果确认、回查兜底”的全流程设计,完美解决了分布式系统中“本地操作与消息发送原子性”问题。其核心是通过两阶段提交思想,结合持久化存储和定时回查,确保最终数据一致性。
核心要点
- 事务消息的核心机制:两阶段提交(预备消息 + 提交/回滚) + 回查机制。
- 三大状态:
COMMIT_MESSAGE
(提交)、ROLLBACK_MESSAGE
(回滚)、UNKNOW
(待回查)。 - 回查的触发条件:生产者未及时返回 Commit/Rollback 结果(如网络异常、生产者宕机)。
常见误区
- 本地事务异常时直接返回 Rollback:建议先返回
UNKNOW
,等待回查确认,避免因临时异常导致误回滚。 - 忽略回查逻辑的实现:回查是兜底机制,必须确保能通过本地存储(如数据库)查询事务实际状态。
- 事务消息过度使用:事务消息会增加 Broker 存储和网络开销,非核心场景可考虑本地消息表等轻量方案。