大数据量下的数据修复与回写Spark on Hive 的大数据量主键冲突排查:COUNT(DISTINCT) 的陷阱
背景与问题概述
这一周(2025-05-26-2026-05-30)我在搞数据拟合修复优化的任务,有大量的数据需要进行数据处理及回写,大概一个表一天一分区有五六千万数据,大约一百多列的字段。 具体是这样的我先取档案,关联对应表hive对应分区的数据,然后进行算法一系列逻辑处理后,将结果输出到hive,然后再从hive回写一份到oracle里面。
spark资源大概我给了不小,数据大概一天40左右吧,大概12个excutor,每一个12G内存,2core吧,拟合完数据,将数据入hive时候,进行了整体去重。 包括且不限于如下操作
1、.distinct(),
2、对应主键的去重.dropDuplicates(id),
3、row_number对id,type主键字段开窗取first
4、对id,type主键字段开窗,取后续字段的max()
经过以上操作,我的数据得以在没有主键冲突的情况下顺利的入库到hive中,并且我对入库数据进行group by id,type having count(1) >1时数据也没有出现重复的情况。
OK。鬼知道我对上述数据验证进行多少次跑批总结出来的上面的操作。以上是我写入hive的操作。 下面即将是从hive入到oracle艰辛的探索之路。 正常来讲经过上面的数据操作,我从hive入到oracle是不应该出现主键冲突的情况了,因为我有一部分表已经处理入库了,但有一个表就是死活入不进去,我impala都快查烂了,资源监控的同事都给我致电了。
为什么调了一天呢,因为跑一个 程序就要个吧小时,代码都快被我调抑郁了。
Hive数据写入阶段的去重策略
经过多次实验和验证,我总结出一套有效的去重方法,确保数据在写入Hive时不出现主键冲突:
1. 整体去重 - distinct()
val distinctDF = originalDF.distinct()
这种方法简单直接,但性能开销较大,适合小数据集或初步去重。
2. 基于主键的去重 - dropDuplicates()
val dedupByKeyDF = originalDF.dropDuplicates("id")
比整体去重更高效,只针对指定列进行去重。
3. 开窗函数取第一条记录
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._val windowSpec = Window.partitionBy("id", "type").orderBy("timestamp") val firstRecordDF = originalDF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")
这种方法在有多条相同主键记录时,可以按指定排序条件保留一条。
4. 开窗函数取最大值记录
val maxValueDF = originalDF.groupBy("id", "type").agg(max("value1").as("value1"), max("value2").as("value2"),/* 其他字段的max操作 */)
对于需要保留最大值的场景,这种聚合方式非常有效。
Hive到Oracle的数据的迁移问题结局
尽管Hive中的数据已经严格去重,但在迁移到Oracle时仍遇到了两个主要问题:
问题1:NULL值导致的主键冲突
-- 问题发现查询
SELECT id, type, COUNT(1)
FROM hive_table
WHERE id IS NULL
GROUP BY id, type
HAVING COUNT(1) > 1;
解决方案:
// 在写入Oracle前增加NULL值处理
val cleanDF = processedDF.na.fill("NULL", Seq("id")).filter("id IS NOT NULL") // 或者直接过滤
问题2:资源不足导致的作业失败
最初配置:
-
12个Executor
-
每个Executor 12G内存,2个核心
-
一个表一天的分区大概处理约40GB数据
作业在运行10-20分钟后失败,经过多次调整,最终稳定运行的配置:
-
每个Executor 45G内存,这个我觉得得看集群资源,我们集群资源很紧张,大概10TB的内存,都不太够用
-
适当增加核心数(根据集群情况)我一般都设置2
性能优化经验总结
1. 内存配置黄金法则
对于大规模数据处理,Executor内存配置应遵循:
-
基础内存 = 数据分区大小 × 安全系数(2-3)
-
考虑序列化开销和中间数据结构
2. 高效去重策略选择
方法 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
distinct() | 小数据集或全字段去重 | 简单 | 性能差 |
dropDuplicates() | 已知主键字段 | 高效 | 仅针对指定列 |
开窗函数 | 需要按条件保留记录 | 灵活可控 | 计算开销大 |
聚合函数 | 需要保留极值 | 高效 | 只能处理数值字段 |
3. NULL值处理最佳实践
-
在数据处理的早期阶段识别和处理NULL值
-
对于主键字段,NULL值应被替换或过滤
-
考虑使用COALESCE或NVL函数提供默认值
4. 资源监控与调优技巧
-
观察GC时间和频率,内存不足时GC会频繁发生
-
监控Executor心跳丢失情况
-
适当增加
spark.memory.fraction
(默认0.6) -
考虑启用
spark.memory.offHeap.enabled
使用堆外内存
优化Demo示例代码
/*** @date 2025-05-30* @author hebei_xidaocun_laoli*/
// 1. 读取原始数据
val rawDF = spark.table("source_table").where("dt = '20250530'") // 按分区过滤// 2. 多阶段去重处理
val stage1DF = rawDF.dropDuplicates("id") // 初步去重val windowSpec = Window.partitionBy("id", "type").orderBy(col("update_time").desc)
val stage2DF = stage1DF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")// 3. NULL值处理
val cleanDF = stage2DF.na.fill(Map("id" -> "NULL_ID","type" -> "DEFAULT"
)).filter("id != 'NULL_ID'") // 或者保留但确保不冲突// 4. 写入Hive
cleanDF.write.mode("overwrite").partitionBy("dt").saveAsTable("result_hive_table")// 5. 配置优化后写入Oracle
cleanDF.write.format("jdbc").option("url", "jdbc:oracle:thin:@//host:port/service").option("dbtable", "target_table").option("user", "username").option("password", "password").option("batchsize", 10000) // 调整批量大小.option("isolationLevel", "NONE") // 对于大数据量写入可提高性能.mode("append").save()
通过这次项目,总结了以下经验:
-
数据质量优先:在数据处理早期阶段解决NULL值、重复数据等问题
-
渐进式调优:从较小资源开始,逐步增加直至作业稳定运行
-
监控驱动:密切监控作业执行情况,特别是GC和内存使用指标
-
文档记录:记录每次调整的参数和效果,形成知识库
大数据处理中的问题往往不是单一因素导致的,需要综合考虑数据特性、处理逻辑和集群资源。希望诸君避免类似的"坑",更高效地完成大数据处理任务。
这个资源调优是真的恶心,代码没问题,就是和资源有问题,跑着跑着就突然报错了,唉,还好这个端午节前解决了