当前位置: 首页 > news >正文

Spark的Shuffle过程

Spark 的 Shuffle 是分布式计算中数据重分区的核心过程,目的是将不同 Executor 上的数据按照一定规则(通常是 Key 的哈希值)重新分配,确保相同 Key 的数据汇聚到同一节点进行聚合、连接等操作。其过程涉及数据分区、写入磁盘、网络传输、读取合并等多个步骤,逻辑复杂但可拆解为清晰的阶段。

一、Shuffle 的核心目标

假设有一个简单的计算任务:对 (Key, Value) 类型的数据集执行 reduceByKey(按 Key 聚合)。
例如,输入数据如下(分布在两个 Executor 上):

  • Executor 1[(A, 1), (B, 2), (A, 3)]
  • Executor 2[(B, 4), (C, 5), (A, 6)]

目标是将所有相同 Key 的数据汇总到一起,最终计算结果:
A: 1+3+6=10,B: 2+4=6,C:5

为实现这个目标,Shuffle 需要完成:

  1. 让所有 A 的数据到同一个节点,所有 B 的数据到同一个节点,以此类推。
  2. 确保数据在节点间高效传输和合并。

二、Shuffle 的完整过程(以 SortShuffleManager 为例)

Spark 2.0+ 默认使用 SortShuffleManager,其 Shuffle 过程分为 Map 阶段Reduce 阶段,具体步骤如下:

阶段 1:Map 阶段(数据分区与写入)

Map 阶段由所有 Map Task 执行(每个 Map Task 处理一个 RDD 分区),核心是将数据按 Key 分区并写入本地磁盘。

步骤 1.1:数据分区(确定每个 Key 归属的 Reduce 分区)

每个 Map Task 处理的数据中,每条记录 (Key, Value) 会通过 分区函数 确定归属的 Reduce 分区(即目标分区 ID)。

  • 默认分区函数:Key.hashCode % numPartitionsnumPartitions 是 Shuffle 并行度,由 spark.sql.shuffle.partitionsRDD.partitions.size 决定)。

    例子:假设 numPartitions=3(即 3 个 Reduce 分区),计算各 Key 的分区:

    • A.hashCode % 3 = 0 → 归属分区 0
    • B.hashCode % 3 = 1 → 归属分区 1
    • C.hashCode % 3 = 2 → 归属分区 2

    则 Executor 1 和 2 中的数据会被标记分区:

    • Executor 1 数据分区后:
      (A,1)→0,(B,2)→1,(A,3)→0
    • Executor 2 数据分区后:
      (B,4)→1,(C,5)→2,(A,6)→0
步骤 1.2:数据排序与合并(可选,SortShuffle 特性)

SortShuffle 会对每个 Map Task 的输出按 (分区 ID, Key) 排序:

  • 先按分区 ID 排序(确保同一分区的数据连续),再按 Key 排序(方便后续合并)。

    例子:

    • Executor 1 排序后:[(0, A,1), (0, A,3), (1, B,2)]
    • Executor 2 排序后:[(0, A,6), (1, B,4), (2, C,5)]
步骤 1.3:写入本地磁盘(生成 Map 输出文件)

每个 Map Task 将排序后的结果写入 一个输出文件,并生成 索引文件 记录每个 Reduce 分区在输出文件中的起始位置和长度。

  • 输出文件(.data):存储所有分区的键值对数据(按分区连续存储)。

  • 索引文件(.index):记录每个分区在 .data 文件中的偏移量(如分区 0 从 0 字节开始,长度 100 字节;分区 1 从 100 字节开始,长度 80 字节等)。

    例子:

    • Executor 1 生成的 .data 文件内容:(A,1)(A,3)(B,2)(分区 0 和 1 的数据连续存储),.index 文件记录分区 0 和 1 的偏移量。
    • Executor 2 生成的 .data 文件内容:(A,6)(B,4)(C,5),.index 文件记录分区 0、1、2 的偏移量。
阶段 2:Reduce 阶段(数据拉取与合并)

Reduce 阶段由所有 Reduce Task 执行(每个 Reduce Task 处理一个目标分区),核心是从所有 Map Task 拉取属于自己的分区数据,合并后进行计算。

步骤 2.1:获取 Map 输出位置(MapOutputTracker)

Driver 中的 MapOutputTracker 会记录所有 Map Task 的输出文件路径(存储在 Driver 内存或 ZooKeeper 中,视集群模式而定)。

  • Reduce Task 启动时,会向 MapOutputTracker 请求自己负责的分区(如分区 0)对应的所有 Map 输出文件路径。

    例子:负责分区 0 的 Reduce Task 会得知:需要从 Executor 1 的 .data 文件中读取分区 0 的数据,以及从 Executor 2 的 .data 文件中读取分区 0 的数据。

步骤 2.2:拉取数据(Shuffle Fetch)

Reduce Task 通过网络(通常是 HTTP)从各个 Map Task 的节点拉取属于自己的分区数据,过程称为 Fetch

  • 每个 Reduce Task 会启动多个线程并行拉取(由 spark.shuffle.fetchers 控制,默认 64 个)。

  • 拉取的数据先存入内存缓冲区(spark.shuffle.reduceside.buffer.percent 控制内存占比),缓冲区满后写入磁盘临时文件。

    例子:负责分区 0 的 Reduce Task 拉取到的数据:

    • 从 Executor 1 拉取:(A,1), (A,3)
    • 从 Executor 2 拉取:(A,6)
步骤 2.3:合并数据(Merge)

拉取完成后,Reduce Task 需要合并所有拉取到的数据(内存中的和磁盘上的):

  • 若数据已排序(SortShuffle 输出),合并过程类似“归并排序”,按 Key 拼接成连续的有序数据流。
  • 合并后的数据格式:(A,1), (A,3), (A,6)(同一 Key 的所有 Value 汇聚)。
步骤 2.4:执行计算(如 reduceByKey)

合并后,Reduce Task 对同一 Key 的 Value 执行聚合逻辑(如 reduceByKey 的求和操作):

  • 例子中,A 的 Value 为 1,3,6,聚合后结果为 (A, 10)

三、Shuffle 过程的关键细节

  1. 数据存储形式

    • Map 阶段输出:每个 Map Task 生成 1 个 .data 文件(所有分区数据)+ 1 个 .index 文件(分区偏移量),存储在本地磁盘(spark.local.dir 配置的目录)。
    • Reduce 阶段临时数据:拉取的数据先存内存,满后存磁盘临时文件,合并后删除。
  2. 网络传输优化

    • 拉取数据时会优先拉取本地数据(同一节点的 Map 输出),减少跨节点传输。
    • 数据传输前会压缩(spark.shuffle.compress 默认开启,用 Snappy 压缩),减少网络 IO。
  3. Shuffle 并行度

    • numPartitions 决定,即 Reduce Task 数量。例如 spark.sql.shuffle.partitions=200 表示 Shuffle 后生成 200 个分区。
    • 并行度过低:单个 Reduce Task 处理数据量大,容易倾斜;过高:小文件多,磁盘 IO 开销大。

四、总结 Shuffle 流程(图示简化)

输入数据(分布在 Executor 1、2)↓
Map 阶段:1. 分区:按 Key 哈希分配到 Reduce 分区(0、1、2)2. 排序:按 (分区 ID, Key) 排序3. 写入:生成 .data(数据)和 .index(索引)文件↓
Reduce 阶段:1. 定位:通过 MapOutputTracker 获取目标分区的 Map 输出路径2. 拉取:从所有 Map 节点 Fetch 属于自己的分区数据3. 合并:归并排序所有拉取的数据(按 Key 汇聚)4. 计算:对同一 Key 执行聚合操作,输出结果

通过这个过程,原本分散在不同节点的相同 Key 数据被汇聚,为后续的聚合、连接等操作提供了基础。Shuffle 是 Spark 性能的核心瓶颈之一,优化 Shuffle 通常需要调整并行度、解决数据倾斜、合理配置内存和压缩参数等。

http://www.dtcms.com/a/507790.html

相关文章:

  • 前端HTML常用基础标
  • 智能井盖传感器如何成为智慧城市“无声卫士”?
  • Django Web 开发系列(一):视图基础与 URL 路由配置全解析
  • 【python】在Django中,执行原生SQL查询
  • 5 个 Windows 故障排除工具
  • 云南网站建设招商交换友情链接的渠道
  • 在SCNet使用异构海光DCU 部署文心21B大模型报错HIP out of memory(未调通)
  • 嘉兴网站建设优化温州快速建站公司
  • 西安自助建站公司网站没有做404页面
  • 解决Vcenter告警datastore存储容量不足问题
  • 骆驼重链抗体免疫文库构建:从动物免疫到文库质控的关键技术解析
  • BearPi小熊派 鸿蒙开发入门笔记(1)
  • 湖州品牌网站设计wordpress侧栏导航栏
  • 使用EasyExcel生成下拉列表
  • 解密面向对象三大特征:封装、继承、多态
  • 未来之窗昭和仙君(二十六)复制指定元素内容到剪贴板——东方仙盟筑基期
  • nginx压缩包在windows下如何启动和停止使用nginx
  • 桐城住房和城乡建设局网站汶上网站建设多少钱
  • 一个外贸网站要多大的空间比较好帝国cms 网站地图插件
  • 国产DSP芯片FT6678的UART接口详解C++软件开发,嵌入式软件开发,Linux
  • 第十二周 waf绕过和前端加密绕过
  • 时间服务器练习
  • access数据库做网站互联网舆情忻州
  • php企业网站模板下载有没有做外贸免费网站
  • 关于Windows中PyExecjs库中文乱码的解决
  • 算法16.0
  • 卡码网语言基础课(Python) | 16.出现频率最高的字母
  • [优选算法专题四.前缀和——NO.28 除自身以外数组的乘积]
  • 垂直门户网站怎么做如何开设一个网站
  • 第一章 FreeRTOS简介