Spark的shuffle类型与对比
1. Broadcast 广播机制(避免 Shuffle 的优化)
- 原理:将小表全量复制到所有 Executor 内存,与大表在本地执行 Join(无需跨节点数据传输),用内存换 Shuffle 开销。
- 触发条件:
- 操作类型:仅适用于
Join
(内连接为主;左 / 右外连接中,广播表需为驱动表,否则失效); - 表大小:小表(被广播方)大小 ≤
spark.sql.autoBroadcastJoinThreshold
(默认 10MB); - 配置开关:
spark.sql.autoBroadcastJoinThreshold > 0
(默认开启自动广播); - 手动触发:
broadcast()
函数强制广播(无视大小阈值); - 排除场景:禁用广播(
spark.sql.autoBroadcastJoinThreshold = -1
)、笛卡尔积cross join
(默认不广播)。
- 操作类型:仅适用于
- 优点:无 Shuffle 开销,Join 效率极高;适合小表与大表关联。
- 缺点:小表过大会占用大量 Executor 内存,可能引发 OOM;仅支持 Join 场景。
- 配置项:
spark.sql.autoBroadcastJoinThreshold
(默认 10MB);spark.sql.broadcastTimeout
(默认 300 秒,广播超时时间)。
- 产生文件个数:无 Shuffle 文件(小表以广播变量形式在内存中传输,不落地)。
2. Hash Shuffle(原始版,已过时)
- 原理:
- Mapper 端:每个 Map 任务按
hash(key) % R
分区,为每个 Reducer 生成 1 个独立文件(无合并 / 排序,直接写入); - Reducer 端:拉取所有 Map 任务中对应分区的文件,聚合后处理。
- Mapper 端:每个 Map 任务按
- 触发条件:
- 操作类型:宽依赖操作(
group by
、distinct
、非广播Join
、repartition
等); - 配置与版本:仅支持 Spark 2.0 之前版本,且显式配置
spark.shuffle.manager=hash
; - 禁用文件合并:
spark.shuffle.consolidateFiles=false
(默认值)。
- 操作类型:宽依赖操作(
- 优点:实现简单,无排序开销,小数据量场景下效率尚可。
- 缺点:文件数量爆炸(
MT×R
),磁盘 IO 压力大、元数据管理开销高;大数据量下易 OOM 或超时。 - 配置项:
spark.shuffle.manager=hash
(仅旧版本支持);spark.shuffle.consolidateFiles=false
。
- 产生文件个数公式:总文件数 =
MT × R
(MT
为 Map 任务数,R
为 Reducer 数)。
3. Consolidated Hash Shuffle(Hash Shuffle 优化版,已过时)
- 原理:同一 Executor 内的所有 Map 任务共享 Reducer 分区文件 —— 每个 Executor 为每个 Reducer 生成 1 个文件,Executor 内所有 Map 任务的对应分区数据均写入此文件。
- 触发条件:
- 操作类型:同 Hash Shuffle(宽依赖操作);
- 配置与版本:
spark.shuffle.manager=hash
(旧版本) +spark.shuffle.consolidateFiles=true
(启用文件合并)。
- 优点:文件数量减少(
C×R
,C
为 Map端的Executor 数),缓解磁盘 IO 压力。 - 缺点:无排序导致聚合效率低;仅支持旧版本,已被 Sort Shuffle 替代。
- 配置项:
spark.shuffle.manager=hash
;spark.shuffle.consolidateFiles=true
。
- 产生文件个数公式:总文件数 =
C × R
。
4. Sort Merge Shuffle(当前默认实现)
- 原理:
- Mapper 端:先按分区键分区(分区数 = R),再对每个分区内数据排序,最终生成 1 个合并数据文件(含所有分区)和 1 个索引文件(记录各分区偏移量);
- Reducer 端:根据索引拉取对应分区数据,合并后二次排序(若需)并聚合。
- 触发条件:
- 操作类型:宽依赖操作(
group by
、distinct
、非广播Join
、orderBy
、repartition
等); - 配置与版本:Spark 2.0 + 默认启用(
spark.shuffle.manager=sort
); - 排除 Bypass 场景:当
R > spark.shuffle.sort.bypassMergeThreshold
(默认 200),或存在自定义排序器时,执行完整排序流程。
- 操作类型:宽依赖操作(
- 优点:文件数量极少(
2×M
),适合大数据量;排序后聚合 / Join 效率高;支持自定义分区和排序。 - 缺点:Mapper 端排序有额外开销;小数据量场景下略慢于 Bypass 机制。
- 配置项:
spark.shuffle.manager=sort
(默认);spark.shuffle.sort.bypassMergeThreshold
(默认 200,触发 Bypass 的 Reducer 数阈值);spark.sql.shuffle.partitions
(默认 200,控制 Reducer 数)。
- 产生文件个数公式:总文件数 =
2 × M
(每个 Mapper端 1 个数据文件 + 1 个索引文件)。
5. Bypass Merge Shuffle(Sort Merge Shuffle 的子机制)
- 原理:Sort Merge Shuffle 的 “无排序” 优化 —— 当 Reducer 数较少时,Mapper 端跳过排序步骤,直接为每个分区写临时文件,最后合并为 1 个数据文件和 1 个索引文件(与 Sort Merge Shuffle 文件结构一致,但无排序开销)。
- 触发条件:
- 基础条件:启用 Sort Shuffle(
spark.shuffle.manager=sort
); - Reducer 数阈值:
R ≤ spark.shuffle.sort.bypassMergeThreshold
(默认 200); - 无自定义排序/下游无聚合算子:未指定
mapSideCombine
或自定义排序器(否则必须排序)。
- 基础条件:启用 Sort Shuffle(
- 优点:跳过排序步骤,小 Reducer 数场景下比普通 Sort Merge Shuffle 快;文件数量仍为
2×M
,避免 Hash Shuffle 的文件爆炸。 - 缺点:数据未排序,若 Reducer 端需排序则需额外开销;仅适用于 Reducer 数少的场景。
- 配置项:
- 继承 Sort Merge Shuffle 的所有配置;
- 核心控制:
spark.shuffle.sort.bypassMergeThreshold
(默认 200)。
- 产生文件个数公式:总文件数 =
2 × M
(与 Sort Merge Shuffle 一致,因最终合并为 1 个数据文件 + 1 个索引文件)。
6. Sort Tungsten Shuffle(Sort Shuffle 的 Tungsten 优化版)
- 原理:基于 Sort Merge Shuffle,集成 Tungsten 内存管理引擎 —— 使用二进制存储、堆外内存(Off-Heap)减少 GC,直接基于序列化数据进行排序,达到优化排序、提升序列化效率。
- 触发条件:
- Tungsten 启用:Spark 1.3 + 默认集成,可通过
spark.memory.offHeap.enabled=true
开启堆外内存。 - Shuffle 过程中的输出分区个数少于 16777216 个
- Shuffle 的序列化器支持序列化值的重定位(Kryo)
- 下游算子无聚合需要
- Tungsten 启用:Spark 1.3 + 默认集成,可通过
- 优点:内存利用率更高,减少 GC 卡顿;排序和 IO 性能优于普通 Sort Shuffle;适合超大数据量。
- 缺点:对自定义数据类型兼容性略弱(需实现 Tungsten 序列化接口);排序开销仍存在。
- 配置项:
- 继承 Sort Merge Shuffle 的所有配置;
spark.memory.offHeap.enabled
(默认 false,是否启用堆外内存);spark.memory.offHeap.size
(默认 0,堆外内存大小)。
- 产生文件个数公式:总文件数 =
2 × M
。
核心差异总结
类型 | 核心特征 | 适用场景 | 文件数规模 | 关键区分点 |
---|---|---|---|---|
Broadcast 机制 | 无 Shuffle,小表广播 | 小表与大表 Join | 0 | 仅 Join,用内存换开销 |
Hash Shuffle | 无排序,文件爆炸 | 旧版本 + 极小数据量 | MT×R (极大) | 已淘汰,文件管理缺陷明显 |
Consolidated Hash | Executor 内文件合并 | 旧版本 + 中小数据量 | C×R (较大) | 部分优化但仍无排序 |
Sort Merge Shuffle | 排序 + 少文件 | 大数据量(默认) | 2×M (极小) | 全量排序,适合大 Reducer 数 |
Bypass Merge Shuffle | 无排序 + 少文件(Sort 的子机制) | 小 Reducer 数(≤200) | 2×M | 跳过排序,效率更高但适用范围窄 |
Sort Tungsten Shuffle | Tungsten 优化,堆外内存 | 超大数据量,内存敏感 | 2×M | 内存效率提升,依赖 Tungsten 引擎 |
实际应用中,Spark 2.0 + 默认优先使用 Sort Merge Shuffle(Reducer 数多)或其 Bypass 子机制(Reducer 数少),小表 Join 自动触发 Broadcast 机制,无需关注 Hash 类 Shuffle(已淘汰)。
(欢迎订阅、讨论、转载)
推荐内容:
大数据计算引擎-Catalyst 优化器:Spark SQL 的 “智能翻译官 + 效率管家”
大数据计算引擎-从源码看Spark AQE对于倾斜的处理
深入starrocks-怎样实现多列联合统计信息
深入starrocks-多列联合统计一致性探查与策略(YY一下)
大数据计算引擎-全阶段代码生成(Whole-stage Code Generation)与火山模型(Volcano)对比