当前位置: 首页 > news >正文

Spark 优化全攻略:从 “卡成 PPT“ 到 “飞一般体验“

哈喽各位数据打工人~ 是不是总被 Spark 任务的 "慢" 折磨?咖啡续到天亮,任务还在 "龟速" 爬行?😭 别慌!今天把 Spark 优化的 "家底" 全抖出来,从资源到代码,从理论到实操,保证小白也能看懂~ 所有知识点一个不落,赶紧码住!

一、开局先给 "粮草":资源配置优化🍚(性能调优第一步)

Spark 任务跑不快,八成是 "饿" 的!就像打游戏没装备,再牛的操作也白搭~ 资源配置是优化的第一步,给够资源,性能直接翻倍!

1. 核心资源参数详解(提交脚本必看)

提交 Spark 任务时,这些参数决定了你的任务能调动多少 "兵力":

bin/spark-submit \--class com.bigdata.spark.Analysis \  # 主类路径--master yarn \  # 生产环境必用yarn!--deploy-mode cluster \  # 集群模式--num-executors 80 \  # Executor(小兵)数量--driver-memory 6g \  # Driver(指挥官)内存--executor-memory 6g \  # 每个小兵的内存--executor-cores 3 \  # 每个小兵的CPU核数/usr/opt/modules/spark/jar/spark.jar \  # 任务jar包

👉 这些参数控制着 "小兵数量"、"每个小兵的力气(CPU)" 和 "背包大小(内存)",直接影响并行能力!

2. 不同集群模式的资源分配技巧

Spark 有两种主要运行模式,资源分配套路不同:

  • Standalone 模式:直接看集群总资源!比如 15 台机器,每台 8G 内存 + 2 核 CPU,就配 15 个 Executor,每个 8G 内存 + 2 核(把资源用满)。
  • Yarn 模式:按资源队列分配!比如队列有 400G 内存 + 100 核,就配 50 个 Executor(400/8=50),每个 8G 内存 + 2 核(100/50=2)。

3. 资源调优的 "真香" 效果

  • 加 Executor 数量:4 个 Executor×2 核 → 8 个 task 并行;8 个 Executor×2 核 → 16 个 task 并行(直接翻倍!)。
  • 加 CPU 核数:4 个 Executor×2 核 → 8 个 task;4 个 Executor×4 核 → 16 个 task(并行能力翻倍!)。
  • 加内存:内存大了能缓存更多数据(少写磁盘)、给 shuffle 更多空间(少磁盘 IO)、减少 GC(避免频繁 "卡顿")。

4. 生产环境配置参考

--num-executors:50~100  # 小兵数量
--driver-memory:1G~5G  # 指挥官内存(够用就行)
--executor-memory:6G~10G  # 小兵背包大小
--executor-cores:3  # 小兵的手(核数)
--master:必须是yarn!  # 生产环境标配

二、RDD 优化:让数据 "少干活"💼(避免重复劳动)

RDD 是 Spark 的 "打工人",优化 RDD 就是让它们别做无用功~ 这部分细节超多,一个都不能漏!

1. RDD 复用:别让数据 "重复加班"

比如计算 "用户活跃率" 和 "用户留存率" 时,都需要 "用户登录记录"。如果每次都重新计算这份数据,相当于让员工重复做同一份报表 —— 纯纯浪费!😤
优化:把重复用的 RDD 存为一个变量,后续直接调用,一次计算多次复用~

2. RDD 持久化:给数据 "记笔记"📝

Spark 默认每次用 RDD 都会重新计算(从父 RDD 开始算),就像每次写作业都重新抄题,傻爆了!必须持久化!

  • 怎么做:用cache()persist()把 RDD 存到内存 / 磁盘,下次直接取~
  • 关键细节
    • 内存不够?用序列化!把数据压缩变小,塞到内存里(比如persist(StorageLevel.MEMORY_ONLY_SER))。
    • 怕数据丢了?开副本机制!每个数据存两个副本到不同节点,丢了一个还有备份(persist(StorageLevel.MEMORY_ONLY_2))。

3. 早过滤:提前 "扔垃圾"🗑️

拿到原始数据后,第一时间把没用的过滤掉!就像网购拆快递,先扔包装盒再摆东西,省空间又高效~
例子:分析用户订单时,先过滤掉测试订单、无效订单,再处理剩下的有效数据,内存占用直接减半!

三、并行度调节:让 CPU"不摸鱼"⚡(资源利用最大化)

并行度就是同时跑多少个 task(任务)。如果 CPU 核闲着,就是在浪费钱!💸

1. 并行度的核心逻辑

  • 并行度= 每个 Stage 的 task 数量。
  • 若并行度太低:比如 20 个 Executor×3 核 = 60 核,但只跑 40 个 task,20 个核摸鱼(资源浪费!)。
  • 理想状态:task 数量 = 总 CPU 核数 ×2~3 倍!这样核跑完一个 task 马上接下一个,不闲着~

2. 为什么是 2~3 倍?

task 执行速度有快有慢(比如有的数据多,有的少)。如果 task 数量 = 核数,快的核跑完就闲着;多设 2~3 倍,快的核能继续干活,整体效率飙升!

3. 默认并行度与 Stage 划分

  • 默认并行度:官方默认 200(别和 Flink 混了!)。
  • Stage 划分:遇到 shuffle 算子(如 reduceByKey)就会分 Stage。比如read→map→flatmap→reduceByKey→foreach,前三个算子一个 Stage,后面一个 Stage。

四、广播大变量:内存 "瘦身术"🧙‍♀️(减少重复存储)

普通变量在每个 task 里都存一份,内存直接爆炸!广播变量能让内存消耗 "断崖式下跌"~

1. 问题场景

比如一个 20M 的变量被 500 个 task 共用:

  • 普通变量:500 个副本→500×20M=10G 内存(血亏!)。
  • 广播变量:每个 Executor 存 1 份,20 个 Executor→20×20M=400M 内存(直接省 25 倍!)。

2. 广播变量工作原理

  • Driver 先存 1 份,task 运行时先从本地 Executor 的 BlockManager 取,没有就从 Driver 或其他节点拉取,之后所有 task 共用这一份。

五、Kryo 序列化:数据 "压缩包"📦(更快更小)

默认用 Java 序列化,但效率低!Kryo 序列化速度快 10 倍,数据体积更小~

1. 为什么用 Kryo?

  • Java 序列化:方便但慢,数据大(需要实现 Serializable 接口)。
  • Kryo 序列化:速度快 10 倍,体积小,但需要注册类型(Spark 2.0 后,简单类型 / 字符串默认用 Kryo 啦~)。

2. 配置方法

val conf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(Array(classOf[YourClass]))  // 注册需要序列化的类

六、本地化等待时长:数据 "少跑腿"🏃‍♂️(减少网络传输)

Spark 希望 task 在数据所在节点跑(少传数据),但节点资源可能被占满,这时候要等一等!

1. 本地化等级(性能从高到低)

  • PROCESS_LOCAL:task 和数据在同一个 Executor(最好!)。
  • NODE_LOCAL:同一节点不同 Executor(数据进程内传)。
  • RACK_LOCAL:同一机架不同节点(网络传)。
  • ANY:任意节点(最差,跨机架传)。

2. 调节技巧

  • 默认等 3 秒,若日志里很多 NODE_LOCAL/ANY,延长等待时间(比如 5 秒),让 task 等节点资源释放。
  • 别太长!否则等待时间比省的传输时间还多,反而变慢~
  • 配置:spark.locality.wait=5s(测试用 client 模式看日志调优)。

七、算子调优:选对工具干得快🔧(每个算子都有讲究)

算子用错,努力白费!这部分是优化核心,细节拉满~

1. mapPartitions:批量处理 "加速器"

  • 普通 map:逐条处理数据(1 万条数据调用 1 万次函数,比如建 1 万次数据库连接)。
  • mapPartitions:按分区处理(1 个分区调用 1 次函数,1 万条数据建 1 次连接)。
  • 注意:数据量大别用!一次加载整个分区数据可能 OOM(内存溢出)~

2. foreachPartition:数据库写入 "神器"

和 mapPartitions 同理,写数据库时用它:一个分区建一次连接,批量写入,比 foreach(逐条建连接)快 10 倍!

// 优化写法
rdd.foreachPartition(iter => {val conn = getDBConnection()  // 一个分区建一次连接iter.foreach(data => conn.insert(data))  // 批量写conn.close()
})

3. filter + coalesce:解决数据 "肥瘦不均"

  • 问题:filter 后分区数据量差异大(比如一个分区 100 条,一个 800 条),导致 task 忙闲不均(数据倾斜)。
  • 解决:用 coalesce 重分区,让每个分区数据量均匀。
    • coalesce vs repartition
      • 减少分区(A>B):用 coalesce(默认不 shuffle,差距大时设shuffle=true)。
      • 增加分区(A<B):用 repartition(底层是 coalesce+shuffle)。

4. repartition:破解 SparkSQL"并行度锁死"

  • SparkSQL 的并行度由 HDFS 文件 split 数决定,用户改不了!如果数据量大 + 逻辑复杂,task 少就会很慢。
  • 解法:SQL 查出来的 RDD 立即用 repartition 重分区,后续 Stage 并行度就由你控制了~

5. reduceByKey:shuffle"预聚合王者"

  • groupByKey:map 端不聚合,全量 shuffle 到 reduce 端(网络传输量大)。
  • reduceByKey:map 端先本地聚合(比如先算每个单词在本节点的次数),再 shuffle(传输量骤减)。
  • 结论:能用 reduceByKey 就别用 groupByKey!

八、Shuffle 调优:数据 "搬家" 不堵车🚚(最容易踩坑的环节)

Shuffle 是数据在节点间传输的过程,最容易卡壳,这 5 个参数必调!

1. map 端缓冲区:小推车 "扩容"

默认 32KB,数据量大时会频繁溢写到磁盘(比如 640KB 数据要溢写 20 次)。调大到 64KB 减少 IO:

spark.conf.set("spark.shuffle.file.buffer", "64k")

2. reduce 端拉取缓冲区:货车 "扩容"

默认 48MB,调大到 96MB,一次拉更多数据,减少网络传输次数:

spark.conf.set("spark.reducer.maxSizeInFlight", "96m")

3. 拉取重试次数:网络差就 "多试几次"

默认重试 3 次,大数据量 shuffle 易失败。调大到 60 次提高稳定性:

spark.conf.set("spark.shuffle.io.maxRetries", "60")

4. 拉取等待间隔:失败了 "歇会儿再试"

默认等 5 秒,调大到 60 秒,给网络 / GC 留缓冲时间:

spark.conf.set("spark.shuffle.io.retryWait", "60s")

5. SortShuffle 排序阈值:不需要排序就 "跳过"

SortShuffleManager 默认当 reduce task<200 时不排序。若确定不需要排序,调大阈值(比如 500),减少排序开销:

spark.conf.set("spark.shuffle.sort.bypassMergeThreshold", "500")

九、JVM 调优:内存 "合理分家"🏠(避免频繁 "清理房间")

JVM 内存分块不合理,会频繁 GC(垃圾回收),导致任务卡顿~

1. 静态内存管理:给执行区 "腾空间"

  • 静态模式下:Storage(缓存 RDD / 广播数据)占 60%,Execution(shuffle 中间数据)占 40%,两者独立。
  • 若 Execution 内存不够(频繁 GC),调小 Storage 占比:
spark.conf.set("spark.storage.memoryFraction", "0.4")  // 降到40%

2. 统一内存管理:内存 "动态分配"

  • 统一模式下:Storage 和 Execution 各占 50%,支持动态占用(shuffle 内存不够时自动用 Storage 的),无需手动调!

3. 堆外内存:额外 "储物间" 扩容

若常报 "out of memory"、"executor lost",可能堆外内存不够(默认≈300MB)。调大到 1G 以上:

--conf spark.executor.memoryOverhead=2g  # 堆外内存设2G

总结:Spark 优化 "全地图"🗺️

  1. 资源配置:按模式分资源,Executor、内存、核给够。
  2. RDD 优化:复用、持久化(序列化 + 副本)、早过滤。
  3. 并行度:task 数量 = 核数 ×2~3 倍,避免资源浪费。
  4. 广播变量:减少变量副本,内存直降 25 倍 +。
  5. Kryo 序列化:比 Java 快 10 倍,数据更小。
  6. 本地化等待:根据日志调时长,减少网络传输。
  7. 算子调优:mapPartitions/foreachPartition 批量处理,coalesce 解决倾斜,reduceByKey 替代 groupByKey。
  8. Shuffle 调优:5 个参数(缓冲区、重试、阈值)全调对。
  9. JVM 调优:静态内存分比例,堆外内存不够就扩容。

按这个清单一步步优化,你的 Spark 任务绝对能从 "卡成 PPT" 变成 "飞一般体验"!🚀 赶紧收藏实践吧~ 有问题评论区问我哦~

http://www.dtcms.com/a/325688.html

相关文章:

  • Hadoop和Spark的区别
  • vscode新建esp32工程,没有sample_project怎么办?
  • Mysql——Sql的执行过程
  • Windows Git Bash 常用配置
  • 设计模式笔记_结构型_门面模式
  • 2020/12 JLPT听力原文 问题一 3番
  • VTK 标签中文
  • MFC C++ 使用ODBC方式调用Oracle数据库的详细步骤
  • Go 多进程编程-socket(套接字)
  • 今日项目之线程同步操作项目
  • 生成模型实战 | MuseGAN详解与实现
  • encoder-only / decoder-only / encoder-decoder架构分析
  • 云原生应用的DevOps2(Jenkins渗透场景)
  • Spring Boot 单元测试:@SpyBean 使用教程
  • Linux生成自签名 SSL 证书(适用于测试或内部使用)
  • CI/CD渗透测试靶场
  • cesium/resium 修改子模型材质
  • [Oracle] UNPIVOT 列转行
  • MySQL 数据操作全流程:创建、读取、更新与删除实战
  • openEuler、 CentOS、Ubuntu等 Linux 系统中,Docker 常用命令总结
  • FPGA+护理:跨学科发展的探索(一)
  • SAE J2716多协议网关的硬件架构与实时协议转换机制解析
  • 三种常见的菜单路由封装方式详解
  • rust编译过程的中间表现形式如何查看,ast,hir,mir
  • Rust学习笔记(一)|Rust初体验 猜数游戏
  • Excel 实战:基因表达矩阵前处理中测序符号的快速剥离方法
  • K210人脸识别系统
  • 在Linux中部署tomcat
  • 【Redis的安装与配置】
  • 如何理解Tomcat、Servlet、Catanalina的关系