当前位置: 首页 > news >正文

Spark Memory 内存设计的核心组件

1、内存区域划分

1、driver

+-------------------------------------------------------+
|                  Driver 进程总内存                     |
+-------------------------------------------------------+
|  +------------------+  +----------------------------+ |
|  |   JVM 堆内内存    |  |       堆外内存区域           | |
|  | (spark.driver.   |  |                            | |
|  |     memory)      |  +----------------------------+ |
|  +------------------+  |  1. Spark管理堆外内存池      | |
|  |                  |  | (可选,通常不启用)           | |
|  | +--------------+ |  +----------------------------+ |
|  | | 统一内存池    | |  |  2. JVM/系统开销内存         | |
|  | | (60%默认)     | |  | (spark.driver.memoryOverhead)| 
|  | | +----------+ | |  |   - JVM 开销                | |
|  | | | 存储内存  | | |  |   - 网络缓冲区              | |
|  | | | (主区域)  | | |  |   - 广播变量传输缓冲区       | |
|  | | +----------+ | |  |   - 收集结果缓冲区           | |
|  | |              | |  +----------------------------+ |
|  | | [执行内存]    | |                                 |
|  | | (极小)       | |                                 |
|  | +--------------+ |                                 |
|  | +--------------+ |                                 |
|  | | 用户内存      | |                                 |
|  | | (40%默认)     | |                                 |
|  | |   - 收集数据   | |                                 |
|  | |   - 应用状态   | |                                 |
|  | +--------------+ |                                 |
|  +------------------+                                 |
+-------------------------------------------------------+

2、executor

+-------------------------------------------------------+
|                  Executor 进程总内存                   |
+-------------------------------------------------------+
|                                                       |
|  +------------------+  +----------------------------+ |
|  |   JVM 堆内内存    |  |       堆外内存区域           | |
|  | (spark.executor. |  |                            | |
|  |     memory)      |  +----------------------------+ |
|  +------------------+  |  1. Spark管理堆外内存池      | |
|  |                  |  | (spark.memory.offHeap.size) | |
|  | +--------------+ |  |   - 存储内存 (Storage)       | |
|  | | 统一内存池    | |  |   - 执行内存 (Execution)     | |
|  | | (60%默认)     | |  +----------------------------+ |
|  | | +----------+ | |  |  2. JVM/系统开销内存         | |
|  | | | 执行内存  | | |  | (spark.executor.memoryOverhead)| 
|  | | | (50%默认)| | |  |   - JVM 自身开销             | |
|  | | +----------+ | |  |   - 线程栈                   | |
|  | | +----------+ | |  |   - 本地库(NIO/Native)       | |
|  | | | 存储内存  | | |  |   - 内存映射文件(MMAP)       | |
|  | | | (50%默认)| | |  |   - PySpark/Python进程       | |
|  | | +----------+ | |  |   - 用户代码直接分配堆外内存   | |
|  | +--------------+ |  +----------------------------+ |
|  | +--------------+ |                                 |
|  | | 用户内存      | |                                 |
|  | | (40%默认)     | |                                 |
|  | |   - UDF对象   | |                                 |
|  | |   - 用户数据  | |                                 |
|  | +--------------+ |                                 |
|  +------------------+                                 |
+-------------------------------------------------------+
参数目标问题配置关键点
spark.memory.offHeap.enabledGC压力、超大内存 、大量缓存/Shufflespark.memory.offHeap.enabled、spark.memory.offHeap.size
spark.executor.memoryOverheadpyspark、大量shuffle、kyro序列化、 使用 PySpark/本地库spark.executor.memoryOverhead

2、确定内存消耗

1、估算数据集所需的内存消耗量的最佳方法是创建一个 RDD,将其放入缓存中,并查看 Web UI 中的“存储”页面。该页面会告诉您 RDD 占用了多少内存。

2、要估算某个对象的内存消耗,请使用 SizeEstimatorestimate 方法。这在尝试不同的数据布局以减少内存使用量以及确定广播变量在每个执行器堆中所占的空间时非常有用。
*https://www.infoworld.com/article/2077408/sizeof-for-java.html*

3、Driver、Executor 内存

Driver存储元数据(RDD依赖、Task调度状态)MemoryManager(仅Storage)
Executor运行Task,处理Shuffle/缓存数据TaskMemoryManager + MemoryConsumer

每个Task独占一个TaskMemoryManager,管理其生命周期内的内存分配

//Executor.scala
override def run(): Unit = {......**val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)**val deserializeStartTimeNs = System.nanoTime()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0LThread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)val ser = env.closureSerializer.newInstance()logInfo(log"Running ${LogMDC(TASK_NAME, taskName)}")execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)var taskStartTimeNs: Long = 0var taskStartCpu: Long = 0startGCTime = computeTotalGcTime()var taskStarted: Boolean = falsetry {// Must be set before updateDependencies() is called, in case fetching dependencies// requires access to properties contained within (e.g. for access control).Executor.taskDeserializationProps.set(taskDescription.properties)updateDependencies(taskDescription.artifacts.files,taskDescription.artifacts.jars,taskDescription.artifacts.archives,isolatedSession)// Always reset the thread class loader to ensure if any updates, all threads (not only// the thread that updated the dependencies) can update to the new class loader.Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)task.localProperties = taskDescription.properties**task.setTaskMemoryManager(taskMemoryManager)**......} finally {cleanMDCForTask(taskName, mdcProperties)runningTasks.remove(taskId)if (taskStarted) {// This means the task was successfully deserialized, its stageId and stageAttemptId// are known, and metricsPoller.onTaskStart was called.metricsPoller.onTaskCompletion(taskId, task.stageId, task.stageAttemptId)}}}

4、核心组件

1、MemoryManager → UnifiedMemoryManager

**统一内存池 + 动态借用。**它在执行和存储之间强制执行软边界,以便任何一方都可以从另一方借用内存。

参数默认值作用计算公式
spark.memory.fraction0.6统一内存池占总可用堆内存(扣除 300MB)的比例Spark Memory = (总堆内存 - 300MB) × 0.6
spark.memory.storageFraction0.5Storage 内存在统一内存池中的初始占比初始 Storage 内存 = Spark Memory × 0.5

1、交互流程

  • Execution内存不足时

强制从Storage“借用”内存(若Storage有缓存,则溢写磁盘)。

  • Storage内存不足时

可借用空闲Execution内存,但Execution需用时能强制收回。

  • Task间公平调度

ExecutionMemoryPool保证每个Task最小值总执行内存/(2*并行Task数)内存,最大值总执行内存 / 并行Task数

private[memory] def acquireMemory(numBytes: Long,taskAttemptId: Long,maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => (),computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")// TODO: clean up this clunky method signature// Add this task to the taskMemory map just so we can keep an accurate count of the number// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`if (!memoryForTask.contains(taskAttemptId)) {memoryForTask(taskAttemptId) = 0L// This will later cause waiting tasks to wake up and check numTasks againlock.notifyAll()}// Keep looping until we're either sure that we don't want to grant this request (because this// task would have more than 1 / numActiveTasks of the memory) or we have enough free// memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).// TODO: simplify this to limit each task to its own slotwhile (true) {val numActiveTasks = memoryForTask.keys.sizeval curMem = memoryForTask(taskAttemptId)// In every iteration of this loop, we should first try to reclaim any borrowed execution// space from storage. This is necessary because of the potential race condition where new// storage blocks may steal the free execution memory that this task was waiting for.maybeGrowPool(numBytes - memoryFree)// Maximum size the pool would have after potentially growing the pool.// This is used to compute the upper bound of how much memory each task can occupy. This// must take into account potential free memory as well as the amount this pool currently// occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,// we did not take into account space that could have been freed by evicting cached blocks.val maxPoolSize = computeMaxPoolSize()val maxMemoryPerTask = maxPoolSize / numActiveTasks**val minMemoryPerTask = poolSize / (2 * numActiveTasks)**// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasksval maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))// Only give it as much memory as is free, which might be none if it reached 1 / numTasksval toGrant = math.min(maxToGrant, memoryFree)// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;// if we can't give it this much now, wait for other tasks to free up memory// (this happens if older tasks allocated lots of memory before N grew)if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {logInfo(log"TID ${MDC(TASK_ATTEMPT_ID, taskAttemptId)} waiting for at least 1/2N of" +log" ${MDC(POOL_NAME, poolName)} pool to be free")lock.wait()} else {memoryForTask(taskAttemptId) += toGrantreturn toGrant}}0L  // Never reached}

2、配置优化建议

  1. 缓存密集型任务(如迭代算法)
    • 提高 spark.memory.storageFraction(如 0.6),确保更多缓存数据保留在内存中。
  2. Shuffle 密集型任务(如大数据关联)
    • 降低 spark.memory.storageFraction(如 0.4),优先保障 Execution 内存,避免 Shuffle 溢写磁盘

2、MemoryPool

  • StorageMemoryPool:记录Storage内存使用。
  • ExecutionMemoryPool:记录Execution内存使用。

3. TaskMemoryManager

  • 职责:管理单个Task的内存(堆内/堆外),通过页式内存抽象(Page)统一寻址。
  • 关键机制
    • 将内存划分为Page(通过MemoryBlock表示)。
    • 使用13位页号 + 51位偏移量的64位地址编码,统一寻址堆内/堆外内存。
    • 允许我们寻址 8192 (2^31 - 1) 8 字节,大约是 140 TB 的内存。
  • 依赖MemoryAllocator分配物理内存,HeapMemoryAllocator/UnsafeMemoryAllocator
特性堆内内存堆外内存
分配方式JVM 堆内 long[]Unsafe.allocateMemory()
地址空间对象引用 + 偏移量直接内存地址
GC 影响受GC暂停影响不受GC影响

4. MemoryAllocator

物理内存分配器:

  • HeapMemoryAllocator:分配堆内内存(基于long[]数组)。
  • UnsafeMemoryAllocator:通过sun.misc.Unsafe直接分配堆外内存(避免GC开销)。

5. MemoryConsumer

抽象类,代表需要内存的组件(如Shuffle、排序)。关键方法:

  • spill():内存不足时将数据溢写到磁盘。
  • acquireMemory():向TaskMemoryManager申请内存。
http://www.dtcms.com/a/319409.html

相关文章:

  • 2025年主流开源音视频播放项目深度解析
  • 数据结构——B-树、B+树、B*树
  • flutter-使用AnimatedDefaultTextStyle实现文本动画
  • 状压DP-子集枚举技巧
  • MySQL UNION 操作符详细说明
  • 机器视觉系统工业相机的成像原理及如何选型
  • 数据结构-哈希表(散列表)
  • 进程控制:进程的创建、终止、阻塞、唤醒、切换等生命周期管理操作
  • 基于深度学习的调制信号分类识别算法的研究生学习之旅
  • C语言sprintf、strcmp、strcpy、strcat函数详解:字符串操作的核心工具
  • Modbus转Profinet网关与西门子PLC的互联配置案例:用于永宏品牌变频器的控制实现
  • 一个基于 epoll 实现的多路复用 TCP 服务器程序,相比 select 和 poll 具有更高的效率
  • 并发编程(三)线程模型和通信
  • 【AI算法承载】海思3516DV500+IMX664方案一体机芯,开放AI算法部署二次开发
  • 蓝桥杯----数码管、按键、定时器与中断
  • PTrade详细介绍
  • 【遥感图像入门】遥感中的“景”是什么意思?
  • 深入理解 ReentrantLock和AQS底层源码
  • 专题:2025财务转型与AI赋能数字化报告|附30+份报告PDF汇总下载
  • 《深入解析缓存三大难题:穿透、雪崩、击穿及应对之道》
  • cv2.threshold cv2.morphologyEx
  • 宝塔面板配置Nacos集群
  • Plant Biotechnol J(IF=10.5)|DAP-seq助力揭示葡萄白粉病抗性机制
  • 什么是POE接口?通俗理解
  • Pytest项目_day07(pytest)
  • MySql MVCC的原理总结
  • S7-1200 串行通信介绍
  • 配送算法9 A GRASP algorithm for the Meal Delivery Routing Problem
  • React 中 useRef 使用方法
  • 设计模式 观察者模式