当前位置: 首页 > news >正文

Spark Shuffle机制原理

文章目录

  • 1.什么是Shuffle?
  • 2.Shuffle解决什么问题?
  • 3.Shuffle Write与Shuffle Read
  • 4.Shuffle的计算需求
    • 4.1 计算需求表
    • 4.2 partitionby
    • 4.3 groupByKey
    • 4.4 reduceByKey
    • 4.5 sortByKey
  • 5.Shuffle Write框架设计与实现
    • 5.1 Shuffle Write框架实现的功能
    • 5.2 Shuffle Write的多种情况
      • 5.2.1 不需要combine和sort
        • 5.2.1.1 操作流程
        • 5.2.1.2 优缺点
        • 5.2.1.3 适用性
      • 5.2.2 不需要combine,需要sort
        • 5.2.2.1 操作流程
        • 5.2.2.2 优缺点
        • 5.2.2.3 适用性
      • 5.2.3 需要combile,需要/不需要sort
        • 5.2.3.1 操作流程
        • 5.2.3.2 优缺点
        • 5.2.3.3 适用性
  • 6.Shuffle Read框架设计与实现
    • 6.1 Shuffle Read框架实现的功能
    • 6.2 Shuffle Read的不同情况
      • 6.2.1 不需要combine和sort
        • 6.2.1.1 操作流程
        • 6.2.1.2 优缺点
        • 6.2.1.3 适用性
      • 6.2.2 不需要combine,需要sort
        • 6.2.2.1 操作流程
        • 6.2.2.2 优缺点
        • 6.2.2.3 适用性
      • 6.2.3 需要combine,需要/不需要sort
        • 6.2.3.1 操作流程
        • 6.2.3.2 优缺点
        • 6.2.3.3 适用性

阅读本篇文章前,需要阅读 Spark执行计划与UI分析

1.什么是Shuffle?

运行在不同stage、不同节点上的task间如何进行数据传递。这个数据传递过程通常被称为Shuffle机制。

2.Shuffle解决什么问题?

如果是单纯的数据传递,则只需要将数据进行分区、通过网络传输即可,没有太大难度,但Shuffle机制还需要进行各种类型的计算(如聚合、排序),而且数据量一般会很大。如何支持这些不同类型的计算,如何提高Shuffle的性能都是Shuffle机制设计的难点问题。

3.Shuffle Write与Shuffle Read

  • Shuffle Write:上游stage预先将输出数据进行划分,按照分区存放,分区个数与下游task个数一致,这个过程被称为"Shuffle Write"。
  • Shuffle Read:上游数据按照分区存放完成后,下游的task将属于自己分区的数据通过网络传输获取,然后将来自上游不同分区的数据聚合再一起处理,这个过程称为"Shuffle Read"。

4.Shuffle的计算需求

4.1 计算需求表

所谓计算需求,也就是Shuffle要解决具体算子的哪些计算需求:
在这里插入图片描述
这里我来分析几个例子:

4.2 partitionby

在这里插入图片描述
可以看到partitionby操作只进行了数据分区操作,并没有涉及到数据的聚合和排序操作。

4.3 groupByKey

在这里插入图片描述
可以看到groupByKey的操作既需要分区,又需要做聚合,并且在Shuffle Read阶段做的聚合。

4.4 reduceByKey

在这里插入图片描述
可以看到reduceByKey做了两步聚合,在Shuffle Write中先执行func聚合一次(由spark内部执行,不生成新的rdd),然后进行分区数据传输,最后再在每个分区聚合一次,执行相同的func函数。同时func需要满足交换律和结合律。两次聚合(多了Shuffle Write端聚合)的优点是优化Shuffle的性能,一是传输的数据量大大减少,二是降低Shuffle Read端的内存消耗。

4.5 sortByKey

在这里插入图片描述
分区后,在ShuffleRead端进行排序。sortByKey() 为了保证生成的RDD中的数据是全局有序(按照Key排序) 的, 采用Range划分来分发数据。 Range划分可以保证在生成的RDD中, partition 1中的所有record的Key小于(或大于) partition 2中所有的record的Key。
可以看到当前并没有算子需要在Shuffle Write端进行排序的,但不能保证用户实现的算子不会在Shuffle Write端进行排序,因此在spark实现Shuffle框架的时候保留了在Shuffle Write端进行排序的功能

5.Shuffle Write框架设计与实现

5.1 Shuffle Write框架实现的功能

如第四节中的图所示,每个数据操作只需要其中的一个或两个功能。Spark为了支持所有的情况,设计了一个通用的Shuffle Write框架,框架的计算顺序为“map()输出→数据聚合→排序→分区”输出。
在这里插入图片描述
map task每计算出一个record及其partitionId,就将record放入类似HashMap的数据结构中进行聚合;聚合完成后,再将HashMap中的数据放入类似Array的数据结构中进行排序,既可按照partitionId,也可以按照partitionId+Key进行排序;最后根据partitionId将数据写入不同的数据分区中,存放到本地磁盘上。partitionId=Hash(Key)% 下游分区数

5.2 Shuffle Write的多种情况

5.2.1 不需要combine和sort

在这里插入图片描述
这种Shuffle Write方式称为:BypassMergeSortShuffleWriter
这种情况最简单,只需要实现分区功能:

5.2.1.1 操作流程

map()依次输出KV record,并计算其partitionId(PID),Spark根据 partitionId,将record依次输出到不同的buffer中,每当buffer填满就将record溢写到磁盘上的分区文件中。分配buffer的原因是map()输出record的速度很快,需要进行缓冲来减少磁盘I/O。

5.2.1.2 优缺点

该模式的优点是速度快,直接将record输出到不同文件中。缺点是资源消耗过高,每个分区都需要有一个buffer(默认大小为32KB,由spark.Shuffle.file.buffer进行控制),当分区数过大时,内存消耗会很高。

5.2.1.3 适用性

适用于Shuffle Write端不需要聚合和排序且分区个数较少(小于spark.Shuffle.sort.bypassMergeThreshold,默认值为200),例如groupBy(100),partitionBy(100),sortByKey(100)。

5.2.2 不需要combine,需要sort

在这里插入图片描述

这种Shuffle模式被命名为:SortShuffleWriter(KeyOrdering=true),使用的Array被命名为PartitionedPairBuffer

5.2.2.1 操作流程
  • 这种情况需要使用partitionId+key进行排序,Spark采用的实现方法是建立一个Array:PartitionedPairBuffer,来存放map()输出的record,并将每个<K,V>record转化为<(PartitionId,K),V>record,然后按照PartitionId+Key对record进行排序,最后将所有record写入写入一个文件中,通过建立索引来标示每个分区
  • 如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上,等待map()输出完以后,再将Array中的record与磁盘上已排序的record进行全局排序,得到最终有序的record,并写入文件中。
5.2.2.2 优缺点
  • 优点是只需要一个Array结构就可以支持按照partitionId+Key进行排序,Array大小可控,而且具有扩容和spill到磁盘上的功能,支持从小规模到大规模数据的排序。同时,输出的数据已经按照partitionId进行排序,因此只需要一个分区文件存储,即可标示不同的分区数据,克服了BypassMergeSortShuffleWriter中建立文件数过多的问题,适用于分区个数很大的情况。缺点是排序增加计算时延。
5.2.2.3 适用性
  • map()端不需要聚合(combine)、Key需要排序、分区个数无限制。目前,Spark本身没有提供这种排序类型的数据操作,但不排除用户会自定义,或者系统未来会提供这种类型的操作。sortByKey()操作虽然需要按Key进行排序,但这个排序过程在Shuffle Read端完成即可,不需要在Shuffle Write端进行排序。

最后,使用这种Shuffle如何解决BypassMergeSortShuffleWriter存在的buffer分配过多的问题?我们只需要将“按PartitionId+Key排序”改为“只按PartitionId排序”,就可以支持“不需要map()端combine、不需要按照Key进行排序,分区个数过大”的操作。例如,groupByKey(300)、partitionBy(300)、sortByKey(300)。

5.2.3 需要combile,需要/不需要sort

在这里插入图片描述
这种Shuffle模式被称为:sort-based Shuffle Write,哈希表为:PartitionedAppendOnlyMap

5.2.3.1 操作流程
  • 需要实现按Key进行聚合(combine)的功能,Spark采用的实现方法是建立一个类似HashMap的数据结构对map()输出的record进行聚合。HashMap中的Key是“partitionId+Key”,HashMap中的Value是经过相同combine的聚合结果。在图中,combine()是sum()函数,那么Value中存放的是多个record对应的Value相加的结果。
  • 聚合完成后,Spark对HashMap中的record进行排序。如果不需要按Key进行排序,如上图所示,那么只按partitionId进行排序;如果需要按Key进行排序,如图6.7的下图所示,那么按partitionId+Key进行排序。最后,将排序后的record写入一个分区文件中。其中使用的hash表既可以实现聚合功能,也可以实现排序功能。
  • 如果HashMap存放不下,则会先扩容为两倍大小,如果还存放不下,就将HashMap中的record排序后spill到磁盘上。此时,HashMap被清空,可以继续对map()输出的record进行聚合,如果内存再次不够用,那么继续spill到磁盘上,此过程可以重复多次。当map()输出完成以后,将此时HashMap中的reocrd与磁盘上已排序的record进行再次聚合(merge),得到最终的record,输出到分区文件中。
5.2.3.2 优缺点
  • 优缺点同5.4.2
5.2.3.3 适用性
  • 适合map()端聚合(combine)、需要或者不需要按Key进行排序、分区个数无限制的应用,如reduceByKey()、aggregateByKey()等。

6.Shuffle Read框架设计与实现

6.1 Shuffle Read框架实现的功能

在这里插入图片描述
reduce task不断从各个map task的分区文件中获取数据(Fetch records),然后使用类似HashMap的结构来对数据进行聚(aggregate),该过程是边获取数据边聚合。聚合完成后,将HashMap中的数据放入类似Array的数据结构中按照Key进行排序(sort byKey),最后将排序结果输出或者传递给下一个操作。

6.2 Shuffle Read的不同情况

6.2.1 不需要combine和sort

在这里插入图片描述

6.2.1.1 操作流程
  • 这种情况最简单,只需要实现数据获取功能即可。等待所有的map task结束后,reduce task开始不断从各个map task获取<K,V>record,并将record输出到一个buffer中(大小为spark.reducer.maxSizeInFlight=48MB),下一个操作直接从buffer中获取数据即可。
6.2.1.2 优缺点
  • 优点是逻辑和实现简单,内存消耗很小。缺点是不支持聚合、排序等复杂功能。
6.2.1.3 适用性
  • 适合既不需要聚合也不需要排序的应用,如partitionBy()等。

6.2.2 不需要combine,需要sort

在这里插入图片描述
使用的Array结构:PartitionedPairBuffer

6.2.2.1 操作流程
  • 获取数据后,将buffer中的record依次输出到一个Array结构(PartitionedPairBuffer)中。由于这里采用了本来用于Shuffle Write端的PartitionedPairBuffer结构,所以还保留了每个record的partitionId。然后,对Array中的record按照Key进行排序,并将排序结果输出或者传递给下一步操作。
  • 当内存无法存下所有的record时,PartitionedPairBuffer将record排序后spill到磁盘上,最后将内存中和磁盘上的record进行全局排序,得到最终排序后的record。
6.2.2.2 优缺点
  • 优点是只需要一个Array结构就可以支持按照Key进行排序,Array大小可控,而且具有扩容和spill到磁盘上的功能,不受数据规模限制。缺点是排序增加计算时延。
6.2.2.3 适用性
  • 适合reduce端不需要聚合,但需要按Key进行排序的操作,如sortByKey()、sortBy()等。

6.2.3 需要combine,需要/不需要sort

在这里插入图片描述
哈希表:ExternalAppendOnlyMap

6.2.3.1 操作流程
  • 获取record后,Spark建立一个类似HashMap的数据结构(ExternalAppendOnlyMap)对buffer中的record进行聚合,HashMap中的Key是record中的Key,HashMap中的Value是经过相同聚合函数(func())计算后的结果。
  • 聚合函数是sum()函数,那么Value中存放的是多个record对应Value相加后的结果。之后,如果需要按照Key进行排序,如下图所示,则建立一个Array结构,读取HashMap中的record,并对record按Key进行排序,排序完成后,将结果输出或者传递给下一步操作。
  • 如果HashMap存放不下,则会先扩容为两倍大小,如果还存放不下,就将HashMap中的record排序后spill到磁盘上。此时,HashMap被清空,可以继续对buffer中的record进行聚合。如果内存再次不够用,那么继续spill到磁盘上,此过程可以重复多次。当聚合完成以后,将此时HashMap中的reocrd与磁盘上已排序的record进行再次聚合,得到最终的record,输出到分区文件中。
    注意,这里的排序和聚合依然使用的同一个数据结构。
6.2.3.2 优缺点
  • 同上一节。
6.2.3.3 适用性
  • 适合reduce端需要聚合、不需要或需要按Key进行排序的操作,如reduceByKey()、aggregateByKey()等。
http://www.dtcms.com/a/331574.html

相关文章:

  • STM32HAL 快速入门(七):GPIO 输入之光敏传感器控制蜂鸣器
  • 深入理解管道(下):括号命令 `()`、`-ExpandProperty` 与 AD/CSV 实战
  • Java 大视界 -- Java 大数据在智能家居能耗监测与节能优化中的应用探索(396)
  • 【漏洞复现】WinRAR 目录穿越漏洞(CVE-2025-8088)
  • JavaScript 解构赋值语法详解
  • iOS Sqlite3
  • Playwright初学指南 (3):深入解析交互操作
  • 【完整源码+数据集+部署教程】肾脏病变实例分割系统源码和数据集:改进yolo11-CARAFE
  • 基于机器学习的文本情感极性分析系统设计与实现
  • 华为宣布云晰柔光屏技术迎来重大升级
  • 生产环境sudo配置详细指南
  • 机器学习学习总结
  • 如何选择适合工业场景的物联网网关?
  • 相较于传统AR作战环境虚拟仿真系统,其优势体现在哪些方面?
  • Python小程序1.0版本
  • C++类与对象核心知识点全解析(中)【六大默认成员函数详解】
  • Perforce P4 Git 连接器
  • 随身 Linux 开发环境:使用 cpolar 内网穿透服务实现 VSCode 远程访问
  • Activity + fragment的页面结构,fragment始终无法显示问题
  • AI 赋能的软件工程全生命周期应用
  • 第16届蓝桥杯C++中高级选拔赛(STEMA)2024年10月20日真题
  • 【C#】PNG 和 JPG、JPEG的应用以及三种格式的区别?
  • Oracle commit之后做了什么
  • 【20-模型诊断调优】
  • BSCI认证对企业的影响,BSCI认证的重要性,BSCI审核的核心内容
  • 信息vs知识:人类学习与AI规则提取
  • 设计模式笔记_行为型_状态模式
  • Spring Boot + Redis Sentinel (一主两从)测试案例
  • Docker pull拉取镜像命令的入门教程
  • 蓝耘元生代上线 Baichuan-M2-32B,医疗推理能力卓越,高效部署,成本超低