当前位置: 首页 > news >正文

流批一体向量化引擎Flex

摘要:本文整理自蚂蚁分布式计算引擎技术专家,Calcite Comitter、Flink Contributor 刘勇老师,在 Flink Forward Asia 2024 核心技术 (一) 专场中的分享,内容分为以下三个部分:

1、向量化技术背景

2、架构

3、未来规划

01. 向量化技术背景

1.1 什么是向量化计算

1.1.1 并行数据处理 SIMD 指令

以下面循环代码为例,计算在 CPU 内完成需经三步:

  • 加载(Load),从内存加载2个源操作数(a[i]和b[i])到2个寄存器。

  • 计算(Compute),执行加法指令,作用于2个寄存器里的源操作数副本,结果产生到目标寄存器。

  • 存储(Store),将目标寄存器的数据存入(拷贝)到目标内存位置(c[i])。

void addArrays(const int* a, const int* b, int* c, int num) {for (int i = 0; i < num; ++i) {c[i] = a[i] + b[i];}
}

该流程即对应传统的计算架构:单指令单数据(SISD)顺序架构,任意时间点只有一条指令作用于一条数据流。如果有更宽的寄存器(超机器字长,比如256位16字节),一次性从源内存同时加载更多的数据到寄存器,一条指令作用于寄存器x和y,在x和y的每个分量(比如32位4字节)上并行进行加,并将结果存入寄存器z的各对应分量,最后一次性将寄存器z里的内容存入目标内存,那么就能实现单指令并行处理数据的效果,这就是单指令多数据(SIMD)。

image

1.1.2 向量化执行框架具有的特性

执行引擎常规按行处理的方式,存在以下问题:

  • **CPU Cache 命中率差。**一行的多列(字段)数据的内存紧挨在一起,哪怕只对其中的一个字段做操作,其他字段所占的内存也需要加载进来,这会抢占稀缺的 Cache 资源。Cache 命失会导致被请求的数据从内存加载进Cache,等待内存操作完成会导致 CPU 执行指令暂停(Memory Stall),这会增加延时,还可能浪费内存带宽。

  • **变长字段影响计算效率。**假设一行包括 int、string、int 三列,其中 int 类型是固定长度,而 string 是变长的(一般表示为int len + bytes content),变长列的存在会导致无法通过行号算 offset 做快速定位。

  • 虚函数开销。对一行的多列进行处理通常会封装在一个循环里,会抽象出一个类似 handle 的接口(C++虚函数)用于处理某类型数据,各字段类型会 override 该 handle 接口。虚函数的调用多一步查表,且无法被内联,循环内高频调用虚函数的性能影响不可忽视。

image

因此,要让向量化计算发挥威力,只使用 SIMD 指令还不够,还需要对框架层面进行改造,数据按列组织

  • 数据按列组织将提高数据局部性。参与计算的列的多行数据会内存紧凑的保存在一起,CPU 可以通过预取指令将接下来要处理的数据加载进 Cache,从而减少 Memory Stall。不参与计算的列的数据不会与被处理的列竞争 Cache,这种内存交互的隔离能提高 Cache 亲和性。

  • 同一列数据在循环里被施加相同的计算。批量迭代将减少函数调用次数,通过模版能减少虚函数调用,降低运行时开销。针对固定长度类型的列很容易被并行处理(通过行号 offset 到数据),这样的执行框架也有利于让编译器做自动向量化代码生成,显著减少分支,减轻预测失败的惩罚。结合模板,编译器会为每个实参生成特定实例化代码,避免运行时查找虚函数表,并且由于编译器知道了具体的类型信息,可以对模板函数进行内联展开。

image

1.2 向量化计算在分布式计算领域的现状

随着技术的发展,向量化技术在硬件、指令集、配套工具、类库等得到多方位协同发展。

  • 在指令集层面,当前大多数机器都支持 SIMD 指令集,比如 x86 平台的 SSE、AVX 指令集,ARM 平台的 NEON指令集。

  • 在编译器层面,现代编译器如 GCC、LLVM 可以自动将代码中可向量化的部分转成SIMD指令。比如开源有一个基于 arrow 的表达式计算框架 gandiva 就是基于 LLVM 生成 SIMD 指令。

  • 在类库层面,有支持跨平台的库 XSIMD,可同时运行在 x8 6和 arm 架构。

image.png

上面讲述了向量化的技术原理和可供使用的方案,我们再看下大数据领域在向量化方向做了哪些探索。首先看下 Photon, Photon 是 Databricks 闭源的一个全新的向量化引擎,完全用 C++ 编写。Databricks 在过去几年里,一直通过各种技术手段对内部的 Spark 版本 Databricks Runtimes 进行优化。左下图展示了不同 DBR 版本相对于2.1版本的性能提升情况,可以看到未开启 Photon 的版本性能提升在1-2倍,自从上了 Photon 后,性能提升可以达到3-8倍。自从 Databricks 论文公布其方案和效果后,大数据领域进行了各种尝试和探索。主流分布式计算引擎都在往向量化方向发力且效果显然,Flink 作为流批一体实时计算引擎在湖仓一体中起关键核心作用,但缺少核心的向量化能力,因此我们蚂蚁向这块发起了挑战。

image.png

1.3 流批一体向量化引擎在蚂蚁的探索

今年我们在Flink 1.18中首次引入了流批一体向量化计算,通过使用C++开发,利用 SIMD、向量化、列计算等技术,在完全兼容 Flink SQL 语法的基础之上,可以提供比原生的执行引擎更高的吞吐,并可以降低用户 Flink 作业的执行成本。同时,SQL语义对齐,对用户来说,仅仅需要增加配置即可开启,降低存量用户使用成本。

02. 架构

2.1 引擎选型

image.png

由于目前开源社区已经有比较成熟的 Native Engine,如 ClickHouse、Velox,具备了优秀的向量化、批处理、列计算等能力,并经过业界广泛验证和实践。因此,我们不打算重头造轮子,而是站在巨人的肩膀上。采用了类似 gluten 的实现方式,基于velox上构建。同时,为了防止闭门造车,在立项之出选择和 gluten 社区合作,讨论构建方案以及加强紧密型合作。内部代号 Flex,Flex 是 Fink 和 Velox 的全称,也是 Flexible 的前缀,我们希望它能够做到灵活可插拔,像胶水一样扮演其工作。

同时,velox 算子面向传统批处理场景,算子本身无法处理回撤消息,即天然和流式计算场景想违背,因此无法直接基于 gluten 的 substrain plan 方案。改方案实现复杂工作量较大,也没法支持状态算子。同时,为了复用 velox 一些现有能力,不必重复造轮子,因此我们另辟蹊径采用了新的方案,新方案可以做到真正的流批一体,同时,向量化核心效果在于表达式计算。因此非常适合 Calc 算子,而且 Calc 算子不区分流和批,也就是说流任务和批任务都可以享受到向量化带来的技术红利,也可以使用到 velox 的表达式计算能力。

最后,我们在内部完成相应的 POC 之后,通过使用基准测试函数进行测试,发现端到端的性能TPS能够提升四倍以上。所以我们开始立项,代号叫 Flex,它是 Flink 加 Velox 的全称,也是 Flexible(灵活的)的前缀,我们希望它能够做到灵活可插拔,做好中间层的工作。

2.2 Flex引擎架构

image.png

Flex的架构主要分为六个模块。第一层是JNI胶水层,其主要负责实现一套通用的高性能 JIN 库和 Velox 进行交互。第二层是 native 算子层,当前支持 native source、native sink、native calc。第三层是 plan conversion,负责将 Flink 的算子、数据等转换成 Velox 能处理的形式。第四是数据转换层,因为 Flink SQL 是面向 RowData 的数据结构,需要把它转变成面向列的 Velox RowVector 数据结构。第五层是 Fallback 层,由于状态算子当前还不支持需要进行 fallback 处理。第六层是统一的管理层,对内存等进行统一管理。

2.3 关键工作

然后我们从功能性、正确性、易用性和稳定性四个方面进行全方位建设。

image.png

2.3.1 功能性建设

image.png

2.3.1.1 Native 算子层优化

NativeCalc 算子优化

在用户向量化作业验证过程中,发现对于任务中含有很多 RexInputRef 引用字段,这些字段数据内容长且没有加工逻辑,如果把使用到的 Scheme 字段全部做攒批和数据转换,转换开销将近11%。我们可以将引用字段单独拆出来,拆成两个 Calc 算子,在 NativeCalc 算子层仅仅将表达式中使用的字段进行数据转换,其他字段仅仅 forward到原有费 Calc codegen 计算进行取值,最后再拼接 JoinedRowData。为了高效的拼接 JoinedRowData,自然需要插入Calc 算子对字段重排序,因此需要增加规则支持 projection reorder。通过该优化数据转换层开销降低到6.68%

同时对于表达式里面含有 udf,复用此方案把 udf 也拆到引用字段的 Calc 里面,就不需要整个 Calc 算子 fallback。

image

支持 NativeSource/NativeSink

对于蚂蚁内部有类似 kafka 的消息队列 sls, sls source/sink 当前支持 Arrow 模式,为了避免冗余的数据行转列开销,插件层直接读写 Arrow 格式数据,从而避免额外的行转列开销。

2.3.1.2 Plan层优化

为了尽可能多的将 DAG 中各种算子中包含 projection 和 condition 的逻辑中包含 SIMD 函数的计算逻辑委托给velox进行计算,我们在 plan 层增加了多种基于规则的 Rule 对算子进行拆分合并和交互,全方位的发挥性能极致。目前 Calc、Join、Correlate 算子都已经支持 Native 执行

由于流式计算场景需要攒批,发挥列计算的优势,但是攒批和转换数据层存在一定开销,因此对于 calc 算子中整个表达式 tree 都没有使用到 SIMD 函数,将不翻译成 native 算子

下面的 sql RexCall以blink_json_value 为例进行阐述优化规则

projection reorder

以下面 sql 为例,该规则将RexInputRef引用字段排在 RexCall 前面,通过一个非 Native 的 Cal 记录下引用关系。主要用于和算子层优化结合。

SELECT a, blink_json_value(a, c), b FROM MyTable
Calc(select=[a, f0 AS EXPR$1, b])
+- NativeCalc(select=[a, b, blink_json_value(a, c) AS f0])+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
projections 包含 SIMD 函数

该 sql 仅仅 projections 含有 SIMD 函数,condition 没有使用,因此 filter 逻辑还是使用非 native 的算子。

SELECT a, blink_json_value(b, d) FROM MyTable
WHERE concat(a, '1') is not null

经过优化后的执行计划如下所示

NativeCalc(select=[a, blink_json_value(b, d) AS EXPR$1])
+- Calc(select=[a, b, d], where=[CONCAT(a, '1') IS NOT NULL])+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
condition 包含 SIMD 函数

该 sql projections 没有使用 SIMD 函数,condition 含有,因此 condition 部分使用 native 算子执行。

SELECT a, b, concat(a, '1') FROM MyTable
WHERE blink_json_value(a, c) is not null

经过优化后的执行计划如下所示

Calc(select=[a, b, concat(a, '1') AS EXPR$1], where=[f0])
+- NativeCalc(select=[a, b, blink_json_value(a, c) IS NOT NULL AS f0])+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
projections 和 condition 都包含 SIMD 函数
  • 该 sql projections 和 condition 都含有 SIMD 函数,因此拆成两个 Native 算子执行。
SELECT blink_json_value(a, b), concat(c, '1') FROM MyTable
WHERE blink_json_value(a, c) is not null

经过优化后的执行计划如下所示

NativeCalc(select=[blink_json_value(a, b) AS EXPR$0, CONCAT(c, '1') AS EXPR$1])
+- Calc(select=[a, b, c], where=[f0])+- NativeCalc(select=[a, b, c, blink_json_value(a, c) IS NOT NULL AS f0])+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
Inner Join 的 condition 中包含 SIMD 函数

image.png

下面的双流 join 例子,不仅仅 On 条件中包含 SIMD 函数,后面的 Where 逻辑也有。

    SELECT a, concat(d, '1') FROM(SELECT a, d FROM(SELECT a, dFROM leftTable JOIN rightTable ONa = d and blink_json_value(a, a) = concat(a, d))WHERE blink_json_value(a, d) = concat(a, d))

calcite 解析后的 AST 树如下

LogicalProject(a=[$0], EXPR$1=[CONCAT($1, _UTF-16LE'1')])
+- LogicalProject(a=[$0], d=[$1])+- LogicalFilter(condition=[=(blink_json_value($0, $1), CONCAT($0, $1))])+- LogicalProject(a=[$0], d=[$3])+- LogicalJoin(condition=[AND(=($0, $3), =(blink_json_value($0, $0), CONCAT($0, $3)))], joinType=[inner]):- LogicalTableScan(table=[[default_catalog, default_database, leftTable]])+- LogicalTableScan(table=[[default_catalog, default_database, rightTable]])

通过 logical plan 优化,可以看到两个 SIMD 函数,已经抽取到单独的 Calc 算子,跟在 Join 后面

FlinkLogicalCalc(select=[a, CONCAT(d, '1') AS EXPR$1], where=[f0])
+- FlinkLogicalCalc(select=[a, d, AND(=(blink_json_value(a, a), CONCAT(a, d)), =(blink_json_value(a, d), CONCAT(a, d))) AS f0])+- FlinkLogicalJoin(condition=[=($0, $1)], joinType=[inner]):- FlinkLogicalCalc(select=[a]):  +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, leftTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])+- FlinkLogicalCalc(select=[d])+- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, rightTable, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])

如果不开启 native 能力,翻译后的 ExecPlan 如下,其中 Calc 推到了下面的 Join 算子

Calc(select=[a, CONCAT(d, '1') AS EXPR$1])
+- Join(joinType=[InnerJoin], where=[((a = d) AND (blink_json_value(a, a) = CONCAT(a, d)) AND (blink_json_value(a, d) = CONCAT(a, d)))], select=[a, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]):- Exchange(distribution=[hash[a]]):  +- Calc(select=[a]):     +- TableSourceScan(table=[[default_catalog, default_database, leftTable]], fields=[a, b, c])+- Exchange(distribution=[hash[d]])+- Calc(select=[d])+- TableSourceScan(table=[[default_catalog, default_database, rightTable]], fields=[d, e, f])

开启 native 能力后,SIMD 表达式是委托给了 native 计算

Calc(select=[a, CONCAT(d, '1') AS EXPR$1], where=[f0])
+- NativeCalc(select=[a, d, ((blink_json_value(a, a) = CONCAT(a, d)) AND (blink_json_value(a, d) = CONCAT(a, d))) AS f0])+- Join(joinType=[InnerJoin], where=[(a = d)], select=[a, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]):- Exchange(distribution=[hash[a]]):  +- Calc(select=[a]):     +- TableSourceScan(table=[[default_catalog, default_database, leftTable, nativeOperator=[false]]], fields=[a, b, c])+- Exchange(distribution=[hash[d]])+- Calc(select=[d])+- TableSourceScan(table=[[default_catalog, default_database, rightTable, nativeOperator=[false]]], fields=[d, e, f])
Correlate 节点对应物理算子 condition 中包含 SIMD 函数

image.png

同理,对于 Flink 内置 udtf 函数物理算子实现,由于包含 condition,对于该场景也是可以使用 native 进行加速

Calc(select=[a, b, c, f0, f1], where=[((CAST(f1 AS BIGINT) = a) AND (c = f0))])
+- Correlate(invocation=[func($cor0.c)], correlate=[table(func($cor0.c))], select=[a,b,c,f0,f1], joinType=[INNER], condition=[AND(=(blink_json_value($0, _UTF-16LE'$.id'), 2), =(+($1, 1), *($1, $1)))])+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])

经过优化后的执行计划如下所示

Calc(select=[a, b, c, f0, f1], where=[f00])
+- NativeCalc(select=[a, b, c, f0, f1, ((blink_json_value(f0, '$.id') = 2) AND (CAST(f1 AS BIGINT) = a) AND (c = f0)) AS f00])+- Correlate(invocation=[func($cor0.c)], correlate=[table(func($cor0.c))], select=[a,b,c,f0,f1], joinType=[INNER], condition=[=(+($1, 1), *($1, $1))])+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
2.3.1.3 Native 层

在 native 层,我们支持了18个 SIMD 函数,其中字符串函数15个,数学函数3个,也补齐了大量 velox 不支持的 Flink 内置函数。

2.3.1.4 细粒度 Fallback 机制
  • 支持细粒度的 fallback 机制,全部做到可配置

  • 仅含有SIMD 函数的表达式才翻译成 NativeCalc

  • 支持细粒度的函数签名级黑名单机制

  • 对于 SQL timestamp/decimal 类型 fallback 机制,对这种容易出现正确性问题的类型先 fallback

2.3.1.5 其他
  • 支持配置化的函数映射机制,函数覆盖优先级机制

  • 支持配置化的函数映射机制对于 Flink velox spark/presto 函数语义一直但是函数名不一样,可以不用改动c++代码,修改配置即可,函数覆盖优先级机制持配置化的函数映射机制。新引入的函数或者修复 velox 函数 bug 无需加入 velox,导致整个编译时间很久。在 flex 内部加入即可,编译时间从2个小时到2分钟。

2.3.2 正确性建设

image.png

这个是重中之重。两套引擎函数行为语义无法保证是对齐的,如何通过自动化手段发现二者行为上的差异?因此我们支持了函数级和作业级两套自动化比对框架。

如何复用Flink原有函数单元测试代码,而不需要改动和重写测试逻辑。然后当前 Flink 内置函数有两套机制 oldstack/newstack,测试框架也不同。为此我们改造原有引擎2套测试框架逻辑,通过反射方式将这些单元测试自动注入向量化配置方式进行自动化比对。每天定时跑脚本将语义不一致的函数输出出来。通过这个工具,我们发现了Flink 引擎本身函数正确性问题4个,都已经提给社区,Flink 和 Velox 函数语义不对齐问题15个。

也支持了作业级端到端比对框架,本质就是上线前双跑比对。对重要作业 mock 两个作业消费固定的数据集输出到不同表,每天定时跑并输出比对结果报告到钉钉群。

2.3.3 易用性建设

image.png

为了全方位提升易用性,我们在引擎层,尽最大程度将简单交给用户,复杂留给自己

  • 如何提前发现用户作业中使用的函数 velox 是否支持,可以翻译成 Native 算子?我们开发了一套自动化编译工具捞取线上作业自动化执行,将各种函数不支持问题问题提前解决掉,从而减少用户干扰。

  • 如何提前知道开发的 SIMD 函数效果怎么样,是否有性能回退?因此我们基于 JMH 框架实现了一套端到端的性能测试框架,因为数据转换层有一定开销,因此需要对比整体性能更合理,目前支持 GenericRow 和 ColumnRow。下图是函数使用 SIMD 实现端到端的性能效果数据。

image.png

还搭建了向量化大盘,可以看到作业级别的效果数据。

易用的 DAG 中 native calc/source/sink 算子展示

2.3.4 稳定性建设

image.png

对向量化作业配置监控告警,提前发现问题。同时,由于 Native 算子需要额外的 native 内存,我们 plan 层自动注入额外资源。

2.3.5 效果

我们从线上捞取符合向量化场景的部分作业跑成向量化方式,下面这张图是真实的效果数据,有13%的作业端到端 TPS 可以提升1被以上,37%的作业可以提升40%以上,作业平均提升了75%。其中效果最好的作业提升了14倍。

image.png

03. 未来规划

最后我来展望一下未来规划:

image.png

  • 全新的数据转换层支持 RowData 直接转 velox RowVector,减少转换层数据拷贝开销

    • 目前 Flink 的数据转换是基于 Arrow 的数据转换,存在额外的数据转换开销
  • 和 Paimon 结合,支持 Native Parquet/Orc Reader

  • 支持更多算子,非状态算子如维表算子,状态算子等

  • 支持更多 SIMD,支持 SQL 全类型,对齐 Flink 所有内置函数

相关文章:

  • 前端面试六之axios
  • 黑马教程强化day2-2
  • markdown文本转换时序图
  • 深入理解 TCP 套接字:Socket 编程入门教程
  • 数组方法_push()/pop()/数组方法_shift()/unshift()
  • 滚动—横向滚动时,如何直接滚动到对应的内容板块
  • `document.domain` API 的废弃与现代 Web 开发的转型
  • 从 8 秒到 1 秒:前端性能优化的 12 个关键操作
  • Maven 构建性能优化深度剖析:原理、策略与实践
  • CKA考试知识点分享(10)---NetworkPolicy
  • 深入浅出:C++深拷贝与浅拷贝
  • Web防火墙深度实战:从漏洞修补到CC攻击防御
  • 重拾前端基础知识:CSS预处理器
  • 基于AI智能体的医疗AI工具库构建路径分析
  • Python爬虫(54)Python数据治理全攻略:从爬虫清洗到NLP情感分析的实战演进
  • 第七章: SEO与渲染方式 三
  • C#接口代码记录
  • 第七章: SEO与渲染方式
  • Scrapy爬虫框架:数据采集的瑞士军刀(附实战避坑指南)!!!
  • ( github actions + workflow 01 ) 实现爬虫自动化,每2小时爬取一次澎湃新闻
  • 哪个网站可以直接做ppt/杭州seook优屏网络
  • 网站文章更新怎么做/百度手机端排名如何优化
  • wordpress gb2312/搜索引擎优化的名词解释
  • 网站代码 输入文字 跳出内容/怎么给网站做优化
  • 中企动力网站模板/好的推广平台
  • 上海建网站价格/上海网站搜索引擎优化