当前位置: 首页 > news >正文

Spark核心Shuffle详解(二)ShuffleHandler

1、ShuffleWriter

ShuffleWriter 是一个抽象类,定义了 map 任务将中间结果输出到磁盘上的规范。ShuffleWriter 有三个实现类:SortShuffleWriter、BypassMergeSortShuffleWriter 和 UnsafeShuffleWriter。

1、BypassMergeSortShuffleWriter

BypassMergeSortShuffleWriter是Bypass机制的具体实现,其设计目标是在适当场景下消除排序开销

1、实现原理

BypassMergeSortShuffleWriter的核心工作流程分为三个阶段:

  1. 分区缓冲区初始化:为每个Reduce分区(分区数N≤200)创建独立的磁盘写缓冲区和临时文件。
  2. 数据分发与写入:对于每条记录,计算其目标分区ID(通过key的hashcode对N取模),然后直接追加到对应分区的缓冲区。缓冲区满时,同步将数据溢写到对应分区的临时文件中。
  3. 文件合并:处理完所有数据后,将N个临时分区文件合并为一个数据文件,并创建索引文件。这一步是关键创新,既保持了Hash Shuffle避免排序的优点,又通过文件合并解决了小文件问题。

2、UnsafeShuffleWriter

UnsafeShuffleWriter是Spark Tungsten项目的重要组成部分,直接操作堆外内存,旨在消除JVM开销并提升内存使用效率。直接通过Java Unsafe API管理内存,避免JVM对象开销和GC压力。

3、SortShuffleWriter

SortShuffleWriter是SortShuffleManager的默认实现,采用了"内存聚合→排序→溢写→合并"。

1、内存管理与数据结构

SortShuffleWriter根据算子类型选择不同的内存数据结构:

  • Map数据结构(PartitionedAppendOnlyMap:用于聚合类算子(如reduceByKey)。该结构结合了Map和Array的特性,支持高效的数据更新和聚合操作。其核心优势包括:
    • 使用线性探测法解决哈希冲突,减少指针开销
    • 支持原地更新聚合值,避免创建新对象
    • 在内存不足时能够扩展并溢出到磁盘
  • Array数据结构(PartitionedPairBuffer:用于非聚合类算子。该结构简单追加(key, value)对,内存效率更高。当数据量超过阈值时,触发排序和溢写操作。
//ExternalSorter
@volatile private var map = new PartitionedAppendOnlyMap[K, C]
@volatile private var buffer = new PartitionedPairBuffer[K, C]

2、 溢出与合并机制

SortShuffleWriter采用多级溢出-合并策略平衡内存使用和磁盘I/O:

  1. 内存填充与排序:数据不断添加到内存数据结构,当达到阈值(spark.shuffle.spill.initialMemoryThreshold默认5MB)时,根据分区ID和Key进行排序。
  2. 批次溢写:排序后的数据以批次(默认10,000条)形式写入临时磁盘文件,减少磁盘I/O次数。
  3. 全局合并:最终将内存中剩余数据与所有临时磁盘文件进行归并排序,生成最终输出。这一过程使用高效的最小堆算法,只需一次扫描即可完成多路合并。
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {var shouldSpill = falseif (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {// Claim up to double our current memory from the shuffle memory poolval amountToRequest = 2 * currentMemory - myMemoryThresholdval granted = acquireMemory(amountToRequest)myMemoryThreshold += granted// If we were granted too little memory to grow further (either tryToAcquire returned 0,// or we already had more memory than myMemoryThreshold), spill the current collectionshouldSpill = currentMemory >= myMemoryThreshold}shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold// Actually spillif (shouldSpill) {_spillCount += 1logSpillage(currentMemory)spill(collection)_elementsRead = 0_memoryBytesSpilled += currentMemoryreleaseMemory()}shouldSpill}
//spark.shuffle.spill.batchSize
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: Iterator[(K, C)]): DiskMapIterator = {val (blockId, file) = diskBlockManager.createTempLocalBlock()val writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetrics)var objectsWritten = 0// List of batch sizes (bytes) in the order they are written to diskval batchSizes = new ArrayBuffer[Long]// Flush the disk writer's contents to disk, and update relevant variablesdef flush(): Unit = {val segment = writer.commitAndGet()batchSizes += segment.length_diskBytesSpilled += segment.lengthobjectsWritten = 0}var success = falsetry {while (inMemoryIterator.hasNext) {val kv = inMemoryIterator.next()writer.write(kv._1, kv._2)objectsWritten += 1if (objectsWritten == serializerBatchSize) {flush()}}if (objectsWritten > 0) {flush()writer.close()} else {writer.revertPartialWritesAndClose()}success = true} finally {if (!success) {// This code path only happens if an exception was thrown above before we set success;// close our stuff and let the exception be thrown furtherwriter.closeAndDelete()}}new DiskMapIterator(file, blockId, batchSizes)}
private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]): Iterator[Product2[K, C]] = {val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)type Iter = BufferedIterator[Product2[K, C]]// Use the reverse order (compare(y,x)) because PriorityQueue dequeues the maxval heap = new mutable.PriorityQueue[Iter]()((x: Iter, y: Iter) => comparator.compare(y.head._1, x.head._1))heap.enqueue(bufferedIters: _*)  // Will contain only the iterators with hasNext = truenew Iterator[Product2[K, C]] {override def hasNext: Boolean = heap.nonEmptyoverride def next(): Product2[K, C] = {if (!hasNext) {throw new NoSuchElementException}val firstBuf = heap.dequeue()val firstPair = firstBuf.next()if (firstBuf.hasNext) {heap.enqueue(firstBuf)}firstPair}}}

2、ShuffleReader

1、BlockStoreShuffleReader

Shuffle Read阶段负责从各个Map Task的输出中获取属于当前Reduce分区的数据,并进行必要的处理(如聚合、排序),为后续计算阶段提供输入。ShuffleReader的设计直接影响Reduce Task的性能和稳定性。

1、实现原理

第1步:获取Map输出状态 (SortShuffleManager##getReader

  • Reduce Task首先通过 MapOutputTracker(主节点上的 MapOutputTrackerMaster)获取当前Stage中所有Map Task的输出状态信息(MapStatus)。
  • MapStatus 包含了每个Map Task输出的数据位置(Executor的Host和Port)以及每个Result Partition的大小。

第2步:创建拉取迭代器 - ShuffleBlockFetcherIterator (BlockStoreShuffleReader##read)

这是数据拉取的核心。它会:

  1. 划分请求: 根据 MapStatus,将需要拉取的Block分为两组:
    • 本地请求: 如果Map Task和Reduce Task在同一个Executor上,则直接通过 BlockManager 从本地磁盘读取。
    • 远程请求: 需要通过网络从其他Executor拉取。
  2. 并发拉取: 它会并发的发起多个远程请求(默认并行度为 spark.reducer.maxReqsInFlight,通常为1,但可能不是最优配置),以加速数据拉取过程。
  3. 流式解压与反序列化: 拉取到的数据是以序列化、压缩的字节流形式存在的。ShuffleBlockFetcherIterator 会边拉取边进行解压和反序列化,将其转换为一条条的 (Key, Value) 记录。
  4. 容错与重试: 如果某个Block拉取失败,它会进行重试。

第3步:聚合与排序 - 关键的“组合器”(BlockStoreShuffleReader##read)

反序列化后的记录流会被传递给一个“组合器”进行处理。根据Shuffle的依赖类型(AggregatorOrdering),这里的行为有所不同:

  • mapSideCombine = false(默认/未在Map端聚合):
    • 无聚合、无排序: 直接返回记录流。例如 groupBy() 的某些情况。
    • 有聚合: 使用 Aggregator。Spark会使用一个哈希表(ExternalAppendOnlyMap)来在内存中维护聚合状态。当内存不足时,它会将内存中的数据溢写(Spill) 到磁盘,最后再合并所有内存和磁盘文件。这就是“外部聚合”。
    • 有排序: 使用 Ordering。Spark会使用一个堆排序器(ExternalSorter)。同样,当内存不足时,它会进行溢写和归并排序。
  • mapSideCombine = true(在Map端已聚合):
    • Reduce端接收到的已经是部分聚合的结果。此时,Reduce端的聚合器只需要对这些“部分聚合结果”进行最终合并,逻辑相同,但数据量更小,效率更高。

第4步:输出最终结果

经过聚合和排序后的最终结果,会以一个迭代器的形式返回给ShuffleReader的调用者(通常是Result Task的 runTask 方法)。Task会遍历这个迭代器,并将最终结果写入到自己的输出存储中(或直接返回给Driver)。

3、Shuffle状态跟踪管理

1、MapOutputTracker

MapOutputTracker 是 Spark Shuffle 的核心元数据管理组件,负责跟踪和管理所有 Map 任务的输出位置信息,为 Reduce 任务提供数据位置服务。

2、MapOutputTrackerMaster (org.apache.spark)

  • 维护所有shuffle的map输出元数据,包括每个shuffle的map任务数量、每个map任务的输出位置(MapStatus)等。
  • 响应Executor的MapOutputTrackerWorker的请求,提供map输出位置信息。
  • 当shuffle完成后,清理对应的元数据。
// HashMap for storing shuffleStatuses in the driver.
// Statuses are dropped only by explicit de-registering.
// Exposed for testing// Shuffleid -> ShuffleStatus
val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala// ShuffleStatus
// MapStatus 索引为map任务ID,值为MapStatus(包含每个map任务输出的位置和大小信息)
val mapStatuses = new Array[MapStatus](numPartitions)

3、MapOutputTrackerWorker

  • 与Driver上的MapOutputTrackerMaster通信,获取shuffle的map输出位置信息。
  • 缓存获取到的map输出位置信息,避免重复请求。
  • 当缓存的信息过期时(比如Executor丢失),清理缓存。
val mapStatuses: Map[Int, Array[MapStatus]] =new ConcurrentHashMap[Int, Array[MapStatus]]().asScalaval mergeStatuses: Map[Int, Array[MergeStatus]] =new ConcurrentHashMap[Int, Array[MergeStatus]]().asScala

4、MapStatus

MapStatus是ShuffleMapTask执行完成后向Driver报告的核心元数据,包含了两类关键信息:

  • Task运行所在的BlockManager地址
  • 该Task输出中每个Reduce分区的大小估算信息

压缩的 MapStatus CompressedMapStatus 默认

高度压缩的 MapStatus(用于大量 Reduce 分区) 当分数数>2000使用 。 当size 小于阈值 reduce size 取平均值
在这里插入图片描述

http://www.dtcms.com/a/418712.html

相关文章:

  • React Native启动性能优化实战:Hermes + RAM Bundles + 懒加载
  • 怎么做淘宝客个人网站wordpress可视化编辑插件下载
  • [C++项目框架]gflags和gtest的简单介绍
  • Vue2 和 Vue3 中使用 Vue Router 的详细过程
  • 微服务项目->在线oj系统(Java-Spring)-后台管理(2)
  • 【MySQL体系】第2篇:MySQL索引类型和原理
  • flash型的著名网站网站开发公司怎么接单
  • 【第五章:计算机视觉-项目实战之图像分割实战】2.图像分割实战:人像抠图-(1)人像抠图Image Matting算法详解
  • 使用 PyTorch 构建并训练 CNN 模型
  • 如何做电影网站狼视听seo外包优化服务商
  • blender布局工作区突然变得很卡
  • 【计算机视觉】图像去雾技术
  • 工信部网站icp备案号文艺范wordpress主题
  • 树莓派无法播放哔哩哔哩等视频
  • 华为芯片泄密案警示:用Curtain e-locker阻断内部数据泄露
  • 记一次达梦数据库的查询异常
  • 泸州市建设工程管理局网站58网站怎么做品牌推广
  • 个人主题网站设计论文北京seo推广系统
  • AI编程开发系统001-基于SpringBoot+Vue的旅游民宿租赁系统
  • 通用人工智能(AGI):从技术探索到社会重构的 2025 展望
  • 【Web前端|第五篇】Vue进阶(一):Axios工具和前端工程化
  • RISE论文阅读
  • LeetCode 416 分割等和子集
  • web开发,在线%车辆管理%系统,基于Idea,html,css,vue,java,springboot,mysql
  • 《安富莱嵌入式周报》第358期:USB4雷电开源示波器,2GHz带宽,3.2Gsps采样率,开源亚微米级精度3D运动控制平台,沉浸式8声道全景声音频录制
  • Axure: 多级多选可交互树状列表
  • 打破线制,告别电脑:积木易搭发布无线一体式3D扫描仪Toucan
  • 做电影网站的资源从哪里换wordpress新建音乐界面
  • Conda环境激活全指南:bash、conda activate与source activate详解
  • 英国网站后缀爱做的小说网站吗