当前位置: 首页 > news >正文

Spark核心Shuffle详解(一)ShuffleManager

1、ShuffleManager

是Spark Shuffle的可插拔式入口点,负责协调Shuffle过程的各个方面。在SparkEnv初始化时,会根据配置(spark.shuffle.manager参数)创建相应的ShuffleManager实例,驱动Driver和Executor上的Shuffle操作

1、SortShuffleManager

shuffleManager 的默认实现,通过排序合并机制有效控制了中间文件数量

2、ShuffleHandler

1、BaseShuffleHandle

使用BaseShuffleHandle的ShuffleWriter是SortShuffleWriter。

BaseShuffleHandle是默认的shuffle句柄,它对应的是SortShuffleManager中的常规shuffle路径。使用BaseShuffleHandle时,map端会对输出数据进行排序(可能包括聚合),然后写入一个数据文件和一个索引文件。reduce任务根据索引文件来获取自己需要的数据。

特点:

  • 支持map端的聚合(如果需要)和排序。
  • 输出文件数量少,每个map任务只生成一个数据文件和一个索引文件。
  • 适用于reduce任务数量多的情况,因为无论reduce任务多少,map端都只生成一个文件。

使用BaseShuffleHandleShuffleWriterSortShuffleWriter

2、BypassMergeSortShuffleHandle

优化reduce分区数较少的情况,避免不必要的排序开销。

BypassMergeSortShuffleHandle对应的是BypassMergeSortShuffleWriter。这种shuffle方式在reduce任务数量较少时使用,因为在这种情况下,排序的开销可能大于直接写入多个文件的开销。

特点:

  • 每个map任务为每个reduce任务创建一个临时文件,然后将这些文件合并成一个数据文件和一个索引文件。
  • 不需要在map端进行排序,因此如果shuffle不需要排序(比如没有聚合操作)且reduce任务数少,这种方式效率更高。
  • 但是,如果reduce任务数很多,创建大量临时文件会导致性能下降和资源消耗增加。

使用BypassMergeSortShuffleHandleShuffleWriterBypassMergeSortShuffleWriter

// 条件1: reduce分区数 <= spark.shuffle.sort.bypassMergeThreshold (默认200)
// 条件2: 没有map端聚合操作
// 条件3: 不能是排序的Shuffle
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {// We cannot bypass sorting if we need to do map-side aggregation.if (dep.mapSideCombine) {false} else {val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)dep.partitioner.numPartitions <= bypassMergeThreshold}}/*
Map任务执行过程:
1. 为每个reduce分区创建单独的文件
2. 直接将数据写入对应的分区文件
3. 不需要在内存中排序
4. 最后将所有分区文件合并成一个数据文件和索引文件Reduce任务执行过程:
1. 直接读取对应分区的数据块
2. 不需要额外的合并排序操作*/

3、SerializedShuffleHandle

Tungsten优化,序列化

SerializedShuffleHandle对应的是UnsafeShuffleWriter。它是Tungsten项目的一部分,旨在通过使用堆外内存和序列化来优化shuffle性能。

特点:

  • 数据在shuffle过程中始终保持序列化状态,减少了序列化和反序列化的开销。
  • 使用堆外内存管理,避免了GC的压力。
  • 支持溢出到磁盘,当内存不足时,会将数据溢出到磁盘。
  • 但是,这种shuffle方式不支持聚合操作,因为数据是序列化的,无法直接进行聚合。

使用SerializedShuffleHandleShuffleWriterUnsafeShuffleWriter

//条件1: 使用支持重定位的序列化器 e.g. UnsafeRowSerializer,KryoSerializer (需要无禁用autoReset)
// 条件2: 没有map聚合操作
// 条件3: 分区数不超过支持的最大值 16777215
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {val shufId = dependency.shuffleIdval numPartitions = dependency.partitioner.numPartitionsif (!dependency.serializer.supportsRelocationOfSerializedObjects) {log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +s"${dependency.serializer.getClass.getName}, does not support object relocation")false} else if (dependency.mapSideCombine) {log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +s"map-side aggregation")false} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")false} else {log.debug(s"Can use serialized shuffle for shuffle $shufId")true}}

shuffleId: Shuffle操作的唯一标识符
dependency: Shuffle依赖关系,包含分区器、序列化器等信息

3、ShuffleBlockResolver

ESS,块坐标检索随机块数据

默认实现 IndexShuffleBlockResolver 每个 Map Task 只生成一个数据文件 + 一个索引文件。

启用Push Based Shuffle时 , 不同Map Task 的 数据文件 + 索引文件有可能合并

4、Push Based Shuffle (YARN support on spark 3.1+)

  • 当启用Push Based Shuffle时,map任务不会将数据写入本地文件,而是将数据推送到运行在ESS上的MergedShuffleFileManager RemoteBlockPushResolver
  • RemoteBlockPushResolver负责接收数据,并将来自不同map任务的同一分区数据合并存储,生成数据文件和索引文件。
  • 在reduce阶段,reduce任务会通过ShuffleBlockResolver(此时可能是RemoteBlockPushResolver)来获取数据块的位置信息,并从ESS拉取数据。

Push Based Shuffle 的优势:

  • 减少连接数:在传统的Pull Based Shuffle中,每个reduce任务需要连接每个map任务,连接数为M*R(M为map任务数,R为reduce任务数)。而Push Based Shuffle中,reduce任务只需要连接ESS,连接数大大减少。
  • 减少小文件:Push Based Shuffle会在ESS端合并多个map任务输出的同一分区数据,生成更大的文件,从而减少小文件数量,提高读取效率。
  • 负载均衡:Push Based Shuffle可以将数据均匀地推送到多个ESS上,避免热点问题。

RemoteBlockPushResolver 的工作流程:

  • 注册App:在应用程序启动时,Driver会向ESS注册应用程序,并获取推送地址。
  • 推送数据:map任务将数据分区后,推送到ESS的RemoteBlockPushResolver
  • 合并数据:RemoteBlockPushResolver将接收到的数据块合并,并写入到本地磁盘,同时生成索引文件。
  • 提供服务:reduce任务通过ESS拉取合并后的数据。

e.g. spip https://issues.apache.org/jira/browse/SPARK-30602
在这里插入图片描述

http://www.dtcms.com/a/410002.html

相关文章:

  • Android 开发环境解析:从SDK、NDK到版本兼容性指南
  • 基于YOLO8+flask+layui的行人跌倒行为检测系统【源码+模型+数据集】
  • Mysql DBA学习笔记(日志)
  • 平替MongoDB:金仓多模数据库助力电子证照国产化实践
  • QT6中QGraphicsView功能与应用
  • WSL2搭建Hadoop伪分布式环境
  • 新闻媒体发稿平台排名Top5,聚合型新闻发稿服务平台推荐
  • Linux(4)|入门的开始:Linux基本指令(4)
  • (七)API 重构的艺术:打造优雅、可维护的 API
  • MAC idea 环境变量设置失效
  • 百度站长收录提交入口深圳设计网站源码
  • 2025Unity超详细《坦克大战3D》项目实战案例(上篇)——UI搭建并使用和数据持久化(附资源和源代码)
  • DenseNet:密集连接
  • 第一次学习Hardhat
  • 腾讯wordpress 建站自适应网站建设哪家便宜
  • 第八章 MyBatis及MyBatis-Plus
  • 5mins了解redis底层数据结源码
  • 华为云学习笔记(四):运维类服务与企业网站上云实践
  • 面向大模型输出的“耐脏” JSON 处理:从清洗到严格化的完整方案
  • 抢先注册网站域名卖掉英文网站建设服务合同
  • Apache、Nginx 和 Tomcat 的区别
  • 解决avue-input-tree组件重置数据不回显/重置失败
  • 苏州网站建设致宇网页设计制作手机网站
  • Kimi-VL:月之暗面开源的视觉语言模型
  • Buck电路项目实战:从原理到实战全解析
  • 如何读懂Mach-O:构建macOS和iOS应用安全的第一道认知防线
  • 远程录制新体验:Bililive-go与cpolar的无缝协作
  • 购物网站制作样例糖果网站建设策划书模板
  • 百度新闻源网站故乡网站开发的意义
  • PortSwigger靶场之Exploiting server-side parameter pollution in a query string通关秘籍