当前位置: 首页 > news >正文

rabbitmq分布式事务

1. 总体架构图(一眼看懂)

┌------------------┐        1.本地事务             ┌------------------┐
│   订单服务        │  ---►DB+event表(同一事务)---►  │  定时补偿任务      │
│  (producer)     │                              └------------------┘
│                  │        2.发送消息              ▲
│                  ├-------------------------------►│┌------------------┐
│                  │  3.ConfirmCallback ack/nack   │ │  rabbit mq       │
│                  │◄-------------------------------┤└------------------┘
└------------------┘                             ││                                        ││ 5.补偿/回滚  4.消费失败/业务校验失败        │▼                                        ▼
┌------------------┐        6.对账任务            ┌------------------┐
│  补偿/对账服务     │------------------------►.   │   库存/账户服务    │
└------------------┘                             └------------------┘

2. 角色与职责清单

组件职责关键技术点
订单服务1. 本地事务落库+写事件表
2. 发送消息并监听 confirm
@Transactional+Publisher Confirm
事件表仅 5 列即可:id,biz_id,event_type,payload,status,create_time状态枚举:UNSENT/SENT/DONE
补偿任务UNSENT 重投;扫 DLX 报警;触发补偿消息@Scheduled(fixedDelay=5s)
库存服务1. 幂等消费
2. 业务校验失败 立即发补偿消息
3. 技术异常抛异常触发重投
唯一索引/SETNX+手动 basicAck/Nack
对账任务每日离线 FULL JOIN 业务表 vs 事件表;输出差异SQL/Spark 均可
补偿服务监听补偿队列,做“冲正”:关闭订单、退款、加回库存普通消费者,逻辑与业务反向

3. 生产端:可靠投递(代码级)

spring:rabbitmq:publisher-confirm-type: correlated   # 开启 confirm
@Transactional
public void createOrder(OrderDTO dto){// 1. 落单Order order = orderDao.insert(dto);// 2. 同一事务写事件eventDao.insert(Event.builder().bizId(order.getId()).eventType("ORDER_CREATED").payload(JSON.toJSONString(dto)).status("UNSENT").build());
}// 3. 事务提交后异步发消息
@EventListener(TransactionPhase.AFTER_COMMIT)
public void sendAfterTx(OrderCreatedEvent event){Event evt = eventDao.findByBizId(event.getOrderId());CorrelationData cd = new CorrelationData(evt.getId());rabbitTemplate.convertAndSend("order.event.exchange","stock.reduce",evt.getPayload(),cd);
}// 4. ConfirmCallback 更新状态
rabbitTemplate.setConfirmCallback((cd, ack, cause) -> {if (ack) {eventDao.updateStatus(cd.getId(), "SENT");} else {log.warn("消息未送达 broker, 等定时任务补偿");}
});

4. 消费端:幂等 + 业务失败补偿

@RabbitListener(queues = "stock.reduce.queue")
public void handle(Message msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {String orderId = JSON.parseObject(msg.getBody()).getString("orderId");int quantity   = JSON.parseObject(msg.getBody()).getIntValue("quantity");try {// 1. 幂等判断(唯一索引)if (stockDao.alreadyDeduct(orderId)) {channel.basicAck(tag, false);return;}// 2. 真正扣减boolean ok = stockDao.deduct(orderId, quantity);if (!ok) {                // 库存不足——业务失败sendCompensate(orderId, "STOCK_LACK");}channel.basicAck(tag, false);} catch (Exception e) {       // 技术异常channel.basicNack(tag, false, true); // 重新投递}
}private void sendCompensate(String orderId, String reason){CompensateCmd cmd = new CompensateCmd(orderId, reason);rabbitTemplate.convertAndSend("compensate.exchange", "order.cancel", cmd);
}

5. 补偿服务(所谓“回滚”)

@RabbitListener(queues = "order.cancel.queue")
public void handleCancel(CompensateCmd cmd) {Order order = orderDao.find(cmd.getOrderId());if (order == null || order.getStatus() == "CLOSED") return;// 1. 关闭订单orderDao.updateStatus(cmd.getOrderId(), "CLOSED");// 2. 退款accountService.refund(order.getUserId(), order.getAmount());// 3. 释放库存(幂等)stockDao.addBack(cmd.getOrderId());// 4. 事件表记 DONE,防止再对账eventDao.updateStatus(cmd.getOrderId(), "DONE");
}

6. 对账任务(离线)

-- 每日凌晨运行
INSERT INTO diff_${biz_date}
SELECTCOALESCE(b.order_id, m.biz_id) AS order_id,CASE WHEN b.order_id IS NULL THEN 'ONLY_MSG'WHEN m.biz_id  IS NULL THEN 'ONLY_BIZ'ELSE 'STATUS_MISMATCH' END AS type
FROM biz_order b
FULL JOIN event_msg m ON b.order_id = m.biz_id
WHERE b.order_date = ${biz_date}AND (b.order_id IS NULL OR m.biz_id IS NULL OR b.status != m.status);
  • ONLY_BIZ → 补发消息
  • ONLY_MSG → 发补偿消息关闭订单
  • 差异 >0 条即钉钉 @oncall

7. 异常场景全覆盖

场景表现兜底手段
本地事务失败订单都没落库无消息,业务方自己回滚
confirm 丢失事件表仍 UNSENT定时任务 5s 重扫重投
消费端技术异常代码 NPE重新投递(requeue=true)
消费端业务失败库存不足立即发补偿消息 + ack
消息一直重试失败超过最大重试DLX,人工处理
对账不一致漏消息/脏消息离线补平或冲正

8. 性能 & 运维要点

  • 事件表 按月分区,保留 90 天自动过期;
  • 补偿任务单机即可,限流 200 TPS 防止打爆 MQ;
  • 生产端开启 batch-send 可提升 3 倍吞吐;
  • 所有队列 镜像模式 ≥2 节点,磁盘持久化;
  • 监控看板:UNSENT 积压量、DLX 增长率、confirm 失效率

9. 一句话总结

RabbitMQ 没有“回滚”按钮,只有“补发 + 补偿 + 对账”三板斧;
把本地消息表、confirm、幂等、补偿、对账 5 步全部落地,即可在纯 RabbitMQ 环境下实现生产级分布式事务

下面给出一份可直接复制到 IDE 跑通的「RabbitMQ 最终一致性」完整 Demo,所有配置、SQL、Java 代码、MQ 声明全部贴全。
Spring Boot 3.2 + RabbitMQ 3.12


  1. 工程骨架(Maven 依赖)
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version>
</parent><properties><java.version>17</java.version>
</properties><dependencies><!-- web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- rabbit --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- mysql --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- mybatis-plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.5</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>

  1. application.yml(全部配置)
server:port: 8080spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/shop?useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: 123456rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlated   # 开启 publisher confirmpublisher-returns: true              # 消息不可达会触发 returntemplate:mandatory: true                    # 配合 returns 使用listener:type: simplesimple:acknowledge-mode: manual         # 手工 ackprefetch: 5                      # 限流default-requeue-rejected: false  # 业务异常不重新入队,直接 DLX
mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImplmapper-locations: classpath*:/mapper/**/*.xml

  1. MySQL 表 SQL(一次性执行)
CREATE DATABASE IF NOT EXISTS shop DEFAULT CHARACTER SET utf8mb4;
USE shop;-- 订单表
CREATE TABLE t_order (id            BIGINT PRIMARY KEY AUTO_INCREMENT,user_id       BIGINT      NOT NULL,sku_id        BIGINT      NOT NULL,quantity      INT         NOT NULL,amount        DECIMAL(10,2) NOT NULL,status        VARCHAR(32) NOT NULL DEFAULT 'INIT',create_time   DATETIME DEFAULT CURRENT_TIMESTAMP
);-- 本地事件表
CREATE TABLE t_local_event (id            BIGINT PRIMARY KEY AUTO_INCREMENT,biz_id        VARCHAR(64) NOT NULL,event_type    VARCHAR(64) NOT NULL,payload       TEXT        NOT NULL,status        VARCHAR(32) NOT NULL DEFAULT 'UNSENT',create_time   DATETIME DEFAULT CURRENT_TIMESTAMP,update_time   DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,INDEX idx_bizid_type (biz_id, event_type),INDEX idx_status (status)
) ENGINE=InnoDB;-- 库存表(为了演示简单)
CREATE TABLE t_stock (id            BIGINT PRIMARY KEY AUTO_INCREMENT,sku_id        BIGINT NOT NULL UNIQUE,available     INT    NOT NULL
);-- 扣库存流水(幂等)
CREATE TABLE t_stock_flow (id            BIGINT PRIMARY KEY AUTO_INCREMENT,order_id      VARCHAR(64) NOT NULL UNIQUE,sku_id        BIGINT NOT NULL,quantity      INT    NOT NULL,status        VARCHAR(32) NOT NULL,create_time   DATETIME DEFAULT CURRENT_TIMESTAMP
);

  1. RabbitMQ 配置类(声明队列、交换机、DLX)
@Configuration
public class RabbitConfig {/* ---------------- 业务队列 ---------------- */public static final String STOCK_REDUCE_QUEUE = "stock.reduce.queue";public static final String STOCK_EXCHANGE     = "stock.topic";public static final String STOCK_RK           = "stock.reduce";/* ---------------- 补偿队列 ---------------- */public static final String COMPENSATE_QUEUE   = "order.compensate.queue";public static final String COMPENSATE_EX      = "compensate.topic";public static final String COMPENSATE_RK      = "order.cancel";/* ---------------- 死信参数 ---------------- */private static final String DLX_NAME  = "dlx.topic";private static final String DLQ_NAME  = "stock.reduce.dlq";@BeanTopicTopicExchange stockExchange() {return ExchangeBuilder.topicExchange(STOCK_EXCHANGE).durable(true).build();}@BeanTopicTopicExchange dlxExchange() {return ExchangeBuilder.topicExchange(DLX_NAME).durable(true).build();}@Beanpublic Queue stockReduceQueue() {return QueueBuilder.durable(STOCK_REDUCE_QUEUE).withArgument("x-dead-letter-exchange", DLX_NAME).withArgument("x-dead-letter-routing-key", DLQ_NAME).build();}@Beanpublic Binding stockBinding() {return BindingBuilder.bind(stockReduceQueue()).to(stockExchange()).with(STOCK_RK);}@Beanpublic Queue dlq() {return QueueBuilder.durable(DLQ_NAME).build();}@Beanpublic Binding dlqBinding() {return BindingBuilder.bind(dlq()).to(dlxExchange()).with(DLQ_NAME);}/* 补偿交换机队列 */@Beanpublic TopicExchange compensateEx() {return ExchangeBuilder.topicExchange(COMPENSATE_EX).durable(true).build();}@Beanpublic Queue compensateQueue() {return QueueBuilder.durable(COMPENSATE_QUEUE).build();}@Beanpublic Binding compensateBinding() {return BindingBuilder.bind(compensateQueue()).to(compensateEx()).with(COMPENSATE_RK);}
}

  1. 实体 & Mapper(MyBatis-Plus)
@Data
@TableName("t_order")
public class Order {private Long id;private Long userId;private Long skuId;private Integer quantity;private BigDecimal amount;private String status;private LocalDateTime createTime;
}@Data
@TableName("t_local_event")
public class LocalEvent {private Long id;private String bizId;private String eventType;private String payload;private String status;private LocalDateTime createTime;private LocalDateTime updateTime;
}@Mapper
public interface OrderMapper extends BaseMapper<Order> {}@Mapper
public interface LocalEventMapper extends BaseMapper<LocalEvent> {}

  1. 事务消息发送工具(confirm + 本地事件)
@Component
@RequiredArgsConstructor
public class EventPublisher {private final RabbitTemplate rabbitTemplate;private final LocalEventMapper eventMapper;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {eventMapper.updateStatus(correlationData.getId(), "SENT");} else {// 不处理,等定时任务}});}/*** 事务内仅落库;AFTER_COMMIT 再调本方法*/public void publish(String bizId, String eventType, Object payload) {LocalEvent event = new LocalEvent();event.setBizId(bizId);event.setEventType(eventType);event.setPayload(JSON.toJSONString(payload));event.setStatus("UNSENT");eventMapper.insert(event);CorrelationData cd = new CorrelationData(event.getId().toString());rabbitTemplate.convertAndSend(RabbitConfig.STOCK_EXCHANGE,RabbitConfig.STOCK_RK,event.getPayload(),cd);}
}

  1. 订单服务(本地事务 + 事件)
@Service
@RequiredArgsConstructor
public class OrderService {private final OrderMapper orderMapper;private final EventPublisher publisher;private final ApplicationEventPublisher appEventPublisher;@Transactionalpublic String createOrder(Long userId, Long skuId, Integer quantity) {// 1. 落单Order order = new Order();order.setUserId(userId);order.setSkuId(skuId);order.setQuantity(quantity);order.setAmount(BigDecimal.valueOf(quantity * 100)); // 单价 100order.setStatus("INIT");orderMapper.insert(order);// 2. 写事件表(同一事务)publisher.publish(order.getId().toString(), "ORDER_CREATED",Map.of("orderId", order.getId(),"skuId", skuId,"quantity", quantity));return order.getId().toString();}
}

  1. 库存服务(幂等 + 业务失败补偿)
@Component
@RequiredArgsConstructor
public class StockConsumer {private final StockFlowMapper flowMapper;private final RabbitTemplate rabbitTemplate;@RabbitListener(queues = RabbitConfig.STOCK_REDUCE_QUEUE)public void reduce(Message msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {String body = new String(msg.getBody());JSONObject json = JSON.parseObject(body);String orderId = json.getString("orderId");Long skuId = json.getLong("skuId");Integer quantity = json.getInteger("quantity");// 1. 幂等if (flowMapper.exists(orderId)) {channel.basicAck(tag, false);return;}// 2. 业务校验Integer available = flowMapper.availableOf(skuId);if (available < quantity) {// 库存不足 -> 补偿sendCompensate(orderId, "STOCK_NOT_ENOUGH");channel.basicAck(tag, false);  // 必须 ackreturn;}// 3. 扣减flowMapper.deduct(skuId, quantity);flowMapper.insert(orderId, skuId, quantity, "SUCCESS");channel.basicAck(tag, false);} catch (Exception e) {log.error("扣库存异常", e);channel.basicNack(tag, false, true); // 重试}}private void sendCompensate(String orderId, String reason) {Map<String, String> cmd = Map.of("orderId", orderId, "reason", reason);rabbitTemplate.convertAndSend(RabbitConfig.COMPENSATE_EX,RabbitConfig.COMPENSATE_RK,JSON.toJSONString(cmd));}
}

  1. 补偿消费者(关闭订单 + 退款)
@Component
@RequiredArgsConstructor
public class CompensateConsumer {private final OrderMapper orderMapper;@RabbitListener(queues = RabbitConfig.COMPENSATE_QUEUE)public void compensate(String json) {JSONObject cmd = JSON.parseObject(json);String orderId = cmd.getString("orderId");Order order = orderMapper.selectById(orderId);if (order == null || "CLOSED".equals(order.getStatus())) return;// 1. 关单order.setStatus("CLOSED");orderMapper.updateById(order);// 2. 退款(demo 直接日志)log.warn(">>>> 退款操作 orderId={}, amount={}", orderId, order.getAmount());// 3. 释放库存由 stock 服务监听同一条 cmd 自行加回}
}

  1. 定时补偿任务(扫 UNSENT)
@Component
@RequiredArgsConstructor
public class ResendScheduler {private final LocalEventMapper eventMapper;private final RabbitTemplate rabbitTemplate;// 每 5 秒扫一次@Scheduled(fixedDelay = 5000)public void resend() {List<LocalEvent> list = eventMapper.selectList(new QueryWrapper<LocalEvent>().eq("status", "UNSENT").le("create_time", LocalDateTime.now().minusSeconds(10))); // 兜底延迟for (LocalEvent e : list) {CorrelationData cd = new CorrelationData(e.getId().toString());rabbitTemplate.convertAndSend(RabbitConfig.STOCK_EXCHANGE,RabbitConfig.STOCK_RK,e.getPayload(),cd);}}
}

  1. 启动类
@SpringBootApplication
@EnableScheduling
@MapperScan("com.example.mapper")
public class TxApplication {public static void main(String[] args) {SpringApplication.run(TxApplication.class, args);}
}

  1. 快速自测

  2. 启动 RabbitMQ、MySQL,执行上方 SQL

  3. curl -X POST http://localhost:8080/order?userId=1&skuId=10&quantity=2

  4. 观察控制台:

  • 订单落库 → 事件 UNSENT → 消息发送 → 收到 confirm → 事件变 SENT → 库存扣减成功
  • 把库存改成 1 再下单,日志出现 >>>> 退款操作,订单状态变为 CLOSED,即补偿成功。

  1. 小结

这份代码直接把前面所有本地消息表、confirm、幂等、补偿、对账落地:

  • 零消息丢失:confirm + 定时重扫 UNSENT
  • 幂等:t_stock_flow.order_id 唯一索引
  • 业务失败:立即发补偿消息,关单+退款
  • 技术异常:basicNack 重投 → 进 DLX → 人工
http://www.dtcms.com/a/406435.html

相关文章:

  • vue动态插槽 #[i] 和 v-slot:[i] 对于Prettier的区别
  • EasyGBS如何构建全域覆盖的应急管理与安全生产解决方案?
  • 【数据结构OJ】BFS算法的可视化:二叉树“层序遍历”
  • RabbitMQ:在Linux上安装RabbitMQ
  • 大数据毕业设计选题推荐:基于Hadoop+Spark的全球能源消耗数据分析与可视化系统
  • 从避障到实时建图:机器学习如何让无人机更智能、更安全、更实用(附微型机载演示示例)
  • ui做的好的网站专业的深圳网站建设公司哪家好
  • 最简单的 Web 打印方案:用 5 分钟上手 web-print-pdf(npm 包)
  • 深度学习在自动驾驶上应用(二)
  • OpenLayers地图交互 -- 章节十二:键盘平移交互详解
  • Unity 透视摄像机视野适配不同分辨率的屏幕
  • 可持续金融的新范式:拆解欧盟ESG监管体系及其全球影响力
  • 【数据保护】一种安全高效的全匿踪纵向联邦学习方法
  • 阿里云物联网平台seo站外优化平台
  • 网站开发软件 手机网站做app有什么意义
  • WorldSimBench: 迈向作为世界模拟器的视频生成模型——论文解读
  • 嵌入式 - 内核驱动1 - 配置linux驱动
  • 工作中学习自己的Qt知识误区-Version3
  • C#连接达梦(DM)数据库
  • 服务器独立显卡可以亮机但进不了系统怎么办
  • 超高密度2kW GaN基低压电机驱动器的设计
  • 「日拱一码」100 机器学习辅助定向进化MLDE
  • C++项目:仿muduo库高并发服务器------EventLoop模块的设计
  • 电子商务网站开发综合实训报告h5页面制作工具包括
  • 全栈信创+AI大模型:百分点科技BD-OS重塑数据治理基座
  • 时隔一天第二阶段他来了 html!!!!!!!!!!!
  • [创业之路-596]:半导体生产中所需要光源的上下游产业链
  • spring-ai简单示例
  • sqlsugar sqlite
  • IP 授权管理标识:守护 IP 价值,解锁商业新可能