当前位置: 首页 > news >正文

大数据量下的数据修复与回写Spark on Hive 的大数据量主键冲突排查:COUNT(DISTINCT) 的陷阱

背景与问题概述

        这一周(2025-05-26-2026-05-30)我在搞数据拟合修复优化的任务,有大量的数据需要进行数据处理及回写,大概一个表一天一分区有五六千万数据,大约一百多列的字段。         具体是这样的我先取档案,关联对应表hive对应分区的数据,然后进行算法一系列逻辑处理后,将结果输出到hive,然后再从hive回写一份到oracle里面。         

        spark资源大概我给了不小,数据大概一天40左右吧,大概12个excutor,每一个12G内存,2core吧,拟合完数据,将数据入hive时候,进行了整体去重。 包括且不限于如下操作       

1、.distinct(),         

2、对应主键的去重.dropDuplicates(id),         

3、row_number对id,type主键字段开窗取first         

4、对id,type主键字段开窗,取后续字段的max()

        经过以上操作,我的数据得以在没有主键冲突的情况下顺利的入库到hive中,并且我对入库数据进行group by id,type having count(1) >1时数据也没有出现重复的情况。        

        OK。鬼知道我对上述数据验证进行多少次跑批总结出来的上面的操作。以上是我写入hive的操作。 下面即将是从hive入到oracle艰辛的探索之路。 正常来讲经过上面的数据操作,我从hive入到oracle是不应该出现主键冲突的情况了,因为我有一部分表已经处理入库了,但有一个表就是死活入不进去,我impala都快查烂了,资源监控的同事都给我致电了。         

        为什么调了一天呢,因为跑一个 程序就要个吧小时,代码都快被我调抑郁了。

Hive数据写入阶段的去重策略

经过多次实验和验证,我总结出一套有效的去重方法,确保数据在写入Hive时不出现主键冲突:

1. 整体去重 - distinct()

val distinctDF = originalDF.distinct()

这种方法简单直接,但性能开销较大,适合小数据集或初步去重。

2. 基于主键的去重 - dropDuplicates()

val dedupByKeyDF = originalDF.dropDuplicates("id")

比整体去重更高效,只针对指定列进行去重。

3. 开窗函数取第一条记录

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._val windowSpec = Window.partitionBy("id", "type").orderBy("timestamp")
val firstRecordDF = originalDF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")

这种方法在有多条相同主键记录时,可以按指定排序条件保留一条。

4. 开窗函数取最大值记录

val maxValueDF = originalDF.groupBy("id", "type").agg(max("value1").as("value1"), max("value2").as("value2"),/* 其他字段的max操作 */)

对于需要保留最大值的场景,这种聚合方式非常有效。

Hive到Oracle的数据的迁移问题结局

尽管Hive中的数据已经严格去重,但在迁移到Oracle时仍遇到了两个主要问题:

问题1:NULL值导致的主键冲突

-- 问题发现查询
SELECT id, type, COUNT(1) 
FROM hive_table 
WHERE id IS NULL 
GROUP BY id, type 
HAVING COUNT(1) > 1;

解决方案

// 在写入Oracle前增加NULL值处理
val cleanDF = processedDF.na.fill("NULL", Seq("id")).filter("id IS NOT NULL") // 或者直接过滤

问题2:资源不足导致的作业失败

最初配置:

  • 12个Executor

  • 每个Executor 12G内存,2个核心

  • 一个表一天的分区大概处理约40GB数据

作业在运行10-20分钟后失败,经过多次调整,最终稳定运行的配置:

  • 每个Executor 45G内存,这个我觉得得看集群资源,我们集群资源很紧张,大概10TB的内存,都不太够用

  • 适当增加核心数(根据集群情况)我一般都设置2

性能优化经验总结

1. 内存配置黄金法则

对于大规模数据处理,Executor内存配置应遵循:

  • 基础内存 = 数据分区大小 × 安全系数(2-3)

  • 考虑序列化开销和中间数据结构

2. 高效去重策略选择

方法适用场景优点缺点
distinct()小数据集或全字段去重简单性能差
dropDuplicates()已知主键字段高效仅针对指定列
开窗函数需要按条件保留记录灵活可控计算开销大
聚合函数需要保留极值高效只能处理数值字段

3. NULL值处理最佳实践

  • 在数据处理的早期阶段识别和处理NULL值

  • 对于主键字段,NULL值应被替换或过滤

  • 考虑使用COALESCE或NVL函数提供默认值

4. 资源监控与调优技巧

  • 观察GC时间和频率,内存不足时GC会频繁发生

  • 监控Executor心跳丢失情况

  • 适当增加spark.memory.fraction(默认0.6)

  • 考虑启用spark.memory.offHeap.enabled使用堆外内存

优化Demo示例代码

  /*** @date 2025-05-30* @author hebei_xidaocun_laoli*/
// 1. 读取原始数据
val rawDF = spark.table("source_table").where("dt = '20250530'") // 按分区过滤// 2. 多阶段去重处理
val stage1DF = rawDF.dropDuplicates("id") // 初步去重val windowSpec = Window.partitionBy("id", "type").orderBy(col("update_time").desc)
val stage2DF = stage1DF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")// 3. NULL值处理
val cleanDF = stage2DF.na.fill(Map("id" -> "NULL_ID","type" -> "DEFAULT"
)).filter("id != 'NULL_ID'") // 或者保留但确保不冲突// 4. 写入Hive
cleanDF.write.mode("overwrite").partitionBy("dt").saveAsTable("result_hive_table")// 5. 配置优化后写入Oracle
cleanDF.write.format("jdbc").option("url", "jdbc:oracle:thin:@//host:port/service").option("dbtable", "target_table").option("user", "username").option("password", "password").option("batchsize", 10000) // 调整批量大小.option("isolationLevel", "NONE") // 对于大数据量写入可提高性能.mode("append").save()

通过这次项目,总结了以下经验:

  1. 数据质量优先:在数据处理早期阶段解决NULL值、重复数据等问题

  2. 渐进式调优:从较小资源开始,逐步增加直至作业稳定运行

  3. 监控驱动:密切监控作业执行情况,特别是GC和内存使用指标

  4. 文档记录:记录每次调整的参数和效果,形成知识库

        大数据处理中的问题往往不是单一因素导致的,需要综合考虑数据特性、处理逻辑和集群资源。希望诸君避免类似的"坑",更高效地完成大数据处理任务。

        这个资源调优是真的恶心,代码没问题,就是和资源有问题,跑着跑着就突然报错了,唉,还好这个端午节前解决了

相关文章:

  • 基本数据指针的解读-C++
  • Visual Studio中的宏变量
  • Python打卡训练营day40——2025.05.30
  • 每日算法 -【Swift 算法】将整数转换为罗马数字
  • JS手写代码篇---手写节流函数
  • allWebPlugin中间件VLC专用版之截图功能介绍
  • 评论功能开发全解析:从数据库设计到多语言实现-优雅草卓伊凡
  • 历年西安交通大学计算机保研上机真题
  • 异步并发控制代码详细分析
  • WEB3——什么是ABI
  • 《TCP/IP 详解 卷1:协议》第2章:Internet 地址结构
  • <PLC><socket><西门子>基于西门子S7-1200PLC,实现手机与PLC通讯(通过websocket转接)
  • 云原生微服务架构演进之路:理念、挑战与实践
  • 小型图书管理系统案例(用于spring mvc 实践)
  • MicroPython+L298N+ESP32控制电机转速
  • Wi-Fi 切换 5G 的时机
  • 公链地址生成曲线和算法
  • 【NLP入门系列一】NLP概述和独热编码
  • c/c++的opencv霍夫变换
  • LLM 使用 MCP 协议及其原理详解
  • 大理如何做百度的网站/软件公司
  • 有做全棉坯布的网站吗/网盘搜索引擎入口
  • 有没有IT做兼职的网站/淘数据官网
  • 厦门小型网站建设/鞍山网络推广
  • wordpress stats view counter/seo分析与优化实训心得
  • 鹤壁做网站多少钱/百度推广的价格表