Seata与shardingsphere的集成,单日10万+数据入库及一致性
自己写的,亲测有效。
其实绝大部分程序员是没有机会接触大规模大数据的分布式系统的,因此虽然对分库分表、数据一致性有不少了解,却比较少的直接上手,尤其是搭建初始的框架。
笔者最近正在做一个项目,日高峰入库单表10万+数据,因此需要考虑分库分表,同时由于系统涉及支付,需要从多个支付渠道扣款、扣减库存,因此在分布式系统中需要实现数据一致性。RPC采用Dubbo服务。
需要说明的是,该方案采用的是shardingsphere的分布式事务AT模式,事务管理器采用的是Seata,这个与直接使用seata-starter还不一样。属于官网所说的ShardingSphere 集成 Seata 柔性事务
对于单应用的分库分表、RCP调用的分库分表都可以做到DB回滚。但是对于MQ、Redis的一致性还未完全的验证,但实际上对于MQ/redis等,我相信大多数研发都有可以解决,完全可以采用在事务提交后,异步发送即可,对于发送失败的,自己重试即可,不能认为这一切就是银弹,高级的程序员看任何事情都应该是双面的看待。
如下就是解决方案示例:
一、引入包
<properties>
<shardingsphere.version>5.2.1</shardingsphere.version>
<seata.version>2.0.0</seata.version>
<mybatis-plus.version>3.5.2</mybatis-plus.version>
<dubbo.version>3.3.3</dubbo.version>
<spring.boot-version>2.7.18</spring.boot-version>
</properties>
<!-- ShardingSphere -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<!-- ShardingSphere通过seata进行AT的远程数据库回滚 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-transaction-base-seata-at</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<!-- Seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2021.0.5.0</version>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>${seata.version}</version>
<exclusions>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
二、Seata服务器及集成Seata的客户端应用
2.1 Seata服务器安装及配置
1、seata的服务器采用2.0.0版本,seata服务器需要4张独立的表,并建议seata的服务器采用单独的数据库,同时seata服务器也依赖nacos配置,请提前安装好nacos。
2.1.1 seata服务器建表语句如下
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for branch_table
-- ----------------------------
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for distributed_lock
-- ----------------------------
DROP TABLE IF EXISTS `distributed_lock`;
CREATE TABLE `distributed_lock` (
`lock_key` char(20) NOT NULL,
`lock_value` varchar(20) NOT NULL,
`expire` bigint DEFAULT NULL,
PRIMARY KEY (`lock_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for global_table
-- ----------------------------
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint DEFAULT NULL,
`status` tinyint NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int DEFAULT NULL,
`begin_time` bigint DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status`,`gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for lock_table
-- ----------------------------
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(128) DEFAULT NULL,
`transaction_id` bigint DEFAULT NULL,
`branch_id` bigint NOT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`status` tinyint NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
SET FOREIGN_KEY_CHECKS = 1;
2.1.2 Seata服务器的yml文件
采用docker安装,该文件可以在宿主机上通过-v 映射到容器中,这里可以看到
namespace: "local" group: "SEATA_GROUP" data-id: seataServer.properties
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
server:
port: 7091
spring:
application:
name: seata-server
logging:
config: classpath:logback-spring.xml
file:
path: ${log.home:${user.home}/logs/seata}
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
seata:
config:
type: nacos
nacos:
server-addr: 192.168.3.118:8848
username: nacos
password: nacos
namespace: "local"
group: "SEATA_GROUP"
data-id: seataServer.properties
registry:
type: nacos
nacos:
application: seata-server
server-addr: 192.168.3.118:8848
username: nacos
password: nacos
namespace: "local"
group: "SEATA_GROUP"
# server:
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/**
2.1.2 到nacos上建立配置
创建local命名空间,命名空间的名字和ID都是local。
创建Data id是seataServer.properties
创建Group 是 SEATA_GROUP
配置格式选择:properties
内容如下,这里配置的DB就是seata独立的数据库,里面是之前创建的4张表,也可以使用业务库。
# 存储模式
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://192.168.3.118:3306/seata?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
# 事务日志存储模式(与 store.mode=db 配合)
store.db.lockTable=lock_table
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
# 事务日志存储优化
store.db.queryLimit=1000
store.db.maxWait=5000
# 参与事务的客户端的事务组与TC集群的映射关系(一个集群内的所有应用都采用同一个名字,例如lili-sharding-java-group)
service.vgroupMapping.lili-sharding-java-group=default
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
logging.file.path=/seata-server/logs
2.1.2 使用docker部署Seata服务器
这里就是把yml配置文件映射到docker容器中,至此seata服务器部署完毕。
docker run -d --name seata-server-home \
-p 8091:8091 \
-p 7091:7091 \
-v /Users/hezhixiong/Documents/software/seata/application-home.yml:/seata-server/resources/application.yml \
-e SEATA_IP=192.168.3.118 \
-e SEATA_PORT=8091 \
registry.cn-hangzhou.aliyuncs.com/haiju_base/seata-server:2.0.0
2.2 JAVA应用集成Seata客户端
2.2.1 Seata应用yml配置
尤其注意tx-service-group,这里的名字与seata服务器里配置的要一致,简单来说就是分布式系统中所有应用都配置统一个名字即可,application-id 每个应用都不一样。
seata:
enabled: false # 这里直接关闭,不要怀疑,因为我们使用是shardingsphere集成Seata柔性事务,已在resource目录下的seata.conf中配置,seata其实被shardingsphere包装了
enable-auto-data-source-proxy: false # 由于使用了分库分表数据源,这里必须要关闭自动代理,由shardingsphere代理数据源
application-id: lili-sharding-java # 应用名称
tx-service-group: lili-sharding-java-group # 事务组名称(决定了多个微服务是否能参与到同一个分布式事务中,如果多个微服务需要参与同一个分布式事务,它们的tx-service-group必须配置为相同的)
service:
vgroup-mapping:
lili-sharding-java-group: default # 这里于seata-server中配置的service.vgroupMapping.lili-sharding-java-group=default 保持一致
registry:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
username: ${spring.cloud.nacos.discovery.username}
password: ${spring.cloud.nacos.discovery.password}
namespace: ${spring.cloud.nacos.discovery.namespace}
group: SEATA_GROUP # 这里的Group需要与SEATA-SERVER注册nacos的GROUP一直,而不一定是本应用在nacos上注册的GROUP一致,通常应用注册的group是DEFAULT_GROUP
config:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
username: ${spring.cloud.nacos.discovery.username}
password: ${spring.cloud.nacos.discovery.password}
namespace: ${spring.cloud.nacos.discovery.namespace}
group: SEATA_GROUP # 这里的Group需要与SEATA-SERVER注册nacos的GROUP一直,而不一定是本应用在nacos上注册的GROUP一致,通常应用注册的group是DEFAULT_GROUP
data-id: seataServer.properties # 配置文件名称,这是seata-server在nacos中的配置文件名称,用于客户端读取Seata 服务端的全局配置(如事务存储模式、数据库连接等)
tcc:
fence:
log-table-name: tcc_fence_log # 防悬挂表名,可以配置分库分表也可以不配置,配置的话需要写按照xid的分库分表算法
cleanPeriod: 1h # 防悬挂清理周期,默认为 1h,表示一消失清理一次
2.2.2 Seata应用seata.conf配置
由于集成了分库分表,需要在应用的resource目录下,创建seata.conf文件,内容如下
########## ShardingSphere对seata客户端的配置解析,具体详见org.apache.shardingsphere.transaction.base.seata.at.SeataATShardingSphereTransactionManager
# 是否启用seata. 默认为true
sharding.transaction.seata.at.enable=true
# 当前客户端的id
client.application.id=lili-sharding-java
# 事务分组. 等价于 seata.tx-service-group
client.transaction.service.group=lili-sharding-java-group
2.2.3 Seata应用连接到数据库中创建事务日志表及tcc表。
如果是分库分表的,要求每个分库下都要创建
-- AT模式需要使用的表(无论是否分库分表,只要是分库就必须在所有的业务库中创建该表)
CREATE TABLE `undo_log`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(100) NOT NULL COMMENT 'global transaction id',
`context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime NOT NULL COMMENT 'create datetime',
`log_modified` datetime NOT NULL COMMENT 'modify datetime',
`ext` varchar(100) DEFAULT NULL COMMENT 'ext info',
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
-- ========= 注意区分是否tcc_fence_log分库分表,不分库分表,数据库位于00库下即可 ========
-- 不分库分表,数据库位于00库下,性能不够,可考虑分库分表
-- TCC 模式需要使用的表
CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(
`xid` VARCHAR(128) NOT NULL COMMENT 'global id',
`branch_id` BIGINT NOT NULL COMMENT 'branch id',
`action_name` VARCHAR(64) NOT NULL COMMENT 'action name',
`status` TINYINT NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',
`gmt_modified` DATETIME(3) NOT NULL COMMENT 'update time',
PRIMARY KEY (`xid`, `branch_id`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
三、Shardingsphere的配置
3.1 yml文件配置
这里的分库分表算法,同时支持依据userNo、orderNo分库分表。你们可以自己写自己的算法。
这里有个重要的是default-data-source-name: ds00 # 对于不分库分表的,全部路由到00库。
spring:
shardingsphere:
datasource:
names: ds00, ds01 # 定义2个数据源,如果要修改ds名字,需要同步修改分库分表算法代码
# 业务库 ds00
ds00:
url: jdbc:mysql://127.0.0.1:3306/cinema00?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true&useInformationSchema=true
username: root
password: 123456
type: com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
# 连接池配置
connection-timeout: 45000 # 45秒连接超时
minimum-idle: 5 # 初始和最小空闲连接数
maximum-pool-size: 50 # 最大连接数,依据数据库的最大连接数限制配置
idle-timeout: 300000 # 空闲连接超时时间
max-lifetime: 1800000 # 连接最大存活时间
connection-test-query: SELECT 1 # 连接检测查询
# 业务库 ds01
ds01:
url: jdbc:mysql://127.0.0.1:3306/cinema01?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true&useInformationSchema=true
username: root
password: 123456
type: com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
# 连接池配置
connection-timeout: 45000 # 45秒连接超时
minimum-idle: 5 # 初始和最小空闲连接数
maximum-pool-size: 50 # 最大连接数
idle-timeout: 300000 # 空闲连接超时时间
max-lifetime: 1800000 # 连接最大存活时间
connection-test-query: SELECT 1 # 连接检测查询
rules:
sharding:
default-data-source-name: ds00 # 对于不分库分表的,全部路由到00库
tables:
ORDER_INFO: # order_info 表分片规则
actualDataNodes: ds0${0..1}.ORDER_INFO_0${0..15} # 分库分表逻辑,由数据源名 + 表名组成例如ds00.order_info_07,ds01.order_info_13
database-strategy:
complex:
shardingColumns: USER_NO, ORDER_NO
shardingAlgorithmName: db-complex-algorithm
table-strategy:
complex:
shardingColumns: USER_NO, ORDER_NO
shardingAlgorithmName: table-complex-algorithm
# 分库算法(2库)
sharding-algorithms:
db-complex-algorithm:
type: CLASS_BASED
props:
strategy: COMPLEX
algorithmClassName: com.lili.sharding.rule.db.DbComplexShardingAlgorithm
table-complex-algorithm:
type: CLASS_BASED
props:
strategy: COMPLEX
algorithmClassName: com.lili.sharding.rule.table.TableComplexShardingAlgorithm
key-generators:
snowflake:
type: SNOWFLAKE
props:
sql-show: true # 打印 SQL
四、代码实战
注意:
不使用seata的@GlobalTransactional 注解实现,因为我们采用的是分库分表的柔性事务。
4.1 如果只是多表数据库操作,不涉及RPC调用,直接使用@Transactional注解即可
/**
* 跨库出错将全部回滚,支持直接使用spring事务,支持跨库事务回滚
* (特别推荐,尽可能采用第3种方式,在不得以的情况下才使用seata,毕竟seata的稳定性保证是不容易的一件事)
* 1、@Transactional必须采用代理的方式,调用方必须采用spring bean的方式调用,事务才可以生效,如果当前类直接调用本类自己的方法,即使方法上有@Transactional,事务将不会生效
* 2、如果一次请求,不涉及RPC调用,那么就直接使用spring事务即可。
* 3、如果不涉及RCP调用,但需要使用redis/mq,那么建议使用@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT),在另一个类的方法在事务提交后发送,如果发送失败采用补偿机制。
* 4、如果一次请求,涉及到RPC调用,那么就叠加@ShardingSphereTransactionType使用,来保证系统与跨库事务的一致性。注意,我们不需要使用seata的 @GlobalTransactional。原因是本系统采用的是基于shardingsphere的柔性属于,底层基于seata的封装
*/
@Transactional(rollbackFor = Exception.class)
public String add2OrderUseSpringTransactional() {
String userNo1 = "fasfafadsfa"; //按预期HASH%2 该用户订单落在00库
String orderInfo1 = "orderInfo1" + System.currentTimeMillis();
// 交流流水号DB中是唯一索引,第一条数据模拟变化,保证不唯一
String transNo1 = "transNo" + System.currentTimeMillis();
String userNo2 = "12312455523"; // 按预期HASH%2 该用户订单落在01库
String orderInfo2 = "orderInfo2" + System.currentTimeMillis();
// 固定的交流流水号DB中是唯一索引,在第二次请求时,会触发数据库异常,00库中的数据也会被回购
String transNo2 = "conflictingTransNo";
String orderNo1 = this.addOrder(userNo1, orderInfo1, transNo1);
String orderNo2 = this.addOrder(userNo2, orderInfo2, transNo2);
return orderNo1 + "::::" + orderNo2;
}
4.2 涉及RCP调用分别在服务方与消费方设置不同注解
4.2.1 全局事务dubbo发起方消费者代码,需要使用2个注解
@Transactional(rollbackFor = Exception.class)
@ShardingSphereTransactionType(TransactionType.BASE)
public String add2OrderUseSeata(Boolean inRollback, Boolean outRollback) {
String userNo1 = "fasfafadsfa"; //按预期HASH%2 该用户订单落在00库
String orderInfo1 = "orderInfo1" + System.currentTimeMillis();
// 交流流水号DB中是唯一索引,第一条数据模拟变化,保证不唯一
String transNo1 = "transNo" + System.currentTimeMillis();
String userNo2 = "12312455523"; // 按预期HASH%2 该用户订单落在01库
String orderInfo2 = "orderInfo2" + System.currentTimeMillis();
// 固定的交流流水号DB中是唯一索引,因此第二次插入会报唯一索引异常,此时通过seata的全局事务保障在00库中的订单回滚
String transNo2 = inRollback ? "conflictingTransNo" : "transNo" + System.currentTimeMillis();
String xid = RootContext.getXID();
BranchType branchType = RootContext.getBranchType();
System.out.println("xid:" + xid + ",branchType:" + branchType);
// 先调用1次本系统的DB
String orderNo1 = this.addOrder(userNo1, orderInfo1, transNo1);
// 再调用1次远程RPC(可以制造调用成功或调用失败)
PayInfo payInfo = new PayInfo();
payInfo.setUserNo(userNo1);
payInfo.setOrderNo(orderNo1);
String outTrans = outRollback ? "conflictingTransNo" : transNo1;
payInfo.setOutTransNo(outTrans);
String payNo = payProxy.addPay(payInfo);
// 最后再调用1次本系统的DB
String orderNo2 = this.addOrder(userNo2, orderInfo2, transNo2);
return orderNo1 + "::::" + orderNo2 + "::::" + payNo;
}
4.2.2 全局事务dubbo服务提供方代码,需要使用1个注解
@DubboService
public class PayFacadeServiceImpl implements IPayFacade {
@Resource
private PayService payService;
@Override
@ShardingSphereTransactionType(TransactionType.BASE)
public String addPay(PayInfo pay) {
return payService.addPay(pay);
}
}
至此整合dubbo、分库分表、seata、mybatis-plus的应用就完成了