基于 ShardingSphere + Seata 的最终一致性事务完整示例实现
以下是基于 ShardingSphere + Seata 的最终一致性事务完整示例实现(以电商订单+库存场景为例):
一、环境准备
1. 基础环境
-
数据库:2 个 MySQL 实例(
order_db
订单库、stock_db
库存库),每个库需创建undo_log
表(Seata 回滚日志表):-- Seata AT 模式所需回滚日志表(每个库执行) CREATE TABLE IF NOT EXISTS undo_log (id BIGINT AUTO_INCREMENT,branch_id BIGINT NOT NULL,xid VARCHAR(100) NOT NULL,context VARCHAR(128) NOT NULL,rollback_info LONGBLOB NOT NULL,log_status INT NOT NULL,log_created DATETIME NOT NULL,log_modified DATETIME NOT NULL,PRIMARY KEY (id),UNIQUE KEY ux_undo_log (xid, branch_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
中间件:
- ShardingSphere-JDBC 5.4.0(分库分表)
- Seata Server 2.0.0(事务协调器)
2. Seata Server 部署(macOS)
# 下载 Seata Server
wget https://github.com/seata/seata/releases/download/v2.0.0/seata-server-2.0.0.tar.gz
tar -zxvf seata-server-2.0.0.tar.gz
cd seata-server-2.0.0# 配置 registry.conf(使用 Nacos 作为注册中心)
vim conf/registry.conf
# 修改内容(示例):
registry {type = "nacos"nacos {serverAddr = "127.0.0.1:8848" # 本地 Nacos 地址namespace = ""cluster = "default"}
}# 启动 Seata Server
sh bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file # 内存模式存储事务日志(测试用)
二、Spring Boot 项目配置
1. 依赖引入(pom.xml
)
<dependencies><!-- Spring Boot 基础 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- MyBatis-Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.4</version></dependency><!-- ShardingSphere-JDBC --><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId><version>5.4.0</version></dependency><!-- Seata 依赖 --><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>2.0.0</version></dependency><!-- MySQL 驱动 --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency>
</dependencies>
2. 核心配置(application.yml
)
spring:application:name: order-serviceshardingsphere:datasource:names: order_db,stock_db # 订单库和库存库order_db:type: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3306/order_db?useSSL=false&serverTimezone=UTCusername: rootpassword: 123456stock_db:type: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3306/stock_db?useSSL=false&serverTimezone=UTCusername: rootpassword: 123456props:sql-show: true # 打印 SQL 路由日志cloud:nacos:discovery:server-addr: 127.0.0.1:8848 # Nacos 注册中心地址seata:tx-service-group: my_test_tx_group # 事务组名称(需与 Seata Server 配置匹配)registry:type: nacos # 使用 Nacos 注册nacos:server-addr: 127.0.0.1:8848service:vgroup-mapping:my_test_tx_group: default # 事务组映射到 default 集群
三、业务代码实现
1. 库存服务(操作 stock_db
)
@Data
@TableName("t_stock")
public class Stock {@TableId(type = IdType.AUTO)private Long id;private Long goodsId; // 商品 IDprivate Integer quantity; // 库存数量
}
public interface StockMapper extends BaseMapper<Stock> {/** 扣减库存 */@Update("UPDATE t_stock SET quantity = quantity - 1 WHERE goods_id = #{goodsId} AND quantity > 0")int deductStock(@Param("goodsId") Long goodsId);
}
2. 订单服务(操作 order_db
)
@Data
@TableName("t_order")
public class Order {@TableId(type = IdType.ASSIGN_ID)private Long orderId;private Long userId;private Long goodsId;private LocalDateTime createTime;
}
public interface OrderMapper extends BaseMapper<Order> {/** 创建订单 */int insert(Order order);
}
3. 全局事务入口(跨库操作)
@Service
public class OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate StockMapper stockMapper;/*** 创建订单并扣减库存(跨库事务)* @GlobalTransactional:Seata 全局事务注解*/@GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class)public void createOrder(Long userId, Long goodsId) {// 1. 扣减库存(stock_db 库)int deductResult = stockMapper.deductStock(goodsId);if (deductResult <= 0) {throw new RuntimeException("库存不足");}// 2. 创建订单(order_db 库)Order order = new Order();order.setUserId(userId);order.setGoodsId(goodsId);order.setCreateTime(LocalDateTime.now());orderMapper.insert(order);// 模拟异常(触发回滚)// throw new RuntimeException("人为异常,触发事务回滚");}
}
四、验证最终一致性
-
正常流程:调用
createOrder(1, 1001)
,观察:stock_db.t_stock
库存减 1。order_db.t_order
新增订单记录。- Seata Server 日志显示事务提交成功。
-
异常回滚:取消注释
throw new RuntimeException(...)
,调用接口后观察:stock_db.undo_log
表生成回滚日志。- 库存自动恢复(Seata 执行反向补偿操作)。
- 订单表无新增记录。
关键说明
-
Seata AT 模式原理:
- 事务发起方(
OrderService
)向 Seata Server 注册全局事务。 - 分支事务(扣库存、创建订单)执行前,Seata 会记录数据的「旧值」到
undo_log
。 - 若全局事务提交,Seata 标记
undo_log
为已提交(无实际操作)。 - 若全局事务回滚,Seata 使用
undo_log
中的旧值恢复数据。
- 事务发起方(
-
性能优化:Seata AT 模式通过「无锁读」和「回滚日志」避免长事务锁,适合高并发场景(如电商大促)。
通过此示例,可实现分库分表下的跨库事务最终一致性,满足大多数业务场景需求。