RocketMQ 事务消息详解及生产使用场景
博主介绍:✌全网粉丝5W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌
博主作品:《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis-plus+MySQL+Vue等前后端分离项目,可以在左边的分类专栏找到更多项目。《Uniapp项目案例》有几个有uniapp教程,企业实战开发。《微服务实战》专栏是本人的实战经验总结,《Spring家族及微服务系列》专注Spring、SpringMVC、SpringBoot、SpringCloud系列、Nacos等源码解读、热门面试题、架构设计等。除此之外还有不少文章等你来细细品味,更多惊喜等着你哦
🍅uniapp微信小程序🍅面试题软考题免费使用,还可以使用微信支付,扫码加群。由于维护成本问题得不到解决,可能将停止线上维护。
🍅文末获取联系🍅精彩专栏推荐订阅👇🏻👇🏻 不然下次找不到哟
Java项目案例《100套》
https://blog.csdn.net/qq_57756904/category_12173599.html
uniapp小程序《100套》https://blog.csdn.net/qq_57756904/category_12173599.html
有需求代码永远写不完,而方法才是破解之道,抖音有实战视频课程,某马某千等培训都是2万左右,甚至广东有本科院校单单一年就得3万4年就12万学费,而且还没有包括吃饭的钱。所以很划算了。另外博客左侧有源码阅读专栏,对于求职有很大帮助,当然对于工作也是有指导意义等。在大城市求职,你面试来回一趟多多少少都在12块左右,而且一般不会一次性就通过,还得面试几家。而如果你对源码以及微服务等有深度认识,这无疑给你的面试添砖加瓦更上一层楼。
最后再送一句:最好是学会了,而不是学废了!!
2
一、事务消息核心概念
RocketMQ事务消息是解决分布式事务的一种重要方案,它通过"半消息"机制确保本地事务与消息发送的原子性。
1. 事务消息流程
[事务执行流程图]
1. 生产者发送"半消息" → 2. Broker存储半消息 → 3. 执行本地事务
4. 根据本地事务结果提交/回滚 → 5. Broker提交/回滚消息 → 6. 消费者可见/丢弃
2. 关键状态
-
PREPARED:半消息状态,对消费者不可见
-
COMMIT:事务提交,消息对消费者可见
-
ROLLBACK:事务回滚,消息被删除
-
UNKNOWN:需要检查本地事务状态
二、核心使用场景
1. 订单支付场景(经典案例)
// 订单服务
public void processPayment(Order order) {// 1. 发送事务消息TransactionSendResult result = producer.sendMessageInTransaction("payment-tx-topic",MessageBuilder.withPayload(order).build(),order.getOrderId());// 2. 本地事务执行(在TransactionListener中)
}// 事务监听器
@RocketMQTransactionListener
public class PaymentTransactionListener implements RocketMQTransactionListener {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {String orderId = (String) arg;// 更新订单状态为"支付中"orderDao.updateStatus(orderId, "PROCESSING");return LocalTransactionState.UNKNOWN;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String orderId = msg.getKeys();Order order = orderDao.getById(orderId);return "PAID".equals(order.getStatus()) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;}
}
2. 库存扣减场景
// 库存服务消费者
@RocketMQMessageListener(topic = "inventory-tx-topic",consumerGroup = "inventory-group"
)
public class InventoryConsumer implements RocketMQListener<InventoryDTO> {@Transactional@Overridepublic void onMessage(InventoryDTO dto) {// 幂等检查if (inventoryLogDao.exists(dto.getTxId())) {return;}// 扣减库存inventoryDao.reduceStock(dto.getSku(), dto.getQuantity());// 记录日志inventoryLogDao.insert(new InventoryLog(dto.getTxId()));}
}
3. 跨服务数据同步
// 用户服务(数据变更方)
public void updateUser(User user) {// 1. 开启本地事务TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 2. 更新DBuserDao.update(user);// 3. 发送事务消息TransactionSendResult result = producer.sendMessageInTransaction("user-update-topic",MessageBuilder.withPayload(user).build(),null);transactionManager.commit(status);} catch (Exception e) {transactionManager.rollback(status);throw e;}
}
三、事务消息实现细节
1. 生产者配置
@Bean
public TransactionMQProducer transactionProducer(@Value("${rocketmq.name-server}") String nameServer) {TransactionMQProducer producer = new TransactionMQProducer("tx-producer-group");producer.setNamesrvAddr(nameServer);producer.setTransactionListener(new YourTransactionListener());// 事务检查线程池ExecutorService executor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),r -> new Thread(r, "tx-check-thread"));producer.setExecutorService(executor);return producer;
}
2. 消息状态检查策略
public class OrderTransactionListener implements RocketMQTransactionListener {@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 建议采用三级检查策略:// 1. 先查Redis快速返回String status = redis.get("tx_status:" + msg.getTransactionId());if ("COMMITTED".equals(status)) return COMMIT;if ("ROLLBACK".equals(status)) return ROLLBACK;// 2. 查本地数据库OrderTx tx = txDao.getByTxId(msg.getTransactionId());if (tx != null) {redis.setex("tx_status:" + msg.getTransactionId(), tx.getStatus(), 5, TimeUnit.MINUTES);return tx.isSuccess() ? COMMIT : ROLLBACK;}// 3. 查业务表最终状态Order order = orderDao.getByTxId(msg.getTransactionId());if (order != null) {return "PAID".equals(order.getStatus()) ? COMMIT : ROLLBACK;}return UNKNOWN; // 继续等待下次检查}
}
四、生产环境注意事项
1. 必须实现的保障措施
-
幂等设计:消费者必须实现幂等处理
@Transactional public void consume(MessageExt msg) {if (processed(msg.getMsgId())) return;// 业务处理recordProcessed(msg.getMsgId()); }
-
事务状态持久化:本地事务状态必须落库
CREATE TABLE tx_log (tx_id VARCHAR(64) PRIMARY KEY,biz_type VARCHAR(32),status VARCHAR(16),create_time DATETIME,update_time DATETIME );
2. 性能优化建议
-
异步提交:对于非关键路径可以使用异步提交
producer.sendMessageInTransaction(msg, new TransactionSendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 异步处理成功}@Overridepublic void onException(Throwable e) {// 异常处理} });
-
批量事务消息:5.0+版本支持批量事务消息
List<Message> messages = ...; producer.sendMessageInTransaction(messages, null);
3. 监控指标
指标名称 | 监控方式 | 阈值建议 |
---|---|---|
事务消息TPS | RocketMQ Console | 根据机器配置 |
平均处理耗时 | Prometheus + Grafana | < 500ms |
事务检查失败率 | 自定义监控 | < 0.1% |
未完成事务数 | 定时扫描tx_log表 | < 100 |
五、与其它方案的对比
1. 对比本地消息表
RocketMQ事务消息 | 本地消息表 | |
---|---|---|
实现复杂度 | 中(依赖MQ) | 高(需自实现) |
可靠性 | 高 | 取决于实现 |
性能 | 高 | 中(需DB操作) |
实时性 | 高 | 依赖扫描间隔 |
2. 对比Seata
[适用场景对比图]
RocketMQ事务消息 → 消息驱动场景
Seata → 复杂业务事务场景
六、典型问题解决方案
1. 事务消息堆积处理
// 补偿任务示例
@Scheduled(fixedDelay = 60000)
public void compensateTimeoutTransactions() {List<TransactionLog> timeouts = txLogDao.findTimeoutTransactions(30);timeouts.forEach(tx -> {MessageExt msg = queryMessageByTxId(tx.getTxId());LocalTransactionState state = checkLocalTransaction(msg);if (state != UNKNOWN) {producer.endTransaction(msg, state, null);txLogDao.updateStatus(tx.getTxId(), state.name());}});
}
2. 网络分区处理
public LocalTransactionState checkLocalTransaction(MessageExt msg) {if (isNetworkPartition()) {// 网络分区时保守策略return LocalTransactionState.ROLLBACK_MESSAGE;}// 正常检查逻辑
}
总结
最佳使用场景:
-
需要保证本地操作与消息发送一致性的场景
-
跨系统数据最终一致性要求高的场景
-
对性能要求较高的分布式事务场景
不适用场景:
-
需要强一致性的金融核心交易
-
事务执行时间可能很长的业务(超过消息检查时间窗口)
实际生产中,建议将事务消息与本地消息表结合使用,关键业务增加补偿任务,实现最大程度的可靠性保障。
3