当前位置: 首页 > news >正文

Spark的shuffle类型与对比

1. Broadcast 广播机制(避免 Shuffle 的优化)

  • 原理:将小表全量复制到所有 Executor 内存,与大表在本地执行 Join(无需跨节点数据传输),用内存换 Shuffle 开销。
  • 触发条件
    1. 操作类型:仅适用于Join(内连接为主;左 / 右外连接中,广播表需为驱动表,否则失效);
    2. 表大小:小表(被广播方)大小 ≤ spark.sql.autoBroadcastJoinThreshold(默认 10MB);
    3. 配置开关:spark.sql.autoBroadcastJoinThreshold > 0(默认开启自动广播);
    4. 手动触发:broadcast()函数强制广播(无视大小阈值);
    5. 排除场景:禁用广播(spark.sql.autoBroadcastJoinThreshold = -1)、笛卡尔积cross join(默认不广播)。
  • 优点无 Shuffle 开销,Join 效率极高;适合小表与大表关联。
  • 缺点:小表过大会占用大量 Executor 内存,可能引发 OOM;仅支持 Join 场景。
  • 配置项
    • spark.sql.autoBroadcastJoinThreshold(默认 10MB);
    • spark.sql.broadcastTimeout(默认 300 秒,广播超时时间)。
  • 产生文件个数:无 Shuffle 文件(小表以广播变量形式在内存中传输,不落地)。

2. Hash Shuffle(原始版,已过时)

  • 原理
    • Mapper 端:每个 Map 任务按hash(key) % R分区,为每个 Reducer 生成 1 个独立文件(无合并 / 排序,直接写入);
    • Reducer 端:拉取所有 Map 任务中对应分区的文件,聚合后处理。
  • 触发条件
    1. 操作类型:宽依赖操作(group bydistinct、非广播Joinrepartition等);
    2. 配置与版本:仅支持 Spark 2.0 之前版本,且显式配置spark.shuffle.manager=hash
    3. 禁用文件合并:spark.shuffle.consolidateFiles=false(默认值)。
  • 优点:实现简单,无排序开销,小数据量场景下效率尚可。
  • 缺点文件数量爆炸MT×R),磁盘 IO 压力大、元数据管理开销高;大数据量下易 OOM 或超时。
  • 配置项
    • spark.shuffle.manager=hash(仅旧版本支持);
    • spark.shuffle.consolidateFiles=false
  • 产生文件个数公式:总文件数 = MT × RMT为 Map 任务数,R为 Reducer 数)。

3. Consolidated Hash Shuffle(Hash Shuffle 优化版,已过时)

  • 原理:同一 Executor 内的所有 Map 任务共享 Reducer 分区文件 —— 每个 Executor 为每个 Reducer 生成 1 个文件,Executor 内所有 Map 任务的对应分区数据均写入此文件。
  • 触发条件
    1. 操作类型:同 Hash Shuffle(宽依赖操作);
    2. 配置与版本:spark.shuffle.manager=hash(旧版本) + spark.shuffle.consolidateFiles=true(启用文件合并)。
  • 优点文件数量减少C×RC为 Map端的Executor 数),缓解磁盘 IO 压力。
  • 缺点无排序导致聚合效率低;仅支持旧版本,已被 Sort Shuffle 替代。
  • 配置项
    • spark.shuffle.manager=hash
    • spark.shuffle.consolidateFiles=true
  • 产生文件个数公式:总文件数 = C × R

4. Sort Merge Shuffle(当前默认实现)

  • 原理
    • Mapper 端:先按分区键分区(分区数 = R),再对每个分区内数据排序,最终生成 1 个合并数据文件(含所有分区)和 1 个索引文件(记录各分区偏移量);
    • Reducer 端:根据索引拉取对应分区数据,合并后二次排序(若需)并聚合。
  • 触发条件
    1. 操作类型:宽依赖操作(group bydistinct、非广播JoinorderByrepartition等);
    2. 配置与版本:Spark 2.0 + 默认启用(spark.shuffle.manager=sort);
    3. 排除 Bypass 场景:当R > spark.shuffle.sort.bypassMergeThreshold(默认 200),或存在自定义排序器时,执行完整排序流程。
  • 优点文件数量极少(2×M),适合大数据量;排序后聚合 / Join 效率高;支持自定义分区和排序。
  • 缺点:Mapper 端排序有额外开销;小数据量场景下略慢于 Bypass 机制。
  • 配置项
    • spark.shuffle.manager=sort(默认);
    • spark.shuffle.sort.bypassMergeThreshold(默认 200,触发 Bypass 的 Reducer 数阈值);
    • spark.sql.shuffle.partitions(默认 200,控制 Reducer 数)。
  • 产生文件个数公式:总文件数 = 2 × M(每个 Mapper端 1 个数据文件 + 1 个索引文件)。

5. Bypass Merge Shuffle(Sort Merge Shuffle 的子机制)

  • 原理:Sort Merge Shuffle 的 “无排序” 优化 —— 当 Reducer 数较少时,Mapper 端跳过排序步骤,直接为每个分区写临时文件,最后合并为 1 个数据文件和 1 个索引文件(与 Sort Merge Shuffle 文件结构一致,但无排序开销)。
  • 触发条件
    1. 基础条件:启用 Sort Shuffle(spark.shuffle.manager=sort);
    2. Reducer 数阈值:R ≤ spark.shuffle.sort.bypassMergeThreshold(默认 200);
    3. 无自定义排序/下游无聚合算子:未指定mapSideCombine或自定义排序器(否则必须排序)。
  • 优点跳过排序步骤小 Reducer 数场景下比普通 Sort Merge Shuffle 快;文件数量仍为2×M,避免 Hash Shuffle 的文件爆炸。
  • 缺点:数据未排序,若 Reducer 端需排序则需额外开销;仅适用于 Reducer 数少的场景。
  • 配置项
    • 继承 Sort Merge Shuffle 的所有配置;
    • 核心控制:spark.shuffle.sort.bypassMergeThreshold(默认 200)。
  • 产生文件个数公式:总文件数 = 2 × M(与 Sort Merge Shuffle 一致,因最终合并为 1 个数据文件 + 1 个索引文件)。

6. Sort Tungsten Shuffle(Sort Shuffle 的 Tungsten 优化版)

  • 原理:基于 Sort Merge Shuffle,集成 Tungsten 内存管理引擎 —— 使用二进制存储、堆外内存(Off-Heap)减少 GC,直接基于序列化数据进行排序,达到优化排序、提升序列化效率
  • 触发条件
    1. Tungsten 启用:Spark 1.3 + 默认集成,可通过spark.memory.offHeap.enabled=true开启堆外内存。
    2. Shuffle 过程中的输出分区个数少于 16777216 个
    3. Shuffle 的序列化器支持序列化值的重定位(Kryo)
    4. 下游算子无聚合需要
  • 优点内存利用率更高,减少 GC 卡顿;排序和 IO 性能优于普通 Sort Shuffle;适合超大数据量。
  • 缺点:对自定义数据类型兼容性略弱(需实现 Tungsten 序列化接口);排序开销仍存在。
  • 配置项
    • 继承 Sort Merge Shuffle 的所有配置;
    • spark.memory.offHeap.enabled(默认 false,是否启用堆外内存);
    • spark.memory.offHeap.size(默认 0,堆外内存大小)。
  • 产生文件个数公式:总文件数 = 2 × M

核心差异总结

类型核心特征适用场景文件数规模关键区分点
Broadcast 机制无 Shuffle,小表广播小表与大表 Join0仅 Join,用内存换开销
Hash Shuffle无排序,文件爆炸旧版本 + 极小数据量MT×R(极大)已淘汰,文件管理缺陷明显
Consolidated HashExecutor 内文件合并旧版本 + 中小数据量C×R(较大)部分优化但仍无排序
Sort Merge Shuffle排序 + 少文件大数据量(默认)2×M(极小)全量排序,适合大 Reducer 数
Bypass Merge Shuffle无排序 + 少文件(Sort 的子机制)小 Reducer 数(≤200)2×M跳过排序,效率更高但适用范围窄
Sort Tungsten ShuffleTungsten 优化,堆外内存超大数据量,内存敏感2×M内存效率提升,依赖 Tungsten 引擎

实际应用中,Spark 2.0 + 默认优先使用 Sort Merge Shuffle(Reducer 数多)或其 Bypass 子机制(Reducer 数少),小表 Join 自动触发 Broadcast 机制,无需关注 Hash 类 Shuffle(已淘汰)。

(欢迎订阅、讨论、转载)

推荐内容: 

大数据计算引擎-Catalyst 优化器:Spark SQL 的 “智能翻译官 + 效率管家”

大数据计算引擎-从源码看Spark AQE对于倾斜的处理

深入starrocks-怎样实现多列联合统计信息

深入starrocks-多列联合统计一致性探查与策略(YY一下)

大数据计算引擎-全阶段代码生成(Whole-stage Code Generation)与火山模型(Volcano)对比

http://www.dtcms.com/a/511205.html

相关文章:

  • 【 论文精读】VIDM:基于扩散模型的视频生成新范式
  • CentOS 7 安装指定内核版本与切换内核版本
  • Spring MVC 拦截器interceptor
  • 如何在 CentOS、Ubuntu 和 Debian 云服务器上安装 Python 3
  • 《金融电子化》:构建金融韧性运行安全体系:从灾备管理到主动防御新范式​​
  • spark组件-spark core(批处理)
  • 进行网站建设视频教程装修网站cms
  • 解决Kali虚拟机中VMnet1(仅主机模式)网卡无法获取IP地址的问题
  • Linux驱动开发笔记(十一)——阻塞和非阻塞IO
  • Docker----快速入门
  • 深度学习8-卷积神经网络-CNN概述-卷积层-池化层-深度卷积神经网络-案例:服装分类
  • 厦门做外贸网站国内十大咨询公司排名
  • 架构设计过去十年与未来十年
  • Nginx 日志轮转
  • 《Linux运维总结:基于ARM64+X86_64架构CPU使用docker-compose一键离线部署mongodb 7.0.22容器版副本集群》
  • 《Linux运维总结:基于ARM64+X86_64架构CPU使用docker-compose一键离线部署mongodb 7.0.22容器版分片集群》
  • MongoDB基础与Mongoose ODM
  • 做定制网站价格教做flash的网站
  • 【流量控制】算不对 GBN 窗口?分不清 SR 重传?滑动窗口 + 3 大协议一篇吃透
  • 临时插入的紧急任务如何影响整体进度
  • 国内net开发的网站建设网站建设费如何会计处理
  • Melos 使用指南:Flutter / Dart 多包管理工具!
  • React组件完全指南
  • TypeScript:npm的types、typings、@type的区别
  • 我的第一份开源贡献:小米工程师程赛的社区之旅
  • Python 基础 | 第八课:函数详解与应用
  • 火狐浏览器替换js脚本
  • 车载诊断架构 --- 由一个售后问题引发对P4时间的思考
  • 第3章 SQL数据定义语句
  • phpcms 网站m8 wordpress主题