【进阶篇-消息队列】——RocketMQ如何实现事务的
目录
- 一、RocketMQ如何实现事务的
- 1.1、普通业务代码实现RocketMQ 的事务大致流程
- 1.2、通过 RocketMQ 的源代码分析事务消息是如何实现的
- 1.2.1、 RocketMQ 在 Producer 端事务消息的实现
- 1.2.1、 RocketMQ 在 Broker端事务消息和事务反查的实现
本文来源:极客时间vip课程笔记
一、RocketMQ如何实现事务的
1.1、普通业务代码实现RocketMQ 的事务大致流程
-
首先我们一起通过普通业务代码来看 RocketMQ 的事务大致流程。
public class CreateOrderService {@Injectprivate OrderDao orderDao; // 注入订单表的DAO@Injectprivate ExecutorService executorService; //注入一个ExecutorServiceprivate TransactionMQProducer producer;// 初始化transactionListener 和 producer@Initpublic void init() throws MQClientException {TransactionListener transactionListener = createTransactionListener();producer = new TransactionMQProducer("myGroup");producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();}// 创建订单服务的请求入口@PUT@RequestMapping(...)public boolean createOrder(@RequestBody CreateOrderRequest request) {// 根据创建订单请求创建一条消息Message msg = createMessage(request);// 发送事务消息SendResult sendResult = producer.sendMessageInTransaction(msg, request);// 返回:事务是否成功return sendResult.getSendStatus() == SendStatus.SEND_OK;}private TransactionListener createTransactionListener() {return new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {CreateOrderRequest request = (CreateOrderRequest ) arg;try {// 执行本地事务创建订单orderDao.createOrderInDB(request);// 如果没抛异常说明执行成功,提交事务消息return LocalTransactionState.COMMIT_MESSAGE;} catch (Throwable t) {// 失败则直接回滚事务消息return LocalTransactionState.ROLLBACK_MESSAGE;}}// 反查本地事务