【RocketMQ】RocketMQ原生 API 实现的分布式事务完整方案
以下是基于 RocketMQ 原生 API 实现的分布式事务完整方案(包含订单服务生产者、库存服务消费者及所有辅助类),可直接运行:
一、项目结构
src/main/java/
├── order/ # 订单服务(生产者)
│ ├── DBUtil.java # 订单库连接工具
│ ├── OrderDTO.java # 订单数据传输对象
│ └── OrderTransactionProducer.java # 事务消息生产者
└── inventory/ # 库存服务(消费者)├── InventoryDBUtil.java # 库存库连接工具└── StockConsumer.java # 库存扣减消费者
二、数据库表结构(提前创建)
-- 订单库(order_db)
CREATE DATABASE order_db;
USE order_db;-- 订单表
CREATE TABLE order_table (`id` bigint NOT NULL COMMENT '订单ID',`user_id` bigint NOT NULL COMMENT '用户ID',`product_id` bigint NOT NULL COMMENT '商品ID',`quantity` int NOT NULL COMMENT '购买数量',`status` tinyint NOT NULL COMMENT '状态:0-创建中 1-已确认 2-已取消',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB;-- 本地事务日志表
CREATE TABLE local_transaction_log (`id` bigint NOT NULL AUTO_INCREMENT,`transaction_id` varchar(64) NOT NULL COMMENT 'RocketMQ事务ID',`business_id` bigint NOT NULL COMMENT '订单ID',`status` tinyint NOT NULL COMMENT '状态:0-处理中 1-成功 2-失败',PRIMARY KEY (`id`),UNIQUE KEY `uk_transaction_id` (`transaction_id`)
) ENGINE=InnoDB;-- 库存库(inventory_db)
CREATE DATABASE inventory_db;
USE inventory_db;-- 库存表
CREATE TABLE inventory_table (`id` bigint NOT NULL AUTO_INCREMENT,`product_id` bigint NOT NULL COMMENT '商品ID',`stock` int NOT NULL COMMENT '库存数量',`version` int NOT NULL DEFAULT 0 COMMENT '乐观锁版本',PRIMARY KEY (`id`),UNIQUE KEY `uk_product_id` (`product_id`)
) ENGINE=InnoDB;-- 初始化库存数据
INSERT INTO inventory_table (product_id, stock) VALUES (1001, 100);
三、完整代码实现
1. 订单服务 - 数据库连接工具(DBUtil.java)
package order;import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.SQLException;public class DBUtil {private static final HikariDataSource dataSource;static {// 配置数据库连接池HikariConfig config = new HikariConfig();config.setJdbcUrl("jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC");config.setUsername("root"); // 替换为你的数据库用户名config.setPassword("123456"); // 替换为你的数据库密码config.setDriverClassName("com.mysql.cj.jdbc.Driver");config.setMaximumPoolSize(10);dataSource = new HikariDataSource(config);}// 获取数据库连接public static Connection getConnection() throws SQLException {return dataSource.getConnection();}
}
2. 订单服务 - 订单数据传输对象(OrderDTO.java)
package order;public class OrderDTO {private Long orderId; // 订单IDprivate Long userId; // 用户IDprivate Long productId; // 商品IDprivate Integer quantity; // 购买数量// Getter和Setterpublic Long getOrderId() { return orderId; }public void setOrderId(Long orderId) { this.orderId = orderId; }public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public Long getProductId() { return productId; }public void setProductId(Long productId) { this.productId = productId; }public Integer getQuantity() { return quantity; }public void setQuantity(Integer quantity) { this.quantity = quantity; }
}
3. 订单服务 - 事务消息生产者(OrderTransactionProducer.java)
package order;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.*;public class OrderTransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {// 1. 创建事务生产者TransactionMQProducer producer = new TransactionMQProducer("order_producer_group");producer.setNamesrvAddr("127.0.0.1:9876"); // RocketMQ NameServer地址// 2. 设置线程池(处理本地事务和回查)ExecutorService executor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2000),r -> new Thread(r, "order-transaction-thread"));producer.setExecutorService(executor);// 3. 设置事务监听器(核心逻辑)producer.setTransactionListener(new TransactionListener() {/*** 执行本地事务:创建订单*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {OrderDTO order = (OrderDTO) arg;String transactionId = msg.getTransactionId(); // RocketMQ生成的事务IDConnection conn = null;try {conn = DBUtil.getConnection();conn.setAutoCommit(false); // 开启事务// 步骤1:创建订单(状态为"创建中")PreparedStatement orderStmt = conn.prepareStatement("INSERT INTO order_table (id, user_id, product_id, quantity, status) " +"VALUES (?, ?, ?, ?, 0)");orderStmt.setLong(1, order.getOrderId());orderStmt.setLong(2, order.getUserId());orderStmt.setLong(3, order.getProductId());orderStmt.setInt(4, order.getQuantity());orderStmt.executeUpdate();// 步骤2:记录事务日志(用于回查)PreparedStatement logStmt = conn.prepareStatement("INSERT INTO local_transaction_log (transaction_id, business_id, status) " +"VALUES (?, ?, 1)");logStmt.setString(1, transactionId);logStmt.setLong(2, order.getOrderId());logStmt.executeUpdate();conn.commit(); // 提交事务System.out.println("本地事务成功:订单创建完成,订单ID=" + order.getOrderId());return LocalTransactionState.COMMIT_MESSAGE; // 提交消息} catch (Exception e) {// 本地事务失败,回滚并记录失败日志if (conn != null) {try {conn.rollback();PreparedStatement logStmt = conn.prepareStatement("INSERT INTO local_transaction_log (transaction_id, business_id, status) " +"VALUES (?, ?, 2)");logStmt.setString(1, transactionId);logStmt.setLong(2, order.getOrderId());logStmt.executeUpdate();conn.commit();} catch (SQLException ex) {ex.printStackTrace();}}System.err.println("本地事务失败:订单创建失败,订单ID=" + order.getOrderId());e.printStackTrace();return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息} finally {if (conn != null) {try { conn.close(); } catch (SQLException e) { e.printStackTrace(); }}}}/*** 事务回查:确认本地事务状态*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String transactionId = msg.getTransactionId();try (Connection conn = DBUtil.getConnection()) {PreparedStatement stmt = conn.prepareStatement("SELECT status FROM local_transaction_log WHERE transaction_id = ?");stmt.setString(1, transactionId);ResultSet rs = stmt.executeQuery();if (rs.next()) {int status = rs.getInt("status");if (status == 1) {System.out.println("事务回查:本地事务成功,transactionId=" + transactionId);return LocalTransactionState.COMMIT_MESSAGE;} else {System.out.println("事务回查:本地事务失败,transactionId=" + transactionId);return LocalTransactionState.ROLLBACK_MESSAGE;}}} catch (SQLException e) {e.printStackTrace();}System.out.println("事务回查:未找到日志,transactionId=" + transactionId);return LocalTransactionState.UNKNOW; // 等待下次回查}});// 4. 启动生产者producer.start();System.out.println("订单服务生产者启动成功");// 5. 模拟创建订单(实际应从接口接收参数)OrderDTO order = new OrderDTO();order.setOrderId(System.currentTimeMillis()); // 用时间戳作为订单IDorder.setUserId(10001L); // 模拟用户IDorder.setProductId(1001L); // 商品ID(与库存表对应)order.setQuantity(5); // 购买数量// 6. 发送半事务消息Message message = new Message("stock_topic", // 主题(库存服务监听)"deduct", // 标签order.getOrderId().toString().getBytes() // 消息体(订单ID));producer.sendMessageInTransaction(message, order); // 发送事务消息// 等待10秒后关闭(生产环境无需此步骤)Thread.sleep(10000);producer.shutdown();System.out.println("订单服务生产者关闭");}
}
4. 库存服务 - 数据库连接工具(InventoryDBUtil.java)
package inventory;import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.SQLException;public class InventoryDBUtil {private static final HikariDataSource dataSource;static {// 配置数据库连接池HikariConfig config = new HikariConfig();config.setJdbcUrl("jdbc:mysql://localhost:3306/inventory_db?useSSL=false&serverTimezone=UTC");config.setUsername("root"); // 替换为你的数据库用户名config.setPassword("123456"); // 替换为你的数据库密码config.setDriverClassName("com.mysql.cj.jdbc.Driver");config.setMaximumPoolSize(10);dataSource = new HikariDataSource(config);}// 获取数据库连接public static Connection getConnection() throws SQLException {return dataSource.getConnection();}
}
5. 库存服务 - 消息消费者(StockConsumer.java)
package inventory;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;public class StockConsumer {public static void main(String[] args) throws MQClientException {// 1. 创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("inventory_consumer_group");consumer.setNamesrvAddr("127.0.0.1:9876"); // RocketMQ NameServer地址// 2. 订阅主题和标签(监听库存扣减消息)consumer.subscribe("stock_topic", "deduct");// 3. 注册消息监听器(核心:扣减库存)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {// 解析消息:消息体为订单IDLong orderId = Long.parseLong(new String(msg.getBody()));System.out.println("收到扣库存消息,订单ID=" + orderId);// 实际场景:通过订单ID查询订单表获取商品ID和数量(此处简化)Long productId = 1001L; // 与初始化库存的商品ID一致Integer quantity = 5; // 与订单服务的购买数量一致// 扣减库存(乐观锁实现)boolean success = deductStock(productId, quantity);if (success) {System.out.println("库存扣减成功,商品ID=" + productId + ",扣减数量=" + quantity);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费成功} else {System.out.println("库存扣减失败(乐观锁冲突),将重试,商品ID=" + productId);return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 重试}} catch (Exception e) {System.err.println("库存扣减异常,将重试");e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 重试}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 4. 启动消费者consumer.start();System.out.println("库存服务消费者启动成功,等待消息...");}/*** 扣减库存(乐观锁实现,保证幂等性和并发安全)*/private static boolean deductStock(Long productId, Integer quantity) throws SQLException {Connection conn = null;try {conn = InventoryDBUtil.getConnection();conn.setAutoCommit(false); // 开启事务// 步骤1:查询当前库存和版本号PreparedStatement queryStmt = conn.prepareStatement("SELECT stock, version FROM inventory_table WHERE product_id = ?");queryStmt.setLong(1, productId);ResultSet rs = queryStmt.executeQuery();if (!rs.next()) {throw new RuntimeException("商品不存在,productId=" + productId);}int currentStock = rs.getInt("stock");int currentVersion = rs.getInt("version");// 步骤2:检查库存是否充足if (currentStock < quantity) {throw new RuntimeException("库存不足,productId=" + productId + ",当前库存=" + currentStock);}// 步骤3:扣减库存(乐观锁:where条件包含version,防止并发问题)PreparedStatement updateStmt = conn.prepareStatement("UPDATE inventory_table SET stock = stock - ?, version = version + 1 " +"WHERE product_id = ? AND version = ?");updateStmt.setInt(1, quantity);updateStmt.setLong(2, productId);updateStmt.setInt(3, currentVersion);int rows = updateStmt.executeUpdate();if (rows == 0) {// 乐观锁冲突(其他线程已修改库存),扣减失败conn.rollback();return false;}conn.commit(); // 提交事务return true;} catch (SQLException e) {if (conn != null) conn.rollback();throw e;} finally {if (conn != null) {try { conn.close(); } catch (SQLException e) { e.printStackTrace(); }}}}
}
四、运行说明
-
依赖说明:
需在pom.xml
中添加以下依赖(Maven 项目):<dependencies><!-- RocketMQ 客户端 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><!-- MySQL 驱动 --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.33</version></dependency><!-- 数据库连接池 --><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId><version>5.0.1</version></dependency> </dependencies>
-
启动 RocketMQ 服务:
先安装 RocketMQ 并启动 NameServer 和 Broker:# 启动 NameServer nohup sh bin/mqnamesrv & # 启动 Broker(默认端口 9876) nohup sh bin/mqbroker -n localhost:9876 &
-
运行顺序:
- 先启动库存服务消费者(
StockConsumer.main
) - 再启动订单服务生产者(
OrderTransactionProducer.main
)
- 先启动库存服务消费者(
-
验证结果:
- 订单表(
order_table
)新增一条状态为 0 的订单 - 库存表(
inventory_table
)中商品 1001 的库存从 100 变为 95 - 本地事务日志表(
local_transaction_log
)新增一条状态为 1 的记录
- 订单表(
五、核心特性说明
- 分布式事务保障:通过 RocketMQ 半事务消息 + 本地事务 + 回查机制,确保订单创建和库存扣减最终一致
- 幂等性处理:库存扣减使用乐观锁(
version
字段),避免重复消费导致超扣 - 异常重试:消费者消费失败时返回
RECONSUME_LATER
,触发 RocketMQ 自动重试 - 事务回查:解决生产者宕机或网络异常导致的事务状态未知问题
该方案可直接用于电商下单-扣库存等核心场景,如需扩展其他服务(如支付、物流),只需新增对应的消息消费者并保证幂等性即可。
为什么本地事务成功后仍需要回查?
即使本地事务(如创建订单)已经成功提交,RocketMQ 仍然需要 事务回查机制,核心原因是 分布式系统中存在“状态同步异常”的可能性——即生产者本地事务成功,但 Broker 未收到“Commit”指令,导致消息无法投递,最终引发数据不一致。
具体场景:为什么本地事务成功后仍需要回查?
假设订单服务的本地事务(创建订单)已经成功提交,但在以下情况中,Broker 会始终认为消息状态未知,导致库存服务永远收不到扣库存消息,最终订单创建成功但库存未扣减(数据不一致):
1. 生产者发送“Commit”指令失败
- 流程:本地事务成功提交 → 生产者准备向 Broker 发送“Commit”指令 → 网络突然中断 / 生产者宕机 → “Commit”指令未到达 Broker。
- 结果:Broker 存储的仍是“半事务消息”(不可投递),但生产者本地事务已成功(订单已创建),此时必须通过回查让 Broker 知道“需要Commit”。
2. 生产者与 Broker 之间的网络分区
- 场景:生产者所在机房与 Broker 机房网络临时断开,生产者无法发送“Commit”指令,但自身本地事务已执行成功。
- 结果:Broker 长时间收不到状态确认,需主动回查生产者,确认本地事务状态。
3. 生产者处理逻辑异常
- 场景:本地事务成功提交,但生产者在生成“Commit”指令的代码中抛出异常(如空指针),导致指令未发出。
- 结果:Broker 未收到状态确认,需通过回查发现“本地事务实际已成功”。
回查机制的核心作用:解决“状态同步缺口”
RocketMQ 事务消息的核心是 “半事务消息 + 本地事务 + 状态确认” 的三段式设计,但前两段(半消息、本地事务)与第三段(状态确认)之间存在“同步缺口”:
- 本地事务的成功与否,只有生产者的数据库知道;
- Broker 仅能通过生产者主动发送的“Commit/Rollback”指令感知状态;
- 当“状态确认指令”因各种异常丢失时,Broker 必须通过 主动回查生产者的数据库,填补这个缺口,确保最终状态一致。
代码中的回查逻辑如何生效?
在你提供的代码中,checkLocalTransaction
方法正是为回查设计的:
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {String transactionId = msg.getTransactionId();// 查本地事务日志表:如果状态为1(成功),则返回Committry (Connection conn = DBUtil.getConnection()) {PreparedStatement stmt = conn.prepareStatement("SELECT status FROM local_transaction_log WHERE transaction_id = ?");stmt.setString(1, transactionId);ResultSet rs = stmt.executeQuery();if (rs.next() && rs.getInt("status") == 1) {return LocalTransactionState.COMMIT_MESSAGE; // 回查发现本地成功,通知Broker提交}}// ...
}
- 当本地事务成功提交后,
local_transaction_log
表中会记录status=1
; - 即使“Commit”指令丢失,Broker 回查时也能通过该日志确认“本地事务成功”,从而触发消息投递,最终保证订单创建与库存扣减的一致性。
总结
回查机制是 RocketMQ 为解决 “分布式系统中状态同步不可靠” 而设计的兜底方案。它不依赖生产者主动发送的状态指令,而是通过直接查询生产者的本地事务日志,确保 Broker 最终能获取正确的事务状态,避免因网络、宕机等异常导致的数据不一致。
简单说:本地事务成功 ≠ Broker 知道它成功,回查就是让 Broker“主动确认”的过程。
触发事务回查场景
RocketMQ 的事务回查机制并非无条件触发,而是在特定异常场景下才会执行,目的是解决“生产者本地事务状态与 Broker 消息状态不一致”的问题。以下详细说明触发条件及回查与“主动提交”的关系:
一、触发事务回查的 3 种核心场景
只有当 Broker 长时间无法确定事务最终状态时,才会触发回查,具体场景如下:
1. 生产者未发送任何事务确认指令(Commit/Rollback)
- 正常流程中,生产者执行完本地事务后,必须向 Broker 发送
COMMIT
或ROLLBACK
指令(如代码中return LocalTransactionState.COMMIT_MESSAGE
)。 - 若因以下原因导致指令未发出,Broker 会触发回查:
- 生产者执行本地事务成功后,在发送
COMMIT
前宕机/重启; - 网络中断:生产者发送的
COMMIT/ROLLBACK
指令未到达 Broker; - 生产者代码异常:本地事务成功,但生成确认指令时抛出异常(如空指针),导致指令未发送。
- 生产者执行本地事务成功后,在发送
2. 生产者发送了 UNKNOW 状态
- 若生产者执行本地事务后返回
LocalTransactionState.UNKNOW
(表示“暂时无法确定状态”),Broker 会认为事务状态未知,触发回查。 - 例如:本地事务依赖第三方接口(如支付回调),暂时无法确认结果时,可先返回
UNKNOW
,等待后续回查。
3. Broker 未在超时时间内收到确认指令
- RocketMQ 有默认超时机制(默认 60 秒,可通过
transactionTimeout
配置):若 Broker 收到半事务消息后,超过超时时间仍未收到生产者的确认指令(COMMIT/ROLLBACK
),则触发回查。 - 目的是避免因极端异常(如生产者彻底宕机后重启)导致消息状态永久未知。
二、回查与“主动提交”的关系:回查是“主动提交失败”的兜底
回查不会替代“主动提交”,而是在“主动提交失败”时的补充机制,二者关系如下:
-
正常情况:
生产者执行本地事务后,会主动向 Broker 发送COMMIT
或ROLLBACK
。Broker 收到后,直接处理消息(投递/删除),不会触发回查。 -
异常情况:
若主动提交失败(如指令丢失、生产者宕机),Broker 因未收到确认,会触发回查,通过查询生产者的本地事务日志(如local_transaction_log
表)确认状态,最终决定消息是否投递。
举例:
- 你的代码中,若本地事务成功并返回
COMMIT_MESSAGE
,且该指令成功到达 Broker,则 Broker 直接投递消息,不会回查。 - 若该
COMMIT
指令因网络中断丢失,Broker 超时后触发回查,通过查询local_transaction_log
表中status=1
,最终仍会投递消息(保证一致性)。
三、回查的执行逻辑(避免无意义重复回查)
RocketMQ 对回查有严格的重试机制,避免无意义的重复回查:
- 重试次数:默认最多回查 15 次(可通过
transactionCheckMax
配置),超过次数后默认按ROLLBACK
处理。 - 重试间隔:每次回查间隔逐渐延长(如 10s、30s、1min…),避免频繁访问生产者。
- 终止条件:回查过程中,若生产者返回明确的
COMMIT
或ROLLBACK
,则停止回查。
总结
- 触发回查的本质:Broker 无法通过生产者的“主动确认指令”获取事务状态(指令丢失、超时、状态未知)。
- 与主动提交的关系:回查是主动提交失败后的兜底机制,正常情况下(主动提交成功)不会触发。
- 核心目的:通过查询生产者本地事务日志,解决分布式系统中“状态同步不可靠”的问题,保证最终一致性。
简单说:回查是 Broker 对“未知状态”的主动追问,只有当“主动汇报”失败时才会发生。