当前位置: 首页 > news >正文

Spark中的堆外和堆内内存以及内部行数据表示UnsafeRow

背景

本文基于 Spark v4.0.0
写此文章的目的是为了说明 Spark计算引擎在堆内和堆外内存的处理,以及在内部行处理的优化。

分析

堆外和堆内内存

Spark中的内存分为执行内存和存储内存,
执行内存用来在用来缓存在shufle过程中的一些数据,比如说 shuffle 用来排序的内存。
存储内存用来缓存RDD数据以及broadcast的数据等。

具体的代码在 MemoryManager 中:

 private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {tungstenMemoryMode match {case MemoryMode.ON_HEAP => MemoryAllocator.HEAPcase MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE}}
...MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();MemoryAllocator HEAP = new HeapMemoryAllocator();
  • 执行内存中如何被使用
    ShuffleExternalSorter举例,在进行shuffle数据写入的时候,会经过如下数据流:
UnsafeShuffleWriter.write||\/
insertRecordIntoSorter||\/
ShuffleExternalSorter.insertRecord||\/
acquireNewPageIfNecessary||\/
allocatePage||\/
TaskMemoryManager.allocatePage||\/
memoryManager.tungstenMemoryAllocator().allocate(acquired)||\/
MemoryAllocator.allocate // UnsafeMemoryAllocator 或者 HeapMemoryAllocator
  • 存储内存如何被使用
    以缓存RDD的结果为例,经过的数据流如下:
 Executor.run||\/env.blockManager.putByte||\/BlockStoreUpdater.save()||\/saveSerializedValuesToMemoryStore||\/MemoryManager.putBytes||\/MemoryManager.acquireStorageMemory||\/storagePool.acquireMemory // StorageMemoryPool(this, MemoryMode.ON_HEAP)或者StorageMemoryPool(this, MemoryMode.OFF_HEAP)
  • 堆内堆外内存的分配和释放

UnsafeMemoryAllocator.allocate 的方法调用Unsafe.allocateMemory的方法,得到一个native的地址offeset:

 long address = Platform.allocateMemory(size);MemoryBlock memory = new MemoryBlock(null, address, size);

UnsafeMemoryAllocator.free直接调用 Platform.freeMemory(memory.offset);进行内存的释放:

Platform.freeMemory(memory.offset);

HeapMemoryAllocator.allocate的方法直接new一个Long类型的数组:

long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);

HeapMemoryAllocator.free的方法是通过释放java对象的引用来达到释放该内存对象的目的:

 memory.setObjAndOffset(null, 0);LinkedList<WeakReference<long[]>> pool =bufferPoolsBySize.computeIfAbsent(alignedSize, k -> new LinkedList<>());pool.add(new WeakReference<>(array));

并且通过弱引用来引用该对象,方便二次利用(如果该对象没有被gc释放的话)。

UnsafeRow

这个UnsafeRow是Spark InternalRow的一种实现,他的作用

  1. 是减少 JVM GC 的压力
  2. 将 JVM 对象序列化为字节数组进行存储,且不影响正常的数据操作,减少了数据存储内存,可以精确计算内存的使用情况
  3. 减少了 Shuffle 过程中序列化和反序列化的的消耗
  • 减少GC压力
    相比SpecificInternalRow / GenericInternalRow 都是以Array进行存储的的,而且Scala中所有的数据类型都是对象,没有java中原生类型的概念。
    比如说 Int final abstract class Int extends _root_.scala.AnyVal 其实是一个抽象类。
    这种采用了对象存储的方式,会天然存在GC的问题
    而 UnsafeRow 采用字节数组的形式,只有单个字节数组的对象,不像SpecificInternalRow 中的的每一个字段都是可以GC的对象,这样在GC标记可达对象阶段,可以减少时间

  • 不影响正常的数据操作,减少了数据存储内存,精确计算内存的使用情况
    BaseGenericInternalRow都有对应的 setXXXgetXXX方法,用来设置或者获取对应数据类型的值,
    UnsafeRow 也有 setXXXgetXXX方法,背后是对应Unsafe.putXXXUnsafe.getXXX方法,所以说不影响正常的数据操作
    正常的jvm对象在堆中会包含对象头,实例数据,对齐填充等信息,而unsafeRow只存储对应的数据本身,节约了额外信息
    由于UnsafeRow只存储了数据,对应的占用大小就可以直接计算出来; 而jvm对象不容易计算,因为对象中的字段可能还会包含其他的引用对象

  • 减少了 Shuffle 过程中序列化和反序列化的的消耗
    在Shuffle write写磁盘阶段,会将对应的Row数据序列化,对于是 UnsafeRow 这种序列化的话,直接把当前的byte数组的数据写入就行(参考UnsafeShuffleWriter.write方法实现),用的 ShuffleExchangeExec.serializer 的序列化 ,也就是 UnsafeRowSerializer
    而不像 JavaSerializer 这种序列化,还需要重新序列化整个对象

在shuffle 读阶段,会将数据反序列化为 内部Row, 对于 UnsafeRow 的话,直接读取对应的的字节数据到 UnsafeRow 就行了(参考 ShuffleManager.getReader 实现),用的是ShuffleExchangeExec.serializer.deserializeStream.asKeyValueIterator方法.

对于转换的问题可以参考SparkSQL InternalRow


文章转载自:

http://htREuFWc.crqpL.cn
http://ncmweZye.crqpL.cn
http://H8TWHuQ0.crqpL.cn
http://ellgSR83.crqpL.cn
http://ZO3AJj74.crqpL.cn
http://BrTZm4s5.crqpL.cn
http://j7qs4nHU.crqpL.cn
http://qmSWNxBB.crqpL.cn
http://YOZNVbbv.crqpL.cn
http://Yh6qSnn5.crqpL.cn
http://ujA5Hbeu.crqpL.cn
http://GYedOWHw.crqpL.cn
http://raV0vvWY.crqpL.cn
http://yCvdr6pc.crqpL.cn
http://J0FFVHvv.crqpL.cn
http://jFT2IVk9.crqpL.cn
http://wZzlQZEy.crqpL.cn
http://HmeYBFH9.crqpL.cn
http://iOyuMDor.crqpL.cn
http://cGFcTnC7.crqpL.cn
http://GVjt4FS4.crqpL.cn
http://pbJ0dxvl.crqpL.cn
http://iV6rd3gS.crqpL.cn
http://BPGRPxsP.crqpL.cn
http://wb95hYKu.crqpL.cn
http://hEIQeFQ3.crqpL.cn
http://nW8QUcjq.crqpL.cn
http://vmRljhCC.crqpL.cn
http://tpCjp6mF.crqpL.cn
http://XSZv92OH.crqpL.cn
http://www.dtcms.com/a/366305.html

相关文章:

  • 打造大师级渲染:10个高效工作流技巧,质效双升
  • 【混合开发】Android+Webview+VUE播放视频之视频解析工具mediainfo-Macos
  • 【EasyExcel】Excel工具类2.0
  • 基于 EasyExcel + 线程池 解决 POI 导出时的内存溢出与超时问题
  • 【CouponHub项目开发】EasyExcel解析Excel并使用线程池异步执行和延时队列兜底
  • 鸿蒙:从图库选择图片并上传到服务器
  • sqlserver2008导入excel表数据遇到的问题
  • 【MFC中OnInitDialog虚函数详解:哪个是虚函数?两个OnInitDialog的关系】
  • 算法-根据前序+中序遍历打印树的右视图
  • vite与webpack对比
  • 用AI做TikTok影视解说,全流程全自动成片,不懂外语也能做全球矩阵!
  • 开源混合专家大语言模型(DBRX)
  • GitHub 热榜项目 - 日榜(2025-09-04)
  • openEuler2403安装部署Kafka
  • CDN加速的安全隐患与解决办法
  • (E题|AI 辅助智能体测)2025年高教杯全国大学生数学建模国赛解题思路|完整代码论文集合
  • Process Explorer 学习笔记(第三章3.1.2):管理权利与提权机制解析)
  • SQL Server服务管理
  • OpenAI开放ChatGPT Projects功能,免费用户也能用了!
  • 【已更新文章+代码】2025数学建模国赛A题思路代码文章高教社杯全国大学生数学建模-烟幕干扰弹的投放策略
  • Java集合---Collection接口和Map接口
  • 应对反爬:使用Selenium模拟浏览器抓取12306动态旅游产品
  • PDF.AI-与你的PDF文档对话
  • Apache PDFBox 与 spire.pdf for java 使用记录
  • Access开发导出PDF的N种姿势,你get了吗?
  • 那些年我们一起追过的Java技术,现在真的别再追了!
  • 记一次 Nuxt 3 + pnpm Monorepo 中的依赖地狱:`@unhead/vue` 引发的致命错误
  • 前端基础(四十三):文本数据解析为键值对
  • vue3入门- script setup详解上
  • JS(DOM对象)