当前位置: 首页 > news >正文

RocketMQ的事务消息机制

事务消息的实现流程

在这里插入图片描述

  1. 首先 生产者 发送一个半消息(也称 half 消息)至 MQ 中。为什么要先发送一个 half 消息呢?这是为了保证生产者和 MQ 之间的通信正常,如果无法正常通信,则消费者可以直接返回一个异常,也就不用处理后面的逻辑。

  2. 如果 half 消息发送成功,MQ 收到这个 half 消息后,会返回一个 success 响应给生产者。

  3. 生产者接收到 MQ 返回的 success 响应后,开始处理本地的业务逻辑,并提交本地事务。

  4. 如果生产者 本地事务提交成功,则会向 MQ 中发送 commit,表示将 half 消息提交,MQ 就会执行第 5 步操作;如果生产者本地事务提交失败,则直接回滚本地事务,并向 MQ 中发送 rollback,表示将之前的 half 消息进行回滚,MQ 接收到 rollback 消息后,就会将 half 消息删除。

  5. 如果 commit,则将 half 消息写入到磁盘。

  6. 如果 MQ 长时间没有接收到 commit 或者 rollback 消息,例如:生产者在处理本地业务时宕机了,或者发送的 commit、rollback 因为在弱网环境,数据丢失了。那么 MQ 就会在一定时间后尝试调用生产者 提供的一个接口,通过这个接口来判断 half 消息的状态。

    所以生产者 提供的接口,需要实现的业务逻辑是:通过数据库中对应数据的状态来判断,之前的 half 消息对应的业务是否执行成功。如果 MQ 从这个接口中得知 half 消息执行成功了,那么 MQ 就会将 half 消息持久化到本地磁盘,如果得知没有执行成功,那么就会将 half 消息删除。

  7. 消费者 从 MQ 中消费对应的消息。

  8. 消费者 处理本地业务逻辑,然后提交本地事务。

简单来讲就是:

生产者发送一个半消息,这个半消息对消费者不可见,然后执行本地事务,比如在本地数据库操作,如果成功,就提交事务消息,这时候消息才被消费者看到;如果失败,就回滚,消息会被删除。如果本地事务执行状态不明确,RocketMQ会回调生产者的接口来确认状态。这样可以保证消息的可靠传输,进而实现分布式事务的最终一致性。

实现流程说完了,可能你现在有各种各样的疑惑?

half 消息是个啥?

它和我们正常发送的普通消息是一样的,都是存储在 MQ 中,唯一不同的是 half 在 MQ 中不会立马被消费者消费到,除非这个 half 消息被 commit 了。(至于为什么未 commit 的 half 消息无法被消费者读取到,这是因为在 MQ 内部,对于事务消息而言,在 commit 之前,会先放在一个内部队列中,只有 commit 了,才会真正将消息放在消费者能读取到的 topic 队列中)

为什么要先发送 half 消息?

主要是为了保证生产者和 MQ 之间是否能正常通信,如果两者之间都不能正常通信,直接返回异常就可以了。

如果 MQ 接收到了 half 消息,但是在返回 success 响应的时候,因为网络原因,导致生产者没有接收到 success 响应,这个时候是什么现象?

当生产者发送 half 消息后,它会等待 MQ 给自己返回 success 响应,如果没有接收到,那么生产者 也会直接结束,返回异常,不再执行后续逻辑。不执行后续逻辑,这样生产者 也就不会提交 commit 消息给 MQ,MQ 长时间没接收到 commit 消息,那么它就会主动回调生产者 的一个接口,生产者 通过接口,查询本地数据后,发现这条消息对应的业务并没有正常执行,那么就告诉 MQ,这个 half 消息不能 commit,需要 rollback,MQ 知道后,就将 half 消息进行删除。

如果生产者本地事务执行失败了,怎么办?

生产者本地事务执行失败后,先对自己本地事务进行回滚,然后再向 MQ 发送 rollback 操作。

生产者本地事务提交成功或失败后,向 MQ 发送的 commit 或者 rollback 消息,因为网络问题丢失了,又该怎么处理?

和上一个问题一样,MQ 长时间没有接收到 half 消息的 commit 或者 rollback 消息,MQ 会主动回调生产者的接口,通过这个接口来判断自己该对这个 half 消息如何处理。

前面说的全是事务消息的实现流程,这和事务消息如何保证数据的最终一致性有什么关系呢?

有关系。首先,生产者 执行本地事务并提交和向 MQ 中发送消息这是两个写操作,然后通过 RocketMQ 的事务消息,我们保证了这两个写操作要么都执行成功,要么都执行失败。然后让其他系统,如消费者通过消费 MQ 中的消息,然后再去执行自己本地的事务,这样到最后,生产者 和 消费者这两个系统的数据状态是不是达到了一致?这就是最终一致性的含义。

如果要求生产者和消费者的数据状态,在生产者返回给客户端之间,这两者就达到一致,这是强一致性,RocketMQ 是没法保证强一致性的。

目前通过「可靠消息来保证数据的最终一致性」是很多大厂都采用的方案,基本都是通过 MQ 和补偿机制来保证数据的一致性。

消费者本地事务提交失败了,怎么办?

如果消费者本地事务提交失败了,可以通过补偿机制进行多次重试,直到成功。如果重试多次后,还是提交失败,例如此时消费者对应的 DB 宕机了,这个时候只要消费者不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 会在一定时间后,继续将这条消息推送给消费者,消费者 就可以继续执行本地事务并提交了,直到成功。这样,依旧是保证了生产者和消费者 数据的最终一致性。

代码实现

使用 RokcetMQ 的事务消息主要涉及到两个部分:

如何发送半事务消息,这个可以通过「TransactionMQProducer」 类来实现。

TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup");
TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(msg, null);
// 通过result来判断half消息是否发送成功
if(result.getSendStatus() == SendStatus.SEND_OK){// 成功
}else{// 失败
}

在前面我们提到了生产者需要提供一个接口,用来供 MQ 回调生产者,实际上这个接口就是一个监听器:'「TransactionListener」的方法。这是一个接口,提供了两个方法。

public interface TransactionListener {// 当half消息发送成功后,我们在这里实现自己的业务逻辑,然后commit或者rollback 给MQLocalTransactionState executeLocalTransaction(final Message msg, final Object arg);// 这个方法就是供MQ回调的方法,MQ通过回调该方法来判断half消息的状态// 可以看到,这个方法的参数是MessageExt,也就是half消息的内容,如果根据MessageExt,我们完全能在生产者中判断之前的业务是否处理成功LocalTransactionState checkLocalTransaction(final MessageExt msg);}

实际使用时,我们需要实现该接口,例如:

public class MyTransactionListener implements TransactionListener {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try{// 处理业务逻辑// ....// 业务逻辑处理成功,commitreturn LocalTransactionState.COMMIT_MESSAGE;}catch (Exception e){}// 业务处理失败,rollbackreturn LocalTransactionState.ROLLBACK_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return null;}}

另外,在创建 producer 时,指定我们实现的监听器

TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup");
transactionMQProducer.setTransactionListener(new MyTransactionListener());

举个栗子:

背景:质检冻结和释放操作需要跨三个系统:WMS、ERP、TMS。痛点是这些操作需要保持一致性;

流程大概是这样的:

  1. 事务发起方(WMS质检服务)发送一个半消息到RocketMQ。

  2. 执行本地事务,比如在质检系统的数据库中记录冻结操作的状态。

  3. 如果本地事务成功,提交事务消息,否则回滚。

  4. 消息被投递到对应的系统(ERP、TMS),消费者处理消息,执行各自的业务逻辑。

  5. 如果消费者处理失败,消息会重试,直到成功。如果超过一定次数仍失败,可能需要人工介入或者触发补偿机制。

  6. 补偿机制包括定期检查未完成的事务,或者通过记录事务日志(需要一个事务日志表,记录每个事务的各个步骤的状态,补偿任务定期扫描该表,找出未完成的事务并进行处理),在出现问题时进行回滚或重试。

保证幂等性:

在消费者端检查是否已经处理过该消息,避免重复处理导致数据不一致。可以通过在数据库中记录消息的唯一ID来实现。

[质检服务]          | ① 发送半消息↓
[RocketMQ] --② 回调检查--> [本地事务]| ③ 投递消息↓
[ERP/TMS 消费者] --④ 失败重试--> [消息队列]| ⑤ 记录日志↓
[补偿服务] ←⑥ 定时扫描→ [事务日志表]
1. 生产者端(质检服务)
// 冻结请求对象
@Data
public class FreezeRequest {private String freezeId;  // 全局唯一IDprivate String skuCode;   // 商品编码private Integer quantity; // 冻结数量
}// 事务消息生产者
@Service
public class WmsFreezeProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 核心事务方法@Transactionalpublic void createFreeze(FreezeRequest request) {// 1. 预占本地库存wmsInventoryService.preFreeze(request.getSkuCode(), request.getQuantity());// 2. 发送事务消息TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("FREEZE_TOPIC", MessageBuilder.withPayload(request).setHeader("FREEZE_ID", request.getFreezeId()).build(),request);// 3. 记录事务日志transactionLogService.saveFreezeLog(request.getFreezeId());}
}// 本地事务监听器
@RocketMQTransactionListener
class WmsTransactionListener implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {FreezeRequest request = (FreezeRequest) arg;try {// 真实冻结库存wmsInventoryService.realFreeze(request.getSkuCode(), request.getQuantity());return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {// 解冻预占库存wmsInventoryService.cancelPreFreeze(request.getSkuCode());return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String freezeId = msg.getHeaders().get("FREEZE_ID", String.class);// 检查本地事务状态return wmsInventoryService.isFreezeCompleted(freezeId) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;}
}
2. 消费者端(ERP)
// 消息处理器
@Service
@RocketMQMessageListener(topic = "FREEZE_TOPIC",consumerGroup = "ERP_FREEZE_GROUP",selectorExpression = "ERP_TAG" // 消息过滤
)
public class ErpFreezeConsumer implements RocketMQListener<FreezeRequest> {@Autowiredprivate ErpAccountService accountService;@Autowiredprivate MessageLogDAO messageLogDAO;@Override@Transactionalpublic void onMessage(FreezeRequest request) {// 1. 幂等检查if (messageLogDAO.exists(request.getFreezeId())) {return;}// 2. 冻结资金accountService.freezeAmount(request.getFreezeId(), request.getTotalAmount());// 3. 记录消费日志messageLogDAO.save(new MessageLog(request.getFreezeId(), "ERP_FREEZE", LocalDateTime.now()));}
}
3. 消费者端(TMS)
@Service
@RocketMQMessageListener(topic = "FREEZE_TOPIC",consumerGroup = "TMS_FREEZE_GROUP",selectorExpression = "TMS_TAG"
)
public class TmsLockConsumer implements RocketMQListener<FreezeRequest> {@Autowiredprivate TransportPlanService planService;@Overridepublic void onMessage(FreezeRequest request) {// 1. 检查是否已处理(Redis实现幂等)String key = "tms_lock:" + request.getFreezeId();if (redisTemplate.opsForValue().setIfAbsent(key, "1", 24, HOURS)) {// 2. 锁定运输计划planService.lockPlan(request.getSkuCode(), request.getQuantity());// 3. 异步记录日志(提升性能)CompletableFuture.runAsync(() -> messageLogDAO.save(new MessageLog(...)));}}
}
3. 补偿服务
// 定时补偿任务
@Component
public class FreezeCompensation {@Autowiredprivate MessageLogDAO messageLogDAO;@Autowiredprivate WmsService wmsService;@Autowiredprivate ErpService erpService;// 每5分钟扫描一次异常@Scheduled(cron = "0 */5 * * * ?") public void compensateFreeze() {// 1. 查找超时未完成的事务List<MessageLog> timeoutLogs = messageLogDAO.findTimeoutLogs(LocalDateTime.now().minusMinutes(30));timeoutLogs.forEach(log -> {// 2. 根据业务类型补偿switch (log.getSystemType()) {case "ERP":if (!erpService.checkFreeze(log.getFreezeId())) {erpService.retryFreeze(log); // 重试冻结}break;case "TMS":TransportPlan plan = planService.getPlan(log.getFreezeId());if (plan.getStatus() != LOCKED) {planService.forceLock(log); // 强制锁定}break;}// 3. 超过3次重试转人工if (log.getRetryCount() > 3) {alertService.notifyHuman(log); // 人工干预}});}
}
关键点
  1. 事务消息保障
    • 半消息机制确保业务操作与消息发送原子性
    • 双重确认机制(本地事务+状态回查)
  2. 消费端设计
    • 消息去重表实现消费幂等
    • 消费失败自动重试(16次阶梯重试策略)
  3. 补偿机制
    • 定时任务扫描事务日志
    • 自动重试 + 人工干预双通道
    • 业务逆向接口标准化设计
效果
  1. 事务成功率:通过三级保障机制(消息重投+消费重试+补偿任务),将成功率从92.6%提升至99.9%
  2. 系统解耦:响应时间从800ms降至200ms,系统可用性提升至99.99%
  3. 运维成本:日常库存异常减少200+/日,问题排查效率提升70%

最佳实践

  1. 消息标签过滤
    为不同系统添加消息标签,避免无效消息传输:

    // WMS发送时指定标签
    rocketMQTemplate.syncSend("FREEZE_TOPIC:ERP_TAG", message);
    rocketMQTemplate.syncSend("FREEZE_TOPIC:TMS_TAG", message);
    
  2. 混合幂等方案
    采用 DB+Redis 双重校验:

    public boolean checkProcessed(String freezeId) {// 先查Redis快速返回if (redis.hasKey(freezeId)) return true;// 再查数据库保证可靠性return messageLogDAO.exists(freezeId);
    }
    
  3. 补偿策略优化
    根据业务类型设置不同补偿策略:

    # application.yml
    compensation:erp:initial-delay: 1mmax-attempts: 5backoff: 2tms:initial-delay: 30s  max-attempts: 3backoff: 1.5
    

相关文章:

  • vue 手机端 封装全局使用的提示框 (vant)
  • YOGA Air X ILL10(83CX)/YOGA 14 ILL10X(83LC)2025款恢复开箱状态原装出厂Win11系统OEM镜像
  • SEMI E40-0200 STANDARD FOR PROCESSING MANAGEMENT(加工管理标准)-(三)完结
  • 【LeetCode 42】接雨水(单调栈、DP、双指针)
  • 深入剖析 I/O 复用之 select 机制
  • C#简易Modbus从站仿真器
  • 2025年排名前十进销存软件大测评
  • Coding Practice,48天强训(32)
  • 【嵌入式开发-IIC】
  • OptiStruct动力分析超单元卡片说明(2)
  • 【嵌入式开发-xxxxx】
  • 计算机网络笔记(十五)——3.2点对点协议PPP
  • MySQL数据库创建、删除、修改
  • 2.4 点云数据存储格式——轻量文本型存储格式
  • 20250508在WIN10下使用移远的4G模块EC200A-CN直接上网
  • WEB UI自动化测试之Pytest框架学习
  • 前端取经路——框架修行:React与Vue的双修之路
  • 稳定性_李雅普诺夫——Lyapunov直接法
  • 数据结构-非线性结构-二叉树
  • Qt开发经验 --- 避坑指南(8)
  • 视频丨习近平同普京在主观礼台出席红场阅兵式
  • 聆听百年唐调正声:唐文治王蘧常吟诵传习的背后
  • 中日有关部门就日本水产品输华问进行第三次谈判,外交部回应
  • 上海:企业招用高校毕业生可享受1500元/人一次性扩岗补助
  • 盖茨:20年内将捐出几乎全部财富,盖茨基金会2045年关闭
  • 身临其境感受伟人思想力量,“马克思书房”在上海社科馆揭幕