当前位置: 首页 > news >正文

【RocketMQ】RocketMQ原生 API 实现的分布式事务完整方案

以下是基于 RocketMQ 原生 API 实现的分布式事务完整方案(包含订单服务生产者、库存服务消费者及所有辅助类),可直接运行:

一、项目结构

src/main/java/
├── order/                  # 订单服务(生产者)
│   ├── DBUtil.java         # 订单库连接工具
│   ├── OrderDTO.java       # 订单数据传输对象
│   └── OrderTransactionProducer.java  # 事务消息生产者
└── inventory/              # 库存服务(消费者)├── InventoryDBUtil.java  # 库存库连接工具└── StockConsumer.java    # 库存扣减消费者

二、数据库表结构(提前创建)

-- 订单库(order_db)
CREATE DATABASE order_db;
USE order_db;-- 订单表
CREATE TABLE order_table (`id` bigint NOT NULL COMMENT '订单ID',`user_id` bigint NOT NULL COMMENT '用户ID',`product_id` bigint NOT NULL COMMENT '商品ID',`quantity` int NOT NULL COMMENT '购买数量',`status` tinyint NOT NULL COMMENT '状态:0-创建中 1-已确认 2-已取消',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB;-- 本地事务日志表
CREATE TABLE local_transaction_log (`id` bigint NOT NULL AUTO_INCREMENT,`transaction_id` varchar(64) NOT NULL COMMENT 'RocketMQ事务ID',`business_id` bigint NOT NULL COMMENT '订单ID',`status` tinyint NOT NULL COMMENT '状态:0-处理中 1-成功 2-失败',PRIMARY KEY (`id`),UNIQUE KEY `uk_transaction_id` (`transaction_id`)
) ENGINE=InnoDB;-- 库存库(inventory_db)
CREATE DATABASE inventory_db;
USE inventory_db;-- 库存表
CREATE TABLE inventory_table (`id` bigint NOT NULL AUTO_INCREMENT,`product_id` bigint NOT NULL COMMENT '商品ID',`stock` int NOT NULL COMMENT '库存数量',`version` int NOT NULL DEFAULT 0 COMMENT '乐观锁版本',PRIMARY KEY (`id`),UNIQUE KEY `uk_product_id` (`product_id`)
) ENGINE=InnoDB;-- 初始化库存数据
INSERT INTO inventory_table (product_id, stock) VALUES (1001, 100);

三、完整代码实现

1. 订单服务 - 数据库连接工具(DBUtil.java)
package order;import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.SQLException;public class DBUtil {private static final HikariDataSource dataSource;static {// 配置数据库连接池HikariConfig config = new HikariConfig();config.setJdbcUrl("jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC");config.setUsername("root");       // 替换为你的数据库用户名config.setPassword("123456");     // 替换为你的数据库密码config.setDriverClassName("com.mysql.cj.jdbc.Driver");config.setMaximumPoolSize(10);dataSource = new HikariDataSource(config);}// 获取数据库连接public static Connection getConnection() throws SQLException {return dataSource.getConnection();}
}
2. 订单服务 - 订单数据传输对象(OrderDTO.java)
package order;public class OrderDTO {private Long orderId;      // 订单IDprivate Long userId;       // 用户IDprivate Long productId;    // 商品IDprivate Integer quantity;  // 购买数量// Getter和Setterpublic Long getOrderId() { return orderId; }public void setOrderId(Long orderId) { this.orderId = orderId; }public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public Long getProductId() { return productId; }public void setProductId(Long productId) { this.productId = productId; }public Integer getQuantity() { return quantity; }public void setQuantity(Integer quantity) { this.quantity = quantity; }
}
3. 订单服务 - 事务消息生产者(OrderTransactionProducer.java)
package order;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.*;public class OrderTransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {// 1. 创建事务生产者TransactionMQProducer producer = new TransactionMQProducer("order_producer_group");producer.setNamesrvAddr("127.0.0.1:9876");  // RocketMQ NameServer地址// 2. 设置线程池(处理本地事务和回查)ExecutorService executor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2000),r -> new Thread(r, "order-transaction-thread"));producer.setExecutorService(executor);// 3. 设置事务监听器(核心逻辑)producer.setTransactionListener(new TransactionListener() {/*** 执行本地事务:创建订单*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {OrderDTO order = (OrderDTO) arg;String transactionId = msg.getTransactionId();  // RocketMQ生成的事务IDConnection conn = null;try {conn = DBUtil.getConnection();conn.setAutoCommit(false);  // 开启事务// 步骤1:创建订单(状态为"创建中")PreparedStatement orderStmt = conn.prepareStatement("INSERT INTO order_table (id, user_id, product_id, quantity, status) " +"VALUES (?, ?, ?, ?, 0)");orderStmt.setLong(1, order.getOrderId());orderStmt.setLong(2, order.getUserId());orderStmt.setLong(3, order.getProductId());orderStmt.setInt(4, order.getQuantity());orderStmt.executeUpdate();// 步骤2:记录事务日志(用于回查)PreparedStatement logStmt = conn.prepareStatement("INSERT INTO local_transaction_log (transaction_id, business_id, status) " +"VALUES (?, ?, 1)");logStmt.setString(1, transactionId);logStmt.setLong(2, order.getOrderId());logStmt.executeUpdate();conn.commit();  // 提交事务System.out.println("本地事务成功:订单创建完成,订单ID=" + order.getOrderId());return LocalTransactionState.COMMIT_MESSAGE;  // 提交消息} catch (Exception e) {// 本地事务失败,回滚并记录失败日志if (conn != null) {try {conn.rollback();PreparedStatement logStmt = conn.prepareStatement("INSERT INTO local_transaction_log (transaction_id, business_id, status) " +"VALUES (?, ?, 2)");logStmt.setString(1, transactionId);logStmt.setLong(2, order.getOrderId());logStmt.executeUpdate();conn.commit();} catch (SQLException ex) {ex.printStackTrace();}}System.err.println("本地事务失败:订单创建失败,订单ID=" + order.getOrderId());e.printStackTrace();return LocalTransactionState.ROLLBACK_MESSAGE;  // 回滚消息} finally {if (conn != null) {try { conn.close(); } catch (SQLException e) { e.printStackTrace(); }}}}/*** 事务回查:确认本地事务状态*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String transactionId = msg.getTransactionId();try (Connection conn = DBUtil.getConnection()) {PreparedStatement stmt = conn.prepareStatement("SELECT status FROM local_transaction_log WHERE transaction_id = ?");stmt.setString(1, transactionId);ResultSet rs = stmt.executeQuery();if (rs.next()) {int status = rs.getInt("status");if (status == 1) {System.out.println("事务回查:本地事务成功,transactionId=" + transactionId);return LocalTransactionState.COMMIT_MESSAGE;} else {System.out.println("事务回查:本地事务失败,transactionId=" + transactionId);return LocalTransactionState.ROLLBACK_MESSAGE;}}} catch (SQLException e) {e.printStackTrace();}System.out.println("事务回查:未找到日志,transactionId=" + transactionId);return LocalTransactionState.UNKNOW;  // 等待下次回查}});// 4. 启动生产者producer.start();System.out.println("订单服务生产者启动成功");// 5. 模拟创建订单(实际应从接口接收参数)OrderDTO order = new OrderDTO();order.setOrderId(System.currentTimeMillis());  // 用时间戳作为订单IDorder.setUserId(10001L);                       // 模拟用户IDorder.setProductId(1001L);                     // 商品ID(与库存表对应)order.setQuantity(5);                          // 购买数量// 6. 发送半事务消息Message message = new Message("stock_topic",          // 主题(库存服务监听)"deduct",               // 标签order.getOrderId().toString().getBytes()  // 消息体(订单ID));producer.sendMessageInTransaction(message, order);  // 发送事务消息// 等待10秒后关闭(生产环境无需此步骤)Thread.sleep(10000);producer.shutdown();System.out.println("订单服务生产者关闭");}
}
4. 库存服务 - 数据库连接工具(InventoryDBUtil.java)
package inventory;import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.SQLException;public class InventoryDBUtil {private static final HikariDataSource dataSource;static {// 配置数据库连接池HikariConfig config = new HikariConfig();config.setJdbcUrl("jdbc:mysql://localhost:3306/inventory_db?useSSL=false&serverTimezone=UTC");config.setUsername("root");       // 替换为你的数据库用户名config.setPassword("123456");     // 替换为你的数据库密码config.setDriverClassName("com.mysql.cj.jdbc.Driver");config.setMaximumPoolSize(10);dataSource = new HikariDataSource(config);}// 获取数据库连接public static Connection getConnection() throws SQLException {return dataSource.getConnection();}
}
5. 库存服务 - 消息消费者(StockConsumer.java)
package inventory;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;public class StockConsumer {public static void main(String[] args) throws MQClientException {// 1. 创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("inventory_consumer_group");consumer.setNamesrvAddr("127.0.0.1:9876");  // RocketMQ NameServer地址// 2. 订阅主题和标签(监听库存扣减消息)consumer.subscribe("stock_topic", "deduct");// 3. 注册消息监听器(核心:扣减库存)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {// 解析消息:消息体为订单IDLong orderId = Long.parseLong(new String(msg.getBody()));System.out.println("收到扣库存消息,订单ID=" + orderId);// 实际场景:通过订单ID查询订单表获取商品ID和数量(此处简化)Long productId = 1001L;  // 与初始化库存的商品ID一致Integer quantity = 5;    // 与订单服务的购买数量一致// 扣减库存(乐观锁实现)boolean success = deductStock(productId, quantity);if (success) {System.out.println("库存扣减成功,商品ID=" + productId + ",扣减数量=" + quantity);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  // 消费成功} else {System.out.println("库存扣减失败(乐观锁冲突),将重试,商品ID=" + productId);return ConsumeConcurrentlyStatus.RECONSUME_LATER;  // 重试}} catch (Exception e) {System.err.println("库存扣减异常,将重试");e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;  // 重试}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 4. 启动消费者consumer.start();System.out.println("库存服务消费者启动成功,等待消息...");}/*** 扣减库存(乐观锁实现,保证幂等性和并发安全)*/private static boolean deductStock(Long productId, Integer quantity) throws SQLException {Connection conn = null;try {conn = InventoryDBUtil.getConnection();conn.setAutoCommit(false);  // 开启事务// 步骤1:查询当前库存和版本号PreparedStatement queryStmt = conn.prepareStatement("SELECT stock, version FROM inventory_table WHERE product_id = ?");queryStmt.setLong(1, productId);ResultSet rs = queryStmt.executeQuery();if (!rs.next()) {throw new RuntimeException("商品不存在,productId=" + productId);}int currentStock = rs.getInt("stock");int currentVersion = rs.getInt("version");// 步骤2:检查库存是否充足if (currentStock < quantity) {throw new RuntimeException("库存不足,productId=" + productId + ",当前库存=" + currentStock);}// 步骤3:扣减库存(乐观锁:where条件包含version,防止并发问题)PreparedStatement updateStmt = conn.prepareStatement("UPDATE inventory_table SET stock = stock - ?, version = version + 1 " +"WHERE product_id = ? AND version = ?");updateStmt.setInt(1, quantity);updateStmt.setLong(2, productId);updateStmt.setInt(3, currentVersion);int rows = updateStmt.executeUpdate();if (rows == 0) {// 乐观锁冲突(其他线程已修改库存),扣减失败conn.rollback();return false;}conn.commit();  // 提交事务return true;} catch (SQLException e) {if (conn != null) conn.rollback();throw e;} finally {if (conn != null) {try { conn.close(); } catch (SQLException e) { e.printStackTrace(); }}}}
}

四、运行说明

  1. 依赖说明
    需在 pom.xml 中添加以下依赖(Maven 项目):

    <dependencies><!-- RocketMQ 客户端 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><!-- MySQL 驱动 --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.33</version></dependency><!-- 数据库连接池 --><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId><version>5.0.1</version></dependency>
    </dependencies>
    
  2. 启动 RocketMQ 服务
    先安装 RocketMQ 并启动 NameServer 和 Broker:

    # 启动 NameServer
    nohup sh bin/mqnamesrv &
    # 启动 Broker(默认端口 9876)
    nohup sh bin/mqbroker -n localhost:9876 &
    
  3. 运行顺序

    • 先启动库存服务消费者(StockConsumer.main
    • 再启动订单服务生产者(OrderTransactionProducer.main
  4. 验证结果

    • 订单表(order_table)新增一条状态为 0 的订单
    • 库存表(inventory_table)中商品 1001 的库存从 100 变为 95
    • 本地事务日志表(local_transaction_log)新增一条状态为 1 的记录

五、核心特性说明

  • 分布式事务保障:通过 RocketMQ 半事务消息 + 本地事务 + 回查机制,确保订单创建和库存扣减最终一致
  • 幂等性处理:库存扣减使用乐观锁(version 字段),避免重复消费导致超扣
  • 异常重试:消费者消费失败时返回 RECONSUME_LATER,触发 RocketMQ 自动重试
  • 事务回查:解决生产者宕机或网络异常导致的事务状态未知问题

该方案可直接用于电商下单-扣库存等核心场景,如需扩展其他服务(如支付、物流),只需新增对应的消息消费者并保证幂等性即可。
在这里插入图片描述


为什么本地事务成功后仍需要回查?

即使本地事务(如创建订单)已经成功提交,RocketMQ 仍然需要 事务回查机制,核心原因是 分布式系统中存在“状态同步异常”的可能性——即生产者本地事务成功,但 Broker 未收到“Commit”指令,导致消息无法投递,最终引发数据不一致。

具体场景:为什么本地事务成功后仍需要回查?

假设订单服务的本地事务(创建订单)已经成功提交,但在以下情况中,Broker 会始终认为消息状态未知,导致库存服务永远收不到扣库存消息,最终订单创建成功但库存未扣减(数据不一致):

1. 生产者发送“Commit”指令失败
  • 流程:本地事务成功提交 → 生产者准备向 Broker 发送“Commit”指令 → 网络突然中断 / 生产者宕机 → “Commit”指令未到达 Broker。
  • 结果:Broker 存储的仍是“半事务消息”(不可投递),但生产者本地事务已成功(订单已创建),此时必须通过回查让 Broker 知道“需要Commit”。
2. 生产者与 Broker 之间的网络分区
  • 场景:生产者所在机房与 Broker 机房网络临时断开,生产者无法发送“Commit”指令,但自身本地事务已执行成功。
  • 结果:Broker 长时间收不到状态确认,需主动回查生产者,确认本地事务状态。
3. 生产者处理逻辑异常
  • 场景:本地事务成功提交,但生产者在生成“Commit”指令的代码中抛出异常(如空指针),导致指令未发出。
  • 结果:Broker 未收到状态确认,需通过回查发现“本地事务实际已成功”。

回查机制的核心作用:解决“状态同步缺口”

RocketMQ 事务消息的核心是 “半事务消息 + 本地事务 + 状态确认” 的三段式设计,但前两段(半消息、本地事务)与第三段(状态确认)之间存在“同步缺口”:

  • 本地事务的成功与否,只有生产者的数据库知道;
  • Broker 仅能通过生产者主动发送的“Commit/Rollback”指令感知状态;
  • 当“状态确认指令”因各种异常丢失时,Broker 必须通过 主动回查生产者的数据库,填补这个缺口,确保最终状态一致。

代码中的回查逻辑如何生效?

在你提供的代码中,checkLocalTransaction 方法正是为回查设计的:

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {String transactionId = msg.getTransactionId();// 查本地事务日志表:如果状态为1(成功),则返回Committry (Connection conn = DBUtil.getConnection()) {PreparedStatement stmt = conn.prepareStatement("SELECT status FROM local_transaction_log WHERE transaction_id = ?");stmt.setString(1, transactionId);ResultSet rs = stmt.executeQuery();if (rs.next() && rs.getInt("status") == 1) {return LocalTransactionState.COMMIT_MESSAGE; // 回查发现本地成功,通知Broker提交}}// ...
}
  • 当本地事务成功提交后,local_transaction_log 表中会记录 status=1
  • 即使“Commit”指令丢失,Broker 回查时也能通过该日志确认“本地事务成功”,从而触发消息投递,最终保证订单创建与库存扣减的一致性。

总结

回查机制是 RocketMQ 为解决 “分布式系统中状态同步不可靠” 而设计的兜底方案。它不依赖生产者主动发送的状态指令,而是通过直接查询生产者的本地事务日志,确保 Broker 最终能获取正确的事务状态,避免因网络、宕机等异常导致的数据不一致。

简单说:本地事务成功 ≠ Broker 知道它成功,回查就是让 Broker“主动确认”的过程。


触发事务回查场景

RocketMQ 的事务回查机制并非无条件触发,而是在特定异常场景下才会执行,目的是解决“生产者本地事务状态与 Broker 消息状态不一致”的问题。以下详细说明触发条件及回查与“主动提交”的关系:

一、触发事务回查的 3 种核心场景

只有当 Broker 长时间无法确定事务最终状态时,才会触发回查,具体场景如下:

1. 生产者未发送任何事务确认指令(Commit/Rollback)
  • 正常流程中,生产者执行完本地事务后,必须向 Broker 发送 COMMITROLLBACK 指令(如代码中 return LocalTransactionState.COMMIT_MESSAGE)。
  • 若因以下原因导致指令未发出,Broker 会触发回查:
    • 生产者执行本地事务成功后,在发送 COMMIT 前宕机/重启;
    • 网络中断:生产者发送的 COMMIT/ROLLBACK 指令未到达 Broker;
    • 生产者代码异常:本地事务成功,但生成确认指令时抛出异常(如空指针),导致指令未发送。
2. 生产者发送了 UNKNOW 状态
  • 若生产者执行本地事务后返回 LocalTransactionState.UNKNOW(表示“暂时无法确定状态”),Broker 会认为事务状态未知,触发回查。
  • 例如:本地事务依赖第三方接口(如支付回调),暂时无法确认结果时,可先返回 UNKNOW,等待后续回查。
3. Broker 未在超时时间内收到确认指令
  • RocketMQ 有默认超时机制(默认 60 秒,可通过 transactionTimeout 配置):若 Broker 收到半事务消息后,超过超时时间仍未收到生产者的确认指令(COMMIT/ROLLBACK),则触发回查。
  • 目的是避免因极端异常(如生产者彻底宕机后重启)导致消息状态永久未知。

二、回查与“主动提交”的关系:回查是“主动提交失败”的兜底

回查不会替代“主动提交”,而是在“主动提交失败”时的补充机制,二者关系如下:

  1. 正常情况
    生产者执行本地事务后,会主动向 Broker 发送 COMMITROLLBACK。Broker 收到后,直接处理消息(投递/删除),不会触发回查

  2. 异常情况
    若主动提交失败(如指令丢失、生产者宕机),Broker 因未收到确认,会触发回查,通过查询生产者的本地事务日志(如 local_transaction_log 表)确认状态,最终决定消息是否投递。

举例

  • 你的代码中,若本地事务成功并返回 COMMIT_MESSAGE,且该指令成功到达 Broker,则 Broker 直接投递消息,不会回查
  • 若该 COMMIT 指令因网络中断丢失,Broker 超时后触发回查,通过查询 local_transaction_log 表中 status=1,最终仍会投递消息(保证一致性)。

三、回查的执行逻辑(避免无意义重复回查)

RocketMQ 对回查有严格的重试机制,避免无意义的重复回查:

  • 重试次数:默认最多回查 15 次(可通过 transactionCheckMax 配置),超过次数后默认按 ROLLBACK 处理。
  • 重试间隔:每次回查间隔逐渐延长(如 10s、30s、1min…),避免频繁访问生产者。
  • 终止条件:回查过程中,若生产者返回明确的 COMMITROLLBACK,则停止回查。

总结

  • 触发回查的本质:Broker 无法通过生产者的“主动确认指令”获取事务状态(指令丢失、超时、状态未知)。
  • 与主动提交的关系:回查是主动提交失败后的兜底机制,正常情况下(主动提交成功)不会触发。
  • 核心目的:通过查询生产者本地事务日志,解决分布式系统中“状态同步不可靠”的问题,保证最终一致性。

简单说:回查是 Broker 对“未知状态”的主动追问,只有当“主动汇报”失败时才会发生

http://www.dtcms.com/a/496366.html

相关文章:

  • 江苏省住房和城乡建设局网站首页网站图片文字排版错误
  • 做网站自动赚钱十堰seo优化教程
  • AUTOSAR图解==>AUTOSAR_AP_TR_SystemTests
  • 手机网站转微信小程序网上商城运营推广思路
  • 乡村振兴 统筹发展PPT(63页)
  • 沈阳网站选禾钻科技有哪些网页设计软件
  • instanceof和类型转换
  • MySQL 企业版数据脱敏与去标识化
  • 物流信息网站wordpress下载样式
  • 网站建设与维护要用到代码吗网站实用性
  • 常州住房和城乡建设部网站北京建设集团网站
  • 正规的GEO优化师培训哪家好
  • 建设银行甘肃省行网站wordpress请求接口的方式
  • 怎么开网站做网红淮安网站建设公司
  • 昌平建设网站徐州seo推广优化
  • 国内旅行做行程网站网站建设公司怎么谈单
  • vscode制作个人网站做爰片免费观看网站
  • 教育网站 网页赏析找公司做网站要注意什么问题
  • 卷积神经网络(CNN)入门实践及Sequential 容器封装
  • 高端网站建设 磐石网络专注自己的服务器建网站
  • 普陀建设网站wordpress开启全站ssl
  • 近期的笔试和面试的复盘
  • 公司邮箱免费注册seo营销培训咨询
  • 从控制到执行:理解 MCP Server 与 Agent 的关系
  • 学做网站网自己的代码放WordPress
  • 创建蛋糕网站建设方案开发网站心得
  • 2025 GEO 在线优化服务商有哪些?这几款各有侧重,哪家好?
  • 取消cine camera的预览窗口
  • public,private与protected
  • 【题解】B2614【深基1.习6】鸡兔共笼