当前位置: 首页 > news >正文

Apache Paimon 查询全流程深度分析

基于 Apache Paimon 源码,我将为您详细分析从用户发起查询到返回结果的完整流程。

1. 整体架构图

1.1 数据读取分层架构

索引层 - Index Layer
存储层 - Storage Layer
执行层 - Execution Layer
API 层 - Table Interface
BloomFilter
FileIndexReader
Bitmap Index
ManifestFile
DataFileMeta
KeyValueFileReaderFactory
RecordReader
FileRecordReader
Format Reader
ORC/Parquet/Avro
LSM Tree Files
Level0 / Level1+
FileStoreScan
SnapshotReader
ManifestsReader
InnerTableRead
KeyValueTableRead
AppendTableRead
MergeFileSplitRead
RawFileSplitRead
ReadBuilder
Table
TableScan
TableRead

1.2 关键组件职责

API 层组件:

  • Table: 表接口,提供 newReadBuilder() 方法创建读取构建器 1
  • ReadBuilder: 构建扫描和读取对象,配置过滤条件和投影 2
  • TableScan: 扫描表快照,生成 Split 计划
  • TableRead: 读取接口,从 Split 创建 RecordReader 3

执行层组件:

  • FileStoreScan: 文件存储扫描实现,读取 Manifest 文件 4
  • SnapshotReader: 快照读取器,协调扫描和 Split 生成 5
  • KeyValueTableRead: 主键表读取实现,支持多种读取模式 6
  • MergeFileSplitRead: 处理需要合并的文件读取 7

存储层组件:

  • KeyValueFileReaderFactory: 创建 KeyValue 文件读取器的工厂 8
  • RecordReader: 批量读取记录的接口 9
  • DataFileMeta: 数据文件元数据,包含统计信息和索引引用

2. 核心类图

2.1 TableRead 接口体系

classDiagramclass TableRead {<<interface>>+withMetricRegistry(MetricRegistry) TableRead+executeFilter() TableRead+withIOManager(IOManager) TableRead+createReader(Split) RecordReader~InternalRow~+createReader(List~Split~) RecordReader~InternalRow~+createReader(Plan) RecordReader~InternalRow~}class InnerTableRead {<<interface>>+withFilter(Predicate) InnerTableRead+withFilter(List~Predicate~) InnerTableRead+withReadType(RowType) InnerTableRead+withTopN(TopN) InnerTableRead+withLimit(int) InnerTableRead+forceKeepDelete() InnerTableRead}class KeyValueTableRead {-List~SplitReadProvider~ readProviders-RowType readType-boolean forceKeepDelete-Predicate predicate-TopN topN-Integer limit+reader(Split) RecordReader~InternalRow~-createMergeReader() RecordReader-createNoMergeReader() RecordReader}class AppendTableRead {-SplitRead read-RowType readType+reader(Split) RecordReader~InternalRow~}class AbstractDataTableRead {<<abstract>>-TableSchema schema-MetricRegistry metricRegistry#applyReadType(RowType) void#innerWithFilter(Predicate) InnerTableRead+createReader(Split) RecordReader~InternalRow~}TableRead <|-- InnerTableReadInnerTableRead <|-- AbstractDataTableReadAbstractDataTableRead <|-- KeyValueTableReadAbstractDataTableRead <|-- AppendTableRead
```[10](#0-9) [11](#0-10) [12](#0-11) ### 2.2 Split 机制类图```mermaid
classDiagramclass Split {<<interface>>+rowCount() long+deletionFiles() Optional~List~DeletionFile~~+convertToRawFiles() Optional~List~RawFile~~+indexFiles() Optional~List~IndexFile~~}class DataSplit {-long snapshotId-BinaryRow partition-int bucket-String bucketPath-Integer totalBuckets-List~DataFileMeta~ beforeFiles-List~DeletionFile~ beforeDeletionFiles-List~DataFileMeta~ dataFiles-List~DeletionFile~ dataDeletionFiles-boolean isStreaming-boolean rawConvertible+snapshotId() long+partition() BinaryRow+bucket() int+dataFiles() List~DataFileMeta~+beforeFiles() List~DataFileMeta~+convertToRawFiles() Optional~List~RawFile~~+mergedRowCount() long}class DataFileMeta {-String fileName-long fileSize-long rowCount-InternalRow minKey-InternalRow maxKey-SimpleStats keyStats-SimpleStats valueStats-long minSequenceNumber-long maxSequenceNumber-int level-List~String~ extraFiles}class SplitGenerator {<<interface>>+splitForBatch(List~DataFileMeta~) List~DataSplit~+splitForStreaming(List~DataFileMeta~) List~DataSplit~}class AppendOnlySplitGenerator {-long targetSplitSize+splitForBatch(List~DataFileMeta~) List~DataSplit~}Split <|.. DataSplitDataSplit o-- DataFileMetaSplitGenerator <|.. AppendOnlySplitGenerator
```[13](#0-12) ### 2.3 RecordReader 体系```mermaid
classDiagramclass RecordReader~T~ {<<interface>>+readBatch() RecordIterator~T~+close() void+transform(Function) RecordReader~R~+filter(Filter) RecordReader~T~}class FileRecordReader~T~ {<<interface>>+readBatch() FileRecordIterator~T~}class KeyValueFileReaderFactory {-FileIO fileIO-SchemaManager schemaManager-TableSchema schema-RowType keyType-RowType valueType-FormatReaderMapping.Builder formatReaderMappingBuilder-long asyncThreshold+createRecordReader(DataFileMeta) RecordReader~KeyValue~}class DataFileRecordReader {-RowType tableRowType-FormatReaderFactory readerFactory-FormatReaderContext context-int[] indexMapping-CastFieldGetter[] castMapping+readBatch() FileRecordIterator~InternalRow~}class KeyValueDataFileRecordReader {-FileRecordReader~InternalRow~ reader-RowType keyType-RowType valueType-int level+readBatch() FileRecordIterator~KeyValue~}class ApplyDeletionVectorReader {-FileRecordReader reader-DeletionVector deletionVector+readBatch() FileRecordIterator}RecordReader <|-- FileRecordReaderFileRecordReader <|.. DataFileRecordReaderFileRecordReader <|.. KeyValueDataFileRecordReaderFileRecordReader <|.. ApplyDeletionVectorReaderKeyValueFileReaderFactory ..> KeyValueDataFileRecordReaderKeyValueDataFileRecordReader o-- DataFileRecordReader
```[14](#0-13) [15](#0-14) ### 2.4 文件扫描层```mermaid
classDiagramclass FileStoreScan {<<interface>>+withPartitionFilter(Predicate) FileStoreScan+withBucket(int) FileStoreScan+withSnapshot(long) FileStoreScan+withKind(ScanMode) FileStoreScan+enableValueFilter() FileStoreScan+plan() Plan+readManifest(ManifestFileMeta) List~ManifestEntry~}class AbstractFileStoreScan {<<abstract>>-ManifestsReader manifestsReader-SnapshotManager snapshotManager-Snapshot specifiedSnapshot-ScanMode scanMode+plan() Plan#withPartitionFilter(Predicate) FileStoreScan}class KeyValueFileStoreScan {-RowType keyType-RowType valueType-Predicate keyFilter-Predicate valueFilter+withKeyFilter(Predicate) FileStoreScan+withValueFilter(Predicate) FileStoreScan#filterByStats(ManifestEntry) boolean}class ManifestsReader {-ManifestFile manifestFile-Predicate partitionFilter+read(List~ManifestFileMeta~) List~ManifestEntry~}FileStoreScan <|-- AbstractFileStoreScanAbstractFileStoreScan <|-- KeyValueFileStoreScanAbstractFileStoreScan o-- ManifestsReader
```[4](#0-3) [16](#0-15) ## 3. 详细流程图### 3.1 查询初始化流程```mermaid
sequenceDiagramparticipant Userparticipant Tableparticipant ReadBuilderparticipant TableScanparticipant FileStoreScanparticipant TableReadparticipant KeyValueTableReadparticipant RecordReaderUser->>Table: newReadBuilder()Table->>ReadBuilder: create ReadBuilderImplUser->>ReadBuilder: withFilter(predicate)User->>ReadBuilder: withProjection(projection)User->>ReadBuilder: newScan()ReadBuilder->>TableScan: create scan instanceTableScan->>FileStoreScan: create KeyValueFileStoreScanUser->>TableScan: plan()FileStoreScan->>FileStoreScan: readManifests()FileStoreScan->>FileStoreScan: filterByStats()FileStoreScan->>TableScan: return Plan with splitsUser->>ReadBuilder: newRead()ReadBuilder->>TableRead: create KeyValueTableReadTableRead->>KeyValueTableRead: configure filters & projectionsUser->>TableRead: createReader(split)KeyValueTableRead->>KeyValueTableRead: select SplitReadProviderKeyValueTableRead->>RecordReader: create MergeReader or NoMergeReaderRecordReader-->>User: return RecordReader

关键调用路径:

  1. Table.newReadBuilder()ReadBuilderImpl 创建 1
  2. ReadBuilder.newScan()Table.newScan()KeyValueFileStore.newScan()
  3. FileStoreScan.plan()AbstractFileStoreScan.plan() → 读取 Manifest 并过滤 17
  4. ReadBuilder.newRead()Table.newRead() → 创建 KeyValueTableRead 18
  5. TableRead.createReader(split)KeyValueTableRead.reader(split) 19

3.2 文件扫描流程

匹配
不匹配
匹配
不匹配
通过
不通过
FileStoreScan.plan()
读取 Snapshot
读取 ManifestList
ManifestsReader.read()
读取 ManifestFile
分区过滤
Partition Filter
读取 ManifestEntry
跳过
Bucket 过滤
ManifestEntry 列表
统计信息过滤
filterByStats
保留 Entry
按 Partition+Bucket 分组
SplitGenerator
Batch 模式?
合并小文件
Bin-packing
每个文件一个 Split
生成 DataSplit 列表
返回 Plan

关键调用路径:

  1. FileStoreScan.plan() → 读取 Snapshot 元数据
  2. ManifestsReader.read() → 并行读取 Manifest 文件
  3. KeyValueFileStoreScan.filterByStats() → 基于统计信息过滤文件
  4. SplitGenerator.splitForBatch() → 使用 Bin-packing 算法合并小文件
  5. SnapshotReaderImpl.generateSplits() → 生成最终的 DataSplit 列表 20

3.3 LSM Tree 读取流程

需要
不需要
MergeFileSplitRead.createReader()
是否需要 Merge?
createMergeReader()
createNoMergeReader()
IntervalPartition 分组
将文件按键范围分成 Sections
遍历每个 Section
MergeTreeReaders.readerForSection()
为每个 SortedRun 创建 Reader
Run 包含的文件
Level 0 文件
按 Sequence 排序
Level 1+ 文件
按键范围排序
KeyValueFileReaderFactory
创建 RecordReader
MergeSorter.mergeSort()
使用 LoserTree
多路归并排序
MergeFunctionWrapper
应用合并函数
合并相同 Key 的记录
forceKeepDelete?
DropDeleteReader
过滤删除记录
保留删除记录
返回合并后的 RecordReader
直接连接所有文件的 Reader

关键调用路径:

  1. MergeFileSplitRead.createMergeReader() → 需要合并的读取路径 21
  2. IntervalPartition.partition() → 将文件按键范围重叠分组
  3. MergeTreeReaders.readerForSection() → 为每个 Section 创建合并读取器 22
  4. KeyValueFileReaderFactory.createRecordReader() → 创建文件读取器 23
  5. MergeSorter.mergeSort() → 多路归并排序
  6. MergeFunctionWrapper.reset() → 应用合并逻辑处理相同 Key

Level 0 特殊处理:
Level 0 文件按照 maxSequenceNumber 倒序存储在 TreeSet 中,读取时优先读取序列号较大的文件 24

3.4 索引使用流程

Bloom Filter
Bitmap Index
匹配
不匹配
查询带有过滤条件
FileStoreScan.plan()
是否配置 File Index?
直接读取文件
DataFileMeta 包含 extraFiles
检查 .index 文件
FileIndexReader.read()
读取索引头部
索引类型
BloomFilterFileIndex.Reader
BitmapFileIndex.Reader
测试过滤条件
值可能存在?
返回 SKIP
跳过整个文件
返回 REMAIN
需要读取文件
读取 Bitmap Block
Bitmap 测试
FileIndexEvaluator
标记文件为跳过
保留文件在扫描结果中
KeyValueFileReaderFactory
创建 Reader 时
应用格式级过滤
ORC/Parquet Predicate Pushdown
Row Group / Stripe 级别跳过

索引使用时机:

  1. Bloom Filter 索引 - 点查询优化:

    • 配置:file-index.bloom-filter.columns
    • 使用时机:等值查询 (=) 条件
    • 工作原理:判断值是否可能存在于文件中
    • 25
  2. Bitmap 索引 - 低基数列优化:

    • 配置:file-index.bitmap.columns
    • 使用时机:枚举类型、状态字段的等值或 IN 查询
    • 存储结构:使用稀疏块索引,包含 INDEX BLOCKS 和 BITMAP BLOCKS
  3. 统计信息过滤

    • DataFileMeta.valueStats 存储 min/max/nullCount
    • filterByStats() 方法中使用
    • 可以跳过不满足范围条件的文件
  4. 格式内置索引

    • ORC: Row Group Index, Bloom Filter
    • Parquet: Column Statistics, Dictionary Encoding
    • FormatReaderContext 中配置并使用

3.5 数据合并流程 (Merge-on-Read)

ClientMergeFileSplitReadIntervalPartitionMergeSorterLoserTreeMergeFunctionRecordReadercreateMergeReader(split)partition(files)按键范围重叠将文件分组成 SectionsList<Section>readerForSection()为每个 SortedRun创建 RecordReadermergeSort(readers)create(readers, comparator)初始化败者树每个 Reader 读取首条记录readBatch()next()找到最小 Keymerge(records with same key)根据 Merge Engine 类型deduplicate / partial-update / aggregation处理 Level 0 记录Level <= 0 的记录优先级更高合并高层级记录合并后的记录adjustTree()调整败者树准备下一条记录merged recordInternalRowloop[读取数据]loop[每个 Section]ClientMergeFileSplitReadIntervalPartitionMergeSorterLoserTreeMergeFunctionRecordReader

Merge-on-Read 关键点:

  1. Section 划分

    • IntervalPartition 根据键范围将文件分组
    • 相同 Section 内的文件键范围有重叠,需要合并
    • 不同 Section 间无重叠,可独立读取
  2. 多路归并

    • MergeSorter.mergeSort() 使用败者树算法
    • 比较器顺序:UserKey → SequenceNumber (降序) → ValueKind
    • Level 0 文件记录优先级最高 22
  3. 合并函数类型

    • deduplicate: 保留最新版本
    • partial-update: 部分字段更新
    • aggregation: 聚合计算
    • first-row: 保留首次插入的行
  4. 删除处理

    • DropDeleteReader 在最后过滤删除标记的记录
    • 如果 forceKeepDelete=true,保留删除记录用于 CDC 21
  5. 重叠区域优化

    • 重叠 Section 只下推主键过滤条件 (filtersForKeys)
    • 非重叠 Section 可以下推所有过滤条件 (filtersForAll)
    • 避免因过滤导致的记录丢失 26

4. 源码关键路径

4.1 查询入口路径

Table.newReadBuilder()└─> ReadBuilderImpl 构造├─> Table.newScan() │   └─> AbstractFileStoreTable.newScan()│       └─> KeyValueFileStore.newScan()│           └─> KeyValueFileStoreScan 创建│└─> Table.newRead()└─> PrimaryKeyFileStoreTable.newRead()└─> KeyValueFileStore.newRead()└─> KeyValueTableRead 创建
```[1](#0-0) ### 4.2 扫描计划生成路径

TableScan.plan()
└─> FileStoreScan.plan()
└─> AbstractFileStoreScan.plan()
├─> scan.withSnapshot() 设置快照
├─> ManifestsReader.read() 读取 Manifest
│ ├─> ManifestFile.read() 解析 Manifest 文件
│ ├─> 应用 Partition Filter
│ ├─> 应用 Bucket Filter
│ └─> 返回 ManifestEntry 列表

├─> KeyValueFileStoreScan.filterByStats() 统计信息过滤
│ └─> 检查 min/max/nullCount

└─> SnapshotReaderImpl.generateSplits()
└─> SplitGenerator.splitForBatch()
├─> AppendOnlySplitGenerator.splitForBatch()
│ └─> Bin-packing 算法合并文件

└─> 返回 DataSplit 列表


### 4.3 Reader 创建路径

TableRead.createReader(split)
└─> KeyValueTableRead.reader(split)
├─> 选择 SplitReadProvider
│ ├─> PrimaryKeyTableRawFileSplitReadProvider (Raw 模式)
│ ├─> MergeFileSplitReadProvider (Merge 模式)
│ ├─> IncrementalChangelogReadProvider (增量 Changelog)
│ └─> IncrementalDiffReadProvider (增量 Diff)

└─> MergeFileSplitRead.createReader(split)
├─> 判断是否需要 Merge

├─> createMergeReader() [需要合并]
│ ├─> IntervalPartition.partition() 文件分组
│ ├─> KeyValueFileReaderFactory.build() 创建工厂
│ ├─> MergeTreeReaders.readerForSection() 每个 Section
│ │ ├─> readerForRun() 每个 SortedRun
│ │ │ └─> KeyValueFileReaderFactory.createRecordReader()
│ │ │ └─> KeyValueDataFileRecordReader 包装
│ │ │
│ │ └─> MergeSorter.mergeSort()
│ │ ├─> SortMergeReader.create()
│ │ ├─> LoserTree 初始化
│ │ └─> 应用 MergeFunctionWrapper
│ │
│ └─> DropDeleteReader 包装 (如果需要)

└─> createNoMergeReader() [无需合并]
└─> ConcatRecordReader 连接所有文件


### 4.4 文件读取路径

RecordReader.readBatch()
└─> SortMergeReader.readBatch() [Merge 模式]
├─> LoserTree.nextKey()
│ └─> 找到最小 Key 的 Reader

├─> LoserTree.popElement()
│ └─> 收集相同 Key 的所有记录

├─> MergeFunctionWrapper.reset(records)
│ ├─> LookupMergeFunction.merge()
│ │ ├─> 处理 Level 0 记录 (level <= 0)
│ │ ├─> 处理高层级记录
│ │ └─> 应用 MergeEngine 逻辑
│ │
│ └─> 返回合并后的 KeyValue

└─> LoserTree.adjustTree()
└─> 重新调整败者树

RecordReader.readBatch()
└─> ConcatRecordReader.readBatch() [No-Merge 模式]
└─> 顺序读取每个文件的 Reader
└─> KeyValueDataFileRecordReader.readBatch()
└─> DataFileRecordReader.readBatch()
├─> FormatReader.readBatch() (ORC/Parquet/Avro)
├─> 应用 DeletionVector (如果存在)
└─> 转换为 KeyValue 格式


### 4.5 索引应用路径

FileStoreScan.plan()
└─> 读取 ManifestEntry 时
└─> KeyValueFileStoreScan.filterByStats()
├─> 检查统计信息 (min/max/nullCount)
└─> 决定是否保留文件

DataFileMeta.extraFiles 包含 .index 文件
└─> KeyValueFileReaderFactory.createRecordReader()
└─> FormatReaderContext 构造
├─> 传递 Predicate 给格式 Reader

└─> OrcFormatReaderFactory.createReader()
├─> 应用 ORC SearchArgument
├─> 读取 ORC 索引
├─> Row Group 级别过滤
└─> Bloom Filter 测试

文件级索引读取:
FileIndexReader.read(indexFile)
└─> BloomFilterFileIndex.Reader.visitEntry()
├─> 读取 Bloom Filter 位集
├─> 测试查询值
└─> 返回 SKIP 或 REMAIN


### 4.6 完整调用示例

// 用户代码
Table table = catalog.getTable(identifier);
ReadBuilder readBuilder = table.newReadBuilder()
.withFilter(predicate)
.withProjection(projection);

TableScan scan = readBuilder.newScan();
TableRead read = readBuilder.newRead();

// 执行扫描
TableScan.Plan plan = scan.plan();

// 读取数据
for (Split split : plan.splits()) {
RecordReader reader = read.createReader

Citations

File: paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java (L19-50)

package org.apache.paimon.table.source;import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;import java.io.Serializable;
import java.util.List;
import java.util.Map;/*** An interface for building the {@link TableScan} and {@link TableRead}.** <p>Example of distributed reading:** <pre>{@code* // 1. Create a ReadBuilder (Serializable)* Table table = catalog.getTable(...);* ReadBuilder builder = table.newReadBuilder()*     .withFilter(...)*     .withReadType(...);** // 2. Plan splits in 'Coordinator' (or named 'Driver'):* List<Split> splits = builder.newScan().plan().splits();** // 3. Distribute these splits to different tasks*

File: paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java (L19-100)

package org.apache.paimon.table.source;import org.apache.paimon.CoreOptions;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;import javax.annotation.Nullable;import java.util.Map;
import java.util.Objects;import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
import static org.apache.paimon.utils.Preconditions.checkState;/** Implementation for {@link ReadBuilder}. */
public class ReadBuilderImpl implements ReadBuilder {private static final long serialVersionUID = 1L;private final InnerTable table;private final RowType partitionType;private final String defaultPartitionName;private Predicate filter;private Integer limit = null;private TopN topN = null;private Integer shardIndexOfThisSubtask;private Integer shardNumberOfParallelSubtasks;private @Nullable PartitionPredicate partitionFilter;private @Nullable Integer specifiedBucket = null;private Filter<Integer> bucketFilter;private @Nullable RowType readType;private boolean dropStats = false;public ReadBuilderImpl(InnerTable table) {this.table = table;this.partitionType = table.rowType().project(table.partitionKeys());this.defaultPartitionName = new CoreOptions(table.options()).partitionDefaultName();}@Overridepublic String tableName() {return table.name();}@Overridepublic RowType readType() {if (readType != null) {return readType;} else {return table.rowType();}}@Overridepublic ReadBuilder withFilter(Predicate filter) {if (this.filter == null) {this.filter = filter;} else {this.filter = PredicateBuilder.and(this.filter, filter);}return this;}@Overridepublic ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {if (partitionSpec != null) {this.partitionFilter =fromPredicate(partitionType,

File: paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java (L34-62)

/*** An abstraction layer above {@link SplitRead} to provide reading of {@link InternalRow}.** @since 0.4.0*/
@Public
public interface TableRead {/** Set {@link MetricRegistry} to table read. */TableRead withMetricRegistry(MetricRegistry registry);TableRead executeFilter();TableRead withIOManager(IOManager ioManager);RecordReader<InternalRow> createReader(Split split) throws IOException;default RecordReader<InternalRow> createReader(List<Split> splits) throws IOException {List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();for (Split split : splits) {readers.add(() -> createReader(split));}return ConcatRecordReader.create(readers);}default RecordReader<InternalRow> createReader(TableScan.Plan plan) throws IOException {return createReader(plan.splits());}
}

File: paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java (L48-98)

public interface FileStoreScan {FileStoreScan withPartitionFilter(Predicate predicate);FileStoreScan withPartitionFilter(List<BinaryRow> partitions);FileStoreScan withPartitionsFilter(List<Map<String, String>> partitions);FileStoreScan withPartitionFilter(PartitionPredicate predicate);FileStoreScan withBucket(int bucket);FileStoreScan onlyReadRealBuckets();FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer> bucketFilter);FileStoreScan withPartitionBucket(BinaryRow partition, int bucket);FileStoreScan withSnapshot(long snapshotId);FileStoreScan withSnapshot(Snapshot snapshot);FileStoreScan withKind(ScanMode scanMode);FileStoreScan withLevel(int level);FileStoreScan withLevelFilter(Filter<Integer> levelFilter);FileStoreScan enableValueFilter();FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter);FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter);FileStoreScan withMetrics(ScanMetrics metrics);FileStoreScan dropStats();FileStoreScan keepStats();@NullableInteger parallelism();ManifestsReader manifestsReader();List<ManifestEntry> readManifest(ManifestFileMeta manifest);/** Produce a {@link Plan}. */Plan plan();

File: paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java (L77-150)

/** Implementation of {@link SnapshotReader}. */
public class SnapshotReaderImpl implements SnapshotReader {private final FileStoreScan scan;private final TableSchema tableSchema;private final CoreOptions options;private final boolean deletionVectors;private final SnapshotManager snapshotManager;private final ChangelogManager changelogManager;private final ConsumerManager consumerManager;private final SplitGenerator splitGenerator;private final BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer;private final FileStorePathFactory pathFactory;private final String tableName;private final IndexFileHandler indexFileHandler;private ScanMode scanMode = ScanMode.ALL;private RecordComparator lazyPartitionComparator;public SnapshotReaderImpl(FileStoreScan scan,TableSchema tableSchema,CoreOptions options,SnapshotManager snapshotManager,ChangelogManager changelogManager,SplitGenerator splitGenerator,BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,FileStorePathFactory pathFactory,String tableName,IndexFileHandler indexFileHandler) {this.scan = scan;this.tableSchema = tableSchema;this.options = options;this.deletionVectors = options.deletionVectorsEnabled();this.snapshotManager = snapshotManager;this.changelogManager = changelogManager;this.consumerManager =new ConsumerManager(snapshotManager.fileIO(),snapshotManager.tablePath(),snapshotManager.branch());this.splitGenerator = splitGenerator;this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;this.pathFactory = pathFactory;this.tableName = tableName;this.indexFileHandler = indexFileHandler;}@Overridepublic Integer parallelism() {return scan.parallelism();}@Overridepublic SnapshotManager snapshotManager() {return snapshotManager;}@Overridepublic ChangelogManager changelogManager() {return changelogManager;}@Overridepublic ManifestsReader manifestsReader() {return scan.manifestsReader();}@Overridepublic List<ManifestEntry> readManifest(ManifestFileMeta manifest) {return scan.readManifest(manifest);}

File: paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java (L47-152)

/*** An abstraction layer above {@link MergeFileSplitRead} to provide reading of {@link InternalRow}.*/
public final class KeyValueTableRead extends AbstractDataTableRead {private final List<SplitReadProvider> readProviders;@Nullable private RowType readType = null;private boolean forceKeepDelete = false;private Predicate predicate = null;private IOManager ioManager = null;@Nullable private TopN topN = null;@Nullable private Integer limit = null;public KeyValueTableRead(Supplier<MergeFileSplitRead> mergeReadSupplier,Supplier<RawFileSplitRead> batchRawReadSupplier,TableSchema schema) {super(schema);this.readProviders =Arrays.asList(new PrimaryKeyTableRawFileSplitReadProvider(batchRawReadSupplier, this::config),new MergeFileSplitReadProvider(mergeReadSupplier, this::config),new IncrementalChangelogReadProvider(mergeReadSupplier, this::config),new IncrementalDiffReadProvider(mergeReadSupplier, this::config));}private List<SplitRead<InternalRow>> initialized() {List<SplitRead<InternalRow>> readers = new ArrayList<>();for (SplitReadProvider readProvider : readProviders) {if (readProvider.get().initialized()) {readers.add(readProvider.get().get());}}return readers;}private void config(SplitRead<InternalRow> read) {if (forceKeepDelete) {read = read.forceKeepDelete();}if (readType != null) {read = read.withReadType(readType);}if (topN != null) {read = read.withTopN(topN);}if (limit != null) {read = read.withLimit(limit);}read.withFilter(predicate).withIOManager(ioManager);}@Overridepublic void applyReadType(RowType readType) {initialized().forEach(r -> r.withReadType(readType));this.readType = readType;}@Overridepublic InnerTableRead forceKeepDelete() {initialized().forEach(SplitRead::forceKeepDelete);this.forceKeepDelete = true;return this;}@Overrideprotected InnerTableRead innerWithFilter(Predicate predicate) {initialized().forEach(r -> r.withFilter(predicate));this.predicate = predicate;return this;}@Overridepublic InnerTableRead withTopN(TopN topN) {initialized().forEach(r -> r.withTopN(topN));this.topN = topN;return this;}@Overridepublic InnerTableRead withLimit(int limit) {initialized().forEach(r -> r.withLimit(limit));this.limit = limit;return this;}@Overridepublic TableRead withIOManager(IOManager ioManager) {initialized().forEach(r -> r.withIOManager(ioManager));this.ioManager = ioManager;return this;}@Overridepublic RecordReader<InternalRow> reader(Split split) throws IOException {DataSplit dataSplit = (DataSplit) split;for (SplitReadProvider readProvider : readProviders) {if (readProvider.match(dataSplit, forceKeepDelete)) {return readProvider.get().get().createReader(dataSplit);}}throw new RuntimeException("Should not happen.");}

File: paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java (L66-107)

/** A {@link SplitRead} to read row lineage table which need field merge. */
public class MergeFileSplitRead implements SplitRead<KeyValue> {private final TableSchema tableSchema;private final FileIO fileIO;private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;private final Comparator<InternalRow> keyComparator;private final MergeFunctionFactory<KeyValue> mfFactory;private final MergeSorter mergeSorter;private final List<String> sequenceFields;private final boolean sequenceOrder;@Nullable private RowType readKeyType;@Nullable private List<Predicate> filtersForKeys;@Nullable private List<Predicate> filtersForAll;@Nullable private int[][] pushdownProjection;@Nullable private int[][] outerProjection;private boolean forceKeepDelete = false;public MergeFileSplitRead(CoreOptions options,TableSchema schema,RowType keyType,RowType valueType,Comparator<InternalRow> keyComparator,MergeFunctionFactory<KeyValue> mfFactory,KeyValueFileReaderFactory.Builder readerFactoryBuilder) {this.tableSchema = schema;this.readerFactoryBuilder = readerFactoryBuilder;this.fileIO = readerFactoryBuilder.fileIO();this.keyComparator = keyComparator;this.mfFactory = mfFactory;this.mergeSorter =new MergeSorter(CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null);this.sequenceFields = options.sequenceField();this.sequenceOrder = options.sequenceFieldSortOrderIsAscending();}

File: paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java (L199-235)

    public MergeFileSplitRead withFilter(Predicate predicate) {if (predicate == null) {return this;}List<Predicate> allFilters = new ArrayList<>();List<Predicate> pkFilters = null;List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();Set<String> nonPrimaryKeys =tableSchema.fieldNames().stream().filter(name -> !primaryKeys.contains(name)).collect(Collectors.toSet());for (Predicate sub : splitAnd(predicate)) {allFilters.add(sub);if (!containsFields(sub, nonPrimaryKeys)) {if (pkFilters == null) {pkFilters = new ArrayList<>();}// TODO Actually, the index is wrong, but it is OK.//  The orc filter just use name instead of index.pkFilters.add(sub);}}// Consider this case:// Denote (seqNumber, key, value) as a record. We have two overlapping runs in a section://   * First run: (1, k1, 100), (2, k2, 200)//   * Second run: (3, k1, 10), (4, k2, 20)// If we push down filter "value >= 100" for this section, only the first run will be read,// and the second run is lost. This will produce incorrect results.//// So for sections with overlapping runs, we only push down key filters.// For sections with only one run, as each key only appears once, it is OK to push down// value filters.filtersForAll = allFilters;filtersForKeys = pkFilters;return this;}

File: paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java (L238-258)

    public RecordReader<KeyValue> createReader(DataSplit split) throws IOException {if (!split.beforeFiles().isEmpty()) {throw new IllegalArgumentException("This read cannot accept split with before files.");}if (split.isStreaming() || split.bucket() == BucketMode.POSTPONE_BUCKET) {return createNoMergeReader(split.partition(),split.bucket(),split.dataFiles(),split.deletionFiles().orElse(null),split.isStreaming());} else {return createMergeReader(split.partition(),split.bucket(),split.dataFiles(),split.deletionFiles().orElse(null),forceKeepDelete);}}

File: paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java (L260-298)

    public RecordReader<KeyValue> createMergeReader(BinaryRow partition,int bucket,List<DataFileMeta> files,@Nullable List<DeletionFile> deletionFiles,boolean keepDelete)throws IOException {// Sections are read by SortMergeReader, which sorts and merges records by keys.// So we cannot project keys or else the sorting will be incorrect.DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, files, deletionFiles);KeyValueFileReaderFactory overlappedSectionFactory =readerFactoryBuilder.build(partition, bucket, dvFactory, false, filtersForKeys);KeyValueFileReaderFactory nonOverlappedSectionFactory =readerFactoryBuilder.build(partition, bucket, dvFactory, false, filtersForAll);List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();MergeFunctionWrapper<KeyValue> mergeFuncWrapper =new ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));for (List<SortedRun> section : new IntervalPartition(files, keyComparator).partition()) {sectionReaders.add(() ->MergeTreeReaders.readerForSection(section,section.size() > 1? overlappedSectionFactory: nonOverlappedSectionFactory,keyComparator,createUdsComparator(),mergeFuncWrapper,mergeSorter));}RecordReader<KeyValue> reader = ConcatRecordReader.create(sectionReaders);if (!keepDelete) {reader = new DropDeleteReader(reader);}return projectOuter(projectKey(reader));}

File: paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java (L57-96)

/** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */
public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> {private final FileIO fileIO;private final SchemaManager schemaManager;private final TableSchema schema;private final RowType keyType;private final RowType valueType;private final FormatReaderMapping.Builder formatReaderMappingBuilder;private final DataFilePathFactory pathFactory;private final long asyncThreshold;private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;private final BinaryRow partition;private final DeletionVector.Factory dvFactory;private KeyValueFileReaderFactory(FileIO fileIO,SchemaManager schemaManager,TableSchema schema,RowType keyType,RowType valueType,FormatReaderMapping.Builder formatReaderMappingBuilder,DataFilePathFactory pathFactory,long asyncThreshold,BinaryRow partition,DeletionVector.Factory dvFactory) {this.fileIO = fileIO;this.schemaManager = schemaManager;this.schema = schema;this.keyType = keyType;this.valueType = valueType;this.formatReaderMappingBuilder = formatReaderMappingBuilder;this.pathFactory = pathFactory;this.asyncThreshold = asyncThreshold;this.partition = partition;this.formatReaderMappings = new HashMap<>();this.dvFactory = dvFactory;}

File: paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java (L98-151)

    @Overridepublic RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws IOException {if (file.fileSize() >= asyncThreshold && file.fileName().endsWith(".orc")) {return new AsyncRecordReader<>(() -> createRecordReader(file, false, 2));}return createRecordReader(file, true, null);}private FileRecordReader<KeyValue> createRecordReader(DataFileMeta file, boolean reuseFormat, @Nullable Integer orcPoolSize)throws IOException {String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName());long schemaId = file.schemaId();Supplier<FormatReaderMapping> formatSupplier =() ->formatReaderMappingBuilder.build(formatIdentifier,schema,schemaId == schema.id() ? schema : schemaManager.schema(schemaId));FormatReaderMapping formatReaderMapping =reuseFormat? formatReaderMappings.computeIfAbsent(new FormatKey(schemaId, formatIdentifier),key -> formatSupplier.get()): formatSupplier.get();Path filePath = pathFactory.toPath(file);long fileSize = file.fileSize();FileRecordReader<InternalRow> fileRecordReader =new DataFileRecordReader(schema.logicalRowType(),formatReaderMapping.getReaderFactory(),orcPoolSize == null? new FormatReaderContext(fileIO, filePath, fileSize): new OrcFormatReaderContext(fileIO, filePath, fileSize, orcPoolSize),formatReaderMapping.getIndexMapping(),formatReaderMapping.getCastMapping(),PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition),false,null,-1,Collections.emptyMap());Optional<DeletionVector> deletionVector = dvFactory.create(file.fileName());if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {fileRecordReader =new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get());}return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level());}

File: paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java (L34-53)

/*** The reader that reads the batches of records.** @since 0.4.0*/
@Public
public interface RecordReader<T> extends Closeable {/*** Reads one batch. The method should return null when reaching the end of the input.** <p>The returned iterator object and any contained objects may be held onto by the source for* some time, so it should not be immediately reused by the reader.*/@NullableRecordIterator<T> readBatch() throws IOException;/** Closes the reader and should release all resources. */@Overridevoid close() throws IOException;

File: paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java (L29-75)

/** Inner {@link TableRead} contains filter and projection push down. */
public interface InnerTableRead extends TableRead {default InnerTableRead withFilter(List<Predicate> predicates) {if (predicates == null || predicates.isEmpty()) {return this;}return withFilter(PredicateBuilder.and(predicates));}InnerTableRead withFilter(Predicate predicate);/** Use {@link #withReadType(RowType)} instead. */@Deprecateddefault InnerTableRead withProjection(int[] projection) {if (projection == null) {return this;}throw new UnsupportedOperationException();}default InnerTableRead withReadType(RowType readType) {throw new UnsupportedOperationException();}default InnerTableRead withTopN(TopN topN) {return this;}default InnerTableRead withLimit(int limit) {return this;}default InnerTableRead forceKeepDelete() {return this;}@Overridedefault TableRead executeFilter() {return this;}@Overridedefault InnerTableRead withMetricRegistry(MetricRegistry registry) {return this;}
}

File: paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java (L60-145)

/** Input splits. Needed by most batch computation engines. */
public class DataSplit implements Split {private static final long serialVersionUID = 7L;private static final long MAGIC = -2394839472490812314L;private static final int VERSION = 8;private long snapshotId = 0;private BinaryRow partition;private int bucket = -1;private String bucketPath;@Nullable private Integer totalBuckets;private List<DataFileMeta> beforeFiles = new ArrayList<>();@Nullable private List<DeletionFile> beforeDeletionFiles;private List<DataFileMeta> dataFiles;@Nullable private List<DeletionFile> dataDeletionFiles;private boolean isStreaming = false;private boolean rawConvertible;public DataSplit() {}public long snapshotId() {return snapshotId;}public BinaryRow partition() {return partition;}public int bucket() {return bucket;}public String bucketPath() {return bucketPath;}public @Nullable Integer totalBuckets() {return totalBuckets;}public List<DataFileMeta> beforeFiles() {return beforeFiles;}public Optional<List<DeletionFile>> beforeDeletionFiles() {return Optional.ofNullable(beforeDeletionFiles);}public List<DataFileMeta> dataFiles() {return dataFiles;}@Overridepublic Optional<List<DeletionFile>> deletionFiles() {return Optional.ofNullable(dataDeletionFiles);}public boolean isStreaming() {return isStreaming;}public boolean rawConvertible() {return rawConvertible;}public OptionalLong latestFileCreationEpochMillis() {return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();}public OptionalLong earliestFileCreationEpochMillis() {return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).min();}@Overridepublic long rowCount() {long rowCount = 0;for (DataFileMeta file : dataFiles) {rowCount += file.rowCount();}return rowCount;}

File: paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java (L68-150)

public abstract class AbstractFileStoreScan implements FileStoreScan {private final ManifestsReader manifestsReader;private final SnapshotManager snapshotManager;private final ManifestFile.Factory manifestFileFactory;private final Integer parallelism;private final ConcurrentMap<Long, TableSchema> tableSchemas;private final SchemaManager schemaManager;private final TableSchema schema;private Snapshot specifiedSnapshot = null;private boolean onlyReadRealBuckets = false;private Integer specifiedBucket = null;private Filter<Integer> bucketFilter = null;private BiFilter<Integer, Integer> totalAwareBucketFilter = null;protected ScanMode scanMode = ScanMode.ALL;private Integer specifiedLevel = null;private Filter<Integer> levelFilter = null;private Filter<ManifestEntry> manifestEntryFilter = null;private Filter<String> fileNameFilter = null;private ScanMetrics scanMetrics = null;private boolean dropStats;public AbstractFileStoreScan(ManifestsReader manifestsReader,SnapshotManager snapshotManager,SchemaManager schemaManager,TableSchema schema,ManifestFile.Factory manifestFileFactory,@Nullable Integer parallelism) {this.manifestsReader = manifestsReader;this.snapshotManager = snapshotManager;this.schemaManager = schemaManager;this.schema = schema;this.manifestFileFactory = manifestFileFactory;this.tableSchemas = new ConcurrentHashMap<>();this.parallelism = parallelism;this.dropStats = false;}@Overridepublic FileStoreScan withPartitionFilter(Predicate predicate) {manifestsReader.withPartitionFilter(predicate);return this;}@Overridepublic FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {manifestsReader.withPartitionFilter(partitions);return this;}@Overridepublic FileStoreScan withPartitionsFilter(List<Map<String, String>> partitions) {manifestsReader.withPartitionsFilter(partitions);return this;}@Overridepublic FileStoreScan withPartitionFilter(PartitionPredicate predicate) {manifestsReader.withPartitionFilter(predicate);return this;}@Overridepublic FileStoreScan onlyReadRealBuckets() {manifestsReader.onlyReadRealBuckets();this.onlyReadRealBuckets = true;return this;}@Overridepublic FileStoreScan withBucket(int bucket) {manifestsReader.withBucket(bucket);specifiedBucket = bucket;return this;}@Overridepublic FileStoreScan withBucketFilter(Filter<Integer> bucketFilter) {this.bucketFilter = bucketFilter;

File: paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java (L67-92)

    public static <T> RecordReader<T> readerForSection(List<SortedRun> section,FileReaderFactory<KeyValue> readerFactory,Comparator<InternalRow> userKeyComparator,@Nullable FieldsComparator userDefinedSeqComparator,MergeFunctionWrapper<T> mergeFunctionWrapper,MergeSorter mergeSorter)throws IOException {List<SizedReaderSupplier<KeyValue>> readers = new ArrayList<>();for (SortedRun run : section) {readers.add(new SizedReaderSupplier<KeyValue>() {@Overridepublic long estimateSize() {return run.totalSize();}@Overridepublic RecordReader<KeyValue> get() throws IOException {return readerForRun(run, readerFactory);}});}return mergeSorter.mergeSort(readers, userKeyComparator, userDefinedSeqComparator, mergeFunctionWrapper);}

File: paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java (L38-100)

public class Levels {private final Comparator<InternalRow> keyComparator;private final TreeSet<DataFileMeta> level0;private final List<SortedRun> levels;private final List<DropFileCallback> dropFileCallbacks = new ArrayList<>();public Levels(Comparator<InternalRow> keyComparator, List<DataFileMeta> inputFiles, int numLevels) {this.keyComparator = keyComparator;// in case the num of levels is not specified explicitlyint restoredNumLevels =Math.max(numLevels,inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);checkArgument(restoredNumLevels > 1, "Number of levels must be at least 2.");this.level0 =new TreeSet<>((a, b) -> {if (a.maxSequenceNumber() != b.maxSequenceNumber()) {// file with larger sequence number should be in frontreturn Long.compare(b.maxSequenceNumber(), a.maxSequenceNumber());} else {// When two or more jobs are writing the same merge tree, it is// possible that multiple files have the same maxSequenceNumber. In// this case we have to compare their file names so that files with// same maxSequenceNumber won't be "de-duplicated" by the tree set.int minSeqCompare =Long.compare(a.minSequenceNumber(), b.minSequenceNumber());if (minSeqCompare != 0) {return minSeqCompare;}// If minSequenceNumber is also the same, use creation timeint timeCompare = a.creationTime().compareTo(b.creationTime());if (timeCompare != 0) {return timeCompare;}// Final fallback: filename (to ensure uniqueness in TreeSet)return a.fileName().compareTo(b.fileName());}});this.levels = new ArrayList<>();for (int i = 1; i < restoredNumLevels; i++) {levels.add(SortedRun.empty());}Map<Integer, List<DataFileMeta>> levelMap = new HashMap<>();for (DataFileMeta file : inputFiles) {levelMap.computeIfAbsent(file.level(), level -> new ArrayList<>()).add(file);}levelMap.forEach((level, files) -> updateLevel(level, emptyList(), files));Preconditions.checkState(level0.size() + levels.stream().mapToInt(r -> r.files().size()).sum()== inputFiles.size(),"Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected.");}public TreeSet<DataFileMeta> level0() {

File: paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java (L1-50)

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.fileindex.bloomfilter;import org.apache.paimon.fileindex.FileIndexReader;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.BloomFilter64;
import org.apache.paimon.utils.BloomFilter64.BitSet;
import org.apache.paimon.utils.IOUtils;import org.apache.hadoop.util.bloom.HashFunction;import java.io.IOException;import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;
import static org.apache.paimon.fileindex.FileIndexResult.SKIP;/*** Bloom filter for file index.** <p>Note: This class use {@link BloomFilter64} as a base filter. Store the num hash function (one* integer) and bit set bytes only. Use {@link HashFunction} to hash the objects, which hash bytes* type(like varchar, binary, etc.) using xx hash, hash numeric type by specified number hash(see* http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm).*/
public class BloomFilterFileIndex implements FileIndexer {private static final int DEFAULT_ITEMS = 1_000_000;
http://www.dtcms.com/a/577423.html

相关文章:

  • 网站中英文切换代码企业服务器配置方案
  • 专业的内蒙古网站建设160外发加工网
  • 团队学习与企业破局
  • 编程语言|前端开发——WebAssembly 和 JavaScript 该怎么选?
  • 佛山美容网站建设广州旅游网站建设设计公司
  • 深入理解HTTPS协议:从密码学基础到TLS 1.3实战
  • rhcse----DNS
  • 苍穹外卖资源点整理+个人错误解析-Day05-Redis、店铺营业状态设置
  • Vue 3.5 新API解析:响应式革命、SSR黑科技与开发体验飞跃
  • 【tips】项目中 package.json的 “type“对于文件的导入导出的区别
  • 【科研绘图系列】R语言绘制曲线图(curve plot)
  • 骏域网站百度信息流是什么
  • 【科研绘图系列】R语言绘制地图(map plot)
  • 【C 语言面试】高频考点深度解析
  • 【AI】拆解神经网络“技术高墙”:一条基于“根本原理-补丁理论-AI部署”哲学的学习路径
  • 让 Elasticsearch Delete By Query 请求立即生效
  • HarmonyOS开发-系统AI能力-语音转文字
  • 巨鹿企业做网站儋州网站建设培训学校
  • 建站优化收费下载网页图片
  • Docker搭建Ngnix、php5.6、php8、postgresql、redis
  • php基础-系统函数-第15天
  • CSP-J教程——第一阶段——第五课:程序流程控制 - 选择结构
  • 【Go微服务框架深度对比】Kratos、Go-Zero、Go-Micro、GoFrame、Sponge五大框架
  • 基于FPGA实现Mini-LVDS转LVDS
  • 做网站的是如何赚钱的哪个小说网站版权做的好处
  • Cache的基本原理
  • 如何提高外贸网站排名南京高端定制网站建设
  • 建网站需要多久网站模板怎么制作
  • 计算机网络:基于TCP协议的自定义协议实现网络计算器功能
  • SpringBoot3+ApolloClient2.3.0集成Apollo2.4.0示例