Spark教程6:Spark 底层执行原理详解
文章目录
- 一、整体架构概述
- 二、核心组件详解
- 1. SparkContext
- 2. DAG Scheduler
- 3. Task Scheduler
- 4. Executor
- 三、作业执行流程
- 1. DAG 生成与 Stage 划分
- 2. Task 调度与执行
- 3. 内存管理
- 四、Shuffle 机制详解
- 1. Shuffle 过程
- 2. Shuffle 优化
- 五、内存管理机制
- 1. 统一内存管理(Unified Memory Management)
- 2. Tungsten 优化
- 六、容错机制
- 1. Lineage(血统)
- 2. Checkpoint
- 3. 任务重试
- 七、调度策略
- 1. 任务调度
- 2. 推测执行
- 八、性能优化关键点
- 1. 数据本地性
- 2. 并行度调整
- 3. 内存调优
- 九、高级特性
- 1. Catalyst 优化器
- 2. Tungsten 项目
- 十、监控与调试工具
- 1. Spark UI
- 2. 事件日志
- 3. Spark 性能调优工具
一、整体架构概述
Spark
采用主从架构(Master-Slave
),主要组件包括:
- Driver Program:运行用户应用的 main 函数,负责创建
SparkContext
、分析作业、调度任务。 - Cluster Manager:资源管理器,如
YARN、Mesos、Standalone
。 - Worker Node:集群中的工作节点,负责执行具体任务。
- Executor:
Worker
节点上的进程,负责运行任务并缓存数据。
执行流程:
- 用户提交应用,
Driver
启动并创建SparkContext
。 SparkContext
连接Cluster Manager
,请求资源。Cluster Manager
分配资源,在Worker
节点上启动Executor
。Driver
将任务分发给Executor
执行。Executor
向Driver
汇报任务状态和结果。
二、核心组件详解
1. SparkContext
- 是
Spark
应用的入口,负责与Cluster Manager
通信,协调资源分配。 - 管理
RDD
的依赖关系(血统图),并生成DAG
(有向无环图)。
2. DAG Scheduler
- 将作业(
Job
)分解为多个阶段(Stage
),每个阶段包含多个任务(Task
)。 - 根据
RDD
的依赖关系划分Stage
:- 宽依赖(如
shuffle
)会触发新的Stage
。 - 窄依赖(如
map、filter
)会被合并到同一个Stage
。
- 宽依赖(如
3. Task Scheduler
- 将
Task
分配给具体的Executor
执行。 - 负责任务调度、重试失败的任务,以及处理推测执行(
Speculative Execution
)。
4. Executor
- 负责执行
Task
,并将结果返回给Driver
。 - 维护内存缓存,存储
RDD
分区数据。
三、作业执行流程
1. DAG 生成与 Stage 划分
# 示例代码
rdd = sc.textFile("data.txt") # 读取文件,创建 RDD
words = rdd.flatMap(lambda line: line.split()) # 转换操作
pairs = words.map(lambda word: (word, 1)) # 转换操作
counts = pairs.reduceByKey(lambda a, b: a + b) # 触发 Shuffle
counts.collect() # 动作操作,触发作业执行
执行流程:
collect()
触发作业提交。DAG Scheduler
将作业划分为两个Stage
:Stage 1
:执行textFile、flatMap、map
操作。Stage 2
:执行reduceByKey
和collect
操作,依赖于Stage 1
的输出。
2. Task 调度与执行
- ShuffleMapTask:执行
Stage 1
的任务,输出中间结果(Shuffle
文件)。 - ResultTask:执行
Stage 2
的任务,读取Shuffle
文件并聚合结果。
3. 内存管理
- Storage Memory:存储缓存的
RDD
和DataFrame
。 - Execution Memory:执行
Shuffle
、聚合、排序等操作的内存。 - User Memory:用户代码使用的内存。
四、Shuffle 机制详解
1. Shuffle 过程
-
Map 端:
- 将数据分区并写入内存缓冲区。
- 缓冲区满时溢写到磁盘,生成多个小文件。
- 最终合并所有小文件为一个大文件,并生成索引。
-
Reduce 端:
- 从各个
Map
任务拉取属于自己的数据。 - 合并数据并按
key
排序。 - 执行聚合或其他操作。
- 从各个
2. Shuffle 优化
- Sort Shuffle:默认实现,减少文件数量。
- Tungsten-Sort Shuffle:基于内存管理框架
Tungsten
,提高效率。 - 自适应执行(Spark 3.0+):动态调整
Shuffle
分区数。
五、内存管理机制
1. 统一内存管理(Unified Memory Management)
Spark 1.6+
引入,Storage
和Execution
内存可相互借用:# 内存配置参数 spark.memory.fraction = 0.6 # 统一内存占堆内存的比例 spark.memory.storageFraction = 0.5 # Storage 内存占统一内存的比例
2. Tungsten 优化
- 堆外内存:减少
GC
压力,提高内存访问效率。 - 二进制格式:直接操作二进制数据,避免
Java
对象开销。
六、容错机制
1. Lineage(血统)
RDD
记录其创建过程(依赖关系),当部分分区丢失时,可通过重新计算恢复。
2. Checkpoint
- 将
RDD
写入可靠存储(如HDFS
),切断血统关系,用于长依赖链的RDD
。rdd.checkpoint() # 设置检查点
3. 任务重试
Task
失败时,Task Scheduler
会自动重试(默认 4 次)。
七、调度策略
1. 任务调度
- FIFO(默认):先进先出。
- FAIR:公平调度,支持多作业共享资源。
# 启用公平调度 spark.conf.set("spark.scheduler.mode", "FAIR")
2. 推测执行
- 当某个任务执行缓慢时,会在其他节点启动副本任务,取最先完成的结果。
# 启用推测执行 spark.conf.set("spark.speculation", "true")
八、性能优化关键点
1. 数据本地性
- PROCESS_LOCAL:数据在同一
JVM
内,最快。 - NODE_LOCAL:数据在同一节点,但需跨进程传输。
- RACK_LOCAL:数据在同一机架的不同节点。
- ANY:数据在任意位置。
2. 并行度调整
- 根据集群资源设置合理的并行度:
# 设置默认并行度 spark.conf.set("spark.default.parallelism", 200)
3. 内存调优
- 调整
Executor
内存和堆外内存:spark.executor.memory = 8g spark.memory.offHeap.enabled = true spark.memory.offHeap.size = 2g
九、高级特性
1. Catalyst 优化器
Spark SQL
的查询优化器,将SQL
查询转换为高效的物理执行计划:- 分析:解析
SQL
语句,检查表和列是否存在。 - 逻辑优化:应用规则优化逻辑计划(如谓词下推、投影修剪)。
- 物理计划生成:生成多个物理计划并选择最优。
- 代码生成:将执行计划编译为
Java
字节码。
- 分析:解析
2. Tungsten 项目
- 优化内存和
CPU
利用率:- 二进制数据处理,减少内存占用。
- 避免
Java
对象开销,直接操作内存。
十、监控与调试工具
1. Spark UI
- 查看作业、阶段、任务的执行情况,内存使用等指标。
2. 事件日志
- 记录作业执行的详细信息,可用于离线分析:
# 启用事件日志 spark.eventLog.enabled = true spark.eventLog.dir = "hdfs:///spark-logs"
3. Spark 性能调优工具
- Shuffle 调优:分析
Shuffle
性能瓶颈。 - SQL 执行计划分析:查看
SQL
查询的优化过程。