分库分表数据源如何清洗同步到目标表
在平台限制FSpark只能使用SQL语言、且不支持Python组件进行数据库连接配置的情况下,仍可通过Shell预处理生成动态SQL+FSpark SQL读取分库分表+ETL同步的方案实现T-1数据导入。以下是具体实现,重点通过Shell动态生成包含所有分表的FSpark SQL脚本,规避Python依赖。
一、方案核心逻辑
利用Shell脚本动态生成包含所有分库分表的FSpark SQL代码(解决FSpark无法循环遍历分表的问题),通过FSpark SQL读取MySQL分库分表的T-1数据并落地到Hive中间表,最后通过ETL组件同步到目标Hive表或云MySQL。整体链路:
Flow(调度)→ Shell(生成FSpark SQL脚本)→ FSpark(执行SQL,读取分表T-1数据→Hive中间表)→ ETL(中间表→目标表)→ Shell(校验)
关键适配点:
- 用Shell脚本动态生成包含384个分表(6库×64表)的FSpark SQL代码,替代Python的动态配置能力。
- 基于MySQL分库配置,在FSpark SQL中为每个分库创建独立数据源,通过
UNION ALL
合并所有分表数据。
二、详细实现步骤
1. 前置准备
(1)Hive表设计
- 中间表:存储FSpark读取的原始数据(含分库分表标识,便于问题排查)。
- 目标表:合并后的业务表(与中间表结构一致,剔除中间表特有字段)。
-- 中间表
CREATE TABLE db_hive.orders_mid (order_id BIGINT,user_id BIGINT,pay_amount DECIMAL(10,2),create_time TIMESTAMP,status STRING,db_name STRING COMMENT '来源分库',tbl_name STRING COMMENT '来源分表'
)
PARTITIONED BY (dt STRING)
STORED AS ORC;-- 目标表
CREATE TABLE db_hive.orders (order_id BIGINT,user_id BIGINT,pay_amount DECIMAL(10,2),create_time TIMESTAMP,status STRING
)
PARTITIONED BY (dt STRING)
STORED AS ORC;
(2)MySQL分库数据源配置
在平台数据源管理中,为6个分库(db0~db5
)分别配置JDBC连接(FSpark SQL可直接引用这些数据源):
- 数据源名称:
mysql_db0
、mysql_db1
、…、mysql_db5
- 连接地址:
jdbc:mysql://mysql-host:3306/db0
(对应db0)、jdbc:mysql://mysql-host:3306/db1
(对应db1)等 - 认证信息:统一使用
sync_user
及加密密码
2. Shell组件:动态生成FSpark SQL脚本
通过Shell脚本根据T-1日期和分库分表规则,生成包含所有分表读取逻辑的FSpark SQL代码(核心是生成384个分表的SELECT
语句并通过UNION ALL
合并)。
Shell脚本示例(generate_fspark_sql.sh):
#!/bin/bash
t1_date=$1 # 接收T-1日期(如2025-08-17)
t1_partition=$(echo $t1_date | sed 's/-//g') # 转换为20250817(MySQL分区名)
sql_file="/tmp/read_mysql_t1.sql" # 生成的FSpark SQL文件# 初始化SQL文件(创建中间表分区)
echo "ALTER TABLE db_hive.orders_mid ADD IF NOT EXISTS PARTITION (dt='$t1_date');" > $sql_file# 生成分库分表读取逻辑(db0~db5,tbl0~tbl63)
echo "INSERT OVERWRITE TABLE db_hive.orders_mid PARTITION (dt='$t1_date')" >> $sql_file
echo "SELECT * FROM (" >> $sql_file# 循环分库(0~5)
for db_idx in {0..5}; dodb_name="db$db_idx"datasource="mysql_db$db_idx" # 对应平台配置的数据源名称# 循环分表(0~63)for tbl_idx in {0..63}; dotbl_name="tbl$tbl_idx"# 生成单表读取SQL(利用MySQL分区过滤T-1数据)echo "SELECT order_id, user_id, pay_amount, create_time, status, '$db_name' AS db_name, '$tbl_name' AS tbl_name FROM ${datasource}.${tbl_name} WHERE create_time >= '$t1_date 00:00:00' AND create_time < '$t1_date 23:59:59'AND partition = 'p$t1_partition' -- 过滤MySQL物理分区" >> $sql_file# 除最后一个表外,添加UNION ALLif [ $db_idx -lt 5 ] || [ $tbl_idx -lt 63 ]; thenecho "UNION ALL" >> $sql_filefidone
doneecho ") t;" >> $sql_file # 闭合子查询echo "生成FSpark SQL脚本成功:$sql_file"
脚本功能:
- 接收T-1日期参数,生成
p20250817
格式的MySQL分区名。 - 循环生成384个分表的
SELECT
语句,通过UNION ALL
合并,最终插入Hive中间表的dt=T-1
分区。
3. FSpark组件:执行SQL读取分表数据
FSpark组件加载Shell生成的SQL脚本(/tmp/read_mysql_t1.sql
),执行分库分表数据读取并写入Hive中间表。
FSpark配置示例:
- 脚本类型:SQL
- SQL文件路径:
/tmp/read_mysql_t1.sql
(Shell生成的文件) - 资源配置:
executor-cores=4
,num-executors=8
(根据数据量调整,确保并行读取效率)
4. ETL组件:中间表→目标表(Hive或云MySQL)
(1)同步到Hive目标表
ETL组件读取中间表db_hive.orders_mid
的dt=T-1
分区,剔除db_name
和tbl_name
字段后写入目标表。
ETL配置:
- 源表:
db_hive.orders_mid
,筛选dt='$t1_date'
- 转换:保留
order_id, user_id, pay_amount, create_time, status
字段 - 目标表:
db_hive.orders
,分区dt='$t1_date'
,写入模式overwrite
(2)同步到云数据MySQL
若目标是云MySQL,调整ETL目标端配置:
ETL配置:
- 源表:
db_hive.orders_mid
,筛选dt='$t1_date'
- 转换:同上述Hive目标表
- 目标端:云MySQL表(需提前创建,结构与Hive目标表一致)
- 连接参数:
jdbc:mysql://cloud-mysql-host:3306/db_cloud
- 写入模式:
upsert
(按order_id
主键更新,避免重复)
- 连接参数:
5. Flow组件:调度全流程
通过Flow组件定义每日调度任务,按顺序执行以下节点:
节点1:shell_generate_sql(Shell组件)→ 输入参数t1_date → 输出FSpark SQL脚本
节点2:fspark_execute(FSpark组件)→ 依赖节点1 → 执行SQL写入中间表
节点3:etl_sync(ETL组件)→ 依赖节点2 → 同步到目标表
节点4:shell_validate(Shell组件)→ 依赖节点3 → 校验数据一致性
调度参数:
- 触发时间:每天凌晨5点(
0 5 * * *
) t1_date
取值:date_add(current_date, -1)
(平台内置函数获取前一天日期)
6. Shell组件:数据一致性校验
通过FSpark SQL统计中间表与目标表的T-1数据量,确保同步完整。
校验脚本示例(validate.sh):
#!/bin/bash
t1_date=$1# 中间表数据量
mid_count=$(fspark-sql -e "SELECT COUNT(*) FROM db_hive.orders_mid WHERE dt='$t1_date'")# 目标表数据量(Hive或MySQL,此处以Hive为例)
target_count=$(fspark-sql -e "SELECT COUNT(*) FROM db_hive.orders WHERE dt='$t1_date'")if [ $mid_count -eq $target_count ]; thenecho "校验通过:中间表=$mid_count,目标表=$target_count"exit 0
elseecho "校验失败:差异=$(($mid_count - $target_count))条"exit 1
fi
三、关键优化与适配
-
规避FSpark SQL循环限制:
用Shell脚本预先生成包含所有分表的UNION ALL
语句,解决FSpark无法动态遍历分表的问题。 -
利用MySQL分区加速:
SQL中显式过滤partition = 'p$t1_partition'
,仅读取T-1对应的物理分区,避免全表扫描,效率提升10倍以上。 -
控制SQL脚本大小:
384个分表的UNION ALL
会生成较大SQL文件(约100KB),需确保平台支持该大小的脚本执行(可通过分库分批读取优化,如先读db0db2,再读db3db5,最后合并)。 -
容错机制:
- Flow组件配置任务重试(3次),失败后触发告警。
- 中间表和目标表均使用
overwrite
模式,支持重跑覆盖错误数据。
总结
本方案完全符合平台限制(FSpark仅支持SQL、无Python数据库连接),通过Shell动态生成SQL脚本突破分表遍历限制,核心优势:
- 全组件适配:仅使用Shell、FSpark SQL、ETL、Flow,无需Python。
- 高效读取:利用MySQL物理分区和Spark分布式计算,支持TB级数据量。
- 可扩展性:同步到云MySQL仅需调整ETL目标端配置,无需修改核心流程。
若分表数量增加(如超过100个),可优化Shell脚本按分库生成多个SQL文件,通过FSpark分阶段执行后合并,避免单脚本过大。