Spark Memory 内存设计的核心组件
1、内存区域划分
1、driver
+-------------------------------------------------------+
| Driver 进程总内存 |
+-------------------------------------------------------+
| +------------------+ +----------------------------+ |
| | JVM 堆内内存 | | 堆外内存区域 | |
| | (spark.driver. | | | |
| | memory) | +----------------------------+ |
| +------------------+ | 1. Spark管理堆外内存池 | |
| | | | (可选,通常不启用) | |
| | +--------------+ | +----------------------------+ |
| | | 统一内存池 | | | 2. JVM/系统开销内存 | |
| | | (60%默认) | | | (spark.driver.memoryOverhead)|
| | | +----------+ | | | - JVM 开销 | |
| | | | 存储内存 | | | | - 网络缓冲区 | |
| | | | (主区域) | | | | - 广播变量传输缓冲区 | |
| | | +----------+ | | | - 收集结果缓冲区 | |
| | | | | +----------------------------+ |
| | | [执行内存] | | |
| | | (极小) | | |
| | +--------------+ | |
| | +--------------+ | |
| | | 用户内存 | | |
| | | (40%默认) | | |
| | | - 收集数据 | | |
| | | - 应用状态 | | |
| | +--------------+ | |
| +------------------+ |
+-------------------------------------------------------+
2、executor
+-------------------------------------------------------+
| Executor 进程总内存 |
+-------------------------------------------------------+
| |
| +------------------+ +----------------------------+ |
| | JVM 堆内内存 | | 堆外内存区域 | |
| | (spark.executor. | | | |
| | memory) | +----------------------------+ |
| +------------------+ | 1. Spark管理堆外内存池 | |
| | | | (spark.memory.offHeap.size) | |
| | +--------------+ | | - 存储内存 (Storage) | |
| | | 统一内存池 | | | - 执行内存 (Execution) | |
| | | (60%默认) | | +----------------------------+ |
| | | +----------+ | | | 2. JVM/系统开销内存 | |
| | | | 执行内存 | | | | (spark.executor.memoryOverhead)|
| | | | (50%默认)| | | | - JVM 自身开销 | |
| | | +----------+ | | | - 线程栈 | |
| | | +----------+ | | | - 本地库(NIO/Native) | |
| | | | 存储内存 | | | | - 内存映射文件(MMAP) | |
| | | | (50%默认)| | | | - PySpark/Python进程 | |
| | | +----------+ | | | - 用户代码直接分配堆外内存 | |
| | +--------------+ | +----------------------------+ |
| | +--------------+ | |
| | | 用户内存 | | |
| | | (40%默认) | | |
| | | - UDF对象 | | |
| | | - 用户数据 | | |
| | +--------------+ | |
| +------------------+ |
+-------------------------------------------------------+
参数 | 目标问题 | 配置关键点 |
---|---|---|
spark.memory.offHeap.enabled | GC压力、超大内存 、大量缓存/Shuffle | spark.memory.offHeap.enabled、spark.memory.offHeap.size |
spark.executor.memoryOverhead | pyspark、大量shuffle、kyro序列化、 使用 PySpark/本地库 | spark.executor.memoryOverhead |
2、确定内存消耗
1、估算数据集所需的内存消耗量的最佳方法是创建一个 RDD,将其放入缓存中,并查看 Web UI 中的“存储”页面。该页面会告诉您 RDD 占用了多少内存。
2、要估算某个对象的内存消耗,请使用 SizeEstimator
的 estimate
方法。这在尝试不同的数据布局以减少内存使用量以及确定广播变量在每个执行器堆中所占的空间时非常有用。
*https://www.infoworld.com/article/2077408/sizeof-for-java.html*
3、Driver、Executor 内存
Driver | 存储元数据(RDD依赖、Task调度状态) | MemoryManager (仅Storage) |
---|---|---|
Executor | 运行Task,处理Shuffle/缓存数据 | TaskMemoryManager + MemoryConsumer |
每个Task独占一个TaskMemoryManager
,管理其生命周期内的内存分配
//Executor.scala
override def run(): Unit = {......**val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)**val deserializeStartTimeNs = System.nanoTime()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0LThread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)val ser = env.closureSerializer.newInstance()logInfo(log"Running ${LogMDC(TASK_NAME, taskName)}")execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)var taskStartTimeNs: Long = 0var taskStartCpu: Long = 0startGCTime = computeTotalGcTime()var taskStarted: Boolean = falsetry {// Must be set before updateDependencies() is called, in case fetching dependencies// requires access to properties contained within (e.g. for access control).Executor.taskDeserializationProps.set(taskDescription.properties)updateDependencies(taskDescription.artifacts.files,taskDescription.artifacts.jars,taskDescription.artifacts.archives,isolatedSession)// Always reset the thread class loader to ensure if any updates, all threads (not only// the thread that updated the dependencies) can update to the new class loader.Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)task.localProperties = taskDescription.properties**task.setTaskMemoryManager(taskMemoryManager)**......} finally {cleanMDCForTask(taskName, mdcProperties)runningTasks.remove(taskId)if (taskStarted) {// This means the task was successfully deserialized, its stageId and stageAttemptId// are known, and metricsPoller.onTaskStart was called.metricsPoller.onTaskCompletion(taskId, task.stageId, task.stageAttemptId)}}}
4、核心组件
1、MemoryManager → UnifiedMemoryManager
**统一内存池 + 动态借用。**它在执行和存储之间强制执行软边界,以便任何一方都可以从另一方借用内存。
参数 | 默认值 | 作用 | 计算公式 |
---|---|---|---|
spark.memory.fraction | 0.6 | 统一内存池占总可用堆内存(扣除 300MB)的比例 | Spark Memory = (总堆内存 - 300MB) × 0.6 |
spark.memory.storageFraction | 0.5 | Storage 内存在统一内存池中的初始占比 | 初始 Storage 内存 = Spark Memory × 0.5 |
1、交互流程
- Execution内存不足时:
强制从Storage“借用”内存(若Storage有缓存,则溢写磁盘)。
- Storage内存不足时:
可借用空闲Execution内存,但Execution需用时能强制收回。
- Task间公平调度:
ExecutionMemoryPool
保证每个Task最小值总执行内存/(2*并行Task数)
内存,最大值:总执行内存 / 并行Task数
。
private[memory] def acquireMemory(numBytes: Long,taskAttemptId: Long,maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => (),computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")// TODO: clean up this clunky method signature// Add this task to the taskMemory map just so we can keep an accurate count of the number// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`if (!memoryForTask.contains(taskAttemptId)) {memoryForTask(taskAttemptId) = 0L// This will later cause waiting tasks to wake up and check numTasks againlock.notifyAll()}// Keep looping until we're either sure that we don't want to grant this request (because this// task would have more than 1 / numActiveTasks of the memory) or we have enough free// memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).// TODO: simplify this to limit each task to its own slotwhile (true) {val numActiveTasks = memoryForTask.keys.sizeval curMem = memoryForTask(taskAttemptId)// In every iteration of this loop, we should first try to reclaim any borrowed execution// space from storage. This is necessary because of the potential race condition where new// storage blocks may steal the free execution memory that this task was waiting for.maybeGrowPool(numBytes - memoryFree)// Maximum size the pool would have after potentially growing the pool.// This is used to compute the upper bound of how much memory each task can occupy. This// must take into account potential free memory as well as the amount this pool currently// occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,// we did not take into account space that could have been freed by evicting cached blocks.val maxPoolSize = computeMaxPoolSize()val maxMemoryPerTask = maxPoolSize / numActiveTasks**val minMemoryPerTask = poolSize / (2 * numActiveTasks)**// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasksval maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))// Only give it as much memory as is free, which might be none if it reached 1 / numTasksval toGrant = math.min(maxToGrant, memoryFree)// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;// if we can't give it this much now, wait for other tasks to free up memory// (this happens if older tasks allocated lots of memory before N grew)if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {logInfo(log"TID ${MDC(TASK_ATTEMPT_ID, taskAttemptId)} waiting for at least 1/2N of" +log" ${MDC(POOL_NAME, poolName)} pool to be free")lock.wait()} else {memoryForTask(taskAttemptId) += toGrantreturn toGrant}}0L // Never reached}
2、配置优化建议
- 缓存密集型任务(如迭代算法)
- 提高
spark.memory.storageFraction
(如0.6
),确保更多缓存数据保留在内存中。
- 提高
- Shuffle 密集型任务(如大数据关联)
- 降低
spark.memory.storageFraction
(如0.4
),优先保障 Execution 内存,避免 Shuffle 溢写磁盘
- 降低
2、MemoryPool
StorageMemoryPool
:记录Storage内存使用。ExecutionMemoryPool
:记录Execution内存使用。
3. TaskMemoryManager
- 职责:管理单个Task的内存(堆内/堆外),通过页式内存抽象(Page)统一寻址。
- 关键机制:
- 将内存划分为Page(通过
MemoryBlock
表示)。 - 使用13位页号 + 51位偏移量的64位地址编码,统一寻址堆内/堆外内存。
- 允许我们寻址 8192 (2^31 - 1) 8 字节,大约是 140 TB 的内存。
- 将内存划分为Page(通过
- 依赖:
MemoryAllocator
分配物理内存,HeapMemoryAllocator/UnsafeMemoryAllocator
。
特性 | 堆内内存 | 堆外内存 |
---|---|---|
分配方式 | JVM 堆内 long[] | Unsafe.allocateMemory() |
地址空间 | 对象引用 + 偏移量 | 直接内存地址 |
GC 影响 | 受GC暂停影响 | 不受GC影响 |
4. MemoryAllocator
物理内存分配器:
HeapMemoryAllocator
:分配堆内内存(基于long[]
数组)。UnsafeMemoryAllocator
:通过sun.misc.Unsafe
直接分配堆外内存(避免GC开销)。
5. MemoryConsumer
抽象类,代表需要内存的组件(如Shuffle、排序)。关键方法:
spill()
:内存不足时将数据溢写到磁盘。acquireMemory()
:向TaskMemoryManager
申请内存。