Apache Paimon 查询全流程深度分析
基于 Apache Paimon 源码,我将为您详细分析从用户发起查询到返回结果的完整流程。
1. 整体架构图
1.1 数据读取分层架构
1.2 关键组件职责
API 层组件:
Table: 表接口,提供newReadBuilder()方法创建读取构建器 1ReadBuilder: 构建扫描和读取对象,配置过滤条件和投影 2TableScan: 扫描表快照,生成 Split 计划TableRead: 读取接口,从 Split 创建 RecordReader 3
执行层组件:
FileStoreScan: 文件存储扫描实现,读取 Manifest 文件 4SnapshotReader: 快照读取器,协调扫描和 Split 生成 5KeyValueTableRead: 主键表读取实现,支持多种读取模式 6MergeFileSplitRead: 处理需要合并的文件读取 7
存储层组件:
KeyValueFileReaderFactory: 创建 KeyValue 文件读取器的工厂 8RecordReader: 批量读取记录的接口 9DataFileMeta: 数据文件元数据,包含统计信息和索引引用
2. 核心类图
2.1 TableRead 接口体系
classDiagramclass TableRead {<<interface>>+withMetricRegistry(MetricRegistry) TableRead+executeFilter() TableRead+withIOManager(IOManager) TableRead+createReader(Split) RecordReader~InternalRow~+createReader(List~Split~) RecordReader~InternalRow~+createReader(Plan) RecordReader~InternalRow~}class InnerTableRead {<<interface>>+withFilter(Predicate) InnerTableRead+withFilter(List~Predicate~) InnerTableRead+withReadType(RowType) InnerTableRead+withTopN(TopN) InnerTableRead+withLimit(int) InnerTableRead+forceKeepDelete() InnerTableRead}class KeyValueTableRead {-List~SplitReadProvider~ readProviders-RowType readType-boolean forceKeepDelete-Predicate predicate-TopN topN-Integer limit+reader(Split) RecordReader~InternalRow~-createMergeReader() RecordReader-createNoMergeReader() RecordReader}class AppendTableRead {-SplitRead read-RowType readType+reader(Split) RecordReader~InternalRow~}class AbstractDataTableRead {<<abstract>>-TableSchema schema-MetricRegistry metricRegistry#applyReadType(RowType) void#innerWithFilter(Predicate) InnerTableRead+createReader(Split) RecordReader~InternalRow~}TableRead <|-- InnerTableReadInnerTableRead <|-- AbstractDataTableReadAbstractDataTableRead <|-- KeyValueTableReadAbstractDataTableRead <|-- AppendTableRead
```[10](#0-9) [11](#0-10) [12](#0-11) ### 2.2 Split 机制类图```mermaid
classDiagramclass Split {<<interface>>+rowCount() long+deletionFiles() Optional~List~DeletionFile~~+convertToRawFiles() Optional~List~RawFile~~+indexFiles() Optional~List~IndexFile~~}class DataSplit {-long snapshotId-BinaryRow partition-int bucket-String bucketPath-Integer totalBuckets-List~DataFileMeta~ beforeFiles-List~DeletionFile~ beforeDeletionFiles-List~DataFileMeta~ dataFiles-List~DeletionFile~ dataDeletionFiles-boolean isStreaming-boolean rawConvertible+snapshotId() long+partition() BinaryRow+bucket() int+dataFiles() List~DataFileMeta~+beforeFiles() List~DataFileMeta~+convertToRawFiles() Optional~List~RawFile~~+mergedRowCount() long}class DataFileMeta {-String fileName-long fileSize-long rowCount-InternalRow minKey-InternalRow maxKey-SimpleStats keyStats-SimpleStats valueStats-long minSequenceNumber-long maxSequenceNumber-int level-List~String~ extraFiles}class SplitGenerator {<<interface>>+splitForBatch(List~DataFileMeta~) List~DataSplit~+splitForStreaming(List~DataFileMeta~) List~DataSplit~}class AppendOnlySplitGenerator {-long targetSplitSize+splitForBatch(List~DataFileMeta~) List~DataSplit~}Split <|.. DataSplitDataSplit o-- DataFileMetaSplitGenerator <|.. AppendOnlySplitGenerator
```[13](#0-12) ### 2.3 RecordReader 体系```mermaid
classDiagramclass RecordReader~T~ {<<interface>>+readBatch() RecordIterator~T~+close() void+transform(Function) RecordReader~R~+filter(Filter) RecordReader~T~}class FileRecordReader~T~ {<<interface>>+readBatch() FileRecordIterator~T~}class KeyValueFileReaderFactory {-FileIO fileIO-SchemaManager schemaManager-TableSchema schema-RowType keyType-RowType valueType-FormatReaderMapping.Builder formatReaderMappingBuilder-long asyncThreshold+createRecordReader(DataFileMeta) RecordReader~KeyValue~}class DataFileRecordReader {-RowType tableRowType-FormatReaderFactory readerFactory-FormatReaderContext context-int[] indexMapping-CastFieldGetter[] castMapping+readBatch() FileRecordIterator~InternalRow~}class KeyValueDataFileRecordReader {-FileRecordReader~InternalRow~ reader-RowType keyType-RowType valueType-int level+readBatch() FileRecordIterator~KeyValue~}class ApplyDeletionVectorReader {-FileRecordReader reader-DeletionVector deletionVector+readBatch() FileRecordIterator}RecordReader <|-- FileRecordReaderFileRecordReader <|.. DataFileRecordReaderFileRecordReader <|.. KeyValueDataFileRecordReaderFileRecordReader <|.. ApplyDeletionVectorReaderKeyValueFileReaderFactory ..> KeyValueDataFileRecordReaderKeyValueDataFileRecordReader o-- DataFileRecordReader
```[14](#0-13) [15](#0-14) ### 2.4 文件扫描层```mermaid
classDiagramclass FileStoreScan {<<interface>>+withPartitionFilter(Predicate) FileStoreScan+withBucket(int) FileStoreScan+withSnapshot(long) FileStoreScan+withKind(ScanMode) FileStoreScan+enableValueFilter() FileStoreScan+plan() Plan+readManifest(ManifestFileMeta) List~ManifestEntry~}class AbstractFileStoreScan {<<abstract>>-ManifestsReader manifestsReader-SnapshotManager snapshotManager-Snapshot specifiedSnapshot-ScanMode scanMode+plan() Plan#withPartitionFilter(Predicate) FileStoreScan}class KeyValueFileStoreScan {-RowType keyType-RowType valueType-Predicate keyFilter-Predicate valueFilter+withKeyFilter(Predicate) FileStoreScan+withValueFilter(Predicate) FileStoreScan#filterByStats(ManifestEntry) boolean}class ManifestsReader {-ManifestFile manifestFile-Predicate partitionFilter+read(List~ManifestFileMeta~) List~ManifestEntry~}FileStoreScan <|-- AbstractFileStoreScanAbstractFileStoreScan <|-- KeyValueFileStoreScanAbstractFileStoreScan o-- ManifestsReader
```[4](#0-3) [16](#0-15) ## 3. 详细流程图### 3.1 查询初始化流程```mermaid
sequenceDiagramparticipant Userparticipant Tableparticipant ReadBuilderparticipant TableScanparticipant FileStoreScanparticipant TableReadparticipant KeyValueTableReadparticipant RecordReaderUser->>Table: newReadBuilder()Table->>ReadBuilder: create ReadBuilderImplUser->>ReadBuilder: withFilter(predicate)User->>ReadBuilder: withProjection(projection)User->>ReadBuilder: newScan()ReadBuilder->>TableScan: create scan instanceTableScan->>FileStoreScan: create KeyValueFileStoreScanUser->>TableScan: plan()FileStoreScan->>FileStoreScan: readManifests()FileStoreScan->>FileStoreScan: filterByStats()FileStoreScan->>TableScan: return Plan with splitsUser->>ReadBuilder: newRead()ReadBuilder->>TableRead: create KeyValueTableReadTableRead->>KeyValueTableRead: configure filters & projectionsUser->>TableRead: createReader(split)KeyValueTableRead->>KeyValueTableRead: select SplitReadProviderKeyValueTableRead->>RecordReader: create MergeReader or NoMergeReaderRecordReader-->>User: return RecordReader
关键调用路径:
Table.newReadBuilder()→ReadBuilderImpl创建 1ReadBuilder.newScan()→Table.newScan()→KeyValueFileStore.newScan()FileStoreScan.plan()→AbstractFileStoreScan.plan()→ 读取 Manifest 并过滤 17ReadBuilder.newRead()→Table.newRead()→ 创建KeyValueTableRead18TableRead.createReader(split)→KeyValueTableRead.reader(split)19
3.2 文件扫描流程
关键调用路径:
FileStoreScan.plan()→ 读取 Snapshot 元数据ManifestsReader.read()→ 并行读取 Manifest 文件KeyValueFileStoreScan.filterByStats()→ 基于统计信息过滤文件SplitGenerator.splitForBatch()→ 使用 Bin-packing 算法合并小文件SnapshotReaderImpl.generateSplits()→ 生成最终的 DataSplit 列表 20
3.3 LSM Tree 读取流程
关键调用路径:
MergeFileSplitRead.createMergeReader()→ 需要合并的读取路径 21IntervalPartition.partition()→ 将文件按键范围重叠分组MergeTreeReaders.readerForSection()→ 为每个 Section 创建合并读取器 22KeyValueFileReaderFactory.createRecordReader()→ 创建文件读取器 23MergeSorter.mergeSort()→ 多路归并排序MergeFunctionWrapper.reset()→ 应用合并逻辑处理相同 Key
Level 0 特殊处理:
Level 0 文件按照 maxSequenceNumber 倒序存储在 TreeSet 中,读取时优先读取序列号较大的文件 24
3.4 索引使用流程
索引使用时机:
-
Bloom Filter 索引 - 点查询优化:
- 配置:
file-index.bloom-filter.columns - 使用时机:等值查询 (=) 条件
- 工作原理:判断值是否可能存在于文件中
- 25
- 配置:
-
Bitmap 索引 - 低基数列优化:
- 配置:
file-index.bitmap.columns - 使用时机:枚举类型、状态字段的等值或 IN 查询
- 存储结构:使用稀疏块索引,包含 INDEX BLOCKS 和 BITMAP BLOCKS
- 配置:
-
统计信息过滤:
DataFileMeta.valueStats存储 min/max/nullCount- 在
filterByStats()方法中使用 - 可以跳过不满足范围条件的文件
-
格式内置索引:
- ORC: Row Group Index, Bloom Filter
- Parquet: Column Statistics, Dictionary Encoding
- 在
FormatReaderContext中配置并使用
3.5 数据合并流程 (Merge-on-Read)
Merge-on-Read 关键点:
-
Section 划分:
IntervalPartition根据键范围将文件分组- 相同 Section 内的文件键范围有重叠,需要合并
- 不同 Section 间无重叠,可独立读取
-
多路归并:
MergeSorter.mergeSort()使用败者树算法- 比较器顺序:UserKey → SequenceNumber (降序) → ValueKind
- Level 0 文件记录优先级最高 22
-
合并函数类型:
deduplicate: 保留最新版本partial-update: 部分字段更新aggregation: 聚合计算first-row: 保留首次插入的行
-
删除处理:
DropDeleteReader在最后过滤删除标记的记录- 如果
forceKeepDelete=true,保留删除记录用于 CDC 21
-
重叠区域优化:
- 重叠 Section 只下推主键过滤条件 (
filtersForKeys) - 非重叠 Section 可以下推所有过滤条件 (
filtersForAll) - 避免因过滤导致的记录丢失 26
- 重叠 Section 只下推主键过滤条件 (
4. 源码关键路径
4.1 查询入口路径
Table.newReadBuilder()└─> ReadBuilderImpl 构造├─> Table.newScan() │ └─> AbstractFileStoreTable.newScan()│ └─> KeyValueFileStore.newScan()│ └─> KeyValueFileStoreScan 创建│└─> Table.newRead()└─> PrimaryKeyFileStoreTable.newRead()└─> KeyValueFileStore.newRead()└─> KeyValueTableRead 创建
```[1](#0-0) ### 4.2 扫描计划生成路径
TableScan.plan()
└─> FileStoreScan.plan()
└─> AbstractFileStoreScan.plan()
├─> scan.withSnapshot() 设置快照
├─> ManifestsReader.read() 读取 Manifest
│ ├─> ManifestFile.read() 解析 Manifest 文件
│ ├─> 应用 Partition Filter
│ ├─> 应用 Bucket Filter
│ └─> 返回 ManifestEntry 列表
│
├─> KeyValueFileStoreScan.filterByStats() 统计信息过滤
│ └─> 检查 min/max/nullCount
│
└─> SnapshotReaderImpl.generateSplits()
└─> SplitGenerator.splitForBatch()
├─> AppendOnlySplitGenerator.splitForBatch()
│ └─> Bin-packing 算法合并文件
│
└─> 返回 DataSplit 列表
### 4.3 Reader 创建路径
TableRead.createReader(split)
└─> KeyValueTableRead.reader(split)
├─> 选择 SplitReadProvider
│ ├─> PrimaryKeyTableRawFileSplitReadProvider (Raw 模式)
│ ├─> MergeFileSplitReadProvider (Merge 模式)
│ ├─> IncrementalChangelogReadProvider (增量 Changelog)
│ └─> IncrementalDiffReadProvider (增量 Diff)
│
└─> MergeFileSplitRead.createReader(split)
├─> 判断是否需要 Merge
│
├─> createMergeReader() [需要合并]
│ ├─> IntervalPartition.partition() 文件分组
│ ├─> KeyValueFileReaderFactory.build() 创建工厂
│ ├─> MergeTreeReaders.readerForSection() 每个 Section
│ │ ├─> readerForRun() 每个 SortedRun
│ │ │ └─> KeyValueFileReaderFactory.createRecordReader()
│ │ │ └─> KeyValueDataFileRecordReader 包装
│ │ │
│ │ └─> MergeSorter.mergeSort()
│ │ ├─> SortMergeReader.create()
│ │ ├─> LoserTree 初始化
│ │ └─> 应用 MergeFunctionWrapper
│ │
│ └─> DropDeleteReader 包装 (如果需要)
│
└─> createNoMergeReader() [无需合并]
└─> ConcatRecordReader 连接所有文件
### 4.4 文件读取路径
RecordReader.readBatch()
└─> SortMergeReader.readBatch() [Merge 模式]
├─> LoserTree.nextKey()
│ └─> 找到最小 Key 的 Reader
│
├─> LoserTree.popElement()
│ └─> 收集相同 Key 的所有记录
│
├─> MergeFunctionWrapper.reset(records)
│ ├─> LookupMergeFunction.merge()
│ │ ├─> 处理 Level 0 记录 (level <= 0)
│ │ ├─> 处理高层级记录
│ │ └─> 应用 MergeEngine 逻辑
│ │
│ └─> 返回合并后的 KeyValue
│
└─> LoserTree.adjustTree()
└─> 重新调整败者树
或
RecordReader.readBatch()
└─> ConcatRecordReader.readBatch() [No-Merge 模式]
└─> 顺序读取每个文件的 Reader
└─> KeyValueDataFileRecordReader.readBatch()
└─> DataFileRecordReader.readBatch()
├─> FormatReader.readBatch() (ORC/Parquet/Avro)
├─> 应用 DeletionVector (如果存在)
└─> 转换为 KeyValue 格式
### 4.5 索引应用路径
FileStoreScan.plan()
└─> 读取 ManifestEntry 时
└─> KeyValueFileStoreScan.filterByStats()
├─> 检查统计信息 (min/max/nullCount)
└─> 决定是否保留文件
DataFileMeta.extraFiles 包含 .index 文件
└─> KeyValueFileReaderFactory.createRecordReader()
└─> FormatReaderContext 构造
├─> 传递 Predicate 给格式 Reader
│
└─> OrcFormatReaderFactory.createReader()
├─> 应用 ORC SearchArgument
├─> 读取 ORC 索引
├─> Row Group 级别过滤
└─> Bloom Filter 测试
文件级索引读取:
FileIndexReader.read(indexFile)
└─> BloomFilterFileIndex.Reader.visitEntry()
├─> 读取 Bloom Filter 位集
├─> 测试查询值
└─> 返回 SKIP 或 REMAIN
### 4.6 完整调用示例
// 用户代码
Table table = catalog.getTable(identifier);
ReadBuilder readBuilder = table.newReadBuilder()
.withFilter(predicate)
.withProjection(projection);
TableScan scan = readBuilder.newScan();
TableRead read = readBuilder.newRead();
// 执行扫描
TableScan.Plan plan = scan.plan();
// 读取数据
for (Split split : plan.splits()) {
RecordReader reader = read.createReader
Citations
File: paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java (L19-50)
package org.apache.paimon.table.source;import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;import java.io.Serializable;
import java.util.List;
import java.util.Map;/*** An interface for building the {@link TableScan} and {@link TableRead}.** <p>Example of distributed reading:** <pre>{@code* // 1. Create a ReadBuilder (Serializable)* Table table = catalog.getTable(...);* ReadBuilder builder = table.newReadBuilder()* .withFilter(...)* .withReadType(...);** // 2. Plan splits in 'Coordinator' (or named 'Driver'):* List<Split> splits = builder.newScan().plan().splits();** // 3. Distribute these splits to different tasks*
File: paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java (L19-100)
package org.apache.paimon.table.source;import org.apache.paimon.CoreOptions;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;import javax.annotation.Nullable;import java.util.Map;
import java.util.Objects;import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
import static org.apache.paimon.utils.Preconditions.checkState;/** Implementation for {@link ReadBuilder}. */
public class ReadBuilderImpl implements ReadBuilder {private static final long serialVersionUID = 1L;private final InnerTable table;private final RowType partitionType;private final String defaultPartitionName;private Predicate filter;private Integer limit = null;private TopN topN = null;private Integer shardIndexOfThisSubtask;private Integer shardNumberOfParallelSubtasks;private @Nullable PartitionPredicate partitionFilter;private @Nullable Integer specifiedBucket = null;private Filter<Integer> bucketFilter;private @Nullable RowType readType;private boolean dropStats = false;public ReadBuilderImpl(InnerTable table) {this.table = table;this.partitionType = table.rowType().project(table.partitionKeys());this.defaultPartitionName = new CoreOptions(table.options()).partitionDefaultName();}@Overridepublic String tableName() {return table.name();}@Overridepublic RowType readType() {if (readType != null) {return readType;} else {return table.rowType();}}@Overridepublic ReadBuilder withFilter(Predicate filter) {if (this.filter == null) {this.filter = filter;} else {this.filter = PredicateBuilder.and(this.filter, filter);}return this;}@Overridepublic ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {if (partitionSpec != null) {this.partitionFilter =fromPredicate(partitionType,
File: paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java (L34-62)
/*** An abstraction layer above {@link SplitRead} to provide reading of {@link InternalRow}.** @since 0.4.0*/
@Public
public interface TableRead {/** Set {@link MetricRegistry} to table read. */TableRead withMetricRegistry(MetricRegistry registry);TableRead executeFilter();TableRead withIOManager(IOManager ioManager);RecordReader<InternalRow> createReader(Split split) throws IOException;default RecordReader<InternalRow> createReader(List<Split> splits) throws IOException {List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();for (Split split : splits) {readers.add(() -> createReader(split));}return ConcatRecordReader.create(readers);}default RecordReader<InternalRow> createReader(TableScan.Plan plan) throws IOException {return createReader(plan.splits());}
}
File: paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java (L48-98)
public interface FileStoreScan {FileStoreScan withPartitionFilter(Predicate predicate);FileStoreScan withPartitionFilter(List<BinaryRow> partitions);FileStoreScan withPartitionsFilter(List<Map<String, String>> partitions);FileStoreScan withPartitionFilter(PartitionPredicate predicate);FileStoreScan withBucket(int bucket);FileStoreScan onlyReadRealBuckets();FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer> bucketFilter);FileStoreScan withPartitionBucket(BinaryRow partition, int bucket);FileStoreScan withSnapshot(long snapshotId);FileStoreScan withSnapshot(Snapshot snapshot);FileStoreScan withKind(ScanMode scanMode);FileStoreScan withLevel(int level);FileStoreScan withLevelFilter(Filter<Integer> levelFilter);FileStoreScan enableValueFilter();FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter);FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter);FileStoreScan withMetrics(ScanMetrics metrics);FileStoreScan dropStats();FileStoreScan keepStats();@NullableInteger parallelism();ManifestsReader manifestsReader();List<ManifestEntry> readManifest(ManifestFileMeta manifest);/** Produce a {@link Plan}. */Plan plan();
File: paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java (L77-150)
/** Implementation of {@link SnapshotReader}. */
public class SnapshotReaderImpl implements SnapshotReader {private final FileStoreScan scan;private final TableSchema tableSchema;private final CoreOptions options;private final boolean deletionVectors;private final SnapshotManager snapshotManager;private final ChangelogManager changelogManager;private final ConsumerManager consumerManager;private final SplitGenerator splitGenerator;private final BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer;private final FileStorePathFactory pathFactory;private final String tableName;private final IndexFileHandler indexFileHandler;private ScanMode scanMode = ScanMode.ALL;private RecordComparator lazyPartitionComparator;public SnapshotReaderImpl(FileStoreScan scan,TableSchema tableSchema,CoreOptions options,SnapshotManager snapshotManager,ChangelogManager changelogManager,SplitGenerator splitGenerator,BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,FileStorePathFactory pathFactory,String tableName,IndexFileHandler indexFileHandler) {this.scan = scan;this.tableSchema = tableSchema;this.options = options;this.deletionVectors = options.deletionVectorsEnabled();this.snapshotManager = snapshotManager;this.changelogManager = changelogManager;this.consumerManager =new ConsumerManager(snapshotManager.fileIO(),snapshotManager.tablePath(),snapshotManager.branch());this.splitGenerator = splitGenerator;this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;this.pathFactory = pathFactory;this.tableName = tableName;this.indexFileHandler = indexFileHandler;}@Overridepublic Integer parallelism() {return scan.parallelism();}@Overridepublic SnapshotManager snapshotManager() {return snapshotManager;}@Overridepublic ChangelogManager changelogManager() {return changelogManager;}@Overridepublic ManifestsReader manifestsReader() {return scan.manifestsReader();}@Overridepublic List<ManifestEntry> readManifest(ManifestFileMeta manifest) {return scan.readManifest(manifest);}
File: paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java (L47-152)
/*** An abstraction layer above {@link MergeFileSplitRead} to provide reading of {@link InternalRow}.*/
public final class KeyValueTableRead extends AbstractDataTableRead {private final List<SplitReadProvider> readProviders;@Nullable private RowType readType = null;private boolean forceKeepDelete = false;private Predicate predicate = null;private IOManager ioManager = null;@Nullable private TopN topN = null;@Nullable private Integer limit = null;public KeyValueTableRead(Supplier<MergeFileSplitRead> mergeReadSupplier,Supplier<RawFileSplitRead> batchRawReadSupplier,TableSchema schema) {super(schema);this.readProviders =Arrays.asList(new PrimaryKeyTableRawFileSplitReadProvider(batchRawReadSupplier, this::config),new MergeFileSplitReadProvider(mergeReadSupplier, this::config),new IncrementalChangelogReadProvider(mergeReadSupplier, this::config),new IncrementalDiffReadProvider(mergeReadSupplier, this::config));}private List<SplitRead<InternalRow>> initialized() {List<SplitRead<InternalRow>> readers = new ArrayList<>();for (SplitReadProvider readProvider : readProviders) {if (readProvider.get().initialized()) {readers.add(readProvider.get().get());}}return readers;}private void config(SplitRead<InternalRow> read) {if (forceKeepDelete) {read = read.forceKeepDelete();}if (readType != null) {read = read.withReadType(readType);}if (topN != null) {read = read.withTopN(topN);}if (limit != null) {read = read.withLimit(limit);}read.withFilter(predicate).withIOManager(ioManager);}@Overridepublic void applyReadType(RowType readType) {initialized().forEach(r -> r.withReadType(readType));this.readType = readType;}@Overridepublic InnerTableRead forceKeepDelete() {initialized().forEach(SplitRead::forceKeepDelete);this.forceKeepDelete = true;return this;}@Overrideprotected InnerTableRead innerWithFilter(Predicate predicate) {initialized().forEach(r -> r.withFilter(predicate));this.predicate = predicate;return this;}@Overridepublic InnerTableRead withTopN(TopN topN) {initialized().forEach(r -> r.withTopN(topN));this.topN = topN;return this;}@Overridepublic InnerTableRead withLimit(int limit) {initialized().forEach(r -> r.withLimit(limit));this.limit = limit;return this;}@Overridepublic TableRead withIOManager(IOManager ioManager) {initialized().forEach(r -> r.withIOManager(ioManager));this.ioManager = ioManager;return this;}@Overridepublic RecordReader<InternalRow> reader(Split split) throws IOException {DataSplit dataSplit = (DataSplit) split;for (SplitReadProvider readProvider : readProviders) {if (readProvider.match(dataSplit, forceKeepDelete)) {return readProvider.get().get().createReader(dataSplit);}}throw new RuntimeException("Should not happen.");}
File: paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java (L66-107)
/** A {@link SplitRead} to read row lineage table which need field merge. */
public class MergeFileSplitRead implements SplitRead<KeyValue> {private final TableSchema tableSchema;private final FileIO fileIO;private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;private final Comparator<InternalRow> keyComparator;private final MergeFunctionFactory<KeyValue> mfFactory;private final MergeSorter mergeSorter;private final List<String> sequenceFields;private final boolean sequenceOrder;@Nullable private RowType readKeyType;@Nullable private List<Predicate> filtersForKeys;@Nullable private List<Predicate> filtersForAll;@Nullable private int[][] pushdownProjection;@Nullable private int[][] outerProjection;private boolean forceKeepDelete = false;public MergeFileSplitRead(CoreOptions options,TableSchema schema,RowType keyType,RowType valueType,Comparator<InternalRow> keyComparator,MergeFunctionFactory<KeyValue> mfFactory,KeyValueFileReaderFactory.Builder readerFactoryBuilder) {this.tableSchema = schema;this.readerFactoryBuilder = readerFactoryBuilder;this.fileIO = readerFactoryBuilder.fileIO();this.keyComparator = keyComparator;this.mfFactory = mfFactory;this.mergeSorter =new MergeSorter(CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null);this.sequenceFields = options.sequenceField();this.sequenceOrder = options.sequenceFieldSortOrderIsAscending();}
File: paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java (L199-235)
public MergeFileSplitRead withFilter(Predicate predicate) {if (predicate == null) {return this;}List<Predicate> allFilters = new ArrayList<>();List<Predicate> pkFilters = null;List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();Set<String> nonPrimaryKeys =tableSchema.fieldNames().stream().filter(name -> !primaryKeys.contains(name)).collect(Collectors.toSet());for (Predicate sub : splitAnd(predicate)) {allFilters.add(sub);if (!containsFields(sub, nonPrimaryKeys)) {if (pkFilters == null) {pkFilters = new ArrayList<>();}// TODO Actually, the index is wrong, but it is OK.// The orc filter just use name instead of index.pkFilters.add(sub);}}// Consider this case:// Denote (seqNumber, key, value) as a record. We have two overlapping runs in a section:// * First run: (1, k1, 100), (2, k2, 200)// * Second run: (3, k1, 10), (4, k2, 20)// If we push down filter "value >= 100" for this section, only the first run will be read,// and the second run is lost. This will produce incorrect results.//// So for sections with overlapping runs, we only push down key filters.// For sections with only one run, as each key only appears once, it is OK to push down// value filters.filtersForAll = allFilters;filtersForKeys = pkFilters;return this;}
File: paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java (L238-258)
public RecordReader<KeyValue> createReader(DataSplit split) throws IOException {if (!split.beforeFiles().isEmpty()) {throw new IllegalArgumentException("This read cannot accept split with before files.");}if (split.isStreaming() || split.bucket() == BucketMode.POSTPONE_BUCKET) {return createNoMergeReader(split.partition(),split.bucket(),split.dataFiles(),split.deletionFiles().orElse(null),split.isStreaming());} else {return createMergeReader(split.partition(),split.bucket(),split.dataFiles(),split.deletionFiles().orElse(null),forceKeepDelete);}}
File: paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java (L260-298)
public RecordReader<KeyValue> createMergeReader(BinaryRow partition,int bucket,List<DataFileMeta> files,@Nullable List<DeletionFile> deletionFiles,boolean keepDelete)throws IOException {// Sections are read by SortMergeReader, which sorts and merges records by keys.// So we cannot project keys or else the sorting will be incorrect.DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, files, deletionFiles);KeyValueFileReaderFactory overlappedSectionFactory =readerFactoryBuilder.build(partition, bucket, dvFactory, false, filtersForKeys);KeyValueFileReaderFactory nonOverlappedSectionFactory =readerFactoryBuilder.build(partition, bucket, dvFactory, false, filtersForAll);List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();MergeFunctionWrapper<KeyValue> mergeFuncWrapper =new ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));for (List<SortedRun> section : new IntervalPartition(files, keyComparator).partition()) {sectionReaders.add(() ->MergeTreeReaders.readerForSection(section,section.size() > 1? overlappedSectionFactory: nonOverlappedSectionFactory,keyComparator,createUdsComparator(),mergeFuncWrapper,mergeSorter));}RecordReader<KeyValue> reader = ConcatRecordReader.create(sectionReaders);if (!keepDelete) {reader = new DropDeleteReader(reader);}return projectOuter(projectKey(reader));}
File: paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java (L57-96)
/** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */
public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> {private final FileIO fileIO;private final SchemaManager schemaManager;private final TableSchema schema;private final RowType keyType;private final RowType valueType;private final FormatReaderMapping.Builder formatReaderMappingBuilder;private final DataFilePathFactory pathFactory;private final long asyncThreshold;private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;private final BinaryRow partition;private final DeletionVector.Factory dvFactory;private KeyValueFileReaderFactory(FileIO fileIO,SchemaManager schemaManager,TableSchema schema,RowType keyType,RowType valueType,FormatReaderMapping.Builder formatReaderMappingBuilder,DataFilePathFactory pathFactory,long asyncThreshold,BinaryRow partition,DeletionVector.Factory dvFactory) {this.fileIO = fileIO;this.schemaManager = schemaManager;this.schema = schema;this.keyType = keyType;this.valueType = valueType;this.formatReaderMappingBuilder = formatReaderMappingBuilder;this.pathFactory = pathFactory;this.asyncThreshold = asyncThreshold;this.partition = partition;this.formatReaderMappings = new HashMap<>();this.dvFactory = dvFactory;}
File: paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java (L98-151)
@Overridepublic RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws IOException {if (file.fileSize() >= asyncThreshold && file.fileName().endsWith(".orc")) {return new AsyncRecordReader<>(() -> createRecordReader(file, false, 2));}return createRecordReader(file, true, null);}private FileRecordReader<KeyValue> createRecordReader(DataFileMeta file, boolean reuseFormat, @Nullable Integer orcPoolSize)throws IOException {String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName());long schemaId = file.schemaId();Supplier<FormatReaderMapping> formatSupplier =() ->formatReaderMappingBuilder.build(formatIdentifier,schema,schemaId == schema.id() ? schema : schemaManager.schema(schemaId));FormatReaderMapping formatReaderMapping =reuseFormat? formatReaderMappings.computeIfAbsent(new FormatKey(schemaId, formatIdentifier),key -> formatSupplier.get()): formatSupplier.get();Path filePath = pathFactory.toPath(file);long fileSize = file.fileSize();FileRecordReader<InternalRow> fileRecordReader =new DataFileRecordReader(schema.logicalRowType(),formatReaderMapping.getReaderFactory(),orcPoolSize == null? new FormatReaderContext(fileIO, filePath, fileSize): new OrcFormatReaderContext(fileIO, filePath, fileSize, orcPoolSize),formatReaderMapping.getIndexMapping(),formatReaderMapping.getCastMapping(),PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition),false,null,-1,Collections.emptyMap());Optional<DeletionVector> deletionVector = dvFactory.create(file.fileName());if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {fileRecordReader =new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get());}return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level());}
File: paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java (L34-53)
/*** The reader that reads the batches of records.** @since 0.4.0*/
@Public
public interface RecordReader<T> extends Closeable {/*** Reads one batch. The method should return null when reaching the end of the input.** <p>The returned iterator object and any contained objects may be held onto by the source for* some time, so it should not be immediately reused by the reader.*/@NullableRecordIterator<T> readBatch() throws IOException;/** Closes the reader and should release all resources. */@Overridevoid close() throws IOException;
File: paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java (L29-75)
/** Inner {@link TableRead} contains filter and projection push down. */
public interface InnerTableRead extends TableRead {default InnerTableRead withFilter(List<Predicate> predicates) {if (predicates == null || predicates.isEmpty()) {return this;}return withFilter(PredicateBuilder.and(predicates));}InnerTableRead withFilter(Predicate predicate);/** Use {@link #withReadType(RowType)} instead. */@Deprecateddefault InnerTableRead withProjection(int[] projection) {if (projection == null) {return this;}throw new UnsupportedOperationException();}default InnerTableRead withReadType(RowType readType) {throw new UnsupportedOperationException();}default InnerTableRead withTopN(TopN topN) {return this;}default InnerTableRead withLimit(int limit) {return this;}default InnerTableRead forceKeepDelete() {return this;}@Overridedefault TableRead executeFilter() {return this;}@Overridedefault InnerTableRead withMetricRegistry(MetricRegistry registry) {return this;}
}
File: paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java (L60-145)
/** Input splits. Needed by most batch computation engines. */
public class DataSplit implements Split {private static final long serialVersionUID = 7L;private static final long MAGIC = -2394839472490812314L;private static final int VERSION = 8;private long snapshotId = 0;private BinaryRow partition;private int bucket = -1;private String bucketPath;@Nullable private Integer totalBuckets;private List<DataFileMeta> beforeFiles = new ArrayList<>();@Nullable private List<DeletionFile> beforeDeletionFiles;private List<DataFileMeta> dataFiles;@Nullable private List<DeletionFile> dataDeletionFiles;private boolean isStreaming = false;private boolean rawConvertible;public DataSplit() {}public long snapshotId() {return snapshotId;}public BinaryRow partition() {return partition;}public int bucket() {return bucket;}public String bucketPath() {return bucketPath;}public @Nullable Integer totalBuckets() {return totalBuckets;}public List<DataFileMeta> beforeFiles() {return beforeFiles;}public Optional<List<DeletionFile>> beforeDeletionFiles() {return Optional.ofNullable(beforeDeletionFiles);}public List<DataFileMeta> dataFiles() {return dataFiles;}@Overridepublic Optional<List<DeletionFile>> deletionFiles() {return Optional.ofNullable(dataDeletionFiles);}public boolean isStreaming() {return isStreaming;}public boolean rawConvertible() {return rawConvertible;}public OptionalLong latestFileCreationEpochMillis() {return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();}public OptionalLong earliestFileCreationEpochMillis() {return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).min();}@Overridepublic long rowCount() {long rowCount = 0;for (DataFileMeta file : dataFiles) {rowCount += file.rowCount();}return rowCount;}
File: paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java (L68-150)
public abstract class AbstractFileStoreScan implements FileStoreScan {private final ManifestsReader manifestsReader;private final SnapshotManager snapshotManager;private final ManifestFile.Factory manifestFileFactory;private final Integer parallelism;private final ConcurrentMap<Long, TableSchema> tableSchemas;private final SchemaManager schemaManager;private final TableSchema schema;private Snapshot specifiedSnapshot = null;private boolean onlyReadRealBuckets = false;private Integer specifiedBucket = null;private Filter<Integer> bucketFilter = null;private BiFilter<Integer, Integer> totalAwareBucketFilter = null;protected ScanMode scanMode = ScanMode.ALL;private Integer specifiedLevel = null;private Filter<Integer> levelFilter = null;private Filter<ManifestEntry> manifestEntryFilter = null;private Filter<String> fileNameFilter = null;private ScanMetrics scanMetrics = null;private boolean dropStats;public AbstractFileStoreScan(ManifestsReader manifestsReader,SnapshotManager snapshotManager,SchemaManager schemaManager,TableSchema schema,ManifestFile.Factory manifestFileFactory,@Nullable Integer parallelism) {this.manifestsReader = manifestsReader;this.snapshotManager = snapshotManager;this.schemaManager = schemaManager;this.schema = schema;this.manifestFileFactory = manifestFileFactory;this.tableSchemas = new ConcurrentHashMap<>();this.parallelism = parallelism;this.dropStats = false;}@Overridepublic FileStoreScan withPartitionFilter(Predicate predicate) {manifestsReader.withPartitionFilter(predicate);return this;}@Overridepublic FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {manifestsReader.withPartitionFilter(partitions);return this;}@Overridepublic FileStoreScan withPartitionsFilter(List<Map<String, String>> partitions) {manifestsReader.withPartitionsFilter(partitions);return this;}@Overridepublic FileStoreScan withPartitionFilter(PartitionPredicate predicate) {manifestsReader.withPartitionFilter(predicate);return this;}@Overridepublic FileStoreScan onlyReadRealBuckets() {manifestsReader.onlyReadRealBuckets();this.onlyReadRealBuckets = true;return this;}@Overridepublic FileStoreScan withBucket(int bucket) {manifestsReader.withBucket(bucket);specifiedBucket = bucket;return this;}@Overridepublic FileStoreScan withBucketFilter(Filter<Integer> bucketFilter) {this.bucketFilter = bucketFilter;
File: paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java (L67-92)
public static <T> RecordReader<T> readerForSection(List<SortedRun> section,FileReaderFactory<KeyValue> readerFactory,Comparator<InternalRow> userKeyComparator,@Nullable FieldsComparator userDefinedSeqComparator,MergeFunctionWrapper<T> mergeFunctionWrapper,MergeSorter mergeSorter)throws IOException {List<SizedReaderSupplier<KeyValue>> readers = new ArrayList<>();for (SortedRun run : section) {readers.add(new SizedReaderSupplier<KeyValue>() {@Overridepublic long estimateSize() {return run.totalSize();}@Overridepublic RecordReader<KeyValue> get() throws IOException {return readerForRun(run, readerFactory);}});}return mergeSorter.mergeSort(readers, userKeyComparator, userDefinedSeqComparator, mergeFunctionWrapper);}
File: paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java (L38-100)
public class Levels {private final Comparator<InternalRow> keyComparator;private final TreeSet<DataFileMeta> level0;private final List<SortedRun> levels;private final List<DropFileCallback> dropFileCallbacks = new ArrayList<>();public Levels(Comparator<InternalRow> keyComparator, List<DataFileMeta> inputFiles, int numLevels) {this.keyComparator = keyComparator;// in case the num of levels is not specified explicitlyint restoredNumLevels =Math.max(numLevels,inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);checkArgument(restoredNumLevels > 1, "Number of levels must be at least 2.");this.level0 =new TreeSet<>((a, b) -> {if (a.maxSequenceNumber() != b.maxSequenceNumber()) {// file with larger sequence number should be in frontreturn Long.compare(b.maxSequenceNumber(), a.maxSequenceNumber());} else {// When two or more jobs are writing the same merge tree, it is// possible that multiple files have the same maxSequenceNumber. In// this case we have to compare their file names so that files with// same maxSequenceNumber won't be "de-duplicated" by the tree set.int minSeqCompare =Long.compare(a.minSequenceNumber(), b.minSequenceNumber());if (minSeqCompare != 0) {return minSeqCompare;}// If minSequenceNumber is also the same, use creation timeint timeCompare = a.creationTime().compareTo(b.creationTime());if (timeCompare != 0) {return timeCompare;}// Final fallback: filename (to ensure uniqueness in TreeSet)return a.fileName().compareTo(b.fileName());}});this.levels = new ArrayList<>();for (int i = 1; i < restoredNumLevels; i++) {levels.add(SortedRun.empty());}Map<Integer, List<DataFileMeta>> levelMap = new HashMap<>();for (DataFileMeta file : inputFiles) {levelMap.computeIfAbsent(file.level(), level -> new ArrayList<>()).add(file);}levelMap.forEach((level, files) -> updateLevel(level, emptyList(), files));Preconditions.checkState(level0.size() + levels.stream().mapToInt(r -> r.files().size()).sum()== inputFiles.size(),"Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected.");}public TreeSet<DataFileMeta> level0() {
File: paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java (L1-50)
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.fileindex.bloomfilter;import org.apache.paimon.fileindex.FileIndexReader;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.BloomFilter64;
import org.apache.paimon.utils.BloomFilter64.BitSet;
import org.apache.paimon.utils.IOUtils;import org.apache.hadoop.util.bloom.HashFunction;import java.io.IOException;import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;
import static org.apache.paimon.fileindex.FileIndexResult.SKIP;/*** Bloom filter for file index.** <p>Note: This class use {@link BloomFilter64} as a base filter. Store the num hash function (one* integer) and bit set bytes only. Use {@link HashFunction} to hash the objects, which hash bytes* type(like varchar, binary, etc.) using xx hash, hash numeric type by specified number hash(see* http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm).*/
public class BloomFilterFileIndex implements FileIndexer {private static final int DEFAULT_ITEMS = 1_000_000;
