当前位置: 首页 > news >正文

RocketMQ 事务消息详解及生产使用场景

 

博主介绍:✌全网粉丝5W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌

博主作品:《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis-plus+MySQL+Vue等前后端分离项目,可以在左边的分类专栏找到更多项目。《Uniapp项目案例》有几个有uniapp教程,企业实战开发。《微服务实战》专栏是本人的实战经验总结,《Spring家族及微服务系列》专注Spring、SpringMVC、SpringBoot、SpringCloud系列、Nacos等源码解读、热门面试题、架构设计等。除此之外还有不少文章等你来细细品味,更多惊喜等着你哦

🍅uniapp微信小程序🍅面试题软考题免费使用,还可以使用微信支付,扫码加群。由于维护成本问题得不到解决,可能将停止线上维护。

🍅文末获取联系🍅精彩专栏推荐订阅👇🏻👇🏻 不然下次找不到哟

Java项目案例《100套》
https://blog.csdn.net/qq_57756904/category_12173599.html
uniapp小程序《100套》

https://blog.csdn.net/qq_57756904/category_12173599.html

有需求代码永远写不完,而方法才是破解之道,抖音有实战视频课程,某马某千等培训都是2万左右,甚至广东有本科院校单单一年就得3万4年就12万学费,而且还没有包括吃饭的钱。所以很划算了。另外博客左侧有源码阅读专栏,对于求职有很大帮助,当然对于工作也是有指导意义等。在大城市求职,你面试来回一趟多多少少都在12块左右,而且一般不会一次性就通过,还得面试几家。而如果你对源码以及微服务等有深度认识,这无疑给你的面试添砖加瓦更上一层楼。

最后再送一句:最好是学会了,而不是学废了!!

2

一、事务消息核心概念

RocketMQ事务消息是解决分布式事务的一种重要方案,它通过"半消息"机制确保本地事务与消息发送的原子性。

1. 事务消息流程

[事务执行流程图]
1. 生产者发送"半消息" → 2. Broker存储半消息 → 3. 执行本地事务
4. 根据本地事务结果提交/回滚 → 5. Broker提交/回滚消息 → 6. 消费者可见/丢弃

2. 关键状态

  • PREPARED:半消息状态,对消费者不可见

  • COMMIT:事务提交,消息对消费者可见

  • ROLLBACK:事务回滚,消息被删除

  • UNKNOWN:需要检查本地事务状态

二、核心使用场景

1. 订单支付场景(经典案例)

// 订单服务
public void processPayment(Order order) {// 1. 发送事务消息TransactionSendResult result = producer.sendMessageInTransaction("payment-tx-topic",MessageBuilder.withPayload(order).build(),order.getOrderId());// 2. 本地事务执行(在TransactionListener中)
}// 事务监听器
@RocketMQTransactionListener
public class PaymentTransactionListener implements RocketMQTransactionListener {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {String orderId = (String) arg;// 更新订单状态为"支付中"orderDao.updateStatus(orderId, "PROCESSING");return LocalTransactionState.UNKNOWN;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String orderId = msg.getKeys();Order order = orderDao.getById(orderId);return "PAID".equals(order.getStatus()) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;}
}

2. 库存扣减场景

// 库存服务消费者
@RocketMQMessageListener(topic = "inventory-tx-topic",consumerGroup = "inventory-group"
)
public class InventoryConsumer implements RocketMQListener<InventoryDTO> {@Transactional@Overridepublic void onMessage(InventoryDTO dto) {// 幂等检查if (inventoryLogDao.exists(dto.getTxId())) {return;}// 扣减库存inventoryDao.reduceStock(dto.getSku(), dto.getQuantity());// 记录日志inventoryLogDao.insert(new InventoryLog(dto.getTxId()));}
}

3. 跨服务数据同步

// 用户服务(数据变更方)
public void updateUser(User user) {// 1. 开启本地事务TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 2. 更新DBuserDao.update(user);// 3. 发送事务消息TransactionSendResult result = producer.sendMessageInTransaction("user-update-topic",MessageBuilder.withPayload(user).build(),null);transactionManager.commit(status);} catch (Exception e) {transactionManager.rollback(status);throw e;}
}

三、事务消息实现细节

1. 生产者配置

@Bean
public TransactionMQProducer transactionProducer(@Value("${rocketmq.name-server}") String nameServer) {TransactionMQProducer producer = new TransactionMQProducer("tx-producer-group");producer.setNamesrvAddr(nameServer);producer.setTransactionListener(new YourTransactionListener());// 事务检查线程池ExecutorService executor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),r -> new Thread(r, "tx-check-thread"));producer.setExecutorService(executor);return producer;
}

2. 消息状态检查策略

public class OrderTransactionListener implements RocketMQTransactionListener {@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 建议采用三级检查策略:// 1. 先查Redis快速返回String status = redis.get("tx_status:" + msg.getTransactionId());if ("COMMITTED".equals(status)) return COMMIT;if ("ROLLBACK".equals(status)) return ROLLBACK;// 2. 查本地数据库OrderTx tx = txDao.getByTxId(msg.getTransactionId());if (tx != null) {redis.setex("tx_status:" + msg.getTransactionId(), tx.getStatus(), 5, TimeUnit.MINUTES);return tx.isSuccess() ? COMMIT : ROLLBACK;}// 3. 查业务表最终状态Order order = orderDao.getByTxId(msg.getTransactionId());if (order != null) {return "PAID".equals(order.getStatus()) ? COMMIT : ROLLBACK;}return UNKNOWN; // 继续等待下次检查}
}

四、生产环境注意事项

1. 必须实现的保障措施

  • 幂等设计:消费者必须实现幂等处理

    @Transactional
    public void consume(MessageExt msg) {if (processed(msg.getMsgId())) return;// 业务处理recordProcessed(msg.getMsgId());
    }
  • 事务状态持久化:本地事务状态必须落库

    CREATE TABLE tx_log (tx_id VARCHAR(64) PRIMARY KEY,biz_type VARCHAR(32),status VARCHAR(16),create_time DATETIME,update_time DATETIME
    );

2. 性能优化建议

  • 异步提交:对于非关键路径可以使用异步提交

    producer.sendMessageInTransaction(msg, new TransactionSendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 异步处理成功}@Overridepublic void onException(Throwable e) {// 异常处理}
    });

  • 批量事务消息:5.0+版本支持批量事务消息

    List<Message> messages = ...;
    producer.sendMessageInTransaction(messages, null);

3. 监控指标

指标名称监控方式阈值建议
事务消息TPSRocketMQ Console根据机器配置
平均处理耗时Prometheus + Grafana< 500ms
事务检查失败率自定义监控< 0.1%
未完成事务数定时扫描tx_log表< 100

五、与其它方案的对比

1. 对比本地消息表

RocketMQ事务消息本地消息表
实现复杂度中(依赖MQ)高(需自实现)
可靠性取决于实现
性能中(需DB操作)
实时性依赖扫描间隔

2. 对比Seata

[适用场景对比图]
RocketMQ事务消息 → 消息驱动场景
Seata → 复杂业务事务场景

六、典型问题解决方案

1. 事务消息堆积处理

// 补偿任务示例
@Scheduled(fixedDelay = 60000)
public void compensateTimeoutTransactions() {List<TransactionLog> timeouts = txLogDao.findTimeoutTransactions(30);timeouts.forEach(tx -> {MessageExt msg = queryMessageByTxId(tx.getTxId());LocalTransactionState state = checkLocalTransaction(msg);if (state != UNKNOWN) {producer.endTransaction(msg, state, null);txLogDao.updateStatus(tx.getTxId(), state.name());}});
}

2. 网络分区处理

public LocalTransactionState checkLocalTransaction(MessageExt msg) {if (isNetworkPartition()) {// 网络分区时保守策略return LocalTransactionState.ROLLBACK_MESSAGE;}// 正常检查逻辑
}

总结

最佳使用场景

  1. 需要保证本地操作与消息发送一致性的场景

  2. 跨系统数据最终一致性要求高的场景

  3. 对性能要求较高的分布式事务场景

不适用场景

  1. 需要强一致性的金融核心交易

  2. 事务执行时间可能很长的业务(超过消息检查时间窗口)

实际生产中,建议将事务消息与本地消息表结合使用,关键业务增加补偿任务,实现最大程度的可靠性保障。

3

相关文章:

  • CQF预备知识:一、微积分 —— 1.2.2 函数f(x)的类型详解
  • 微服务架构中的 RabbitMQ:异步通信与服务解耦(二)
  • RabbitMQ可靠传输——持久性、发送方确认
  • 比斯特自动化|移动电源全自动点焊机:高效点焊助力移动电源制造
  • 随机链表的复制问题详解与代码实现
  • 高等数学-极限
  • AutoMapper .net Framework 的 Model转换扩展方法
  • 小球弹弹弹
  • 数据库5——审计及触发器
  • Linux Docker安装【再探完美版教程】
  • sqlserver数据库查询执行慢的sql、查询隔离级别、设置快照模式、查询锁表进程、锁表sql、解锁等
  • [每日一题] 3362. 零数组变换 iii
  • Excel 密码忘记了?巧用PassFab for Excel 解密帮您找回数据!
  • 二十一、面向对象底层逻辑-scope作用域接口设计
  • deepseek调用
  • 内存管理子系统学习记录
  • 语义分割的image
  • Excel合并单元格后,如何自动批量生成序号列
  • 【人工智障生成日记1】从零开始训练本地小语言模型
  • Google Agent Development Kit与MCP初试
  • 梵克雅宝官网中国官方网站/开网店怎么开 新手无货源
  • 网络营销的特点包括哪些/广州网站优化外包
  • 为何公司做的网站很丑/google推广专员招聘
  • php能干嘛 wordpress/自动app优化最新版
  • 沈阳哪家网站做的好/苏州seo关键词优化外包
  • 宝安网站建设公司/平台推广引流怎么做