Spark的Shuffle过程
Spark 的 Shuffle 是分布式计算中数据重分区的核心过程,目的是将不同 Executor 上的数据按照一定规则(通常是 Key 的哈希值)重新分配,确保相同 Key 的数据汇聚到同一节点进行聚合、连接等操作。其过程涉及数据分区、写入磁盘、网络传输、读取合并等多个步骤,逻辑复杂但可拆解为清晰的阶段。
一、Shuffle 的核心目标
假设有一个简单的计算任务:对 (Key, Value)
类型的数据集执行 reduceByKey
(按 Key 聚合)。
例如,输入数据如下(分布在两个 Executor 上):
- Executor 1:
[(A, 1), (B, 2), (A, 3)]
- Executor 2:
[(B, 4), (C, 5), (A, 6)]
目标是将所有相同 Key 的数据汇总到一起,最终计算结果:
A: 1+3+6=10,B: 2+4=6,C:5
为实现这个目标,Shuffle 需要完成:
- 让所有
A
的数据到同一个节点,所有B
的数据到同一个节点,以此类推。 - 确保数据在节点间高效传输和合并。
二、Shuffle 的完整过程(以 SortShuffleManager 为例)
Spark 2.0+ 默认使用 SortShuffleManager
,其 Shuffle 过程分为 Map 阶段 和 Reduce 阶段,具体步骤如下:
阶段 1:Map 阶段(数据分区与写入)
Map 阶段由所有 Map Task 执行(每个 Map Task 处理一个 RDD 分区),核心是将数据按 Key 分区并写入本地磁盘。
步骤 1.1:数据分区(确定每个 Key 归属的 Reduce 分区)
每个 Map Task 处理的数据中,每条记录 (Key, Value)
会通过 分区函数 确定归属的 Reduce 分区(即目标分区 ID)。
-
默认分区函数:
Key.hashCode % numPartitions
(numPartitions
是 Shuffle 并行度,由spark.sql.shuffle.partitions
或RDD.partitions.size
决定)。例子:假设
numPartitions=3
(即 3 个 Reduce 分区),计算各 Key 的分区:A.hashCode % 3 = 0
→ 归属分区 0B.hashCode % 3 = 1
→ 归属分区 1C.hashCode % 3 = 2
→ 归属分区 2
则 Executor 1 和 2 中的数据会被标记分区:
- Executor 1 数据分区后:
(A,1)→0,(B,2)→1,(A,3)→0
- Executor 2 数据分区后:
(B,4)→1,(C,5)→2,(A,6)→0
步骤 1.2:数据排序与合并(可选,SortShuffle 特性)
SortShuffle 会对每个 Map Task 的输出按 (分区 ID, Key) 排序:
-
先按分区 ID 排序(确保同一分区的数据连续),再按 Key 排序(方便后续合并)。
例子:
- Executor 1 排序后:
[(0, A,1), (0, A,3), (1, B,2)]
- Executor 2 排序后:
[(0, A,6), (1, B,4), (2, C,5)]
- Executor 1 排序后:
步骤 1.3:写入本地磁盘(生成 Map 输出文件)
每个 Map Task 将排序后的结果写入 一个输出文件,并生成 索引文件 记录每个 Reduce 分区在输出文件中的起始位置和长度。
-
输出文件(.data):存储所有分区的键值对数据(按分区连续存储)。
-
索引文件(.index):记录每个分区在 .data 文件中的偏移量(如分区 0 从 0 字节开始,长度 100 字节;分区 1 从 100 字节开始,长度 80 字节等)。
例子:
- Executor 1 生成的 .data 文件内容:
(A,1)(A,3)(B,2)
(分区 0 和 1 的数据连续存储),.index 文件记录分区 0 和 1 的偏移量。 - Executor 2 生成的 .data 文件内容:
(A,6)(B,4)(C,5)
,.index 文件记录分区 0、1、2 的偏移量。
- Executor 1 生成的 .data 文件内容:
阶段 2:Reduce 阶段(数据拉取与合并)
Reduce 阶段由所有 Reduce Task 执行(每个 Reduce Task 处理一个目标分区),核心是从所有 Map Task 拉取属于自己的分区数据,合并后进行计算。
步骤 2.1:获取 Map 输出位置(MapOutputTracker)
Driver 中的 MapOutputTracker
会记录所有 Map Task 的输出文件路径(存储在 Driver 内存或 ZooKeeper 中,视集群模式而定)。
-
Reduce Task 启动时,会向
MapOutputTracker
请求自己负责的分区(如分区 0)对应的所有 Map 输出文件路径。例子:负责分区 0 的 Reduce Task 会得知:需要从 Executor 1 的 .data 文件中读取分区 0 的数据,以及从 Executor 2 的 .data 文件中读取分区 0 的数据。
步骤 2.2:拉取数据(Shuffle Fetch)
Reduce Task 通过网络(通常是 HTTP)从各个 Map Task 的节点拉取属于自己的分区数据,过程称为 Fetch:
-
每个 Reduce Task 会启动多个线程并行拉取(由
spark.shuffle.fetchers
控制,默认 64 个)。 -
拉取的数据先存入内存缓冲区(
spark.shuffle.reduceside.buffer.percent
控制内存占比),缓冲区满后写入磁盘临时文件。例子:负责分区 0 的 Reduce Task 拉取到的数据:
- 从 Executor 1 拉取:
(A,1), (A,3)
- 从 Executor 2 拉取:
(A,6)
- 从 Executor 1 拉取:
步骤 2.3:合并数据(Merge)
拉取完成后,Reduce Task 需要合并所有拉取到的数据(内存中的和磁盘上的):
- 若数据已排序(SortShuffle 输出),合并过程类似“归并排序”,按 Key 拼接成连续的有序数据流。
- 合并后的数据格式:
(A,1), (A,3), (A,6)
(同一 Key 的所有 Value 汇聚)。
步骤 2.4:执行计算(如 reduceByKey)
合并后,Reduce Task 对同一 Key 的 Value 执行聚合逻辑(如 reduceByKey
的求和操作):
- 例子中,
A
的 Value 为1,3,6
,聚合后结果为(A, 10)
。
三、Shuffle 过程的关键细节
-
数据存储形式:
- Map 阶段输出:每个 Map Task 生成 1 个 .data 文件(所有分区数据)+ 1 个 .index 文件(分区偏移量),存储在本地磁盘(
spark.local.dir
配置的目录)。 - Reduce 阶段临时数据:拉取的数据先存内存,满后存磁盘临时文件,合并后删除。
- Map 阶段输出:每个 Map Task 生成 1 个 .data 文件(所有分区数据)+ 1 个 .index 文件(分区偏移量),存储在本地磁盘(
-
网络传输优化:
- 拉取数据时会优先拉取本地数据(同一节点的 Map 输出),减少跨节点传输。
- 数据传输前会压缩(
spark.shuffle.compress
默认开启,用 Snappy 压缩),减少网络 IO。
-
Shuffle 并行度:
- 由
numPartitions
决定,即 Reduce Task 数量。例如spark.sql.shuffle.partitions=200
表示 Shuffle 后生成 200 个分区。 - 并行度过低:单个 Reduce Task 处理数据量大,容易倾斜;过高:小文件多,磁盘 IO 开销大。
- 由
四、总结 Shuffle 流程(图示简化)
输入数据(分布在 Executor 1、2)↓
Map 阶段:1. 分区:按 Key 哈希分配到 Reduce 分区(0、1、2)2. 排序:按 (分区 ID, Key) 排序3. 写入:生成 .data(数据)和 .index(索引)文件↓
Reduce 阶段:1. 定位:通过 MapOutputTracker 获取目标分区的 Map 输出路径2. 拉取:从所有 Map 节点 Fetch 属于自己的分区数据3. 合并:归并排序所有拉取的数据(按 Key 汇聚)4. 计算:对同一 Key 执行聚合操作,输出结果
通过这个过程,原本分散在不同节点的相同 Key 数据被汇聚,为后续的聚合、连接等操作提供了基础。Shuffle 是 Spark 性能的核心瓶颈之一,优化 Shuffle 通常需要调整并行度、解决数据倾斜、合理配置内存和压缩参数等。