深入解析Paimon MergeFunction
MergeFunction 创建过程
以PrimaryKeyFileStoreTable为例子
@Overridepublic KeyValueFileStore store() {if (lazyStore == null) {RowType rowType = tableSchema.logicalRowType();CoreOptions options = CoreOptions.fromMap(tableSchema.options());KeyValueFieldsExtractor extractor =PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;MergeFunctionFactory<KeyValue> mfFactory =PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);if (options.needLookup()) {mfFactory = LookupMergeFunction.wrap(mfFactory);}
调用的是PrimaryKeyTableUtils的函数,通过用户配置的mergeEngine产生
public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(TableSchema tableSchema, KeyValueFieldsExtractor extractor) {RowType rowType = tableSchema.logicalRowType();Options conf = Options.fromMap(tableSchema.options());CoreOptions options = new CoreOptions(conf);CoreOptions.MergeEngine mergeEngine = options.mergeEngine();switch (mergeEngine) {case DEDUPLICATE:return DeduplicateMergeFunction.factory(conf);case PARTIAL_UPDATE:return PartialUpdateMergeFunction.factory(conf, rowType, tableSchema.primaryKeys());case AGGREGATE:return AggregateMergeFunction.factory(conf,tableSchema.fieldNames(),rowType.getFieldTypes(),tableSchema.primaryKeys());case FIRST_ROW:return FirstRowMergeFunction.factory(conf);default:throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);}}
KeyValueFileStore会进一步把mf传递给 KeyValueFileStoreWrite
return new KeyValueFileStoreWrite(fileIO,schemaManager,schema,commitUser,partitionType,keyType,valueType,keyComparatorSupplier,() -> UserDefinedSeqComparator.create(valueType, options),logDedupEqualSupplier,mfFactory,pathFactory(),this::pathFactory,snapshotManager(),newScan(),indexFactory,deletionVectorsMaintainerFactory,options,keyValueFieldsExtractor,tableName);
KeyValueFileStoreWrite 会创建MergeTreeWriter
return new MergeTreeWriter(options.writeBufferSpillable(),options.writeBufferSpillDiskSize(),options.localSortMaxNumFileHandles(),options.spillCompressOptions(),ioManager,compactManager,restoredMaxSeqNumber,keyComparator,mfFactory.create(),writerFactory,options.commitForceCompact(),options.changelogProducer(),restoreIncrement,UserDefinedSeqComparator.create(valueType, options));}
也会创建MergeTreeCompactRewriter,Compaction会使用merge function
return new MergeTreeCompactRewriter(readerFactory,writerFactory,keyComparator,userDefinedSeqComparator,mfFactory,mergeSorter);
MergeTreeWriter
为何使用 mergeFunction
?会丢数据吗?
-
为何使用:
MergeTreeWriter
的核心职责之一就是处理主键冲突。mergeFunction
定义了当新旧记录主键相同时应该执行何种合并逻辑。在
flushWriteBuffer
方法中,当数据从内存刷盘时,会调用writeBuffer.forEach(keyComparator, mergeFunction, ...)
。这里的mergeFunction
实际上是在对内存缓冲区内的数据进行合并。虽然SortBufferWriteBuffer
已经利用sequence.field
做了一轮预处理,但mergeFunction
提供了更通用的合并框架。对于 Paimon 的默认行为(deduplicate
或partial-update
),mergeFunction
的实现会比较两条记录的sequenceNumber
,并保留较大的那一个。 -
会丢掉多版本数据吗?
是的,会,而且这是设计意图。
Paimon 的 Merge-Tree 表(尤其是带主键的表)的设计目标是维护每个主键的最新状态 (latest state),而不是存储所有的历史版本。
mergeFunction
的核心职责就是在合并过程中淘汰旧版本数据,只保留最新的版本。例如,对于同一个主键
PK=1
,你先后写入了三条记录:-
{PK=1, value='A', seq=101}
-
{PK=1, value='B', seq=102}
-
{PK=1, value='C', seq=103}
当这三条记录在内存或磁盘上相遇并触发合并时,
mergeFunction
会比较它们的sequenceNumber
,最终只保留seq=103
的记录{PK=1, value='C'}
。记录 A 和 B 就被“丢弃”了,因为它们是过时的版本。这正是 Paimon 作为湖仓存储引擎实现
UPSERT
语义的关键所在。它通过不断合并来用新数据覆盖旧数据,从而让用户在查询时总是能看到每个主键对应的最新值。如果你需要保留所有历史版本,那么应该选择不定义主键的 Append-Only 表。 -
MergeFunction 接口语义和用法分析
MergeFunction 是 Apache Paimon 中用于合并多个 KeyValue 对象的接口。它在合并树(Merge Tree)的紧凑过程中起着核心作用,定义了如何处理具有相同键的多条记录。
public interface MergeFunction<T> {/*** Reset the merge function to its default state, call this before calling {@link* #add(KeyValue)} for the first time or after {@link #getResult}.*/void reset();/** Add the given {@link KeyValue} to the merge function. */void add(KeyValue kv);/** Get current merged value. */T getResult();/** Require copy input kv, this may cache kv in memory. */boolean requireCopy();
}
核心方法说明
-
reset() : 重置合并函数到初始状态,在首次调用 add() 或调用 getResult() 后使用。
-
add(KeyValue kv) : 将给定的 KeyValue 添加到合并函数中进行处理。
-
getResult() : 获取当前合并后的结果。
-
requireCopy() : 指示是否需要复制输入的 KeyValue,这可能会影响内存使用。
主要实现类
-
DeduplicateMergeFunction :
-
用于主键唯一且值为完整记录的场景,只保留最新的记录。
-
当遇到删除记录时,根据配置决定是否忽略删除操作。
-
适用于需要去重的场景,如唯一用户信息表。
-
-
FirstRowMergeFunction :
-
用于主键唯一且值为完整记录的场景,只保留第一条记录。
-
与 DeduplicateMergeFunction 不同,它保留最早插入的记录而不是最新记录。
-
适用于需要保留首次出现记录的场景。
-
-
PartialUpdateMergeFunction :
-
用于部分更新场景,只更新非空字段。
-
支持序列组(sequence group)来处理字段级别的更新顺序。
-
可以配置在接收到删除记录时的行为(忽略、删除整行或部分删除)。
-
适用于需要部分更新记录的场景,如用户信息的部分字段更新。
-
-
AggregateMergeFunction :
-
用于聚合场景,对非空字段进行预聚合。
-
支持多种聚合函数,如 sum、max、min 等。
-
可以配置在接收到删除记录时是否删除整行。
-
适用于需要对数据进行聚合计算的场景,如统计分析表。
-
使用场景
MergeFunction 在 Paimon 的合并树写入器(MergeTreeWriter)和压缩管理器(MergeTreeCompactManager)中被广泛使用,用于处理数据的合并和压缩操作。不同的实现类适用于不同的业务场景,用户可以根据具体需求选择合适的合并策略。
PartialUpdateMergeFunction
见:
双流join 、 Paimon Partial Update 和 动态schema-CSDN博客
这是在 Paimon 中实现 partial-update
(部分列更新) 合并引擎的核心类。它的主要职责是在 Compaction 过程中,将具有相同主键的多条记录(KeyValue
)合并成最终的一条记录。
PartialUpdateMergeFunction
实现了 MergeFunction<KeyValue>
接口。在 Paimon 的 LSM-Tree 存储模型中,当执行 Compaction 操作需要合并多个数据文件时,Paimon 会读取具有相同主键的一组 KeyValue
数据,然后交由一个 MergeFunction
实例来处理,计算出最终的结果。
PartialUpdateMergeFunction
的合并逻辑是:对于相同主键的记录,不断地用新的非空字段值去覆盖旧的字段值,最终得到一个“打宽”后的完整记录。 它还支持更复杂的场景,如基于序列号的更新、字段聚合和多种删除策略。
// ... existing code ...
import org.apache.paimon.mergetree.compact.MergeFunction;
// ... existing code ...
/*** A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update* non-null fields on merge.*/
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
// ... existing code ...
核心成员变量
这些变量定义了 PartialUpdateMergeFunction
的状态和配置,决定了其合并行为。
// ... existing code ...
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {public static final String SEQUENCE_GROUP = "sequence-group";private final InternalRow.FieldGetter[] getters; // 用于从 InternalRow 中获取字段值private final boolean ignoreDelete; // 是否忽略删除记录private final Map<Integer, FieldsComparator> fieldSeqComparators; // 字段序列号比较器,用于 sequence-groupprivate final boolean fieldSequenceEnabled; // 是否启用了 sequence-groupprivate final Map<Integer, FieldAggregator> fieldAggregators; // 字段聚合器private final boolean removeRecordOnDelete; // 收到 DELETE 记录时是否删除整行private final Set<Integer> sequenceGroupPartialDelete; // 收到 DELETE 记录时,根据 sequence-group 删除部分列private final boolean[] nullables; // 记录每个字段是否可为 nullprivate InternalRow currentKey; // 当前处理的主键private long latestSequenceNumber; // 见过的最新序列号private GenericRow row; // 合并过程中的结果行private KeyValue reused; // 用于复用的 KeyValue 对象,避免重复创建private boolean currentDeleteRow; // 标记当前行最终是否应被删除private boolean notNullColumnFilled;/*** If the first value is retract, and no insert record is received, the row kind should be* RowKind.DELETE. (Partial update sequence group may not correctly set currentDeleteRow if no* RowKind.INSERT value is received)*/private boolean meetInsert; // 是否遇到过 INSERT 类型的记录// ... existing code ...
- 配置类变量 (
ignoreDelete
,fieldSeqComparators
,fieldAggregators
等) 通常在Factory
中被初始化,它们在整个合并过程中保持不变。 - 状态类变量 (
currentKey
,row
,latestSequenceNumber
等) 会在每次reset()
时被重置,用于处理新的一组具有相同主键的记录。
add(KeyValue kv)
:合并逻辑的核心
这是最重要的方法,定义了单条 KeyValue
是如何被合并到当前结果 row
中的。
// ... existing code ...@Overridepublic void add(KeyValue kv) {// refresh key object to avoid reference overwrittencurrentKey = kv.key();currentDeleteRow = false;if (kv.valueKind().isRetract()) {if (!notNullColumnFilled) {initRow(row, kv.value());notNullColumnFilled = true;}// ... 删除逻辑处理 ...// ... existing code ...String msg =String.join("\n","By default, Partial update can not accept delete records,"+ " you can choose one of the following solutions:","1. Configure 'ignore-delete' to ignore delete records.","2. Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.","3. Configure 'sequence-group's to retract partial columns.");throw new IllegalArgumentException(msg);}latestSequenceNumber = kv.sequenceNumber();if (fieldSeqComparators.isEmpty()) {updateNonNullFields(kv);} else {updateWithSequenceGroup(kv);}meetInsert = true;notNullColumnFilled = true;}
// ... existing code ...
它的逻辑可以分为两大块:
A. 处理 retract
消息 (RowKind 为 DELETE
或 UPDATE_BEFORE
)
partial-update
默认不接受删除记录。如果收到了,行为由配置决定:
ignoreDelete = true
: 直接忽略这条删除记录,返回。removeRecordOnDelete = true
: 当收到DELETE
类型的记录时,将currentDeleteRow
标记为true
,并清空当前row
。这意味着最终这条主键对应的记录将被删除。fieldSequenceEnabled = true
: 启用了sequence-group
。这是最复杂的逻辑,它会调用retractWithSequenceGroup(kv)
。这个方法会根据序列号比较结果,来决定是否要“撤销”某些字段的更新(通常是将其设置为null
或调用聚合器的retract
方法)。- 默认行为: 如果以上配置都没有,则直接抛出
IllegalArgumentException
异常,提示用户如何正确配置。
B. 处理 add
消息 (RowKind 为 INSERT
或 UPDATE_AFTER
)
这是主要的更新逻辑:
-
简单更新 (
updateNonNullFields
): 如果没有配置sequence-group
(fieldSeqComparators
为空),则执行最简单的部分列更新。遍历新纪录kv
的所有字段,只要字段值不为null
,就用它来更新row
中对应位置的值。// ... existing code ... private void updateNonNullFields(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());if (field != null) {row.setField(i, field);} else { // ... existing code ...
-
带序列号的更新 (
updateWithSequenceGroup
): 如果配置了sequence-group
,逻辑会更复杂。对于每个字段:- 如果该字段不属于任何
sequence-group
,则行为和简单更新类似(但会考虑聚合)。 - 如果该字段属于某个
sequence-group
,则会使用FieldsComparator
比较新记录kv
和当前结果row
的序列号字段。只有当新记录的序列号 大于或等于 当前结果的序列号时,才会用新记录的字段值去更新row
中由该sequence-group
控制的所有字段。这保证了数据的更新顺序。
- 如果该字段不属于任何
updateWithSequenceGroup
这个方法是 partial-update
合并引擎处理带有 sequence-group
配置时的核心逻辑。当用户在表属性中定义了 fields.<seq_field>.sequence-group = <data_field1>,<data_field2>
这样的规则时,数据合并就不再是简单的“非空值覆盖”,而是需要根据 seq_field
的值来判断是否应该更新 data_field1
和 data_field2
。这解决了多流更新时可能出现的数据乱序覆盖问题。
updateWithSequenceGroup
方法通过引入FieldsComparator
,将简单的字段更新升级为基于序列号的条件更新。它精确地控制了哪些字段在何时可以被更新,从而保证了在多流并发写入场景下,即使数据存在一定程度的乱序,最终也能合并成正确的结果。这是 Paimonpartial-update
模式能够处理复杂更新场景的关键所在。
// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {
// ... existing code ...
- 输入:
KeyValue kv
,代表一条新到达的、具有相同主键的记录。 - 目标: 遍历这条新记录
kv
的所有字段,并根据sequence-group
的规则,决定是否用kv
中的字段值来更新当前正在合并的结果行this.row
。
该方法的核心是一个 for
循环,它遍历了表中的每一个字段。
// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {
// ... existing code ...
在循环内部,对每个字段的处理逻辑可以分为两种情况:
- 该字段不属于任何
sequence-group
。 - 该字段属于某个
sequence-group
。
让我们来详细看这两种情况。
1. 字段不属于任何 sequence-group
// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());FieldsComparator seqComparator = fieldSeqComparators.get(i);FieldAggregator aggregator = fieldAggregators.get(i);Object accumulator = getters[i].getFieldOrNull(row);if (seqComparator == null) {if (aggregator != null) {row.setField(i, aggregator.agg(accumulator, field));} else if (field != null) {row.setField(i, field);}} else {
// ... existing code ...
- 判断条件:
seqComparator == null
。fieldSeqComparators
是一个Map<Integer, FieldsComparator>
,如果在里面找不到当前字段索引i
,就说明这个字段不受任何sequence-group
控制。 - 处理逻辑:
- 带聚合函数: 如果为该字段配置了聚合函数(
aggregator != null
),例如sum
、max
等,则调用aggregator.agg()
方法,将当前累加值accumulator
和新值field
进行聚合,并将结果写回row
。 - 不带聚合函数: 这是最简单的情况。如果新来的字段值
field
不为null
,就直接用它覆盖row
中的旧值。这和updateNonNullFields
的行为是一致的。
- 带聚合函数: 如果为该字段配置了聚合函数(
2. 字段属于某个 sequence-group
这是该方法最核心和复杂的部分。
// ... existing code ...} else {if (isEmptySequenceGroup(kv, seqComparator)) {// skip null sequence groupcontinue;}if (seqComparator.compare(kv.value(), row) >= 0) {int index = i;// Multiple sequence fields should be updated at once.if (Arrays.stream(seqComparator.compareFields()).anyMatch(seqIndex -> seqIndex == index)) {for (int fieldIndex : seqComparator.compareFields()) {row.setField(fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));}continue;}row.setField(i, aggregator == null ? field : aggregator.agg(accumulator, field));} else if (aggregator != null) {row.setField(i, aggregator.aggReversed(accumulator, field));}}}}
// ... existing code ...
- 判断条件:
seqComparator != null
。 - 处理逻辑:
- 空序列组检查:
isEmptySequenceGroup(kv, seqComparator)
会检查这条新纪录kv
中,其对应的序列号字段是否都为null
。如果是,意味着这条记录无法判断新旧,因此直接跳过,不进行任何更新。 - 序列号比较:
seqComparator.compare(kv.value(), row) >= 0
是关键。它会比较新记录kv
和当前结果row
中,由seqComparator
定义的序列号字段。- 如果新记录的序列字段 >= 当前结果的序列字段 : 这意味着新记录
kv
是“更新”的或者“同样新”的,此时应该用kv
的值去更新row
。- 更新 序列字段 本身: 如果当前字段
i
就是序列字段之一,那么需要把这个sequence-group
定义的所有序列号字段都一次性更新掉,然后用continue
跳出本次循环。这是为了保证序列字段之间的一致性。 - 更新数据字段: 如果当前字段
i
是被序列字段 控制的数据字段,则执行更新。如果有聚合器,则调用aggregator.agg()
;如果没有,则直接用新值field
覆盖。
- 更新 序列字段 本身: 如果当前字段
- 如果新记录的序列字段 < 当前结果的序列字段 : 这意味着
kv
是一条“旧”数据。在大部分情况下,这条旧数据会被忽略。但有一个例外:如果为该字段配置了支持乱序聚合的聚合器(例如sum
),则会调用aggregator.aggReversed()
。这个方法通常和agg()
的逻辑是一样的,它允许旧数据也能被正确地聚合进来。对于不支持乱序的聚合器(如max
),aggReversed
可能就是一个空操作。
- 如果新记录的序列字段 >= 当前结果的序列字段 : 这意味着新记录
- 空序列组检查:
createFieldAggregators
方法详解
该方法的核心任务是:为表中的每一个字段(列)确定它在合并时应该使用的聚合逻辑,并创建相应的聚合器(FieldAggregator)。
private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(RowType rowType,List<String> primaryKeys,List<Integer> allSequenceFields,CoreOptions options) {List<String> fieldNames = rowType.getFieldNames();List<DataType> fieldTypes = rowType.getFieldTypes();Map<Integer, Supplier<FieldAggregator>> fieldAggregators = new HashMap<>();for (int i = 0; i < fieldNames.size(); i++) {String fieldName = fieldNames.get(i);DataType fieldType = fieldTypes.get(i);if (allSequenceFields.contains(i)) {// no agg for sequence fieldscontinue;}if (primaryKeys.contains(fieldName)) {// aggregate by primary keys, so they do not aggregatefieldAggregators.put(i,() ->FieldAggregatorFactory.create(fieldType,fieldName,FieldPrimaryKeyAggFactory.NAME,options));continue;}String aggFuncName = getAggFuncName(options, fieldName);if (aggFuncName != null) {// last_non_null_value doesn't require sequence groupcheckArgument(aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)|| fieldSeqComparators.containsKey(fieldNames.indexOf(fieldName)),"Must use sequence group for aggregation functions but not found for field %s.",fieldName);fieldAggregators.put(i,() ->FieldAggregatorFactory.create(fieldType, fieldName, aggFuncName, options));}}return fieldAggregators;}
它遍历表中的所有字段,并按以下优先级规则为每个字段决定其命运:
规则一:序列字段不参与聚合
if (allSequenceFields.contains(i)) { // no agg for sequence fields continue;
}
-
说明:如果一个字段被定义为
sequence.field
(例如event_time
),它的唯一作用就是用来比较记录的新旧,其本身的值在合并时永远是直接覆盖,不会进行求和、求最大值等聚合操作。因此直接跳过该字段的聚合逻辑。
规则二:主键不参与聚合
if (primaryKeys.contains(fieldName)) { // ... fieldAggregators.put( i, () -> FieldAggregatorFactory.create(..., FieldPrimaryKeyAggFactory.NAME, ...)); continue;
}
-
说明:主键字段是记录的唯一标识,它的值在合并过程中必须保持不变。这里为它创建了一个
FieldPrimaryKeyAggFactory
,该工厂产生的聚合器逻辑非常简单:直接返回遇到的第一个值(每个merge function处理的总是主键相同的批次),后续的值都忽略。这种方式保证了主键的稳定性。
规则三:用户自定义的聚合函数
String aggFuncName = getAggFuncName(options, fieldName);
if (aggFuncName != null) { // ...
}
-
说明:Paimon 允许用户为特定字段或所有字段(通过
fields.default.aggregate-function
)指定聚合函数,例如sum
、max
、min
、last_non_null_value
等。 -
逻辑:
getAggFuncName
会读取这些配置。如果找到了配置,就会进入一个重要的检查:
checkArgument( aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME) || fieldSeqComparators.containsKey(fieldNames.indexOf(fieldName)), ...);
-
背景:除了
last_non_null_value
,其他所有聚合函数都必须有sequence group
的支持。 -
后续:检查通过后,就使用
FieldAggregatorFactory
创建一个用户指定的聚合器。
规则四:默认行为(无聚合)
-
说明:如果一个字段不满足以上任何一条规则(它不是序列字段、不是主键、也没有配置任何聚合函数),那么
createFieldAggregators
就不会为它创建任何聚合器。 -
合并逻辑:在后续的合并逻辑中(如
updateNonNullFields
或updateWithSequenceGroup
),对于没有聚合器的字段,其默认行为就是 “非空值覆盖” (non-null value overwrite)。
多 Flink 任务写入与 last_non_null_value
场景设定
-
任务A:只写入
col_a
和col_b
字段。 -
任务B:只写入
col_c
和col_d
字段。 -
并发写入:两个任务并发写入,它们的
sequenceNumber
是完全独立的,因此在全局看来是乱序的。 -
聚合配置:所有值字段(
col_a
到col_d
)都配置了last_non_null_value
聚合。
分析结论
您的结论是正确的:只要每个 Flink 任务只写自己的字段,两个任务的字段不互相干扰,即使 sequenceNumber
乱序,last_non_null_value
也能正确工作。
推演过程
我们来推演一下为什么能正确工作:
假设对于主键 pk=1
,发生了以下事件:
-
T1时刻:任务A 写入
{pk:1, col_a:'A1', col_b:null, col_c:null, col_d:null}
。Paimon 分配
sequenceNumber = 101
。 -
T2时刻:任务B 写入
{pk:1, col_a:null, col_b:null, col_c:'C1', col_d:null}
。Paimon 分配
sequenceNumber = 201
。 -
T3时刻:任务A 写入
{pk:1, col_a:'A2', col_b:null, col_c:null, col_d:null}
。Paimon 分配
sequenceNumber = 102
。
现在,这三条记录在 Compaction 时相遇了。假设没有配置 sequence.field
,SortMergeReader
会根据 sequenceNumber
对它们进行排序:
-
记录1:
{..., seq: 101}
-
记录3:
{..., seq: 102}
-
记录2:
{..., seq: 201}
PartialUpdateMergeFunction
会按 1 → 3 → 2 的顺序处理它们:
初始化
row = {col_a:null, col_b:null, col_c:null, col_d:null}
处理记录1 (seq=101)
-
col_a
更新为'A1'
。 -
其他字段都是
null
,last_non_null_value
忽略它们。 -
当前 row 状态:
{col_a:'A1', col_b:null, col_c:null, col_d:null}
处理记录3 (seq=102)
-
col_a
更新为'A2'
。 -
其他字段都是
null
,忽略。 -
当前 row 状态:
{col_a:'A2', col_b:null, col_c:null, col_d:null}
处理记录2 (seq=201)
-
col_a
是null
,忽略。 -
col_c
更新为'C1'
。 -
其他字段是
null
,忽略。 -
当前 row 状态:
{col_a:'A2', col_b:null, col_c:'C1', col_d:null}
最终合并结果
{col_a:'A2', col_b:null, col_c:'C1', col_d:null}
为什么能行?
关键在于以下两点:
-
字段不重叠
任务A 和 任务B 操作的是完全不同的字段集合。任务A 的写入对
col_c
和col_d
来说永远是null
,反之亦然。 -
last_non_null_value
的幂等性该聚合函数的逻辑是“只要新来的不是
null
就覆盖”。由于字段不重叠,任务A 的sequenceNumber
乱序只会影响col_a
和col_b
的合并顺序,但不会干扰到col_c
和col_d
。同理,任务B 的sequenceNumber
也不会影响到col_a
和col_b
。
重要警告
这种模式能够成功,强依赖于“字段不重叠”这个假设。
如果任务A 和 任务B 都可能写入 col_a
,那么 sequenceNumber
的乱序就会导致 col_a
的最终值变得不确定,从而产生数据不一致。
在这种情况下,就必须引入
sequence.field
来提供一个全局统一的业务时钟,以确保无论哪个任务写入,都能根据业务时间来决定胜负。
Sequence Group 机制解析
Sequence Group(简称序列组) 是一种把若干列(fields)组合在一起,以同一套“序列比较逻辑”决定这组列是否被“整体更新”的机制。
核心组成:
-
每个序列组由一个 FieldsComparator(实现了按配置的序列字段比较规则)表示。
-
比较器会用一组“序列字段”(比如时间戳、version id 等)在 incoming record(kv.value()) 和 当前聚合行(row) 之间判断哪条更新“更新”。
序列组的行为逻辑:
-
对于同一组内的多个列,当 comparator 判定 incoming 更新时,会“整组”把所有序列字段从 incoming 复制到当前行。
-
目的是避免当组中只有部分字段出现在某条 partial-update record 时导致不一致。
-
-
同时支持 per-field 的聚合器(FieldAggregator),用于非序列表现或需要累加/撤销等聚合语义的列。
为什么要这么做
1. Partial update 场景的问题:
-
上游可能发送“局部字段更新”,不同列的到达顺序和是否出现值不一致,会导致最终行状态错误。
-
例如一条消息只包含 timestamp,但缺少其它字段,如果仅按字段级别单独比较,会把某些字段错误覆盖成 null/旧值。
-
2. 序列组保证原子性与一致性:
-
对于一组有共同序列语义的列(例如同一条变更的不同列),只有当 incoming 的序列字段整体比当前更新时,才把这些列整体覆盖。
-
从而保证 原子性 / 一致性,避免只更新部分字段造成的状态不一致。
-
3. 支持聚合列:
-
一些列不是简单替换而是需要 聚合语义(如 last_non_null、sum 等)。
-
序列组逻辑要与聚合逻辑协同工作:
-
新值进来时通常调用 agg;
-
当 incoming 比较旧且为撤回时可调用 aggReversed。
-
典型场景举例
场景一:多列属于同一序列组,保证原子更新
-
表有列 {a, b, ts},其中 a、b 属于同一序列组,序列字段是 ts(时间戳)。
-
上游有两条 partial record:
-
{a = 10, ts = 100}
-
{b = 20, ts = 101}
-
-
如果不使用 sequence group:
-
第二条可能只更新 b,导致 a 保持 10(ok)或者被误置为 null(如果第二条有 null)
-
但当同一事务里 a 与 b 应作为一份变更的一部分时需要一致性
-
-
使用序列组后:
-
会根据 ts 决定是否整组覆盖,避免不一致。
-
场景二:空序列场景,避免误判
-
incoming record 只有普通列(不含 ts)
-
isEmptySequenceGroup 会发现 ts 为 null,跳过序列组逻辑
-
从而避免把组内列误判为“新序列”
注意与边界条件
边界情况 | 说明 |
---|---|
序列组配置不当 | 比如对同一字段被多个组重复定义,Factory 在构建时会抛异常进行校验。 |
配置组合冲突 | 若 removeRecordOnDelete / removeRecordOnSequenceGroup 等配置组合不当,Factory 也会校验并抛出冲突错误。 |
聚合器行为依赖实现 | aggregator 的行为(agg / aggReversed)依赖各自实现,可能需要根据业务确保语义正确(尤其是遇到旧序列来临时如何处理)。 |
总结
Sequence Group 是按配置把若干列绑定在一起用同一“序列比较”决定整体更新的机制——通过比较 incoming 与当前的序列字段,决定是否“整组替换”或在聚合器上做合并/撤销,从而在 partial-update 场景下保证组内列的一致性和正确的聚合语义。
getResult()
方法:产出最终结果
当处理完具有相同主键的所有 KeyValue
后,调用此方法来获取最终的合并结果。
// ... existing code ...@Overridepublic KeyValue getResult() {if (reused == null) {reused = new KeyValue();}RowKind rowKind = currentDeleteRow || !meetInsert ? RowKind.DELETE : RowKind.INSERT;return reused.replace(currentKey, latestSequenceNumber, rowKind, row);}
// ... existing code ...
它会根据 currentDeleteRow
和 meetInsert
标志位来决定最终的 RowKind
。如果 currentDeleteRow
为 true
,或者整个合并过程从未见过 INSERT
类型的记录,那么最终结果就是一条 DELETE
记录。否则,就是一条 INSERT
记录。然后将主键、最新的序列号、最终的 RowKind
和合并后的 row
数据打包成一个 KeyValue
返回。
Factory
内部类:配置的入口
PartialUpdateMergeFunction.Factory
是一个非常重要的内部类,它负责解析用户在表上设置的 OPTIONS
,并据此创建出一个配置好的 PartialUpdateMergeFunction
实例。
// ... existing code ...public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType, List<String> primaryKeys) {return new Factory(options, rowType, primaryKeys);}private static class Factory implements MergeFunctionFactory<KeyValue> {// ... 成员变量,用于存储从 Options 解析出的配置 ...private Factory(Options options, RowType rowType, List<String> primaryKeys) {this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);// ... existing code ...this.removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);// ... 解析 sequence-group 配置 ...for (Map.Entry<String, String> entry : options.toMap().entrySet()) {String k = entry.getKey();String v = entry.getValue();if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {// ... 解析出序列号字段和被控制的字段,构建 fieldSeqComparators ...}}// ... 解析聚合函数配置,构建 fieldAggregators ...this.fieldAggregators =createFieldAggregators(rowType, primaryKeys, allSequenceFields, new CoreOptions(options));// ... 配置校验,确保冲突的配置不会同时开启 ...Preconditions.checkState(!(removeRecordOnDelete && ignoreDelete),// ...);// ...}
// ... existing code ...
在构造函数中,它会:
- 读取
ignore-delete
,partial-update.remove-record-on-delete
等简单配置。 - 遍历所有
OPTIONS
,查找以fields.
开头、以.sequence-group
结尾的配置项,例如fields.order_time.sequence-group=order_id,price
。它会解析这些配置,构建出fieldSeqComparators
这个 Map,其中 key 是被控制字段的索引,value 是一个能够比较order_time
字段的比较器。 - 调用
createFieldAggregators
方法,解析fields.*.aggregate-function
等配置,构建出fieldAggregators
这个 Map。 - 执行一系列
Preconditions.checkState
,对用户的配置进行合法性校验,防止出现逻辑冲突。
PartialUpdateMergeFunction.Factory 创建过程分析
读取用户在表属性(Options)中配置的关于部分更新(Partial Update)的策略,并据此创建一个 PartialUpdateMergeFunction
实例。
构造函数分解:Factory(Options options, RowType rowType, List<String> primaryKeys)
-
解析基本配置
-
ignoreDelete
:是否忽略 DELETE 类型的消息。 -
removeRecordOnDelete
:收到 DELETE 消息时,删除整行记录。 -
removeRecordOnSequenceGroup
:收到特定 sequence group 的 DELETE 消息时,删除整行。(均为基本的行为开关)
-
-
解析 sequence-group(核心逻辑)
-
遍历所有表属性(
options.toMap().entrySet()
),寻找 key 以fields.
开头、以.sequence-group
结尾的配置项(例如:'fields.event_time.sequence-group' = 'col_a,col_b'
)。-
k (Key):
fields.event_time.sequence-group
→ 解析出序列字段event_time
(用于判断新旧的字段)。 -
v (Value):
col_a,col_b
→ 解析出受该序列字段影响的值字段(col_a
和col_b
)。
-
-
构建
fieldSeqComparators
:-
创建
UserDefinedSeqComparator
(关联event_time
字段)。 -
将该比较器与
col_a
、col_b
及event_time
自身关联,存入fieldSeqComparators
Map。 -
最终结构示例:
{ (index_of_col_a) -> comparator, (index_of_col_b) -> comparator, (index_of_event_time) -> comparator }
-
含义:更新
col_a
或col_b
时,需用event_time
判断是否执行更新。
-
-
-
解析聚合函数(createFieldAggregators)
-
检查
fields.my_col.aggregate-function
或fields.default.aggregate-function
等配置。 -
对每个非主键、非序列字段的列,若配置了聚合函数(如 sum、max、last_non_null_value),则创建对应的
FieldAggregator
并存入fieldAggregators
Map。 -
重要约束:除
last_non_null_value
外,其他聚合函数必须与sequence-group
一起使用(因聚合操作需要顺序信息)。
-
-
冲突检查
-
通过一系列
Preconditions.checkState
确保用户未配置逻辑冲突的选项(例如:不能同时启用ignore-delete
和remove-record-on-delete
)。
-
最终产物:
当调用 create()
方法时,Factory 会利用解析好的 fieldSeqComparators
和 fieldAggregators
,生成一个配置齐全的 PartialUpdateMergeFunction
实例。该实例明确知道:对于每一列的主键冲突情况,应如何合并数据(直接覆盖 / 根据 sequence field 判断后覆盖 / 执行聚合操作)。
总结
PartialUpdateMergeFunction
是 Paimon 实现高性能数据打宽(部分列更新)能力的技术基石。它通过一个设计精巧的合并流程,将简单的非空字段覆盖、基于序列号的有序更新、字段聚合以及多种删除策略融为一体。其 Factory
类则充当了连接用户配置和底层实现的桥梁。理解了这个类的工作原理,就能深刻地理解 Paimon partial-update
模式的强大之处。
AggregateMergeFunction 类详细分析
AggregateMergeFunction
是 Paimon 中用于处理主键表聚合合并逻辑的一个类,实现了 MergeFunction<KeyValue>
接口。
主要用途:在合并多个具有相同主键的记录时,对非空字段进行预聚合操作。
主要成员变量
变量名 | 类型 | 说明 |
---|---|---|
|
| 用于从 |
|
| 包含每个字段对应的聚合器,执行具体聚合操作 |
|
| 标识每个字段是否允许为空 |
|
| 存储最新的键值对 |
|
| 存储聚合后的结果行 |
|
| 复用结果对象,避免频繁创建新对象 |
|
| 标识当前记录是否为删除操作 |
|
| 配置项,控制删除操作时是否移除记录 |
构造函数
public AggregateMergeFunction(InternalRow.FieldGetter[] getters,FieldAggregator[] aggregators,boolean removeRecordOnDelete,boolean[] nullables) {this.getters = getters;this.aggregators = aggregators;this.removeRecordOnDelete = removeRecordOnDelete;this.nullables = nullables;
}
参数说明:
-
getters
:字段获取器 -
aggregators
:聚合器数组 -
removeRecordOnDelete
:删除时是否移除记录的配置 -
nullables
:字段是否可为空的数组
核心方法
reset()
@Override
public void reset() {this.latestKv = null;this.row = new GenericRow(getters.length);Arrays.stream(aggregators).forEach(FieldAggregator::reset);this.currentDeleteRow = false;
}
功能:重置所有状态到初始值,包括:
-
清空最新键值对
-
创建新的结果行
-
重置所有聚合器
-
将删除标记设为 false
add(KeyValue kv)
@Override
public void add(KeyValue kv) {latestKv = kv;currentDeleteRow = removeRecordOnDelete && kv.valueKind() == RowKind.DELETE;if (currentDeleteRow) {row = new GenericRow(getters.length);initRow(row, kv.value());return;}boolean isRetract = kv.valueKind().isRetract();for (int i = 0; i < getters.length; i++) {FieldAggregator fieldAggregator = aggregators[i];Object accumulator = getters[i].getFieldOrNull(row);Object inputField = getters[i].getFieldOrNull(kv.value());Object mergedField =isRetract? fieldAggregator.retract(accumulator, inputField): fieldAggregator.agg(accumulator, inputField);row.setField(i, mergedField);}
}
功能:添加键值对到合并函数。
-
如果是删除操作且配置了移除记录,则初始化空行。将现在的KV的非空行赋值到结果中。
-
否则根据操作类型(插入/回撤)对每个字段执行聚合操作。
initRow(GenericRow row, InternalRow value)
private void initRow(GenericRow row, InternalRow value) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(value);if (!nullables[i]) {if (field != null) {row.setField(i, field);} else {throw new IllegalArgumentException("Field " + i + " can not be null");}}}
}
功能:初始化行数据。
-
对于不允许为空的字段,若输入值为 null 则抛出异常。
getResult()
@Override
public KeyValue getResult() {checkNotNull(latestKv,"Trying to get result from merge function without any input. This is unexpected.");if (reused == null) {reused = new KeyValue();}RowKind rowKind = currentDeleteRow ? RowKind.DELETE : RowKind.INSERT;return reused.replace(latestKv.key(), latestKv.sequenceNumber(), rowKind, row);
}
功能:获取合并后的结果。
-
若复用对象为空则创建新对象。
-
根据删除标记设置行类型(INSERT/DELETE)。
-
返回包含合并数据的键值对。
requireCopy()
@Override
public boolean requireCopy() {return false;
}
功能:指示是否需要复制输入键值对,返回 false 表示不需要。因为聚合元素是新创建的元素。
工厂方法
public static MergeFunctionFactory<KeyValue> factory(Options conf,List<String> fieldNames,List<DataType> fieldTypes,List<String> primaryKeys) {return new Factory(conf, fieldNames, fieldTypes, primaryKeys);
}
功能:提供工厂方法,用于创建 MergeFunctionFactory
实例。
内部工厂类 Factory
成员变量
变量名 | 类型 | 说明 |
---|---|---|
|
| 表的配置选项 |
|
| 字段名称列表 |
|
| 字段类型列表 |
|
| 主键字段名称列表 |
|
| 删除时是否移除记录的配置 |
create(@Nullable int[][] projection)
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {List<String> fieldNames = this.fieldNames;List<DataType> fieldTypes = this.fieldTypes;if (projection != null) {Projection project = Projection.of(projection);fieldNames = project.project(fieldNames);fieldTypes = project.project(fieldTypes);}FieldAggregator[] fieldAggregators = new FieldAggregator[fieldNames.size()];List<String> sequenceFields = options.sequenceField();for (int i = 0; i < fieldNames.size(); i++) {String fieldName = fieldNames.get(i);DataType fieldType = fieldTypes.get(i);String aggFuncName = getAggFuncName(fieldName, sequenceFields);fieldAggregators[i] =FieldAggregatorFactory.create(fieldType, fieldName, aggFuncName, options);}return new AggregateMergeFunction(createFieldGetters(fieldTypes),fieldAggregators,removeRecordOnDelete,ArrayUtils.toPrimitiveBoolean(fieldTypes.stream().map(DataType::isNullable).toArray(Boolean[]::new)));
}
功能:创建 AggregateMergeFunction
实例。
-
根据投影信息调整字段名称和类型。
-
为每个字段创建对应的聚合器。
-
使用上述信息构造
AggregateMergeFunction
对象。
getAggFuncName(String fieldName, List<String> sequenceFields)
private String getAggFuncName(String fieldName, List<String> sequenceFields) {if (sequenceFields.contains(fieldName)) {return FieldLastValueAggFactory.NAME; // 序列字段使用 last_value}if (primaryKeys.contains(fieldName)) {return FieldPrimaryKeyAggFactory.NAME; // 主键字段不聚合}String aggFuncName = options.fieldAggFunc(fieldName);if (aggFuncName == null) {aggFuncName = options.fieldsDefaultFunc();}if (aggFuncName == null) {aggFuncName = FieldLastNonNullValueAggFactory.NAME; // 默认使用 last_non_null_value}return aggFuncName;
}
功能:确定字段的聚合函数名称。
-
序列字段 →
last_value
-
主键字段 →
primary_key
(不聚合) -
其他字段 → 优先使用配置的聚合函数 → 默认聚合函数 → 最终默认
last_non_null_value
聚合器 (FieldAggregator)
-
定义:抽象类,定义聚合操作基本接口。
-
实现类:如
FieldMaxAgg
、FieldMinAgg
、FieldSumAgg
等。 -
核心方法:
-
agg()
:执行聚合操作。 -
retract()
(可选):支持回撤操作。
-
配置选项
配置项 | 说明 |
---|---|
| 控制删除操作时是否移除记录 |
| 为特定字段指定聚合函数 |
| 为所有字段指定默认聚合函数 |
| 指定序列字段(不聚合,使用 |
总结
AggregateMergeFunction
通过组合不同的 FieldAggregator
,实现对主键表中非空字段的预聚合操作,有效处理数据更新与删除,确保合并逻辑高效可靠。
DeduplicateMergeFunction 类详细分析
DeduplicateMergeFunction
是 Paimon 项目中用于处理数据合并的一种策略实现,其主要功能是基于主键(唯一键)对数据进行去重,只保留最新的记录。
/*** A {@link MergeFunction} where key is primary key (unique) and value is the full record, only keep* the latest one.*/
public class DeduplicateMergeFunction implements MergeFunction<KeyValue>
从类的注释可以看出,DeduplicateMergeFunction
实现了 MergeFunction
接口,其设计目的是处理主键唯一且值为完整记录的情况,仅保留最新的记录。这种策略在需要去重并保留最新数据的场景中非常有用。
成员变量
private final boolean ignoreDelete;
private KeyValue latestKv;
-
ignoreDelete
: 一个布尔值,表示是否忽略删除记录。当设置为true
时,删除记录将不会被处理。 -
latestKv
: 存储最新的KeyValue
记录,这是去重策略的核心,每次添加新记录时都会更新此变量。
构造函数
private DeduplicateMergeFunction(boolean ignoreDelete) {this.ignoreDelete = ignoreDelete;
}
构造函数是私有的,这意味着外部不能直接实例化该类。实例化需要通过工厂方法完成,这有助于控制对象的创建过程并传递配置参数。
核心方法实现
reset() 方法
@Override
public void reset() {latestKv = null;
}
reset()
方法用于重置合并函数的状态,将 latestKv
设置为 null
,以便开始新的合并过程。
add() 方法
@Override
public void add(KeyValue kv) {// In 0.7- versions, the delete records might be written into data file even when// ignore-delete configured, so ignoreDelete still needs to be checkedif (ignoreDelete && kv.valueKind().isRetract()) {return;}latestKv = kv;
}
add()
方法是去重逻辑的核心。它接收一个 KeyValue
对象作为参数,并根据 ignoreDelete
配置决定是否处理删除记录。如果 ignoreDelete
为 true
且记录是删除记录(通过 kv.valueKind().isRetract()
判断),则直接返回,不更新 latestKv
。否则,将 latestKv
更新为当前记录,实现保留最新记录的策略。
注释中提到,在0.7版本中,开发团队发现了ignore-delete配置在某些情况下不生效的问题,为了确保数据正确性,他们在合并函数中添加了显式的配置检查,作为额外的保护层。这种设计既修复了问题,又保持了对历史数据的兼容性。
getResult() 方法
@Override
public KeyValue getResult() {return latestKv;
}
getResult()
方法返回合并后的结果,即 latestKv
,也就是最新的记录。
requireCopy() 方法
@Override
public boolean requireCopy() {return false;
}
- 由于总是使用最新的记录引用,不存在数据被意外修改的风险
- 数据流是单向的,不会出现历史数据被修改的情况
工厂方法
public static MergeFunctionFactory<KeyValue> factory() {return new Factory(false);
}public static MergeFunctionFactory<KeyValue> factory(Options options) {return new Factory(options.get(CoreOptions.IGNORE_DELETE));
}
提供了两个静态工厂方法用于创建 DeduplicateMergeFunction
实例:
-
无参版本默认创建
ignoreDelete
为false
的实例。 -
有参版本根据传入的
Options
对象获取IGNORE_DELETE
配置值来创建实例。
这两个方法都返回一个 MergeFunctionFactory
对象,实际的实例化工作由内部的 Factory
类完成。
内部 Factory 类
private static class Factory implements MergeFunctionFactory<KeyValue> {private static final long serialVersionUID = 1L;private final boolean ignoreDelete;private Factory(boolean ignoreDelete) {this.ignoreDelete = ignoreDelete;}@Overridepublic MergeFunction<KeyValue> create(@Nullable int[][] projection) {return new DeduplicateMergeFunction(ignoreDelete);}
}
Factory
是一个内部静态类,实现了 MergeFunctionFactory
接口。它负责根据配置创建 DeduplicateMergeFunction
实例。该类通过序列化 ID serialVersionUID
标识,并存储 ignoreDelete
配置。create()
方法根据传入的投影参数(本实现中未使用)创建并返回 DeduplicateMergeFunction
实例。
总结
DeduplicateMergeFunction
类通过简单的策略实现了基于主键的去重功能,仅保留最新的记录。其设计考虑了删除记录的处理,并通过工厂方法提供了灵活的配置选项。该类适用于需要去重并保留最新数据的场景,如实时数据处理中的状态维护。
FirstRowMergeFunction 类详细分析
FirstRowMergeFunction 是 Apache Paimon 项目中用于合并数据的一种策略实现,它实现了 MergeFunction 接口。该类的主要功能是基于主键(唯一键)保留第一条记录,忽略后续具有相同主键的记录。
/*** A {@link MergeFunction} where key is primary key (unique) and value is the full record, only keep* the first one.*/
public class FirstRowMergeFunction implements MergeFunction<KeyValue>
从类的注释可以看出,FirstRowMergeFunction 是一种合并函数,其中键是主键(唯一的),值是完整记录,只保留第一条记录。这意味着当多个具有相同主键的记录需要合并时,只有第一条记录会被保留,其余记录将被忽略。
成员变量
-
private KeyValue first;
用于存储第一条记录。
-
public boolean containsHighLevel;
标记是否包含高层级的数据。
-
private final boolean ignoreDelete;
配置项,用于决定是否忽略删除记录。
protected FirstRowMergeFunction(boolean
ignoreDelete) {this.ignoreDelete = ignoreDelete;
}
构造函数接受一个布尔参数 ignoreDelete
,用于初始化是否忽略删除记录的配置。
方法实现
-
reset()
@Override public void reset() {this.first = null;this.containsHighLevel = false; }
重置方法将
first
设置为null
,并将containsHighLevel
设置为false
,以便重新开始处理新的记录集。 -
add(KeyValue kv)
@Override public void add(KeyValue kv) {if (kv.valueKind().isRetract()) {// In 0.7- versions, the delete records might be written into data file even when// ignore-delete configured, so ignoreDelete still needs to be checkedif (ignoreDelete) {return;} else {throw new IllegalArgumentException("By default, First row merge engine can not accept DELETE/UPDATE_BEFORE records.\n"+ "You can config 'first-row.ignore-delete' to ignore the DELETE/UPDATE_BEFORErecords.");}}if (first == null) {this.first = kv;}if (kv.level() > 0) {containsHighLevel = true;} }
-
如果传入的
KeyValue
是删除或更新前的记录(isRetract()
为true
):-
如果配置了忽略删除(
ignoreDelete
为true
),则直接返回,不处理该记录。 -
否则,抛出异常,提示用户默认情况下不接受删除/更新前记录,并建议配置
first-row.ignore-delete
来忽略这些记录。
-
-
如果
first
为null
(即尚未记录第一条数据),则将当前KeyValue
设置为first
。 -
如果
kv.level()
大于0
,则设置containsHighLevel
为true
。
-
-
getResult()
@Override public KeyValue getResult() {return first; }
返回第一条记录
first
。 -
requireCopy()
@Override public boolean requireCopy() {return true; }
表示需要复制数据。
工厂方法
public static
MergeFunctionFactory<KeyValue> factory
(Options options) {return new FirstRowMergeFunction.Factory(options.get(CoreOptions.IGNORE_DELETE));
}
提供了一个静态工厂方法 factory
,用于根据配置选项创建 FirstRowMergeFunction
实例。该方法通过 CoreOptions.IGNORE_DELETE
获取是否忽略删除记录的配置,并传递给内部类 Factory
。
内部类 Factory
private static class Factory implements
MergeFunctionFactory<KeyValue> {private static final long serialVersionUID = 1L;private final boolean ignoreDelete;public Factory(boolean ignoreDelete) {this.ignoreDelete = ignoreDelete;}@Overridepublic MergeFunction<KeyValue> create(@Nullable int[][] projection) {return new FirstRowMergeFunction(ignoreDelete);}
}
-
Factory
类实现了MergeFunctionFactory
接口,用于创建FirstRowMergeFunction
实例。 -
构造函数接收
ignoreDelete
参数并保存。 -
create
方法根据传入的投影配置创建FirstRowMergeFunction
实例。