Spark中Shuffle阶段的优化方法
Shuffle的参数spark.shuffle.manager控制实现方式,Spark 1.x 有多种实现,2.x基本统一为SortShuffle:
- hash:HashShuffle,早期版本默认。每个 map task 对每个 reduce task 都会生成一个文件,小文件较多。
- sort:SortShuffle,2.x和3.x版本默认。每个 map task 只生成一个文件和index文件,避免了小文件过多问题。
SortShuffle在写出数据时有两种模式,用参数spark.shuffle.sort.bypassMergeThreshold控制,默认200,下面以200为例说明两种模式的区别:
- BypassMerge模式,直写模式,当reduce task 数量小于200时走该模式。具体为:map task 会为每个 reduce task 创建一个临时文件,总共nMap*nReduce个文件。数据不排序,直接写入对应的临时文件,因为 reduce 端只需要拿到属于自己的数据,不需要有序。Reduce端将属于自己的临时文件合并成一个文件,附带一个 index 文件。特点是跳过排序,减少CPU开销。
- Sort模式,,当reduce task 数量小于200时走该模式。具体为:map task 把所有 shuffle 数据写入内存缓冲区,在写出之前,先根据目标 reduceId 对数据排序(实际按分区 id 排序)。按顺序写入一个文件,并生成对应 index 文件,标记每个分区的数据偏移量。特点是减少文件数量,一个数据文件和一个 index 文件,但需要排序的CPU开销。
Shuffle中Map端的排序是为了通过外排(外部排序)降低内存使用量(写磁盘时排序,排好序写入磁盘),正常Reduce阶段需要分组,Key相同的进行归约,在Reduce阶段排序(内部排序)太耗费内存,其实目的就是降低Reduce端压力。如果 reduce 数很少(比如几十个),直写文件数量有限,跳过排序更快;如果 reduce 数很多,直写就会产生大量小文件,带来 I/O 压力;此时排序一次、写一个大文件更划算。配置如下:
sparkConf.set("spark.shuffle.sort.bypassMergeThreshold", "400") // 默认200个
增大输出流缓冲区可以减少写入磁盘的数据量,提高性能,但是增大会占用Execution内存空间。配置如下:
// 这是BufferedOutputStream的buffer缓冲大小
sparkConf.set("spark.shuffle.file.buffer", "64k") // 默认32KB,建议倍数扩大
Reduce端缓冲区大小:一次拉取一个缓冲区大小的数据。一边拉取数据一边聚合计算。调整缓冲区如下:
sparkConf.set("spark.reducer.maxSizeInFlight", "64m") // 默认48m,建议倍数扩大
调节Reduce端取数规则:包括重试次数和等待时长,例如拉取时可能遇到对方Executor宕机或者正在GC。使用:
sparkConf.set("spark.reducer.io.maxRetries", "4") // 默认3次
sparkConf.set("spark.reducer.retryWait", "10s") // 默认5秒