大数据计算引擎-Hudi对Spark Catalyst 优化器的RBO、CBO做了什么
要理解 Hudi 如何基于 RBO(基于规则的优化器)和 CBO(基于代价的优化器)进行改造,需要结合其与查询引擎(如 Spark、Flink)的集成逻辑,以及对数据源层优化的源码实现。Hudi 的核心目标是让查询引擎能更好地利用其数据布局(如分区、文件结构)、索引和元数据特性,从而优化查询性能。
一、基于 RBO(Rule-Based Optimizer)的改造
RBO 通过预定义的规则优化查询计划(如过滤下推、分区修剪等)。Hudi 的 RBO 改造主要体现在将查询引擎的优化规则与 Hudi 的数据源特性结合,让优化器能识别并利用 Hudi 的文件结构、索引、分区等信息,减少不必要的数据扫描。
1. 过滤下推(Filter Pushdown)
过滤下推是 RBO 的核心规则之一:将查询中的过滤条件(如where id = 1
)尽可能下推到数据源层,在数据读取阶段就过滤掉无关数据,减少后续处理的数据量。
Hudi 通过实现查询引擎的过滤下推接口,让过滤条件作用于文件扫描前。以 Spark 为例:
- 源码中,
HoodieSparkFilterPushDown
类(位于org.apache.hudi.spark.optimize
)实现了 Spark 的过滤下推逻辑。该类会解析查询中的Filter
条件,判断哪些条件可以下推到 Hudi 数据源。 - 对于可下推的条件(如等值、范围过滤),Hudi 会在文件扫描前通过
HoodieFileIndex
过滤文件:利用文件元数据(如HoodieLogFile
/HoodieDataFile
的键范围、Bloom 索引信息),只保留可能包含符合条件数据的文件。 - 关键代码逻辑:
HoodieFileIndex.listFiles
方法会接收下推的过滤条件,结合 Hudi 的分区信息和文件级元数据(如minKey
/maxKey
),过滤掉不可能满足条件的文件。
2. 分区修剪(Partition Pruning)
Hudi 支持分区表(如按时间分区),RBO 会通过分区过滤条件(如dt = '2023-01-01'
)修剪掉无关分区,仅扫描目标分区的文件。
源码中,HoodieFileIndex
(Hudi 在 Spark 中的文件索引实现)是分区修剪的核心:
HoodieFileIndex
会解析查询中的分区列过滤条件(如dt
字段的过滤),通过getPartitionSpec
获取表的分区结构,然后计算需要扫描的分区路径。- 例如,
HoodieFileIndex.prunePartitions
方法会根据分区过滤条件,从所有分区中筛选出符合条件的分区,避免扫描全部分区。
3. 投影下推(Projection Pushdown)
投影下推仅读取查询所需的列(如select id, name
只读id
和name
列),减少 IO 开销。Hudi 利用列存文件(如 Parquet)的特性实现投影下推。
源码中,HoodieParquetFileFormat
(Hudi 的 Parquet 文件读取器)会接收查询的投影列信息:
- 在
buildReaderWithPartitionValues
方法中,Hudi 会将投影列传递给 Parquet 的ParquetReader
,仅读取指定列的数据,而非全表列。 - 对于合并后的
HoodieMergeOnReadFile
(包含 Base File 和 Log File),投影下推会同时作用于 Base File(列存)和 Log File(只解析所需列的变更记录)。
4. 索引利用规则
Hudi 的索引(如 Bloom 索引、HBase 索引)用于快速定位包含目标记录的文件。RBO 会通过规则判断是否可利用索引加速过滤:
- 当过滤条件涉及索引键(如
record_key
)时,HoodieIndex
的filter
方法会先通过索引定位到可能包含目标记录的文件,再进行扫描(而非全表扫描)。 - 源码中,
BloomIndex.filter
会利用文件的 Bloom 过滤器,快速排除不包含目标record_key
的文件,减少扫描范围。
二、基于 CBO(Cost-Based Optimizer)的改造
CBO 通过统计信息(如数据量、列基数)计算不同查询计划的代价,选择代价最小的计划。Hudi 的 CBO 改造主要体现在收集和暴露统计信息,让查询引擎的 CBO 能准确估计代价。
1. 统计信息的收集与存储
Hudi 会在数据写入(如upsert
、insert
)或压缩(Compaction)时收集文件级和列级统计信息,并存储在元数据中:
- 文件级统计:每个文件的记录数、大小、最小 / 最大键值等,存储在
HoodieDataFile
的元数据(HoodieFileMetadata
)中,或通过HoodieTimeline
的元数据文件(如.hoodie/metadata
)管理。 - 列级统计:对于 Parquet 文件,Hudi 会解析其 Footer 中的列统计(如
min
/max
/null_count
),并关联到 Hudi 的元数据中。
源码中,HoodieTableMetadata
类负责管理元数据统计信息,Compaction
过程中会通过StatsCollector
更新文件统计。
2. 向查询引擎暴露统计信息
Hudi 将收集的统计信息传递给查询引擎的 CBO,帮助其估计扫描代价、Join 顺序等。以 Spark 为例:
HoodieRelation
(Hudi 在 Spark 中的逻辑关系表示)重写了statistics
方法,返回表级统计信息:通过HoodieTable
获取所有文件的统计信息(如总记录数、总大小),汇总后作为表的统计数据。- 对于分区表,
HoodieFileIndex
会为每个分区提供单独的统计信息(如分区内的记录数、文件大小),供 Spark CBO 估计分区扫描的代价。 - 关键代码:
HoodieRelation.statistics
调用HoodieTable.getTotalRecords
和HoodieTable.getTotalSize
获取总记录数和总大小,传递给 Spark CBO。
3. 代价估计的优化
Hudi 的统计信息帮助 CBO 更准确地估计以下代价:
- 扫描代价:基于文件大小和记录数,CBO 能判断扫描某个分区或文件的 IO 和 CPU 代价(如大文件扫描代价高于小文件)。
- Join 代价:通过列的基数(distinct 值数量)统计,CBO 能估计 Join 的结果集大小,选择更优的 Join 策略(如 Broadcast Join vs Shuffle Join)。
- 过滤选择性:基于列的
min
/max
统计,CBO 能估计过滤条件保留的数据比例,优化过滤后的操作顺序。
三、总结
Hudi 对 RBO 和 CBO 的改造均围绕 “让查询引擎更好地理解 Hudi 的特性” 展开:
- RBO:通过实现过滤下推、分区修剪、投影下推等规则,利用 Hudi 的文件结构、索引和分区特性,减少数据扫描范围;
- CBO:通过收集和暴露文件级 / 列级统计信息,帮助查询引擎的 CBO 准确估计代价,选择最优查询计划。
这些改造最终目标是降低查询的 IO 和计算开销,提升 Hudi 表的查询性能。
推荐内容:
大数据计算引擎-Catalyst 优化器:Spark SQL 的 “智能翻译官 + 效率管家”
大数据计算引擎-从源码看Spark AQE对于倾斜的处理
深入starrocks-怎样实现多列联合统计信息
深入starrocks-多列联合统计一致性探查与策略(YY一下)
大数据计算引擎-全阶段代码生成(Whole-stage Code Generation)与火山模型(Volcano)对比