MQ 最终一致性实现跨库转账
一、专题预告:分布式事务解决方案
- MQ 最终一致性方案:通过跨库转账掌握异步场景下的事务一致性保障;
- TCC 方案:针对强一致性需求场景,提供通用 TCC 框架与实战案例,解决复杂业务的分布式事务问题。
二、前置知识:先打好基础,理解更高效
学习本节内容前,建议先掌握以下核心知识点,避免理解断层:
分布式事务基础概念:
分布式事务定义:跨多个服务 / 数据库的事务,需保证 “要么都成功,要么最终一致”;
CAP 原则:分布式系统中一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)无法同时满足,需根据业务取舍;
BASE 理论:基于 CAP 原则演化,通过 “基本可用(Basically Available)、软状态(Soft State)、最终一致性(Eventual Consistency)” 实现分布式系统的可用性与一致性平衡;
幂等性保障:确保消息重复消费时业务无异常;
MQ 核心能力:理解 MQ 在分布式事务中的角色。
三、MQ 最终一致性方案:核心逻辑先看懂
1. 方案本质
MQ 最终一致性的核心是 “本地事务与消息投递强绑定”+“消费端幂等重试”,确保分布式事务的两个参与者最终达成一致,流程如下:
- 上游服务(如转账中的付款方):执行本地业务(扣减余额)→ 投递 “事务消息”(确保业务成功则消息必投递);
- 下游服务(如转账中的收款方):消费消息→ 执行本地业务(增加余额)→ 幂等处理(防重复消费)+ 衰减式重试(防临时失败)。
2. 适用场景
特别适合 “分布式事务有 2 个参与者,且下游业务一定能成功” 的场景,例如:
- 跨库转账(付款方扣钱 + 收款方加钱);
- 订单创建后同步库存(订单服务创建订单 + 库存服务扣减库存);
- 平台账户提现(平台扣减余额 + 第三方支付打款)。
四、实战案例:跨库转账全流程落地
本节以 “跨库转账” 为案例(为简化演示,付款方与收款方用同一数据库的同一账户表,但按分布式事务逻辑实现),带大家从 “环境准备→代码实现→测试验证” 完整走一遍。
1. 第一步:准备数据库表
先创建账户表,存储用户 ID、用户名与余额,作为转账的业务载体:
-- 账户表:存储用户账户信息
drop table if exists t_account;
create table if not exists t_account
(
id varchar(32) not null primary key comment '用户id(唯一标识)',
name varchar(50) not null comment '用户名',
balance decimal(12, 2) not null comment '账户余额(精确到分)'
) comment '跨库转账账户表';
-- 初始化测试数据:路人1(余额1000元)、路人2(余额0元)
insert ignore into t_account value ('1','chen1','1000.00');
insert ignore into t_account value ('2','chen2','0.00');
2. 第二步:核心代码解析(通用框架已封装,只需写业务)
(1)上游服务:付款方扣减余额 + 投递事务消息
核心接口:AccountController#transfer,只需干 2 件事(通用逻辑已封装在父类):
/*** 跨库转账接口(上游服务:付款方逻辑)* @param transferDTO 转账参数(付款方ID、收款方ID、转账金额)* @return 转账结果*/@PostMapping("/account/transfer")public Result<Void> transfer(@RequestBody TransferDTO transferDTO) {// 1. 执行本地业务:扣减付款方余额(封装事务,失败则回滚)accountService.deductBalance(transferDTO.getFromAccountId(),new BigDecimal(transferDTO.getTransferPrice()));// 2. 投递事务消息:确保扣减余额成功后,消息必投递到MQmsgSender.sendTransactionMsg("transfer_topic", // 转账消息主题"transfer_key", // 路由键(对应下游队列)TransferMsg.builder().fromAccountId(transferDTO.getFromAccountId()).toAccountId(transferDTO.getToAccountId()).transferPrice(new BigDecimal(transferDTO.getTransferPrice())).build());return ResultUtils.success("转账请求已提交,正在处理");}
(2)下游服务:消费消息 + 给收款方加钱
核心类:TransferMsgConsumer,只需实现业务逻辑(幂等、重试等通用能力由父类提供):
@Component@Slf4jpublic class TransferMsgConsumer extends AbstractIdempotentRetryConsumer<TransferMsg> {@Autowiredprivate AccountService accountService;/*** 消费转账消息(下游服务:收款方逻辑)* 父类已实现:幂等消费(防重复加钱)、失败衰减式重试(如1s→3s→5s重试)*/@Overrideprotected void idempotentConsume(TransferMsg msg) {log.info("开始处理收款逻辑:收款方ID={},转账金额={}",msg.getToAccountId(), msg.getTransferPrice());// 核心业务:给收款方增加余额(业务失败会触发重试,重试超限会告警)accountService.increaseBalance(msg.getToAccountId(),msg.getTransferPrice());log.info("收款逻辑处理完成:收款方ID={},余额已更新", msg.getToAccountId());}/*** 幂等键生成:用“转账唯一标识”作为幂等键(防重复消费)* 此处简化用“付款方ID+收款方ID+转账金额+时间戳”生成,实际可加全局唯一ID*/@Overrideprotected String generateIdempotentKey(TransferMsg msg) {return msg.getFromAccountId() + "_" + msg.getToAccountId() + "_" + msg.getTransferPrice().toString();}}
(3)通用框架优势:父类帮你做重复工作
从类图可见,TransferMsgConsumer 继承了 2 个父类,封装了核心通用能力,无需重复开发:
- AbstractRetryConsumer:提供失败衰减式重试(可配置重试次数、间隔),避免临时故障导致业务失败;
- AbstractIdempotentRetryConsumer:在重试基础上增加幂等消费,通过 “幂等键” 确保同一消息只处理一次,防重复加钱。
3. 第三步:启动应用与测试验证
(1)启动服务
运行启动类:确保 MQ(如 RabbitMQ/Kafka)、数据库已正常启动。
(2)初始余额查询
执行 SQL 查看测试账户初始余额:
select * from t_account
(3)执行转账请求
通过 HTTP 测试用例发起转账,让路人 1 给路人 2 转 10 元:
### 跨库转账请求
POST http://localhost:8080/account/transfer
Accept: application/json
Content-Type: application/json
{
"fromAccountId": "1", // 付款方ID(路人1)
"toAccountId": "2", // 收款方ID(路人2)
"transferPrice": "10.00" // 转账金额(10元)
}
(4)验证最终余额
再次执行 SQL 查看账户余额,确认事务最终一致:
select * from t_account;
五、MQ 最终一致性方案:关键注意事项
- 事务消息必须与本地业务强绑定:上游服务需确保 “本地业务成功” 与 “消息投递成功” 要么都成功,要么都失败(可通过 MQ 的事务消息机制实现,如 RabbitMQ 的 confirm 机制、RocketMQ 的事务消息);
- 消费端必须做幂等:消息可能因重试重复投递,需通过 “唯一幂等键”(如业务流水号)确保业务不重复执行;
- 重试策略要合理:采用 “衰减式重试”(如 1s→3s→5s→10s),避免频繁重试压垮下游服务,重试次数建议设为 5-8 次,超限后触发告警(如钉钉 / 邮件),人工介入处理;
- 业务需满足 “下游一定能成功”:若下游业务存在 “必然失败” 场景(如收款方账户不存在),需额外设计补偿逻辑(如上游退款),避免数据不一致。