Spark专题-第一部分:Spark 核心概述(2)-Spark 应用核心组件剖析
这一篇依然是偏理论向的内容,用两篇理论搭建起Spark的框架,让读者有个基础的认知,下一篇就可以开始sql的内容了
第一部分:Spark 核心概述(2)
Spark 应用核心组件剖析
1. Job, Stage, Task 的三层架构
理解 Spark 的执行模型是掌握其性能调优的关键。Spark 采用了三层执行模型:
定义与关系:
-
Job(作业):
- 由 Action 操作触发的一个完整计算任务
- 一个应用可能包含多个 Job
- 例如:
count()
,collect()
,saveAsTextFile()
等操作会触发 Job
-
Stage(阶段):
- Job 被划分为多个 Stage,划分依据是 Shuffle 操作
- 每个 Stage 包含一系列可以在单个节点上并行执行的任务
- Stage 分为两种类型:
- Shuffle Map Stage:为后续 Stage 准备数据
- Result Stage:执行最终计算并输出结果
-
Task(任务):
- Stage 的基本执行单元,每个 Task 处理一个数据分区
- Task 的数量由数据分区数决定
- 在 Executor 上执行的实际计算代码
2. DAG(有向无环图):Spark 的调度大脑
DAG(Directed Acyclic Graph)是 Spark 的核心调度模型,它表示数据处理的逻辑流程。
DAG 的关键概念:
-
窄依赖(Narrow Dependency):
- 每个父分区最多被一个子分区使用
- 允许在单个 Stage 内进行流水线执行
- 例如:
map
,filter
等操作
-
宽依赖(Wide Dependency)/ Shuffle 依赖:
- 每个父分区可能被多个子分区使用
- 需要跨节点数据混洗(Shuffle)
- 导致 Stage 的划分
- 例如:
groupByKey
,reduceByKey
等操作
查看 DAG 的方法:
// 在代码中获取 RDD 的 lineage(血统)
rdd.toDebugString// 通过 Spark UI 查看可视化 DAG
// 访问 http://driver-node:4040
3. 分区(Partitioning):并行度的基础
分区是 Spark 实现并行计算的基础,合理设置分区数对性能至关重要。
分区的关键点:
-
分区数量:
- 决定了任务的并行度
- 默认值:
spark.default.parallelism
或父 RDD 的分区数 - 对于读取 HDFS 文件,通常等于 HDFS 块数
-
分区策略:
- Hash 分区:根据键的哈希值分配分区
- Range 分区:根据键的范围分配分区
- 自定义分区:实现 Partitioner 接口
-
分区操作:
repartition(numPartitions)
:增加或减少分区数,触发 Shufflecoalesce(numPartitions)
:只能减少分区数,避免 Shuffle
代码示例:分区操作
# 创建初始 DataFrame
df = spark.range(0, 1000, 1, 10) # 10个分区
print("初始分区数:", df.rdd.getNumPartitions())# 减少分区数(避免 Shuffle)
df_coalesced = df.coalesce(5)
print("Coalesce 后分区数:", df_coalesced.rdd.getNumPartitions())# 增加分区数(触发 Shuffle)
df_repartitioned = df.repartition(20)
print("Repartition 后分区数:", df_repartitioned.rdd.getNumPartitions())# 按列重新分区
df_repartitioned_by_col = df.repartition(10, "id")
print("按列 Repartition 后分区数:", df_repartitioned_by_col.rdd.getNumPartitions())
4. 执行流程全景图
5. 举例:观察 Job/Stage/Task
通过一个简单例子观察 Spark 的执行层次:
from pyspark.sql import SparkSession
from pyspark.sql.functions import colspark = SparkSession.builder.appName("JobStageTaskDemo").getOrCreate()# 创建示例数据
data = [("Alice", 28, "Engineer"), ("Bob", 35, "Manager"),("Charlie", 42, "Director"),("Diana", 29, "Engineer")]
df = spark.createDataFrame(data, ["name", "age", "job"])# 执行一系列操作
print("=== 开始执行操作 ===")# 第一个 Job:count 操作
print("员工总数:", df.count())# 第二个 Job:filter + groupBy + count
result = df.filter(col("age") > 30).groupBy("job").count()
result.show()# 第三个 Job:写入操作
result.write.mode("overwrite").csv("/tmp/job_demo_output")print("=== 操作完成 ===")
spark.stop()
6. 核心概念总结表
概念 | 描述 | 触发条件/决定因素 |
---|---|---|
Job | 由 Action 操作触发的完整计算任务 | count() , save() , collect() 等 |
Stage | Job 的子集,由 Shuffle 操作划分 | 宽依赖(Shuffle 操作) |
Task | Stage 的基本执行单元,处理一个分区 | 数据分区数量 |
Partition | 数据的基本划分单位,决定并行度 | 数据源特性、配置参数 |
写到最后突然意识到漏了shuffle这个专业词汇,就在这里单独补充吧,不去调整上面的结构,也不新开一篇文章了
深入理解 Shuffle:数据重分布的核心机制
1. Shuffle 的本质:为什么要"洗牌"?
想象一下扑克牌游戏中的洗牌过程:将牌打乱重新分配,确保每个玩家获得随机分布的牌。Spark 中的 Shuffle 也是类似的概念,但目的不同:为了将相同键的数据重新分组到同一台机器上,以便进行聚合、排序等操作。
Shuffle 的简单定义:
Shuffle 是 Spark 在不同 Executor 甚至不同节点之间重新分配数据的过程,使得所有相同键的数据能够聚集到同一个分区中。
2. 什么操作会触发 Shuffle?
以下操作通常会导致 Shuffle:
- 分组操作:
groupByKey()
,reduceByKey()
,combineByKey()
- 连接操作:
join()
,cogroup()
- 排序操作:
sortByKey()
,sortBy()
- 重分区:
repartition()
,coalesce()
(当增加分区时)
3. Shuffle 的物理实现:两阶段过程
Shuffle 过程分为两个主要阶段:Map 阶段和Reduce 阶段。
Map 阶段(Shuffle Write):
- 数据处理:每个 Map 任务处理输入分区的数据
- 分区划分:根据目标分区数(由Partitioner决定)对输出数据进行划分
- 排序和溢出:对每个分区的数据进行排序,可能溢出到磁盘
- 文件生成:将最终结果写入磁盘文件,并生成索引文件
Reduce 阶段(Shuffle Read):
- 获取元数据:了解需要从哪些Map任务获取数据
- 获取数据:通过网络从各个Map任务的输出中获取属于自己分区的数据
- 合并数据:将来自不同Map任务的相同键的数据合并在一起
- 排序数据:对合并后的数据进行排序(如果需要)
4. Shuffle 的物理存储与网络传输
关键物理细节:
-
磁盘 I/O:
- Map 任务将中间结果写入本地磁盘
- Reduce 任务从多个Map任务的磁盘读取数据
- 大量磁盘读写是Shuffle的主要开销来源
-
网络传输:
- 数据在不同节点的Executor之间传输
- 网络带宽可能成为瓶颈
- 机架感知(rack-aware)调度可以减少跨机架传输
-
内存使用:
- 使用内存缓冲区减少磁盘I/O
- 可能发生内存溢出,导致额外磁盘写入
5. Shuffle 的性能影响与优化
为什么 Shuffle 代价高昂:
- 磁盘 I/O:大量中间数据写入和读取
- 网络 I/O:数据在节点间传输
- 序列化/反序列化:数据在网络传输前需要序列化
- 内存压力:排序和合并操作消耗大量内存
优化策略:
# 1. 使用reduceByKey而不是groupByKey(减少Shuffle数据量)
# groupByKey: 将所有值发送到Reducer
rdd.groupByKey().mapValues(sum)# reduceByKey: 先在Map端进行局部聚合,减少Shuffle数据量
rdd.reduceByKey(lambda a, b: a + b)# 2. 使用合适的并行度
spark.conf.set("spark.sql.shuffle.partitions", "200") # 默认200,根据数据量调整# 3. 使用广播连接避免大表Shuffle
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")# 4. 使用合适的序列化格式
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
6. Shuffle 相关配置参数
配置项 | 默认值 | 说明 |
---|---|---|
spark.sql.shuffle.partitions | 200 | 设置Shuffle操作的分区数 |
spark.shuffle.compress | true | 是否压缩Shuffle输出 |
spark.shuffle.spill.compress | true | 是否压缩溢出的Shuffle数据 |
spark.shuffle.memoryFraction | 0.2 | Executor内存中用于Shuffle的比例 |
spark.reducer.maxSizeInFlight | 48m | Reduce任务一次获取数据的最大大小 |
7. 监控 Shuffle 性能
在 Spark UI 中监控 Shuffle:
- Stages 页面:查看每个Stage的Shuffle读写量
- Storage 页面:查看Shuffle数据的使用情况
- Executors 页面:查看Shuffle读写的时间和数据量
关键指标:
- Shuffle Read Size/Records:读取的数据量和记录数
- Shuffle Write Size/Records:写入的数据量和记录数
- Shuffle Spill (Memory):内存中溢出的数据量
- Shuffle Spill (Disk):磁盘上溢出的数据量
8. 总结:Shuffle 的双刃剑特性
Shuffle 的必要性:
- 实现跨节点的数据重新分配
- 支持基于键的聚合和连接操作
- 是分布式计算的基石
Shuffle 的代价:
- 高昂的磁盘和网络I/O开销
- 可能成为性能瓶颈
- 需要仔细调优以获得最佳性能