当前位置: 首页 > news >正文

RokcetMQ事务消息详解

1. RocketMQ事务事务消息是什么?
是RocketMQ提出的保证本地和RokcetMQ组件保证跨系统数据一致性的解决方案

2. RokcetMQ事务消息的业务价值在哪里?
发送消息后,用户对于数据库记录做出更改,而发送消息喝修改数据分别隶属于RocketMQ与数据库两个不同的介质,这个时候可能会处于网络异常造成数据不一致。没有事务消息的时候一般是保持最终一致性。用事务消息更类似于用分布式事务进行数据一致性的保证

3.RocketMQ事务消息流程
分为两阶段提交,第一次发送的时候是“暂不可以消费”的半事务消息;
数据修改成功之后会进行提交,将消息变成“可消费”状态

4.RokectMQ的事务消息场景举例——电商场景
比如说电商新用户注册送积分
流程如下:
用户注册成功之后,首先将用户信息存储到数据库,然后发一条MQ消息出来,并且立即返回。与此同时,活动模块会订阅MQ消息,给用户赠送积分

因为引入了RocketMQ进行系统的解耦,虽然完整利用了MQ的异步、削峰、解耦的功能,但是会引入另外一个新的问题——假如用户信息存入了数据库但是MQ消息发送失败了呢;假如数据库存入失败但是消息发送成功了呢(但是这个一般不会,因为先存数据库再发MQ,所以这一步失败了会回滚)——执行顺序反过来就会有这个问题

5.RokcetMQ使用事项
业务自己实现的接口

public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}
}


当事务发起⽅成功向RocketMQ发送准备执⾏事务的消息后,RocketMQ会回调 RocketMQLocalTransactionListener接⼝中的executeLocalTransaction(Message,Object)方法。executeLocalTransaction(Message,Object)⽅法中主要接收两个参数:⼀个是Message类 型参数,表示回传的消息;另⼀个是Object类型参数,是事务发起⽅调⽤RocketMQ的send()⽅ 法时传递的参数。此⽅法会返回事务的状态,当返回COMMIT时,表示事务提交,当返回 ROLLBACK时,表示事务回滚,当返回UNKNOW时,表示事务回调。当需要回查本地事务状态时,调用checkLocalTransaction(Message)⽅法。checkLocal Transaction(Message)⽅法中接收⼀个Message类型参数,表示要回查的事务消息。此⽅法返 回事务的状态,同executeLocalTransaction(Message,Object)⽅法返回的事务状态,这⾥不再赘述。

重要的三个状态

  • COMMIT_MESSAGE:明确本地事务成功,通知 Broker 提交消息。(消息只有被commit才对消费者可见)

  • ROLLBACK_MESSAGE:明确本地事务失败,通知 Broker 回滚消息。

  • UNKNOW:表示业务方此时也无法确定本地事务的状态(比如,查询事务状态的DB或下游服务暂时不可用)。通知 Broker:“我暂时不知道,请稍后重试回查”。这为系统的健壮性提供了保障

6.RocketMQ事务消息原理解析
首先是Apache RocketMQ官方文档给的图和给的标准流程

  1. 生产者将消息发送至 RocketMQ 服务端; 

  2. RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息; 

  3. 生产者开始执行本地事务逻辑; 

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者;
    • 二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。 
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查;

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果; 

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。

这里的回查机制很重要——回查机制本质上是通过RocketMQ回查服务端的接口,所以整个回查机制需要业务自己写
比如说电商场景,我们做回查的时候就需要进行判断DB中是否有相关的记录即可——当然如果业务比较复杂可以做多个的连表

接着从代码去入手,看看RocketMQ是怎么实现上述流程的
首先是看看这个

sendMessageInTransaction的代码如下
public TransactionSendResult sendMessageInTransaction(final Message msg,final TransactionListener localTransactionListener, final Object arg)throws MQClientException {// 获取事务监听器,这个监听器主要用于Broker反向回查事务状态。TransactionListener transactionListener = getCheckListener();// 如果调用者没有传入本地事务执行器,并且生产者也没有设置默认的事务回查监听器,// 那么就无法执行本地事务和处理事务回查,因此抛出异常。if (null == localTransactionListener && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// 事务消息不支持延迟投递,所以如果用户设置了延迟级别,需要在这里清除掉。if (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}// 使用验证器检查消息的合法性(例如Topic、Body是否为空等)。Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;// 为消息设置一个特殊属性,标记它是一个“事务预备消息”(也叫半消息)。MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");// 将生产者的组名也放入消息属性中,Broker进行事务回查时,需要根据这个组名找到对应的生产者实例。MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {// 第一阶段:发送半消息到Broker。sendResult = this.send(msg);} catch (Exception e) {// 如果发送半消息失败,直接抛出异常,事务流程结束。throw new MQClientException("send message Exception", e);}// 初始化本地事务状态为“未知”。LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;// 用于记录执行本地事务时可能发生的异常。Throwable localException = null;// 根据半消息的发送结果,决定下一步操作。switch (sendResult.getSendStatus()) {case SEND_OK: { // 半消息发送成功try {// 有些Broker版本会返回一个事务ID,如果返回了,就设置到消息的用户属性中。if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}// 从消息的唯一键属性中获取事务ID。String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {// 将获取到的ID设置到消息的TransactionId字段中。msg.setTransactionId(transactionId);}// 第二阶段:执行本地事务。if (null != localTransactionListener) {// 优先使用本次调用传入的事务监听器来执行本地事务。localTransactionState = localTransactionListener.executeLocalTransaction(msg, arg);} else {// 如果没有传入,则使用生产者预设的默认事务监听器(主要用于回查逻辑)。log.debug("Used new transaction API");localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}// 如果本地事务执行器返回null,也当作“未知”状态处理。if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}// 如果本地事务的执行结果不是“提交”,则打印一条info日志,方便排查问题。if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return: {} messageTopic: {} transactionId: {} tag: {} key: {}",localTransactionState, msg.getTopic(), msg.getTransactionId(), msg.getTags(), msg.getKeys());}} catch (Throwable e) {// 如果执行本地事务时发生任何异常。log.error("executeLocalTransactionBranch exception, messageTopic: {} transactionId: {} tag: {} key: {}",msg.getTopic(), msg.getTransactionId(), msg.getTags(), msg.getKeys(), e);// 记录这个异常。本地事务状态仍然是初始化的“未知”状态。localException = e;}}break;// 如果半消息发送结果是刷盘超时、同步从节点超时或从节点不可用。case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:// 这些状态意味着Broker不能保证半消息被成功持久化,所以我们直接将本地事务状态设置为“回滚”。// 这样做可以避免执行本地事务后,半消息却丢失的情况。localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:// 其他发送状态,暂时不做任何处理,本地事务状态保持“未知”。break;}/** 这里的代码省略了第三阶段:提交或回滚事务。* 完整的逻辑会在这个switch之后,根据 `localTransactionState` 的值,* 向Broker发送COMMIT或ROLLBACK指令。* 如果是 UNKNOW 状态,则什么都不做,等待Broker的回查。*/// ... a endTransaction method would likely be called here ...// 返回最终的事务发送结果对象。TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;
}

对于这个监听器来说我们需要实现如下的接口——执行本地事务和消息的回查机制

public interface TransactionListener {/*** When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.** @param msg Half(prepare) message* @param arg Custom business parameter* @return Transaction state*/LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);/*** When no response to prepare(half) message. broker will send check message to check the transaction status, and this* method will be invoked to get local transaction status.** @param msg Check message* @return Transaction state*/LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

对于发送端,你可能还会有疑惑——为什么这个发送端是需要进行本地事务监听器又做了一个可以传入参数的事务监听器呢?
笔者认为是可以做一个方法级别的监听器——负责多业务的使用——不必冗余在一个监听器
同事可以做一个事务的回查

拿电商来说,应该是在支付的时候有

  • 创建订单:扣减库存,然后发送消息。

  • 用户退款:更新退款单状态,然后发送消息。

  • 发放优惠券:记录发放日志,然后发送消息。

这三个流程
如果只有一个监听器——没有方法的监听器对于电商来说代码应该是

public class GodTransactionListener implements TransactionListener {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {String bizType = msg.getProperty("BIZ_TYPE");if ("CREATE_ORDER".equals(bizType)) {// 执行创建订单的本地事务...} else if ("REFUND".equals(bizType)) {// 执行退款的本地事务...} else if ("COUPON".equals(bizType)) {// 执行发券的本地事务...}// ... 代码变得非常复杂和臃肿return LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String bizType = msg.getProperty("BIZ_TYPE");if ("CREATE_ORDER".equals(bizType)) {// 检查订单数据库状态...} else if ("REFUND".equals(bizType)) {// 检查退款单数据库状态...}// ... 回查逻辑也非常复杂return LocalTransactionState.COMMIT_MESSAGE;}
}

但是如果做监听器的方法级别,我们只有回查需要做,监听器可以不写executeLocalTransaction的代码后续的写于方法级别的监听器代码即可

// 专门负责回查的监听器
public class MyCheckListener implements TransactionListener {// 这个方法现在可以忽略,因为我们不会用它来执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {return LocalTransactionState.UNKNOW;}// 核心职责:统一处理所有业务的回查@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String bizKey = msg.getKeys(); // 假设业务主键放在了Keys里String topic = msg.getTopic();if ("TopicOrder".equals(topic)) {// 去订单库查订单状态...} else if ("TopicRefund".equals(topic)) {// 去退款库查退款单状态...}// ... 逻辑清晰return LocalTransactionState.COMMIT_MESSAGE;}
}



7.RocketMQ事务消息与Kafka事务消息原理对比

http://www.dtcms.com/a/422448.html

相关文章:

  • Athena + S3 数据分析实战(深度版):从数据湖到可视化 BI
  • IP纯净度检测工具
  • 第四部分:VTK常用类详解(第114章 vtkStreamTracer流线追踪类)
  • MATLAB的CFAR(恒虚警率)图像目标检测
  • 2025三掌柜赠书活动第三十五期 AI辅助React Web应用开发实践:基于React 19和GitHub Copilot
  • HRPC在Polaris存储系统中的应用
  • 网站在百度无法验证码怎么办网站开发技术有包括
  • 【AI时代速通QT】第八节:Visual Studio与Qt-从项目迁移到多版本管理
  • Spring线程池:ThreadPoolExecutor与ThreadPoolTaskExecutor终极对比
  • IDEA创建SpringBoot项目使用JDK1.8
  • 深入分析JAR和WAR包的区别 (指南七)
  • 详解 OpenCV 中的仿射变换:原理与实战案例
  • 计算机视觉(opencv)——基于 dlib 和 CNN卷积神经网络 的人脸检测
  • 黑色背景的网站开发工具微信商城收费吗
  • html快速学习
  • 门户网站 模板之家办公室门户网站建设和管理工作
  • Git 基础 - 查看提交历史
  • 《Linux 构建工具核心:make 命令、进度条、Gitee》
  • vlan batch { vlan-id1 [ to vlan-id2 ] } 概念及题目
  • 济宁网站建设服务互联网公司怎么赚钱
  • Linux-简单命令
  • Linux ​​ls​​ 命令进阶:从隐藏文件到递归显示,成为文件浏览大师
  • VPS服务器锁等待超时处理,如何有效解决数据库性能瓶颈
  • 英伟达服务器维修市场崛起:捷智算GPU维修中心的技术突围之路
  • 第四部分:VTK常用类详解(第102章 vtkButtonWidget按钮控件类)
  • 进阶02:Labview操作者框架
  • 3.0 labview使用SQLServer
  • 网站营销理念网站建设作业怎么写
  • Apache Doris 大数据仓库全面解析
  • Spring Cloud RabbitMQ 详解:从基础概念到秒杀实战