Chukonu 阅读笔记
Chukonu:一个将原生计算引擎集成到 Spark 中的全功能高性能大数据框架
摘要
Apache Spark 是一种广泛部署的大数据分析框架,它提供了诸如弹性、负载均衡和丰富的生态系统等吸引人的特性。然而,其性能仍有很大的改进空间。尽管用原生编程语言编写的数据并行系统可以显著提高性能,但它可能需要重新实现 Spark 的许多功能才能成为一个功能齐全的系统。原生大数据系统最好只用原生语言编写计算引擎以确保高效率,并重用 Spark 提供的其他成熟功能,而不是重新实现所有内容。
希望提升spark的性能,原来为了提高性能,需要掀了spark,自己实现一些功能,本文提出的方法复用了spark。
但是 JVM 和原生世界之间的交互可能会成为瓶颈。
本文提出了一种名为Chukonu的原生大数据框架,该框架复用了Spark提供的关键大数据特性。由于我们提出的新型DAG分割方法,潜在的Spark集成开销得以减轻,甚至优于现有的纯原生大数据框架。Chukonu将DAG程序分割为运行时部分和编译时部分:运行时部分委托给Spark,以减轻由于特性实现而产生的复杂性。编译时部分进行原生编译。我们提出了一系列优化技术,应用于编译时部分,例如算子融合、向量化和压缩,以显著降低Spark集成开销。评估结果表明,在六个常用的的大数据应用上,Chukonu的加速比高达71.58倍(几何平均数为6.09倍),超过Apache Spark,并且高达7.20倍(几何平均数为2.30倍),超过纯原生框架。通过将SparkSQL生成的物理计划转换为Chukonu程序,Chukonu将SparkSQL的TPC-DS性能提高了2.29倍。
引言
大数据指的是以异构格式快速增长的大量数据集,可以从中提取有价值的信息 [31]。Apache Hadoop [2] 是一个早期的开源大数据解决方案,它包括一个分布式文件系统 (HDFS) 用于持久存储大数据,以及一个基于 MapReduce 抽象的分析框架 [13]。最近,Apache Spark [44] 引入了一种名为弹性分布式数据集 (RDD) [42] 的新抽象,以实现容错数据重用,从而支持迭代工作负载。与 Hadoop MapReduce 相比,它可以实现数量级上的性能提升。Spark 提供了一个富有表现力且易于使用的 API,可以自然地表达函数式转换、MapReduce 和连接,从而可以构建支持图计算 [19]、流处理 [9, 43]、机器学习 [25] 和 SQL 查询 [10] 的支持库。如今,Spark 已被广泛部署,用于提供大数据分析服务 [3]。
背景,大数据服务中spark很重要。
尽管Spark在内存数据集重用方面具有优势,但最近的研究表明,其性能可以得到显著提升。例如,通过使用C++构建大数据分析框架,Thrill [11] 在典型的大数据工作负载上实现了相对于Spark的3.26倍的几何平均加速。实际上,性能提升仍有很大空间:对于Java矩阵乘法内核,切换到C语言可以带来4.4倍的加速,并且通过C编译器提供的向量化和AVX内部函数,性能可以进一步提高9.45倍 [23]。
重用的概念应该更多的是用的少,性能的显著提升。
然而,性能只是大数据处理的一个方面。Spark 提供了许多其他关键特性,例如其基于血统的弹性:大数据分析通常在多租户商品集群中执行,由于机器故障、网络抖动和抢占式调度,任务失败非常常见,这使得检查点对于处理如此频繁的故障效率不高。
跟检查点也带点关系
Spark 基于血统的容错机制允许仅重新计算部分数据,而不是全部数据。
到底什么是血统容错
lineagebased resiliency
大概意思是原生容错,或者说设计的容错。
其弹性还支持其他特性,例如负载均衡、落后者缓解和自动缩放,从而提高了集群的资源利用率。此外,Spark 的生态系统,例如带有 Web UI 的性能分析器以及与各种资源管理器的集成,使其易于在各种私有或公共云上部署、监控和分析应用程序。
webUI好评,能够借用spark生态系统好评
Thrill [11] 具有一种原生的类似 RDD 的抽象,称为 DIA,但它将数据分布紧密地耦合到物理机器,这使得弹性失效。
看来弹性意味着容易移植的特性
Husky [41] 使用上游消息日志记录容错机制,即使在没有故障的情况下也会产生不可忽略的开销 [40, 42]。与 Spark 相比,它们缺乏成为功能完备的系统所需的许多基本特性。
显然,拥有一个功能齐全的原生大数据框架是可取的。
这是目标
一个直接的解决方案是用原生编程语言重新实现一个新的纯原生框架,这既直接又在理论上可行,但成本可能高得令人望而却步,而且是不必要的:Spark 3.0.1 的核心组件有 74K 行代码1,其中只有 9K 行与编程框架直接相关,包括 RDD API 和算子实现,其余是为各种大数据功能服务的组件。Ousterhout 等人 [32] 已经揭示,计算是 Spark 的瓶颈。
这启发我们设计一个高效的大数据框架,通过构建一个可以重用 Spark 完善的大数据功能(超过 Spark 核心 LoC 的 87%)的原生编程框架,如图 1 所示。
复用了87%
然而,特征重用方法带来了一些挑战。首先,现有的纯原生大数据框架与Spark的执行模型不兼容,使得将它们集成到Spark中变得不可行。例如,Thrill将每个数据集分区耦合到特定的机器,而Husky依赖于有状态的任务执行,这违反了Spark的动态调度和无状态假设。
大概感觉是,我要从thrill和husky借一点东西放到spark里边,但是借用没有那么容易
其次,JVM和原生世界之间细粒度的交互,无论是通过JNI还是JNA,都会产生很高的开销[15, 35],这有可能成为一个新的性能瓶颈。因此,需要一个尊重Spark执行模型的原生编程框架,并且最大限度地减少与Spark集成相关的开销至关重要。
减少兼容性带来的问题
本文介绍了Chukonu,一个原生的、具有完整功能和高性能的大数据框架,它同时采用了特征重用的方法。它通过集成到Spark并重用其特性来支持关键的大数据功能。Chukonu的开发投入了合理的人力,包括9K行的C++代码和1.5K行的Scala代码,因为它重用了已充分开发的大数据功能。尽管存在高集成开销的风险,Chukonu已成功克服了这一挑战:其性能甚至超越了为性能而设计的现有纯原生大数据框架。
这归功于我们新颖的方法,该方法将DAG程序拆分为编译时部分和运行时部分:编译时部分处理诸如转换、数据过滤和按分区排序等简单数据流行为。运行时部分处理诸如调度、缓存和通信等复杂的数据流行为。Chukonu将运行时部分委托给Spark,从而减轻了复杂性。
转换,数据过滤,按分区排序,是开发最核心的内容。
为了减少Spark集成的开销,Chukonu应用了一系列针对编译时部分的优化:算子融合消除了不必要的JNA调用。向量化将每个元素的数据处理分批进行,以减少JNA调用的次数。压缩将一批元素紧凑地存储在几个缓冲区中,以便在JNA调用中消除数据复制。
为了在Spark中执行运行时部分,Chukonu实现了对Spark的轻量级封装。它是非侵入式的,可以轻松提交到现有环境,而无需重新配置现有集群资源管理器或重新编译现有Spark。
不需要重新编译spark,是一个即插即用的模块。
它还进行了一些增强,以进一步减少Spark集成开销,包括优化的序列化快速路径、显式指针传递和高效的数据加载。
尽管Chukonu的性能更优越,但编译时部分所需的优化会延长编译时间。这使得Chukonu不适合即席分析,因为缩短的执行时间不足以弥补额外编译时间带来的损失。
复杂的编译过程,不适合在线
本文做出以下贡献:
(1)我们提出了一种将原生计算引擎以低开销集成到 Spark 中的方法,通过将 DAG 程序拆分为编译时部分和运行时部分,分别用于原生编译和 Spark 执行。
什么是原生计算引擎->就是一些加速运行的组件,转换,数据过滤,按分区排序,这样的思路。
(2)我们开发了一系列优化措施,以减少JNA调用次数并减轻JNA调用开销,包括算子融合、向量化和压缩。
JNA
JNA(Java Native Access)是 Java 平台上的一个开源库,用于实现 Java 代码与本地代码(如 C、C++ 编写的代码或动态链接库)的交互。
简单来说,Java 语言本身运行在 JVM(Java 虚拟机)中,受限于虚拟机的沙箱机制,直接调用操作系统底层的本地库(如 Windows 的.dll 文件、Linux 的.so 文件)并不方便。而 JNA 提供了一种简化的方式,让 Java 程序可以直接访问本地代码的函数和数据结构,无需像传统的 JNI(Java Native Interface)那样编写大量繁琐的中间适配代码。
在实际开发中,JNA 常被用于:
调用操作系统提供的底层 API(如系统硬件操作、特殊系统功能);
复用已有的 C/C++ 库,避免重复开发;
优化性能敏感的模块(某些场景下本地代码执行效率更高)。
不过,JNA 调用本地代码时存在一定的开销(如数据类型转换、跨虚拟机上下文切换等),因此在高频率调用场景下,需要像你提到的 “算子融合、向量化和压缩” 等优化措施来减少调用次数、降低开销。
(3)据我们所知,这是首个基于有向无环图(DAG)拆分方法构建的大数据框架的实现,它具有Spark核心的全部功能,并且能够提供与纯原生框架相媲美的性能。
(4)我们通过将Chukonu与Spark和纯原生基线框架进行比较,从而对其进行了仔细的评估。
我们使用 YARN 管理的内部 Hadoop 集群评估了 Chukonu,并将 Chukonu 与 Spark RDD、Spark Tungsten [7]、Husky [41] 和 Thrill [11] 进行了比较。选择了六个大数据应用程序来评估这些系统。我们还实现了一个代码生成器,可以将 SparkSQL 生成的物理计划转换为 Chukonu 程序。TPC-DS 基准测试的所有查询都用于评估 Spark 和 Chukonu 的结构化分析能力。**尽管它站在 Apache Spark 的肩膀上,但结果表明 Chukonu 提供了卓越的性能。**在六个大数据应用程序上,与 Spark、Husky 和 Thrill 相比,其平均加速比分别为 6.09×(最高 71.58×)、2.53×(最高 7.20×)和 2.08×(最高 3.45×)。对于 TPC-DS 基准测试,与 Spark 相比,其总查询执行时间的加速比为 2.29×(Q67 最高可达 5.14×)。这表明 Chukonu 的性能明显优于 Spark,并且具有与纯原生框架相媲美的性能,这证明了 DAG 分割方法的合理性。
背景
数据并行系统。数据并行系统[1, 2, 13, 20, 22, 34, 44]为用户提供了一种编程抽象,以隐藏分布式处理的复杂细节。MapReduce [13] 是一种用户友好的编程模型,它能够基于重新执行实现低开销的容错。集群计算中需要这样做以抵抗服务器故障或抢占。Dryad [22] 引入了数据流图以促进多阶段管道。随后提出了 RDD [42] 和 Apache Spark [44],以实现迭代工作负载的内存数据重用。这提供了基于谱系的容错,以提供低开销的弹性。Piccolo [34] 提供了一种替代方法,该方法将用户暴露于分布式共享可变 KV 接口。它依赖于检查点来实现容错。
反正都是为了隐藏分布式细节,对于用户高度抽象,对于底层高度优化,包括对于效率的考量和对于容错/复用的优化
尽管Spark在内存数据集重用方面具有优势,但最近的研究表明,其性能可以得到显著提升。例如,通过使用C++构建大数据分析框架,Thrill [11] 在典型的大数据工作负载上实现了相对于Spark的3.26倍的几何平均加速。实际上,性能提升仍有很大空间:对于Java矩阵乘法内核,切换到C语言可以带来4.4倍的加速,并且通过C编译器提供的向量化和AVX内部函数,性能可以进一步提高9.45倍 [23]。
把spark换掉,性能显著提升,把某个算子换掉,性能还是显著提升。
在JVM中优化大数据系统。一些研究已经考察了在JVM中优化现有大数据系统的性能。在JVM中进行优化允许API保持不变,从而用户应用程序无需更改。Facade [30]、Lu等[24]和Gerenuk [27]提出了用户自定义函数的自动转换,使其能够支持以序列化形式访问数据,并通过连续存储序列化数据来减少垃圾回收开销。NumaGiC [18]提出了一个NUMA感知的JVM垃圾回收器。Yak [29]提出了一个大数据友好的垃圾回收器,该回收器能够感知大数据的对象生命周期模式。
原生集成到 Apache Spark 中。利用原生编译的能力有助于消除 JVM 的开销,并实现显著的加速 [8, 15, 25, 35, 38]。
java虚拟机层次带来的灵活性,同时也意味着开销
Rosenfeld 等人 [35] 在 C++ 原生引擎中加速了 SparkSQL,但将 Java UDF 留在嵌入式 JVM 中。Flare [15] 通过将整个查询计划原生编译到单机原生后端,与 SparkSQL 相比,展现了数量级的加速,但放弃了 Spark 对分布式执行的支持。Anderson 等人 [8] 通过将 MPI [36] 程序集成到 Spark 中,提供了数量级的加速。然而,MPI 不支持商品服务器上大数据分析所需的低开销容错 [13]。MLlib [25] 和 SparkJNI [38] 通过卸载到原生内核函数来加速性能关键的计算,但这会导致在 JVM 世界中没有原生内核的计算,这可能成为一个新的瓶颈,并产生跨语言的数据编组开销。
这些优化都要舍弃一些事情。
纯原生大数据系统。一些研究已经从头开始构建原生系统以提高性能[11, 21, 41]。Thrill [11] 提出了一种基于分布式不可变数组 (DIA) 内存抽象的原生数据并行系统,该系统与 MPI 的执行模型紧密耦合,并且不支持像 RDD 那样基于血统的容错。Husky [41] 和 Tangram [21] 引入了数据突变以实现细粒度的访问和异步执行,但代价是昂贵的数据日志记录和用于容错的检查点。