SQL -- 推荐:明确分区 INSERT OVERWRITE TABLE dwd.order_detail_di PARTITION (dt = '${biz_date}') SELECT ... FROM ods.order_info_di WHERE dt = '${biz_date}'; -- 禁止:无分区过滤(全表扫描) INSERT OVERWRITE TABLE dwd.order_detail_di SELECT ... FROM ods.order_info_di;
分区字段统一:时间分区用dt(天),格式yyyy-MM-dd;小时级分区加hr(如dt='2025-09-06' AND hr='08');
禁止跨分区覆盖:一次任务只处理单个分区(如dt='${biz_date}'),避免误删历史数据。
语法规范
** 禁止 SELECT ***:只查询需要的字段,减少数据传输;
SQL -- 推荐 SELECT order_id, user_id, create_time FROM ods.order_info_di; -- 禁止 SELECT * FROM ods.order_info_di;
使用显式字段关联:JOIN时明确关联条件,禁止CROSS JOIN(笛卡尔积);
SQL -- 推荐 SELECT a.order_id, b.user_name FROM dwd.order_detail_di a LEFTJOIN dim.user_info b ON a.user_id = b.user_id -- 显式关联字段 WHERE a.dt = '${biz_date}'; -- 禁止:无关联条件(笛卡尔积) SELECT a.order_id, b.user_name FROM dwd.order_detail_di a, dim.user_info b;
合理使用 CTE(公用表表达式):复杂逻辑拆分为 CTE,提高可读性;
SQL WITH valid_orders AS ( -- 步骤1:过滤有效订单 SELECT order_id, user_id, order_amount FROM ods.order_info_di WHERE dt = '${biz_date}' AND order_id IS NOT NULL -- 主键非空 AND order_amount > 0 -- 金额合理), order_with_status AS ( -- 步骤2:关联状态维表 SELECT a.*, b.status_name FROM valid_orders a LEFTJOIN dim.order_status b ON a.status_code = b.code ) -- 最终插入目标表 INSERT OVERWRITE TABLE dwd.order_detail_di PARTITION (dt = '${biz_date}')SELECT order_id, user_id, order_amount, status_name FROM order_with_status;
SQL -- 对热点user_id加盐(假设user_id=0是热点) WITH salted_orders AS (SELECT CASEWHEN user_id = '0'THEN CONCAT(user_id, '_', rand()%10) ELSE user_id ENDAS salted_user_id, order_amount FROM dwd.order_detail_di WHERE dt = '${biz_date}'), partial_agg AS ( -- 第一次聚合(分散计算) SELECT salted_user_id, SUM(order_amount) AS total FROM salted_orders GROUPBY salted_user_id ) -- 第二次聚合(合并结果) SELECT CASEWHEN salted_user_id LIKE '0_%'THEN'0'ELSE salted_user_id ENDAS user_id,SUM(total) AS total_amount FROM partial_agg GROUPBYCASEWHEN salted_user_id LIKE '0_%'THEN'0'ELSE salted_user_id END;
SQL -- 广播用户维表(小表) SELECT /*+ BROADCAST(b) */ a.order_id, b.user_name FROM dwd.order_detail_di a LEFTJOIN dim.user_info b ON a.user_id = b.user_id WHERE a.dt = '${biz_date}';
小文件处理
写入时控制文件数量:通过distribute by均匀分配数据,避免过多小文件;
sql
SQL -- Hive:按user_id哈希分区,控制文件数=100 INSERT OVERWRITE TABLE dws.order_agg_dd PARTITION (dt = '${biz_date}')SELECT user_id, SUM(order_amount) AS total FROM dwd.order_detail_di WHERE dt = '${biz_date}'GROUPBY user_id DISTRIBUTE BY user_id; -- 按用户ID哈希,数据均匀分布
SQL -- Hive: 启用 Map Join 提示 SELECT /*+ MAPJOIN(small_table) */ a.user_id, b.user_name FROM large_table a JOIN small_table b ON a.user_id = b.user_id;
-- Spark SQL: 使用广播变量 SET spark.sql.autoBroadcastJoinThreshold=10485760; -- 10MB SELECT a.user_id, b.user_name FROM large_table a JOIN small_table b ON a.user_id = b.user_id;
避免大表 JOIN 大表:如必须执行,考虑分治策略(如先聚合再 JOIN)。
数据倾斜处理
热点键处理:对倾斜键(如 user_id=NULL)添加随机前缀:
SQL -- 示例:处理订单金额统计中的倾斜 SELECT CASE WHEN user_id IS NULL THEN CONCAT('null_', FLOOR(RAND() * 10)) -- 随机分桶 ELSE user_id END AS user_id_key, SUM(order_amount) AS total_amount FROM dw_order_detail GROUP BY CASE WHEN user_id IS NULL THEN CONCAT('null_', FLOOR(RAND() * 10)) ELSE user_id END;
使用 DISTRIBUTE BY:在 Spark SQL 中手动指定分区键:
SQL SET spark.sql.shuffle.partitions=200; -- 调整分区数 SELECT user_id, SUM(order_amount) FROM dw_order_detail DISTRIBUTE BY user_id -- 手动控制数据分布 GROUP BY user_id;
聚合优化
先过滤再聚合:减少聚合数据量:
SQL -- 正确:先过滤再聚合 SELECT user_id, COUNT(*) AS order_count FROM dw_order_detail WHERE dt = '20240101' AND status = 'completed' GROUP BY user_id;
-- 错误:先聚合再过滤 SELECT user_id, COUNT(*) AS order_count FROM dw_order_detail GROUP BY user_id HAVING dt = '20240101' AND status = 'completed'; -- 无效,HAVING 不能过滤原始字段
避免使用 DISTINCT
优先使用 GROUP BY 替代 DISTINCT,减少计算开销:
SQL -- 正确:使用 GROUP BY SELECT user_id FROM dw_order_detail GROUP BY user_id;
-- 错误:使用 DISTINCT SELECT DISTINCT user_id FROM dw_order_detail;
五、数据质量规范
数据校验
每个任务必须包含数据校验逻辑,校验不通过则任务失败并告警:(调度系统提供数据校验模板)
数据清洗规则
去重:基于唯一键(如order_id)去重,保留最新记录;
SQL -- 保留最新订单记录(按create_time排序) INSERT OVERWRITE TABLE dwd.order_detail_di PARTITION (dt = '${biz_date}')SELECT order_id, user_id, order_amount, create_time FROM (SELECT *, row_number() OVER (PARTITIONBY order_id ORDERBY create_time DESC) AS rn FROM ods.order_info_di WHERE dt = '${biz_date}') t WHERE rn = 1;
脱敏:敏感字段(手机号、身份证号)按规则脱敏;
SQL -- 手机号脱敏:138****5678 SELECT user_id, regexp_replace(phone, '(\\d{3})\\d{4}(\\d{4})', '$1****$2') AS phone FROM ods.user_info_di;