当前位置: 首页 > news >正文

深入解析Paimon MergeFunction

MergeFunction 创建过程

以PrimaryKeyFileStoreTable为例子

@Overridepublic KeyValueFileStore store() {if (lazyStore == null) {RowType rowType = tableSchema.logicalRowType();CoreOptions options = CoreOptions.fromMap(tableSchema.options());KeyValueFieldsExtractor extractor =PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;MergeFunctionFactory<KeyValue> mfFactory =PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);if (options.needLookup()) {mfFactory = LookupMergeFunction.wrap(mfFactory);}

调用的是PrimaryKeyTableUtils的函数,通过用户配置的mergeEngine产生

    public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(TableSchema tableSchema, KeyValueFieldsExtractor extractor) {RowType rowType = tableSchema.logicalRowType();Options conf = Options.fromMap(tableSchema.options());CoreOptions options = new CoreOptions(conf);CoreOptions.MergeEngine mergeEngine = options.mergeEngine();switch (mergeEngine) {case DEDUPLICATE:return DeduplicateMergeFunction.factory(conf);case PARTIAL_UPDATE:return PartialUpdateMergeFunction.factory(conf, rowType, tableSchema.primaryKeys());case AGGREGATE:return AggregateMergeFunction.factory(conf,tableSchema.fieldNames(),rowType.getFieldTypes(),tableSchema.primaryKeys());case FIRST_ROW:return FirstRowMergeFunction.factory(conf);default:throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);}}

KeyValueFileStore会进一步把mf传递给 KeyValueFileStoreWrite

        return new KeyValueFileStoreWrite(fileIO,schemaManager,schema,commitUser,partitionType,keyType,valueType,keyComparatorSupplier,() -> UserDefinedSeqComparator.create(valueType, options),logDedupEqualSupplier,mfFactory,pathFactory(),this::pathFactory,snapshotManager(),newScan(),indexFactory,deletionVectorsMaintainerFactory,options,keyValueFieldsExtractor,tableName);

KeyValueFileStoreWrite 会创建MergeTreeWriter

        return new MergeTreeWriter(options.writeBufferSpillable(),options.writeBufferSpillDiskSize(),options.localSortMaxNumFileHandles(),options.spillCompressOptions(),ioManager,compactManager,restoredMaxSeqNumber,keyComparator,mfFactory.create(),writerFactory,options.commitForceCompact(),options.changelogProducer(),restoreIncrement,UserDefinedSeqComparator.create(valueType, options));}

也会创建MergeTreeCompactRewriter,Compaction会使用merge function

return new MergeTreeCompactRewriter(readerFactory,writerFactory,keyComparator,userDefinedSeqComparator,mfFactory,mergeSorter);


MergeTreeWriter为何使用 mergeFunction?会丢数据吗?

  • ​为何使用​​:

    MergeTreeWriter的核心职责之一就是处理主键冲突。mergeFunction定义了当新旧记录主键相同时应该执行何种合并逻辑。

    flushWriteBuffer方法中,当数据从内存刷盘时,会调用 writeBuffer.forEach(keyComparator, mergeFunction, ...)。这里的 mergeFunction实际上是在对​​内存缓冲区内​​的数据进行合并。虽然 SortBufferWriteBuffer已经利用 sequence.field做了一轮预处理,但 mergeFunction提供了更通用的合并框架。对于 Paimon 的默认行为(deduplicatepartial-update),mergeFunction的实现会比较两条记录的 sequenceNumber,并保留较大的那一个。

  • ​会丢掉多版本数据吗?​

    ​是的,会,而且这是设计意图。​

    Paimon 的 Merge-Tree 表(尤其是带主键的表)的设计目标是维护每个主键的​​最新状态 (latest state)​​,而不是存储所有的历史版本。mergeFunction的核心职责就是在合并过程中​​淘汰旧版本数据​​,只保留最新的版本。

    例如,对于同一个主键 PK=1,你先后写入了三条记录:

    1. {PK=1, value='A', seq=101}

    2. {PK=1, value='B', seq=102}

    3. {PK=1, value='C', seq=103}

    当这三条记录在内存或磁盘上相遇并触发合并时,mergeFunction会比较它们的 sequenceNumber,最终只保留 seq=103的记录 {PK=1, value='C'}。记录 A 和 B 就被“丢弃”了,因为它们是过时的版本。

    这正是 Paimon 作为湖仓存储引擎实现 UPSERT语义的关键所在。它通过不断合并来用新数据覆盖旧数据,从而让用户在查询时总是能看到每个主键对应的最新值。如果你需要保留所有历史版本,那么应该选择不定义主键的 Append-Only 表。


MergeFunction 接口语义和用法分析

MergeFunction 是 Apache Paimon 中用于合并多个 KeyValue 对象的接口。它在合并树(Merge Tree)的紧凑过程中起着核心作用,定义了如何处理具有相同键的多条记录。

public interface MergeFunction<T> {/*** Reset the merge function to its default state, call this before calling {@link* #add(KeyValue)} for the first time or after {@link #getResult}.*/void reset();/** Add the given {@link KeyValue} to the merge function. */void add(KeyValue kv);/** Get current merged value. */T getResult();/** Require copy input kv, this may cache kv in memory. */boolean requireCopy();
}

核心方法说明

  1. ​reset()​​ : 重置合并函数到初始状态,在首次调用 add() 或调用 getResult() 后使用。

  2. ​add(KeyValue kv)​​ : 将给定的 KeyValue 添加到合并函数中进行处理。

  3. ​getResult()​​ : 获取当前合并后的结果。

  4. ​requireCopy()​​ : 指示是否需要复制输入的 KeyValue,这可能会影响内存使用。

主要实现类

  1. ​DeduplicateMergeFunction​​ :

    • 用于主键唯一且值为完整记录的场景,只保留最新的记录。

    • 当遇到删除记录时,根据配置决定是否忽略删除操作。

    • 适用于需要去重的场景,如唯一用户信息表。

  2. ​FirstRowMergeFunction​​ :

    • 用于主键唯一且值为完整记录的场景,只保留第一条记录。

    • 与 DeduplicateMergeFunction 不同,它保留最早插入的记录而不是最新记录。

    • 适用于需要保留首次出现记录的场景。

  3. ​PartialUpdateMergeFunction​​ :

    • 用于部分更新场景,只更新非空字段。

    • 支持序列组(sequence group)来处理字段级别的更新顺序。

    • 可以配置在接收到删除记录时的行为(忽略、删除整行或部分删除)。

    • 适用于需要部分更新记录的场景,如用户信息的部分字段更新。

  4. ​AggregateMergeFunction​​ :

    • 用于聚合场景,对非空字段进行预聚合。

    • 支持多种聚合函数,如 sum、max、min 等。

    • 可以配置在接收到删除记录时是否删除整行。

    • 适用于需要对数据进行聚合计算的场景,如统计分析表。

使用场景

MergeFunction 在 Paimon 的合并树写入器(MergeTreeWriter)和压缩管理器(MergeTreeCompactManager)中被广泛使用,用于处理数据的合并和压缩操作。不同的实现类适用于不同的业务场景,用户可以根据具体需求选择合适的合并策略。

PartialUpdateMergeFunction

见:双流join 、 Paimon Partial Update 和 动态schema-CSDN博客

这是在 Paimon 中实现 partial-update (部分列更新) 合并引擎的核心类。它的主要职责是在 Compaction 过程中,将具有相同主键的多条记录(KeyValue)合并成最终的一条记录。

PartialUpdateMergeFunction 实现了 MergeFunction<KeyValue> 接口。在 Paimon 的 LSM-Tree 存储模型中,当执行 Compaction 操作需要合并多个数据文件时,Paimon 会读取具有相同主键的一组 KeyValue 数据,然后交由一个 MergeFunction 实例来处理,计算出最终的结果。

PartialUpdateMergeFunction 的合并逻辑是:对于相同主键的记录,不断地用新的非空字段值去覆盖旧的字段值,最终得到一个“打宽”后的完整记录。 它还支持更复杂的场景,如基于序列号的更新、字段聚合和多种删除策略。

// ... existing code ...
import org.apache.paimon.mergetree.compact.MergeFunction;
// ... existing code ...
/*** A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update* non-null fields on merge.*/
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
// ... existing code ...

核心成员变量

这些变量定义了 PartialUpdateMergeFunction 的状态和配置,决定了其合并行为。

// ... existing code ...
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {public static final String SEQUENCE_GROUP = "sequence-group";private final InternalRow.FieldGetter[] getters; // 用于从 InternalRow 中获取字段值private final boolean ignoreDelete; // 是否忽略删除记录private final Map<Integer, FieldsComparator> fieldSeqComparators; // 字段序列号比较器,用于 sequence-groupprivate final boolean fieldSequenceEnabled; // 是否启用了 sequence-groupprivate final Map<Integer, FieldAggregator> fieldAggregators; // 字段聚合器private final boolean removeRecordOnDelete; // 收到 DELETE 记录时是否删除整行private final Set<Integer> sequenceGroupPartialDelete; // 收到 DELETE 记录时,根据 sequence-group 删除部分列private final boolean[] nullables; // 记录每个字段是否可为 nullprivate InternalRow currentKey; // 当前处理的主键private long latestSequenceNumber; // 见过的最新序列号private GenericRow row; // 合并过程中的结果行private KeyValue reused; // 用于复用的 KeyValue 对象,避免重复创建private boolean currentDeleteRow; // 标记当前行最终是否应被删除private boolean notNullColumnFilled;/*** If the first value is retract, and no insert record is received, the row kind should be* RowKind.DELETE. (Partial update sequence group may not correctly set currentDeleteRow if no* RowKind.INSERT value is received)*/private boolean meetInsert; // 是否遇到过 INSERT 类型的记录// ... existing code ...
  • 配置类变量 (ignoreDeletefieldSeqComparatorsfieldAggregators 等) 通常在 Factory 中被初始化,它们在整个合并过程中保持不变。
  • 状态类变量 (currentKeyrowlatestSequenceNumber 等) 会在每次 reset() 时被重置,用于处理新的一组具有相同主键的记录。

add(KeyValue kv) :合并逻辑的核心

这是最重要的方法,定义了单条 KeyValue 是如何被合并到当前结果 row 中的。

// ... existing code ...@Overridepublic void add(KeyValue kv) {// refresh key object to avoid reference overwrittencurrentKey = kv.key();currentDeleteRow = false;if (kv.valueKind().isRetract()) {if (!notNullColumnFilled) {initRow(row, kv.value());notNullColumnFilled = true;}// ... 删除逻辑处理 ...// ... existing code ...String msg =String.join("\n","By default, Partial update can not accept delete records,"+ " you can choose one of the following solutions:","1. Configure 'ignore-delete' to ignore delete records.","2. Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.","3. Configure 'sequence-group's to retract partial columns.");throw new IllegalArgumentException(msg);}latestSequenceNumber = kv.sequenceNumber();if (fieldSeqComparators.isEmpty()) {updateNonNullFields(kv);} else {updateWithSequenceGroup(kv);}meetInsert = true;notNullColumnFilled = true;}
// ... existing code ...

它的逻辑可以分为两大块:

A. 处理 retract 消息 (RowKind 为 DELETE 或 UPDATE_BEFORE)

partial-update 默认不接受删除记录。如果收到了,行为由配置决定:

  1. ignoreDelete = true: 直接忽略这条删除记录,返回。
  2. removeRecordOnDelete = true: 当收到 DELETE 类型的记录时,将 currentDeleteRow 标记为 true,并清空当前 row。这意味着最终这条主键对应的记录将被删除。
  3. fieldSequenceEnabled = true: 启用了 sequence-group。这是最复杂的逻辑,它会调用 retractWithSequenceGroup(kv)。这个方法会根据序列号比较结果,来决定是否要“撤销”某些字段的更新(通常是将其设置为 null 或调用聚合器的 retract 方法)。
  4. 默认行为: 如果以上配置都没有,则直接抛出 IllegalArgumentException 异常,提示用户如何正确配置。

B. 处理 add 消息 (RowKind 为 INSERT 或 UPDATE_AFTER)

这是主要的更新逻辑:

  1. 简单更新 (updateNonNullFields): 如果没有配置 sequence-group (fieldSeqComparators 为空),则执行最简单的部分列更新。遍历新纪录 kv 的所有字段,只要字段值不为 null,就用它来更新 row 中对应位置的值。

    // ... existing code ...
    private void updateNonNullFields(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());if (field != null) {row.setField(i, field);} else {
    // ... existing code ...
    
  2. 带序列号的更新 (updateWithSequenceGroup): 如果配置了 sequence-group,逻辑会更复杂。对于每个字段:

    • 如果该字段不属于任何 sequence-group,则行为和简单更新类似(但会考虑聚合)。
    • 如果该字段属于某个 sequence-group,则会使用 FieldsComparator 比较新记录 kv 和当前结果 row 的序列号字段。只有当新记录的序列号 大于或等于 当前结果的序列号时,才会用新记录的字段值去更新 row 中由该 sequence-group 控制的所有字段。这保证了数据的更新顺序。

updateWithSequenceGroup

这个方法是 partial-update 合并引擎处理带有 sequence-group 配置时的核心逻辑。当用户在表属性中定义了 fields.<seq_field>.sequence-group = <data_field1>,<data_field2> 这样的规则时,数据合并就不再是简单的“非空值覆盖”,而是需要根据 seq_field 的值来判断是否应该更新 data_field1 和 data_field2。这解决了多流更新时可能出现的数据乱序覆盖问题。

updateWithSequenceGroup 方法通过引入 FieldsComparator,将简单的字段更新升级为基于序列号的条件更新。它精确地控制了哪些字段在何时可以被更新,从而保证了在多流并发写入场景下,即使数据存在一定程度的乱序,最终也能合并成正确的结果。这是 Paimon partial-update 模式能够处理复杂更新场景的关键所在。

// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {
// ... existing code ...
  • 输入KeyValue kv,代表一条新到达的、具有相同主键的记录。
  • 目标: 遍历这条新记录 kv 的所有字段,并根据 sequence-group 的规则,决定是否用 kv 中的字段值来更新当前正在合并的结果行 this.row

该方法的核心是一个 for 循环,它遍历了表中的每一个字段。

// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {
// ... existing code ...

在循环内部,对每个字段的处理逻辑可以分为两种情况:

  1. 该字段不属于任何 sequence-group
  2. 该字段属于某个 sequence-group

让我们来详细看这两种情况。

1. 字段不属于任何 sequence-group

// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());FieldsComparator seqComparator = fieldSeqComparators.get(i);FieldAggregator aggregator = fieldAggregators.get(i);Object accumulator = getters[i].getFieldOrNull(row);if (seqComparator == null) {if (aggregator != null) {row.setField(i, aggregator.agg(accumulator, field));} else if (field != null) {row.setField(i, field);}} else {
// ... existing code ...
  • 判断条件seqComparator == nullfieldSeqComparators 是一个 Map<Integer, FieldsComparator>,如果在里面找不到当前字段索引 i,就说明这个字段不受任何 sequence-group 控制。
  • 处理逻辑:
    • 带聚合函数: 如果为该字段配置了聚合函数(aggregator != null),例如 summax 等,则调用 aggregator.agg() 方法,将当前累加值 accumulator 和新值 field 进行聚合,并将结果写回 row
    • 不带聚合函数: 这是最简单的情况。如果新来的字段值 field 不为 null,就直接用它覆盖 row 中的旧值。这和 updateNonNullFields 的行为是一致的。

2. 字段属于某个 sequence-group

这是该方法最核心和复杂的部分。

// ... existing code ...} else {if (isEmptySequenceGroup(kv, seqComparator)) {// skip null sequence groupcontinue;}if (seqComparator.compare(kv.value(), row) >= 0) {int index = i;// Multiple sequence fields should be updated at once.if (Arrays.stream(seqComparator.compareFields()).anyMatch(seqIndex -> seqIndex == index)) {for (int fieldIndex : seqComparator.compareFields()) {row.setField(fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));}continue;}row.setField(i, aggregator == null ? field : aggregator.agg(accumulator, field));} else if (aggregator != null) {row.setField(i, aggregator.aggReversed(accumulator, field));}}}}
// ... existing code ...
  • 判断条件seqComparator != null
  • 处理逻辑:
    1. 空序列组检查isEmptySequenceGroup(kv, seqComparator) 会检查这条新纪录 kv 中,其对应的序列号字段是否都为 null。如果是,意味着这条记录无法判断新旧,因此直接跳过,不进行任何更新。
    2. 序列号比较seqComparator.compare(kv.value(), row) >= 0 是关键。它会比较新记录 kv 和当前结果 row 中,由 seqComparator 定义的序列号字段。
      • 如果新记录的序列字段 >= 当前结果的序列字段 : 这意味着新记录 kv 是“更新”的或者“同样新”的,此时应该用 kv 的值去更新 row
        • 更新 序列字段 本身: 如果当前字段 i 就是序列字段之一,那么需要把这个 sequence-group 定义的所有序列号字段都一次性更新掉,然后用 continue 跳出本次循环。这是为了保证序列字段之间的一致性。
        • 更新数据字段: 如果当前字段 i 是被序列字段 控制的数据字段,则执行更新。如果有聚合器,则调用 aggregator.agg();如果没有,则直接用新值 field 覆盖。
      • 如果新记录的序列字段 < 当前结果的序列字段 : 这意味着 kv 是一条“旧”数据。在大部分情况下,这条旧数据会被忽略。但有一个例外:如果为该字段配置了支持乱序聚合的聚合器(例如 sum),则会调用 aggregator.aggReversed()。这个方法通常和 agg() 的逻辑是一样的,它允许旧数据也能被正确地聚合进来。对于不支持乱序的聚合器(如 max),aggReversed 可能就是一个空操作。

createFieldAggregators方法详解

该方法的核心任务是:​​为表中的每一个字段(列)确定它在合并时应该使用的聚合逻辑,并创建相应的聚合器(FieldAggregator)​​。

private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(RowType rowType,List<String> primaryKeys,List<Integer> allSequenceFields,CoreOptions options) {List<String> fieldNames = rowType.getFieldNames();List<DataType> fieldTypes = rowType.getFieldTypes();Map<Integer, Supplier<FieldAggregator>> fieldAggregators = new HashMap<>();for (int i = 0; i < fieldNames.size(); i++) {String fieldName = fieldNames.get(i);DataType fieldType = fieldTypes.get(i);if (allSequenceFields.contains(i)) {// no agg for sequence fieldscontinue;}if (primaryKeys.contains(fieldName)) {// aggregate by primary keys, so they do not aggregatefieldAggregators.put(i,() ->FieldAggregatorFactory.create(fieldType,fieldName,FieldPrimaryKeyAggFactory.NAME,options));continue;}String aggFuncName = getAggFuncName(options, fieldName);if (aggFuncName != null) {// last_non_null_value doesn't require sequence groupcheckArgument(aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)|| fieldSeqComparators.containsKey(fieldNames.indexOf(fieldName)),"Must use sequence group for aggregation functions but not found for field %s.",fieldName);fieldAggregators.put(i,() ->FieldAggregatorFactory.create(fieldType, fieldName, aggFuncName, options));}}return fieldAggregators;}

它遍历表中的所有字段,并按以下优先级规则为每个字段决定其命运:

规则一:序列字段不参与聚合

if (allSequenceFields.contains(i)) {    // no agg for sequence fields    continue;
}
  • ​说明​​:如果一个字段被定义为 sequence.field(例如 event_time),它的唯一作用就是用来比较记录的新旧,其本身的值在合并时永远是直接覆盖,不会进行求和、求最大值等聚合操作。因此直接跳过该字段的聚合逻辑。

规则二:主键不参与聚合

if (primaryKeys.contains(fieldName)) {    // ...    fieldAggregators.put(        i,        () -> FieldAggregatorFactory.create(..., FieldPrimaryKeyAggFactory.NAME, ...));    continue;
}
  • ​说明​​:主键字段是记录的唯一标识,它的值在合并过程中必须保持不变。这里为它创建了一个 FieldPrimaryKeyAggFactory,该工厂产生的聚合器逻辑非常简单:直接返回遇到的第一个值(每个merge function处理的总是主键相同的批次),后续的值都忽略。这种方式保证了主键的稳定性。

规则三:用户自定义的聚合函数

String aggFuncName = getAggFuncName(options, fieldName);
if (aggFuncName != null) {    // ...
}
  • ​说明​​:Paimon 允许用户为特定字段或所有字段(通过 fields.default.aggregate-function)指定聚合函数,例如 summaxminlast_non_null_value等。

  • ​逻辑​​:getAggFuncName会读取这些配置。如果找到了配置,就会进入一个重要的检查:

checkArgument(    aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)         || fieldSeqComparators.containsKey(fieldNames.indexOf(fieldName)),    ...);
  • ​背景​​:除了 last_non_null_value,其他所有聚合函数都必须有 sequence group的支持。

  • ​后续​​:检查通过后,就使用 FieldAggregatorFactory创建一个用户指定的聚合器。

规则四:默认行为(无聚合)

  • ​说明​​:如果一个字段不满足以上任何一条规则(它不是序列字段、不是主键、也没有配置任何聚合函数),那么 createFieldAggregators就不会为它创建任何聚合器。

  • ​合并逻辑​​:在后续的合并逻辑中(如 updateNonNullFieldsupdateWithSequenceGroup),对于没有聚合器的字段,其默认行为就是 ​​“非空值覆盖” (non-null value overwrite)​​。


多 Flink 任务写入与 last_non_null_value

场景设定

  • ​任务A​​:只写入 col_acol_b字段。

  • ​任务B​​:只写入 col_ccol_d字段。

  • ​并发写入​​:两个任务并发写入,它们的 sequenceNumber是完全独立的,因此在全局看来是乱序的。

  • ​聚合配置​​:所有值字段(col_acol_d)都配置了 last_non_null_value聚合。

分析结论

您的结论是正确的:​​只要每个 Flink 任务只写自己的字段,两个任务的字段不互相干扰,即使 sequenceNumber乱序,last_non_null_value也能正确工作。​


推演过程

我们来推演一下为什么能正确工作:

假设对于主键 pk=1,发生了以下事件:

  1. ​T1时刻​​:任务A 写入

    {pk:1, col_a:'A1', col_b:null, col_c:null, col_d:null}

    Paimon 分配 sequenceNumber = 101

  2. ​T2时刻​​:任务B 写入

    {pk:1, col_a:null, col_b:null, col_c:'C1', col_d:null}

    Paimon 分配 sequenceNumber = 201

  3. ​T3时刻​​:任务A 写入

    {pk:1, col_a:'A2', col_b:null, col_c:null, col_d:null}

    Paimon 分配 sequenceNumber = 102

现在,这三条记录在 Compaction 时相遇了。假设没有配置 sequence.fieldSortMergeReader会根据 sequenceNumber对它们进行排序:

  • ​记录1​​:{..., seq: 101}

  • ​记录3​​:{..., seq: 102}

  • ​记录2​​:{..., seq: 201}

PartialUpdateMergeFunction会按 ​​1 → 3 → 2​​ 的顺序处理它们:

初始化

row = {col_a:null, col_b:null, col_c:null, col_d:null}

处理记录1 (seq=101)

  • col_a更新为 'A1'

  • 其他字段都是 nulllast_non_null_value忽略它们。

  • ​当前 row 状态​​: {col_a:'A1', col_b:null, col_c:null, col_d:null}

处理记录3 (seq=102)

  • col_a更新为 'A2'

  • 其他字段都是 null,忽略。

  • ​当前 row 状态​​: {col_a:'A2', col_b:null, col_c:null, col_d:null}

处理记录2 (seq=201)

  • col_anull,忽略。

  • col_c更新为 'C1'

  • 其他字段是 null,忽略。

  • ​当前 row 状态​​: {col_a:'A2', col_b:null, col_c:'C1', col_d:null}

最终合并结果

{col_a:'A2', col_b:null, col_c:'C1', col_d:null}


为什么能行?

关键在于以下两点:

  1. ​字段不重叠​

    任务A 和 任务B 操作的是完全不同的字段集合。任务A 的写入对 col_ccol_d来说永远是 null,反之亦然。

  2. last_non_null_value的幂等性​

    该聚合函数的逻辑是“只要新来的不是 null就覆盖”。由于字段不重叠,任务A 的 sequenceNumber乱序只会影响 col_acol_b的合并顺序,但不会干扰到 col_ccol_d。同理,任务B 的 sequenceNumber也不会影响到 col_acol_b


重要警告

这种模式能够成功,​​强依赖于“字段不重叠”这个假设​​。

如果任务A 和 任务B 都可能写入 col_a,那么 sequenceNumber的乱序就会导致 col_a的最终值变得不确定,从而产生数据不一致。

​在这种情况下,就必须引入 sequence.field来提供一个全局统一的业务时钟,以确保无论哪个任务写入,都能根据业务时间来决定胜负。​


Sequence Group 机制解析

​Sequence Group(简称序列组)​​ 是一种把若干列(fields)组合在一起,以同一套“序列比较逻辑”决定这组列是否被“整体更新”的机制。

核心组成:

  • 每个序列组由一个 ​​FieldsComparator​​(实现了按配置的序列字段比较规则)表示。

  • 比较器会用一组“序列字段”(比如时间戳、version id 等)在 ​​incoming record(kv.value())​​ 和 ​​当前聚合行(row)​​ 之间判断哪条更新“更新”。

序列组的行为逻辑:

  • 对于同一组内的多个列,当 ​​comparator 判定 incoming 更新时​​,会“整组”把所有序列字段从 incoming 复制到当前行。

    • 目的是避免当组中只有部分字段出现在某条 ​​partial-update record​​ 时导致不一致。

  • 同时支持 ​​per-field 的聚合器(FieldAggregator)​​,用于非序列表现或需要累加/撤销等聚合语义的列。


为什么要这么做

1. Partial update 场景的问题:
  • 上游可能发送“​​局部字段更新​​”,不同列的到达顺序和是否出现值不一致,会导致最终行状态错误。

    • 例如一条消息只包含 ​​timestamp​​,但缺少其它字段,如果仅按字段级别单独比较,会把某些字段错误覆盖成 ​​null/旧值​​。

2. 序列组保证原子性与一致性:
  • 对于一组有共同序列语义的列(例如同一条变更的不同列),​​只有当 incoming 的序列字段整体比当前更新时​​,才把这些列整体覆盖。

    • 从而保证 ​​原子性 / 一致性​​,避免只更新部分字段造成的状态不一致。

3. 支持聚合列:
  • 一些列不是简单替换而是需要 ​​聚合语义​​(如 last_non_null、sum 等)。

  • 序列组逻辑要与聚合逻辑协同工作:

    • 新值进来时通常调用 ​​agg​​;

    • 当 incoming 比较旧且为撤回时可调用 ​​aggReversed​​。


典型场景举例

场景一:多列属于同一序列组,保证原子更新
  • 表有列 ​​{a, b, ts}​​,其中 ​​a、b 属于同一序列组​​,序列字段是 ​​ts(时间戳)​​。

  • 上游有两条 partial record:

    • {a = 10, ts = 100}

    • {b = 20, ts = 101}

  • 如果不使用 sequence group:

    • 第二条可能只更新 ​​b​​,导致 ​​a 保持 10(ok)或者被误置为 null(如果第二条有 null)​

    • 但当同一事务里 ​​a 与 b 应作为一份变更的一部分时需要一致性​

  • 使用序列组后:

    • 会根据 ​​ts​​ 决定是否整组覆盖,避免不一致。

场景二:空序列场景,避免误判
  • incoming record 只有普通列(不含 ts)

  • ​isEmptySequenceGroup​​ 会发现 ts 为 null,跳过序列组逻辑

  • 从而避免把组内列误判为“新序列”


注意与边界条件

边界情况

说明

​序列组配置不当​

比如对同一字段被多个组重复定义,Factory 在构建时会抛异常进行校验。

​配置组合冲突​

若 ​​removeRecordOnDelete / removeRecordOnSequenceGroup​​ 等配置组合不当,Factory 也会校验并抛出冲突错误。

​聚合器行为依赖实现​

aggregator 的行为(agg / aggReversed)依赖各自实现,可能需要根据业务确保语义正确(尤其是遇到旧序列来临时如何处理)。


总结

​Sequence Group 是按配置把若干列绑定在一起用同一“序列比较”决定整体更新的机制——通过比较 incoming 与当前的序列字段,决定是否“整组替换”或在聚合器上做合并/撤销,从而在 partial-update 场景下保证组内列的一致性和正确的聚合语义。​

getResult() 方法:产出最终结果

当处理完具有相同主键的所有 KeyValue 后,调用此方法来获取最终的合并结果。

// ... existing code ...@Overridepublic KeyValue getResult() {if (reused == null) {reused = new KeyValue();}RowKind rowKind = currentDeleteRow || !meetInsert ? RowKind.DELETE : RowKind.INSERT;return reused.replace(currentKey, latestSequenceNumber, rowKind, row);}
// ... existing code ...

它会根据 currentDeleteRow 和 meetInsert 标志位来决定最终的 RowKind。如果 currentDeleteRow 为 true,或者整个合并过程从未见过 INSERT 类型的记录,那么最终结果就是一条 DELETE 记录。否则,就是一条 INSERT 记录。然后将主键、最新的序列号、最终的 RowKind 和合并后的 row 数据打包成一个 KeyValue 返回。

Factory 内部类:配置的入口

PartialUpdateMergeFunction.Factory 是一个非常重要的内部类,它负责解析用户在表上设置的 OPTIONS,并据此创建出一个配置好的 PartialUpdateMergeFunction 实例。

// ... existing code ...public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType, List<String> primaryKeys) {return new Factory(options, rowType, primaryKeys);}private static class Factory implements MergeFunctionFactory<KeyValue> {// ... 成员变量,用于存储从 Options 解析出的配置 ...private Factory(Options options, RowType rowType, List<String> primaryKeys) {this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);// ... existing code ...this.removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);// ... 解析 sequence-group 配置 ...for (Map.Entry<String, String> entry : options.toMap().entrySet()) {String k = entry.getKey();String v = entry.getValue();if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {// ... 解析出序列号字段和被控制的字段,构建 fieldSeqComparators ...}}// ... 解析聚合函数配置,构建 fieldAggregators ...this.fieldAggregators =createFieldAggregators(rowType, primaryKeys, allSequenceFields, new CoreOptions(options));// ... 配置校验,确保冲突的配置不会同时开启 ...Preconditions.checkState(!(removeRecordOnDelete && ignoreDelete),// ...);// ...}
// ... existing code ...

在构造函数中,它会:

  1. 读取 ignore-deletepartial-update.remove-record-on-delete 等简单配置。
  2. 遍历所有 OPTIONS,查找以 fields. 开头、以 .sequence-group 结尾的配置项,例如 fields.order_time.sequence-group=order_id,price。它会解析这些配置,构建出 fieldSeqComparators 这个 Map,其中 key 是被控制字段的索引,value 是一个能够比较 order_time 字段的比较器。
  3. 调用 createFieldAggregators 方法,解析 fields.*.aggregate-function 等配置,构建出 fieldAggregators 这个 Map。
  4. 执行一系列 Preconditions.checkState,对用户的配置进行合法性校验,防止出现逻辑冲突。


PartialUpdateMergeFunction.Factory 创建过程分析

读取用户在表属性(Options)中配置的关于部分更新(Partial Update)的策略,并据此创建一个 PartialUpdateMergeFunction实例。

构造函数分解:Factory(Options options, RowType rowType, List<String> primaryKeys)

  1. ​解析基本配置​

    • ignoreDelete:是否忽略 DELETE 类型的消息。

    • removeRecordOnDelete:收到 DELETE 消息时,删除整行记录。

    • removeRecordOnSequenceGroup:收到特定 sequence group 的 DELETE 消息时,删除整行。

      (均为基本的行为开关)

  2. ​解析 sequence-group(核心逻辑)​

    • 遍历所有表属性(options.toMap().entrySet()),寻找 key 以 fields.开头、以 .sequence-group结尾的配置项(例如:'fields.event_time.sequence-group' = 'col_a,col_b')。

      • ​k (Key)​​:fields.event_time.sequence-group→ 解析出序列字段 event_time(用于判断新旧的字段)。

      • ​v (Value)​​:col_a,col_b→ 解析出受该序列字段影响的值字段(col_acol_b)。

    • 构建 fieldSeqComparators

      • 创建 UserDefinedSeqComparator(关联 event_time字段)。

      • 将该比较器与 col_acol_bevent_time自身关联,存入 fieldSeqComparatorsMap。

      • 最终结构示例:

        { (index_of_col_a) -> comparator, (index_of_col_b) -> comparator, (index_of_event_time) -> comparator 
        }
      • 含义:更新 col_acol_b时,需用 event_time判断是否执行更新。

  3. ​解析聚合函数(createFieldAggregators)​

    • 检查 fields.my_col.aggregate-functionfields.default.aggregate-function等配置。

    • 对每个非主键、非序列字段的列,若配置了聚合函数(如 sum、max、last_non_null_value),则创建对应的 FieldAggregator并存入 fieldAggregatorsMap。

    • ​重要约束​​:除 last_non_null_value外,其他聚合函数必须与 sequence-group一起使用(因聚合操作需要顺序信息)。

  4. ​冲突检查​

    • 通过一系列 Preconditions.checkState确保用户未配置逻辑冲突的选项(例如:不能同时启用 ignore-deleteremove-record-on-delete)。

​最终产物​​:

当调用 create()方法时,Factory 会利用解析好的 fieldSeqComparatorsfieldAggregators,生成一个配置齐全的 PartialUpdateMergeFunction实例。该实例明确知道:对于每一列的主键冲突情况,应如何合并数据(直接覆盖 / 根据 sequence field 判断后覆盖 / 执行聚合操作)。

总结

PartialUpdateMergeFunction 是 Paimon 实现高性能数据打宽(部分列更新)能力的技术基石。它通过一个设计精巧的合并流程,将简单的非空字段覆盖、基于序列号的有序更新、字段聚合以及多种删除策略融为一体。其 Factory 类则充当了连接用户配置和底层实现的桥梁。理解了这个类的工作原理,就能深刻地理解 Paimon partial-update 模式的强大之处。


AggregateMergeFunction 类详细分析

AggregateMergeFunction是 Paimon 中用于处理主键表聚合合并逻辑的一个类,实现了 MergeFunction<KeyValue>接口。

​主要用途​​:在合并多个具有相同主键的记录时,对非空字段进行预聚合操作。


主要成员变量

变量名

类型

说明

getters

InternalRow.FieldGetter[]

用于从 InternalRow中获取字段值

aggregators

FieldAggregator[]

包含每个字段对应的聚合器,执行具体聚合操作

nullables

boolean[]

标识每个字段是否允许为空

latestKv

KeyValue

存储最新的键值对

row

GenericRow

存储聚合后的结果行

reused

KeyValue

复用结果对象,避免频繁创建新对象

currentDeleteRow

boolean

标识当前记录是否为删除操作

removeRecordOnDelete

boolean

配置项,控制删除操作时是否移除记录


构造函数

public AggregateMergeFunction(InternalRow.FieldGetter[] getters,FieldAggregator[] aggregators,boolean removeRecordOnDelete,boolean[] nullables) {this.getters = getters;this.aggregators = aggregators;this.removeRecordOnDelete = removeRecordOnDelete;this.nullables = nullables;
}

​参数说明​​:

  • getters:字段获取器

  • aggregators:聚合器数组

  • removeRecordOnDelete:删除时是否移除记录的配置

  • nullables:字段是否可为空的数组


核心方法

reset()

@Override
public void reset() {this.latestKv = null;this.row = new GenericRow(getters.length);Arrays.stream(aggregators).forEach(FieldAggregator::reset);this.currentDeleteRow = false;
}

​功能​​:重置所有状态到初始值,包括:

  • 清空最新键值对

  • 创建新的结果行

  • 重置所有聚合器

  • 将删除标记设为 false


add(KeyValue kv)

@Override
public void add(KeyValue kv) {latestKv = kv;currentDeleteRow = removeRecordOnDelete && kv.valueKind() == RowKind.DELETE;if (currentDeleteRow) {row = new GenericRow(getters.length);initRow(row, kv.value());return;}boolean isRetract = kv.valueKind().isRetract();for (int i = 0; i < getters.length; i++) {FieldAggregator fieldAggregator = aggregators[i];Object accumulator = getters[i].getFieldOrNull(row);Object inputField = getters[i].getFieldOrNull(kv.value());Object mergedField =isRetract? fieldAggregator.retract(accumulator, inputField): fieldAggregator.agg(accumulator, inputField);row.setField(i, mergedField);}
}

​功能​​:添加键值对到合并函数。

  • 如果是删除操作且配置了移除记录,则初始化空行。将现在的KV的非空行赋值到结果中。

  • 否则根据操作类型(插入/回撤)对每个字段执行聚合操作。


initRow(GenericRow row, InternalRow value)

private void initRow(GenericRow row, InternalRow value) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(value);if (!nullables[i]) {if (field != null) {row.setField(i, field);} else {throw new IllegalArgumentException("Field " + i + " can not be null");}}}
}

​功能​​:初始化行数据。

  • 对于不允许为空的字段,若输入值为 null 则抛出异常。


getResult()

@Override
public KeyValue getResult() {checkNotNull(latestKv,"Trying to get result from merge function without any input. This is unexpected.");if (reused == null) {reused = new KeyValue();}RowKind rowKind = currentDeleteRow ? RowKind.DELETE : RowKind.INSERT;return reused.replace(latestKv.key(), latestKv.sequenceNumber(), rowKind, row);
}

​功能​​:获取合并后的结果。

  • 若复用对象为空则创建新对象。

  • 根据删除标记设置行类型(INSERT/DELETE)。

  • 返回包含合并数据的键值对。


requireCopy()

@Override
public boolean requireCopy() {return false;
}

​功能​​:指示是否需要复制输入键值对,返回 false 表示不需要。因为聚合元素是新创建的元素。


工厂方法

public static MergeFunctionFactory<KeyValue> factory(Options conf,List<String> fieldNames,List<DataType> fieldTypes,List<String> primaryKeys) {return new Factory(conf, fieldNames, fieldTypes, primaryKeys);
}

​功能​​:提供工厂方法,用于创建 MergeFunctionFactory实例。


内部工厂类 Factory

成员变量

变量名

类型

说明

options

CoreOptions

表的配置选项

fieldNames

List<String>

字段名称列表

fieldTypes

List<DataType>

字段类型列表

primaryKeys

List<String>

主键字段名称列表

removeRecordOnDelete

boolean

删除时是否移除记录的配置


create(@Nullable int[][] projection)

@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {List<String> fieldNames = this.fieldNames;List<DataType> fieldTypes = this.fieldTypes;if (projection != null) {Projection project = Projection.of(projection);fieldNames = project.project(fieldNames);fieldTypes = project.project(fieldTypes);}FieldAggregator[] fieldAggregators = new FieldAggregator[fieldNames.size()];List<String> sequenceFields = options.sequenceField();for (int i = 0; i < fieldNames.size(); i++) {String fieldName = fieldNames.get(i);DataType fieldType = fieldTypes.get(i);String aggFuncName = getAggFuncName(fieldName, sequenceFields);fieldAggregators[i] =FieldAggregatorFactory.create(fieldType, fieldName, aggFuncName, options);}return new AggregateMergeFunction(createFieldGetters(fieldTypes),fieldAggregators,removeRecordOnDelete,ArrayUtils.toPrimitiveBoolean(fieldTypes.stream().map(DataType::isNullable).toArray(Boolean[]::new)));
}

​功能​​:创建 AggregateMergeFunction实例。

  1. 根据投影信息调整字段名称和类型。

  2. 为每个字段创建对应的聚合器。

  3. 使用上述信息构造 AggregateMergeFunction对象。


getAggFuncName(String fieldName, List<String> sequenceFields)

private String getAggFuncName(String fieldName, List<String> sequenceFields) {if (sequenceFields.contains(fieldName)) {return FieldLastValueAggFactory.NAME; // 序列字段使用 last_value}if (primaryKeys.contains(fieldName)) {return FieldPrimaryKeyAggFactory.NAME; // 主键字段不聚合}String aggFuncName = options.fieldAggFunc(fieldName);if (aggFuncName == null) {aggFuncName = options.fieldsDefaultFunc();}if (aggFuncName == null) {aggFuncName = FieldLastNonNullValueAggFactory.NAME; // 默认使用 last_non_null_value}return aggFuncName;
}

​功能​​:确定字段的聚合函数名称。

  • 序列字段 → last_value

  • 主键字段 → primary_key(不聚合)

  • 其他字段 → 优先使用配置的聚合函数 → 默认聚合函数 → 最终默认 last_non_null_value


聚合器 (FieldAggregator)

  • ​定义​​:抽象类,定义聚合操作基本接口。

  • ​实现类​​:如 FieldMaxAggFieldMinAggFieldSumAgg等。

  • ​核心方法​​:

    • agg():执行聚合操作。

    • retract()(可选):支持回撤操作。


配置选项

配置项

说明

removeRecordOnDelete

控制删除操作时是否移除记录

fieldAggFunc

为特定字段指定聚合函数

fieldsDefaultFunc

为所有字段指定默认聚合函数

sequenceField

指定序列字段(不聚合,使用 last_value覆盖)


总结

AggregateMergeFunction通过组合不同的 FieldAggregator,实现对主键表中非空字段的预聚合操作,有效处理数据更新与删除,确保合并逻辑高效可靠。


DeduplicateMergeFunction 类详细分析

DeduplicateMergeFunction是 Paimon 项目中用于处理数据合并的一种策略实现,其主要功能是基于主键(唯一键)对数据进行去重,只保留最新的记录。

/*** A {@link MergeFunction} where key is primary key (unique) and value is the full record, only keep* the latest one.*/
public class DeduplicateMergeFunction implements MergeFunction<KeyValue>

从类的注释可以看出,DeduplicateMergeFunction实现了 MergeFunction接口,其设计目的是处理主键唯一且值为完整记录的情况,仅保留最新的记录。这种策略在需要去重并保留最新数据的场景中非常有用。

成员变量

private final boolean ignoreDelete;
private KeyValue latestKv;
  • ignoreDelete: 一个布尔值,表示是否忽略删除记录。当设置为 true时,删除记录将不会被处理。

  • latestKv: 存储最新的 KeyValue记录,这是去重策略的核心,每次添加新记录时都会更新此变量。

构造函数

private DeduplicateMergeFunction(boolean ignoreDelete) {this.ignoreDelete = ignoreDelete;
}

构造函数是私有的,这意味着外部不能直接实例化该类。实例化需要通过工厂方法完成,这有助于控制对象的创建过程并传递配置参数。

核心方法实现

reset() 方法

@Override
public void reset() {latestKv = null;
}

reset()方法用于重置合并函数的状态,将 latestKv设置为 null,以便开始新的合并过程。

add() 方法

@Override
public void add(KeyValue kv) {// In 0.7- versions, the delete records might be written into data file even when// ignore-delete configured, so ignoreDelete still needs to be checkedif (ignoreDelete && kv.valueKind().isRetract()) {return;}latestKv = kv;
}

add()方法是去重逻辑的核心。它接收一个 KeyValue对象作为参数,并根据 ignoreDelete配置决定是否处理删除记录。如果 ignoreDeletetrue且记录是删除记录(通过 kv.valueKind().isRetract()判断),则直接返回,不更新 latestKv。否则,将 latestKv更新为当前记录,实现保留最新记录的策略。

注释中提到,在0.7版本中,开发团队发现了ignore-delete配置在某些情况下不生效的问题,为了确保数据正确性,他们在合并函数中添加了显式的配置检查,作为额外的保护层。这种设计既修复了问题,又保持了对历史数据的兼容性。

getResult() 方法

@Override
public KeyValue getResult() {return latestKv;
}

getResult()方法返回合并后的结果,即 latestKv,也就是最新的记录。

requireCopy() 方法

@Override
public boolean requireCopy() {return false;
}

- 由于总是使用最新的记录引用,不存在数据被意外修改的风险
- 数据流是单向的,不会出现历史数据被修改的情况

 工厂方法

public static MergeFunctionFactory<KeyValue> factory() {return new Factory(false);
}public static MergeFunctionFactory<KeyValue> factory(Options options) {return new Factory(options.get(CoreOptions.IGNORE_DELETE));
}

提供了两个静态工厂方法用于创建 DeduplicateMergeFunction实例:

  1. 无参版本默认创建 ignoreDeletefalse的实例。

  2. 有参版本根据传入的 Options对象获取 IGNORE_DELETE配置值来创建实例。

这两个方法都返回一个 MergeFunctionFactory对象,实际的实例化工作由内部的 Factory类完成。

内部 Factory 类

private static class Factory implements MergeFunctionFactory<KeyValue> {private static final long serialVersionUID = 1L;private final boolean ignoreDelete;private Factory(boolean ignoreDelete) {this.ignoreDelete = ignoreDelete;}@Overridepublic MergeFunction<KeyValue> create(@Nullable int[][] projection) {return new DeduplicateMergeFunction(ignoreDelete);}
}

Factory是一个内部静态类,实现了 MergeFunctionFactory接口。它负责根据配置创建 DeduplicateMergeFunction实例。该类通过序列化 ID serialVersionUID标识,并存储 ignoreDelete配置。create()方法根据传入的投影参数(本实现中未使用)创建并返回 DeduplicateMergeFunction实例。

总结

DeduplicateMergeFunction类通过简单的策略实现了基于主键的去重功能,仅保留最新的记录。其设计考虑了删除记录的处理,并通过工厂方法提供了灵活的配置选项。该类适用于需要去重并保留最新数据的场景,如实时数据处理中的状态维护。


FirstRowMergeFunction 类详细分析

FirstRowMergeFunction 是 Apache Paimon 项目中用于合并数据的一种策略实现,它实现了 MergeFunction 接口。该类的主要功能是基于主键(唯一键)保留第一条记录,忽略后续具有相同主键的记录。

/*** A {@link MergeFunction} where key is primary key (unique) and value is the full record, only keep* the first one.*/
public class FirstRowMergeFunction implements MergeFunction<KeyValue>

从类的注释可以看出,FirstRowMergeFunction 是一种合并函数,其中键是主键(唯一的),值是完整记录,只保留第一条记录。这意味着当多个具有相同主键的记录需要合并时,只有第一条记录会被保留,其余记录将被忽略。

成员变量

  • private KeyValue first;

    用于存储第一条记录。

  • public boolean containsHighLevel;

    标记是否包含高层级的数据。

  • private final boolean ignoreDelete;

    配置项,用于决定是否忽略删除记录。

protected FirstRowMergeFunction(boolean 
ignoreDelete) {this.ignoreDelete = ignoreDelete;
}

构造函数接受一个布尔参数 ignoreDelete,用于初始化是否忽略删除记录的配置。

方法实现

  1. ​reset()​

    @Override
    public void reset() {this.first = null;this.containsHighLevel = false;
    }

    重置方法将 first设置为 null,并将 containsHighLevel设置为 false,以便重新开始处理新的记录集。

  2. ​add(KeyValue kv)​

    @Override
    public void add(KeyValue kv) {if (kv.valueKind().isRetract()) {// In 0.7- versions, the delete records might be written into data file even when// ignore-delete configured, so  ignoreDelete still needs to be checkedif (ignoreDelete) {return;} else {throw new IllegalArgumentException("By default, First row merge engine can not accept DELETE/UPDATE_BEFORE records.\n"+ "You can config 'first-row.ignore-delete' to ignore the DELETE/UPDATE_BEFORErecords.");}}if (first == null) {this.first = kv;}if (kv.level() > 0) {containsHighLevel = true;}
    }
    • 如果传入的 KeyValue是删除或更新前的记录(isRetract()true):

      • 如果配置了忽略删除(ignoreDeletetrue),则直接返回,不处理该记录。

      • 否则,抛出异常,提示用户默认情况下不接受删除/更新前记录,并建议配置 first-row.ignore-delete来忽略这些记录。

    • 如果 firstnull(即尚未记录第一条数据),则将当前 KeyValue设置为 first

    • 如果 kv.level()大于 0,则设置 containsHighLeveltrue

  3. ​getResult()​

    @Override
    public KeyValue getResult() {return first;
    }

    返回第一条记录 first

  4. ​requireCopy()​

    @Override
    public boolean requireCopy() {return true;
    }

    表示需要复制数据。

工厂方法

public static 
MergeFunctionFactory<KeyValue> factory
(Options options) {return new FirstRowMergeFunction.Factory(options.get(CoreOptions.IGNORE_DELETE));
}

提供了一个静态工厂方法 factory,用于根据配置选项创建 FirstRowMergeFunction实例。该方法通过 CoreOptions.IGNORE_DELETE获取是否忽略删除记录的配置,并传递给内部类 Factory

内部类 Factory

private static class Factory implements 
MergeFunctionFactory<KeyValue> {private static final long serialVersionUID = 1L;private final boolean ignoreDelete;public Factory(boolean ignoreDelete) {this.ignoreDelete = ignoreDelete;}@Overridepublic MergeFunction<KeyValue> create(@Nullable int[][] projection) {return new FirstRowMergeFunction(ignoreDelete);}
}
  • Factory类实现了 MergeFunctionFactory接口,用于创建 FirstRowMergeFunction实例。

  • 构造函数接收 ignoreDelete参数并保存。

  • create方法根据传入的投影配置创建 FirstRowMergeFunction实例。

http://www.dtcms.com/a/357818.html

相关文章:

  • 图解帕累托前沿(pareto frontier)
  • 嵌入式Linux驱动开发:i.MX6ULL按键中断驱动(非阻塞IO)
  • stm32单片机使用tb6612驱动编码器电机并测速的驱动代码详解—详细参考开发手册(可移植+开发手册)
  • 文本嵌入模型的本质
  • 《ArkUI 记账本开发:状态管理与数据持久化实现》
  • 分布式锁在支付关闭订单场景下的思考
  • Product Hunt 每日热榜 | 2025-08-29
  • 逻辑漏洞 跨站脚本漏洞(xss)
  • 早期人类奴役AI实录:用Comate Zulu 10min做一款Chrome插件
  • nacos登录认证
  • 【算法】15. 三数之和
  • 学习做动画7.跳跃
  • UCIE Specification详解(十)
  • 快速深入理解zookeeper特性及核心基本原理
  • 【拍摄学习记录】06-构图、取景
  • Docker03-知识点整理
  • TypeScript:map和set函数
  • 2025 DDC系统选型白皮书:构建高效低碳智慧楼宇的核心指南
  • 【python开发123】三维地球应用开发方案
  • python 解码 视频解码
  • 打工人日报#20250829
  • 人工智能-python-深度学习-批量标准化与模型保存加载详解
  • OpenTenBase 技术解读与实战体验:从架构到行业落地
  • 2024年06月 Python(四级)真题解析#中国电子学会#全国青少年软件编程等级考试
  • c++标准模板库
  • 轨道交通场景下设备状态监测与智能润滑预测性维护探索
  • 动态环境下的人员感知具身导航!HA-VLN:具备动态多人互动的视觉语言导航基准与排行榜
  • Free Subtitles-免费AI在线字幕生成工具,支持111种语言
  • 【ChatMemory聊天记忆】
  • STM32F4系列单片机如何修改主频