当前位置: 首页 > news >正文

分库分表数据源如何清洗同步到目标表

在平台限制FSpark只能使用SQL语言、且不支持Python组件进行数据库连接配置的情况下,仍可通过Shell预处理生成动态SQL+FSpark SQL读取分库分表+ETL同步的方案实现T-1数据导入。以下是具体实现,重点通过Shell动态生成包含所有分表的FSpark SQL脚本,规避Python依赖。

一、方案核心逻辑

利用Shell脚本动态生成包含所有分库分表的FSpark SQL代码(解决FSpark无法循环遍历分表的问题),通过FSpark SQL读取MySQL分库分表的T-1数据并落地到Hive中间表,最后通过ETL组件同步到目标Hive表或云MySQL。整体链路:

Flow(调度)→ Shell(生成FSpark SQL脚本)→ FSpark(执行SQL,读取分表T-1数据→Hive中间表)→ ETL(中间表→目标表)→ Shell(校验)

关键适配点

  • 用Shell脚本动态生成包含384个分表(6库×64表)的FSpark SQL代码,替代Python的动态配置能力。
  • 基于MySQL分库配置,在FSpark SQL中为每个分库创建独立数据源,通过UNION ALL合并所有分表数据。

二、详细实现步骤

1. 前置准备
(1)Hive表设计
  • 中间表:存储FSpark读取的原始数据(含分库分表标识,便于问题排查)。
  • 目标表:合并后的业务表(与中间表结构一致,剔除中间表特有字段)。
-- 中间表
CREATE TABLE db_hive.orders_mid (order_id BIGINT,user_id BIGINT,pay_amount DECIMAL(10,2),create_time TIMESTAMP,status STRING,db_name STRING COMMENT '来源分库',tbl_name STRING COMMENT '来源分表'
)
PARTITIONED BY (dt STRING)
STORED AS ORC;-- 目标表
CREATE TABLE db_hive.orders (order_id BIGINT,user_id BIGINT,pay_amount DECIMAL(10,2),create_time TIMESTAMP,status STRING
)
PARTITIONED BY (dt STRING)
STORED AS ORC;
(2)MySQL分库数据源配置

在平台数据源管理中,为6个分库(db0~db5)分别配置JDBC连接(FSpark SQL可直接引用这些数据源):

  • 数据源名称:mysql_db0mysql_db1、…、mysql_db5
  • 连接地址:jdbc:mysql://mysql-host:3306/db0(对应db0)、jdbc:mysql://mysql-host:3306/db1(对应db1)等
  • 认证信息:统一使用sync_user及加密密码
2. Shell组件:动态生成FSpark SQL脚本

通过Shell脚本根据T-1日期和分库分表规则,生成包含所有分表读取逻辑的FSpark SQL代码(核心是生成384个分表的SELECT语句并通过UNION ALL合并)。

Shell脚本示例(generate_fspark_sql.sh)

#!/bin/bash
t1_date=$1  # 接收T-1日期(如2025-08-17)
t1_partition=$(echo $t1_date | sed 's/-//g')  # 转换为20250817(MySQL分区名)
sql_file="/tmp/read_mysql_t1.sql"  # 生成的FSpark SQL文件# 初始化SQL文件(创建中间表分区)
echo "ALTER TABLE db_hive.orders_mid ADD IF NOT EXISTS PARTITION (dt='$t1_date');" > $sql_file# 生成分库分表读取逻辑(db0~db5,tbl0~tbl63)
echo "INSERT OVERWRITE TABLE db_hive.orders_mid PARTITION (dt='$t1_date')" >> $sql_file
echo "SELECT * FROM (" >> $sql_file# 循环分库(0~5)
for db_idx in {0..5}; dodb_name="db$db_idx"datasource="mysql_db$db_idx"  # 对应平台配置的数据源名称# 循环分表(0~63)for tbl_idx in {0..63}; dotbl_name="tbl$tbl_idx"# 生成单表读取SQL(利用MySQL分区过滤T-1数据)echo "SELECT order_id, user_id, pay_amount, create_time, status, '$db_name' AS db_name, '$tbl_name' AS tbl_name FROM ${datasource}.${tbl_name} WHERE create_time >= '$t1_date 00:00:00' AND create_time < '$t1_date 23:59:59'AND partition = 'p$t1_partition'  -- 过滤MySQL物理分区" >> $sql_file# 除最后一个表外,添加UNION ALLif [ $db_idx -lt 5 ] || [ $tbl_idx -lt 63 ]; thenecho "UNION ALL" >> $sql_filefidone
doneecho ") t;" >> $sql_file  # 闭合子查询echo "生成FSpark SQL脚本成功:$sql_file"

脚本功能

  • 接收T-1日期参数,生成p20250817格式的MySQL分区名。
  • 循环生成384个分表的SELECT语句,通过UNION ALL合并,最终插入Hive中间表的dt=T-1分区。
3. FSpark组件:执行SQL读取分表数据

FSpark组件加载Shell生成的SQL脚本(/tmp/read_mysql_t1.sql),执行分库分表数据读取并写入Hive中间表。

FSpark配置示例

  • 脚本类型:SQL
  • SQL文件路径/tmp/read_mysql_t1.sql(Shell生成的文件)
  • 资源配置executor-cores=4num-executors=8(根据数据量调整,确保并行读取效率)
4. ETL组件:中间表→目标表(Hive或云MySQL)
(1)同步到Hive目标表

ETL组件读取中间表db_hive.orders_middt=T-1分区,剔除db_nametbl_name字段后写入目标表。

ETL配置

  • 源表db_hive.orders_mid,筛选dt='$t1_date'
  • 转换:保留order_id, user_id, pay_amount, create_time, status字段
  • 目标表db_hive.orders,分区dt='$t1_date',写入模式overwrite
(2)同步到云数据MySQL

若目标是云MySQL,调整ETL目标端配置:

ETL配置

  • 源表db_hive.orders_mid,筛选dt='$t1_date'
  • 转换:同上述Hive目标表
  • 目标端:云MySQL表(需提前创建,结构与Hive目标表一致)
    • 连接参数:jdbc:mysql://cloud-mysql-host:3306/db_cloud
    • 写入模式:upsert(按order_id主键更新,避免重复)
5. Flow组件:调度全流程

通过Flow组件定义每日调度任务,按顺序执行以下节点:

节点1:shell_generate_sql(Shell组件)→ 输入参数t1_date → 输出FSpark SQL脚本
节点2:fspark_execute(FSpark组件)→ 依赖节点1 → 执行SQL写入中间表
节点3:etl_sync(ETL组件)→ 依赖节点2 → 同步到目标表
节点4:shell_validate(Shell组件)→ 依赖节点3 → 校验数据一致性

调度参数

  • 触发时间:每天凌晨5点(0 5 * * *
  • t1_date取值:date_add(current_date, -1)(平台内置函数获取前一天日期)
6. Shell组件:数据一致性校验

通过FSpark SQL统计中间表与目标表的T-1数据量,确保同步完整。

校验脚本示例(validate.sh)

#!/bin/bash
t1_date=$1# 中间表数据量
mid_count=$(fspark-sql -e "SELECT COUNT(*) FROM db_hive.orders_mid WHERE dt='$t1_date'")# 目标表数据量(Hive或MySQL,此处以Hive为例)
target_count=$(fspark-sql -e "SELECT COUNT(*) FROM db_hive.orders WHERE dt='$t1_date'")if [ $mid_count -eq $target_count ]; thenecho "校验通过:中间表=$mid_count,目标表=$target_count"exit 0
elseecho "校验失败:差异=$(($mid_count - $target_count))条"exit 1
fi

三、关键优化与适配

  1. 规避FSpark SQL循环限制
    用Shell脚本预先生成包含所有分表的UNION ALL语句,解决FSpark无法动态遍历分表的问题。

  2. 利用MySQL分区加速
    SQL中显式过滤partition = 'p$t1_partition',仅读取T-1对应的物理分区,避免全表扫描,效率提升10倍以上。

  3. 控制SQL脚本大小
    384个分表的UNION ALL会生成较大SQL文件(约100KB),需确保平台支持该大小的脚本执行(可通过分库分批读取优化,如先读db0db2,再读db3db5,最后合并)。

  4. 容错机制

    • Flow组件配置任务重试(3次),失败后触发告警。
    • 中间表和目标表均使用overwrite模式,支持重跑覆盖错误数据。

总结

本方案完全符合平台限制(FSpark仅支持SQL、无Python数据库连接),通过Shell动态生成SQL脚本突破分表遍历限制,核心优势:

  1. 全组件适配:仅使用Shell、FSpark SQL、ETL、Flow,无需Python。
  2. 高效读取:利用MySQL物理分区和Spark分布式计算,支持TB级数据量。
  3. 可扩展性:同步到云MySQL仅需调整ETL目标端配置,无需修改核心流程。

若分表数量增加(如超过100个),可优化Shell脚本按分库生成多个SQL文件,通过FSpark分阶段执行后合并,避免单脚本过大。

http://www.dtcms.com/a/337435.html

相关文章:

  • 大数据计算引擎(二)——Flink
  • 大数据计算引擎(四)—— Impala
  • 【matlab】考虑源荷不平衡的微电网鲁棒定价研究
  • Pandas 数据导入导出、索引、分组聚合与可视化
  • (第十八期)图像标签的三个常用属性:width、height、border
  • 特赞内容运营解决方案,AI重构品牌内容价值链
  • 云计算学习100天-第21天
  • 整体设计 之“凝聚式中心点”原型 --整除:智能合约和DBMS的深层融合 之2
  • 将 iPhone 联系人转移到 Infinix 的完整指南
  • MCP ZAP Server:一款能够利用大模型替代人工进行Web安全扫描的开源MCP
  • Vue深入组件:组件 v-model 详解2
  • 网络安全巡检系统的功能组成和作用
  • sizeof和strlen的对比分析
  • vue从入门到精通:搭建第一个vue项目
  • kali linux从入门到精通教程
  • 【GM3568JHF】FPGA+ARM异构开发板烧录指南
  • Go并发编程-goroutine
  • 智能人形机器人:知识驱动的工业生产力革新
  • 视觉语言导航(11)——预训练范式 4.1
  • 系统架构师考试-操作系统-10道关于PV操作和死锁的模拟题
  • 实现一个函数,使用引用作为参数完成三个字符串按长度排序,最长的字符串放入第一个参数,最短的字符串放入第三个参数(不允许使用 string)
  • Linx--MySQL--安装笔记详细步骤!
  • 石英挠性加速度计:高精度测量的理想之选?
  • Windows安装python
  • 使用 uv管理 Python 虚拟环境:比conda更快、更轻量的现代方案
  • Baumer高防护相机如何通过YoloV8深度学习模型实现手势识别和指尖检测识别(C#代码UI界面版)
  • Java基础数据类型笔试面试中的“坑”
  • 第4章-04-用WebDriver页面元素操作
  • C 语言数据结构与算法的复杂度分析:从理论到实战的效率衡量指南
  • Qwen Code宣布每天免费调用2000次,且无Token限制