(3)Seata AT 模式的事务一致性保证机制
您说得非常对!方案一(Seata AT模式)我只展示了正向逻辑,确实需要补充完整的反向逻辑和一致性保证机制。让我详细解释并补充完整代码。
Seata AT 模式的事务一致性保证机制
核心原理:二阶段提交 + 回滚日志
- 一阶段:执行业务SQL,生成回滚日志(undo_log)
- 二阶段提交:删除回滚日志
- 二阶段回滚:用回滚日志进行数据恢复
完整的事务流程(包含反向逻辑)
graph TDA[全局事务开始] --> B[一阶段: 执行业务SQL]B --> C[生成undo_log回滚日志]C --> D{所有分支成功?}D -- 是 --> E[二阶段提交]E --> F[删除undo_log]F --> G[事务成功]D -- 否 --> H[二阶段回滚]H --> I[使用undo_log恢复数据]I --> J[删除undo_log]J --> K[事务回滚]
完整的代码实现(包含异常处理)
1. 全局事务入口(OrderService)
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate AccountFeignClient accountFeignClient;@Autowiredprivate StorageFeignClient storageFeignClient;@Override@GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class, timeoutMills = 300000)public void createOrder(OrderDTO orderDTO) {try {log.info("=== 开始创建订单全局事务 ===");// 1. 创建订单(状态为创建中)Order order = createOrderRecord(orderDTO);// 2. 扣减库存(可能会抛出异常)decreaseStorage(orderDTO.getProductId(), orderDTO.getCount());// 3. 扣减余额(可能会抛出异常)decreaseAccount(orderDTO.getUserId(), orderDTO.getMoney());// 4. 更新订单状态为成功updateOrderStatus(order.getId(), 1); // 1-成功log.info("=== 订单创建成功 ===");} catch (Exception e) {log.error("=== 订单创建失败,全局事务回滚 ===", e);// 这里不需要手动回滚,@GlobalTransactional 会自动触发回滚// Seata 会通过undo_log自动回滚已经执行的SQLthrow new RuntimeException("订单创建失败: " + e.getMessage(), e);}}private Order createOrderRecord(OrderDTO orderDTO) {Order order = new Order();order.setUserId(orderDTO.getUserId());order.setProductId(orderDTO.getProductId());order.setCount(orderDTO.getCount());order.setMoney(orderDTO.getMoney());order.setStatus(0); // 0-创建中orderMapper.insert(order);log.info("创建订单记录: {}", order);return order;}private void decreaseStorage(Long productId, Integer count) {try {ResponseEntity<String> result = storageFeignClient.decrease(productId, count);if (!result.getStatusCode().is2xxSuccessful()) {throw new RuntimeException("库存服务调用失败: " + result.getBody());}log.info("库存扣减成功: productId={}, count={}", productId, count);} catch (Exception e) {log.error("库存扣减失败", e);throw new RuntimeException("库存不足或服务异常", e);}}private void decreaseAccount(Long userId, BigDecimal money) {try {ResponseEntity<String> result = accountFeignClient.decrease(userId, money);if (!result.getStatusCode().is2xxSuccessful()) {throw new RuntimeException("账户服务调用失败: " + result.getBody());}log.info("账户扣减成功: userId={}, money={}", userId, money);} catch (Exception e) {log.error("账户扣减失败", e);throw new RuntimeException("余额不足或服务异常", e);}}private void updateOrderStatus(Long orderId, Integer status) {Order order = new Order();order.setId(orderId);order.setStatus(status);orderMapper.updateById(order);}
}
2. 库存服务异常处理增强
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {@Autowiredprivate StorageMapper storageMapper;@Override@Transactionalpublic void decrease(Long productId, Integer count) {try {Storage storage = storageMapper.selectByProductId(productId);if (storage == null) {throw new RuntimeException("商品不存在: " + productId);}if (storage.getResidue() < count) {// 这里抛出异常会触发全局回滚throw new RuntimeException("库存不足: 当前库存=" + storage.getResidue() + ", 需要数量=" + count);}// 执行业务SQL - Seata代理会拦截并生成undo_logstorage.setResidue(storage.getResidue() - count);storage.setUsed(storage.getUsed() + count);storageMapper.updateById(storage);log.info("库存扣减完成: productId={}, 扣减数量={}, 剩余库存={}", productId, count, storage.getResidue());} catch (Exception e) {log.error("库存扣减业务异常", e);throw e; // 异常继续抛出,触发全局回滚}}/*** 查询库存(用于前端校验)*/public Storage getStorage(Long productId) {return storageMapper.selectByProductId(productId);}
}
3. 账户服务异常处理增强
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {@Autowiredprivate AccountMapper accountMapper;@Override@Transactionalpublic void decrease(Long userId, BigDecimal money) {try {Account account = accountMapper.selectByUserId(userId);if (account == null) {throw new RuntimeException("账户不存在: " + userId);}if (account.getBalance().compareTo(money) < 0) {// 余额不足,抛出异常触发回滚throw new RuntimeException("余额不足: 当前余额=" + account.getBalance() + ", 需要金额=" + money);}// 执行业务SQL - Seata代理会拦截并生成undo_logaccount.setBalance(account.getBalance().subtract(money));accountMapper.updateById(account);log.info("账户扣减完成: userId={}, 扣减金额={}, 剩余余额={}", userId, money, account.getBalance());} catch (Exception e) {log.error("账户扣减业务异常", e);throw e; // 异常继续抛出,触发全局回滚}}
}
4. 异常场景测试用例
@SpringBootTest
@Slf4j
public class DistributedTransactionTest {@Autowiredprivate OrderService orderService;@Testpublic void testNormalScenario() {// 正常场景OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("100.00"));orderService.createOrder(orderDTO);// 应该成功,所有数据一致}@Testpublic void testInsufficientInventory() {// 库存不足场景OrderDTO orderDTO = new OrderDTO(1L, 1L, 1000, new BigDecimal("100.00")); // 数量太大try {orderService.createOrder(orderDTO);Assert.fail("应该抛出异常");} catch (Exception e) {log.info("预期中的异常: {}", e.getMessage());// 应该所有数据都回滚:订单不存在或状态为失败,库存和余额不变}}@Testpublic void testInsufficientBalance() {// 余额不足场景OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("1000000.00")); // 金额太大try {orderService.createOrder(orderDTO);Assert.fail("应该抛出异常");} catch (Exception e) {log.info("预期中的异常: {}", e.getMessage());// 应该所有数据都回滚}}@Testpublic void testServiceTimeout() {// 服务超时场景OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("100.00"));// 模拟账户服务超时(需要在AccountService中模拟sleep)try {orderService.createOrder(orderDTO);Assert.fail("应该超时异常");} catch (Exception e) {log.info("超时异常: {}", e.getMessage());// Seata会检测超时并触发回滚}}
}
5. 事务状态查询和补偿接口
@RestController
@RequestMapping("/transaction")
@Slf4j
public class TransactionMonitorController {@Autowiredprivate DataSource dataSource;/*** 查询全局事务状态*/@GetMapping("/status/{xid}")public String getTransactionStatus(@PathVariable String xid) {try (Connection conn = dataSource.getConnection();PreparedStatement ps = conn.prepareStatement("SELECT * FROM undo_log WHERE xid = ? ORDER BY log_created DESC")) {ps.setString(1, xid);ResultSet rs = ps.executeQuery();if (rs.next()) {return "事务存在,状态: " + rs.getInt("log_status");} else {return "事务已完成或不存在";}} catch (Exception e) {return "查询失败: " + e.getMessage();}}/*** 手动补偿接口(用于异常情况)*/@PostMapping("/compensate/{orderId}")public String manualCompensate(@PathVariable Long orderId) {log.warn("手动触发事务补偿: orderId={}", orderId);// 这里可以实现手动补偿逻辑// 1. 检查订单状态// 2. 如果处于中间状态,进行补偿操作// 3. 发送通知等return "补偿操作已触发";}
}
一致性保证的关键点
- undo_log 机制:Seata 在执行业务SQL时自动生成回滚日志
- 全局锁:防止脏写,确保数据隔离性
- 超时控制:
@GlobalTransactional(timeoutMills = 300000)
设置事务超时 - 异常传播:任何服务的异常都会触发全局回滚
- 幂等性设计:服务需要支持重试和幂等调用
监控和运维建议
- 监控undo_log表:定期清理已完成的事务日志
- 设置事务超时:避免长时间占用资源
- 日志记录:详细记录事务执行过程
- 告警机制:对失败的事务进行告警
- 人工补偿接口:提供手动干预的能力
这样完整的实现确保了分布式事务的一致性,无论是正向流程还是异常情况都能正确处理。