Spark核心Shuffle详解(一)ShuffleManager
1、ShuffleManager
是Spark Shuffle的可插拔式入口点,负责协调Shuffle过程的各个方面。在SparkEnv初始化时,会根据配置(spark.shuffle.manager
参数)创建相应的ShuffleManager实例,驱动Driver和Executor上的Shuffle操作
1、SortShuffleManager
shuffleManager 的默认实现,通过排序合并机制有效控制了中间文件数量
2、ShuffleHandler
1、BaseShuffleHandle
使用BaseShuffleHandle的ShuffleWriter是SortShuffleWriter。
BaseShuffleHandle是默认的shuffle句柄,它对应的是SortShuffleManager中的常规shuffle路径。使用BaseShuffleHandle时,map端会对输出数据进行排序(可能包括聚合),然后写入一个数据文件和一个索引文件。reduce任务根据索引文件来获取自己需要的数据。
特点:
- 支持map端的聚合(如果需要)和排序。
- 输出文件数量少,每个map任务只生成一个数据文件和一个索引文件。
- 适用于reduce任务数量多的情况,因为无论reduce任务多少,map端都只生成一个文件。
使用BaseShuffleHandle
的ShuffleWriter
是SortShuffleWriter
。
2、BypassMergeSortShuffleHandle
优化reduce分区数较少的情况,避免不必要的排序开销。
BypassMergeSortShuffleHandle对应的是BypassMergeSortShuffleWriter。这种shuffle方式在reduce任务数量较少时使用,因为在这种情况下,排序的开销可能大于直接写入多个文件的开销。
特点:
- 每个map任务为每个reduce任务创建一个临时文件,然后将这些文件合并成一个数据文件和一个索引文件。
- 不需要在map端进行排序,因此如果shuffle不需要排序(比如没有聚合操作)且reduce任务数少,这种方式效率更高。
- 但是,如果reduce任务数很多,创建大量临时文件会导致性能下降和资源消耗增加。
使用BypassMergeSortShuffleHandle
的ShuffleWriter
是BypassMergeSortShuffleWriter
。
// 条件1: reduce分区数 <= spark.shuffle.sort.bypassMergeThreshold (默认200)
// 条件2: 没有map端聚合操作
// 条件3: 不能是排序的Shuffle
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {// We cannot bypass sorting if we need to do map-side aggregation.if (dep.mapSideCombine) {false} else {val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)dep.partitioner.numPartitions <= bypassMergeThreshold}}/*
Map任务执行过程:
1. 为每个reduce分区创建单独的文件
2. 直接将数据写入对应的分区文件
3. 不需要在内存中排序
4. 最后将所有分区文件合并成一个数据文件和索引文件Reduce任务执行过程:
1. 直接读取对应分区的数据块
2. 不需要额外的合并排序操作*/
3、SerializedShuffleHandle
Tungsten优化,序列化
SerializedShuffleHandle对应的是UnsafeShuffleWriter。它是Tungsten项目的一部分,旨在通过使用堆外内存和序列化来优化shuffle性能。
特点:
- 数据在shuffle过程中始终保持序列化状态,减少了序列化和反序列化的开销。
- 使用堆外内存管理,避免了GC的压力。
- 支持溢出到磁盘,当内存不足时,会将数据溢出到磁盘。
- 但是,这种shuffle方式不支持聚合操作,因为数据是序列化的,无法直接进行聚合。
使用SerializedShuffleHandle
的ShuffleWriter
是UnsafeShuffleWriter
。
//条件1: 使用支持重定位的序列化器 e.g. UnsafeRowSerializer,KryoSerializer (需要无禁用autoReset)
// 条件2: 没有map聚合操作
// 条件3: 分区数不超过支持的最大值 16777215
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {val shufId = dependency.shuffleIdval numPartitions = dependency.partitioner.numPartitionsif (!dependency.serializer.supportsRelocationOfSerializedObjects) {log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +s"${dependency.serializer.getClass.getName}, does not support object relocation")false} else if (dependency.mapSideCombine) {log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +s"map-side aggregation")false} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")false} else {log.debug(s"Can use serialized shuffle for shuffle $shufId")true}}
• shuffleId: Shuffle操作的唯一标识符
• dependency: Shuffle依赖关系,包含分区器、序列化器等信息
3、ShuffleBlockResolver
ESS,块坐标检索随机块数据
默认实现 IndexShuffleBlockResolver
每个 Map Task 只生成一个数据文件 + 一个索引文件。
启用Push Based Shuffle时 , 不同Map Task 的 数据文件 + 索引文件有可能合并
4、Push Based Shuffle (YARN support on spark 3.1+)
- 当启用Push Based Shuffle时,map任务不会将数据写入本地文件,而是将数据推送到运行在ESS上的
MergedShuffleFileManager
RemoteBlockPushResolver
。 - RemoteBlockPushResolver负责接收数据,并将来自不同map任务的同一分区数据合并存储,生成数据文件和索引文件。
- 在reduce阶段,reduce任务会通过ShuffleBlockResolver(此时可能是RemoteBlockPushResolver)来获取数据块的位置信息,并从ESS拉取数据。
Push Based Shuffle 的优势:
- 减少连接数:在传统的Pull Based Shuffle中,每个reduce任务需要连接每个map任务,连接数为M*R(M为map任务数,R为reduce任务数)。而Push Based Shuffle中,reduce任务只需要连接ESS,连接数大大减少。
- 减少小文件:Push Based Shuffle会在ESS端合并多个map任务输出的同一分区数据,生成更大的文件,从而减少小文件数量,提高读取效率。
- 负载均衡:Push Based Shuffle可以将数据均匀地推送到多个ESS上,避免热点问题。
RemoteBlockPushResolver 的工作流程:
- 注册App:在应用程序启动时,Driver会向ESS注册应用程序,并获取推送地址。
- 推送数据:map任务将数据分区后,推送到ESS的
RemoteBlockPushResolver
。 - 合并数据:
RemoteBlockPushResolver
将接收到的数据块合并,并写入到本地磁盘,同时生成索引文件。 - 提供服务:reduce任务通过ESS拉取合并后的数据。
e.g. spip https://issues.apache.org/jira/browse/SPARK-30602