当前位置: 首页 > news >正文

网站搭建公司哪家好wordpress 评论删除

网站搭建公司哪家好,wordpress 评论删除,怎么制作一个简单的网页,重庆巴南网站制作BucketAssigner这是一个用于在 Paimon 中进行动态 Bucket 分配的工具类。根据其包名 crosspartition (跨分区),我们可以推断它主要服务于需要跨分区协调进行 Bucket 分配的场景。它的核心职责是为进入特定分区的每条记录智能地分配一个 Bucket ID。BucketAssigner 类…

BucketAssigner

这是一个用于在 Paimon 中进行动态 Bucket 分配的工具类。根据其包名 crosspartition (跨分区),我们可以推断它主要服务于需要跨分区协调进行 Bucket 分配的场景。它的核心职责是为进入特定分区的每条记录智能地分配一个 Bucket ID。

BucketAssigner 类是一个状态化的管理器。它内部维护了每个分区(Partition)的 Bucket 分配情况的统计信息。其主要目标是:

  • 动态分配:当有新数据写入时,能够动态地决定数据应该放入哪个 Bucket。
  • 负载均衡:通过限制每个 Bucket 中的记录数量(maxCount),避免单个 Bucket 过大,从而实现负载均衡。
  • 高效复用:优先复用未满的现有 Bucket,当没有可用 Bucket 时再创建新的。

核心数据结构

// ... existing code ...
public class BucketAssigner {private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new HashMap<>();// ... existing code ...

这个类只有一个成员变量 stats,但它是整个逻辑的核心:

  • Map<BinaryRow, TreeMap<Integer, Integer>> stats:
    • 外层 Map 的 Key (BinaryRow): 代表一个分区。BinaryRow 是 Paimon 中用于表示一行数据的二进制格式,这里用它来唯一标识一个分区键(Partition Key)。
    • 外层 Map 的 Value (TreeMap<Integer, Integer>): 存储了该分区内所有 Bucket 的统计信息。
      • 内层 TreeMap 的 Key (Integer): 代表 Bucket 的 ID。
      • 内层 TreeMap 的 Value (Integer): 代表当前已分配到该 Bucket 的记录数量。
    • 为什么用 TreeMap?TreeMap 会根据 Key(即 Bucket ID)进行排序。这使得在 assignBucket 方法中遍历现有 Bucket 时,会按照 Bucket ID 从小到大的顺序进行,保证了分配逻辑的确定性。

assignBucket

这是最核心的方法,负责执行 Bucket 分配逻辑。

// ... existing code ...public int assignBucket(BinaryRow part, Filter<Integer> filter, int maxCount) {TreeMap<Integer, Integer> bucketMap = bucketMap(part);// 阶段一:尝试复用现有 Bucketfor (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {int bucket = entry.getKey();int count = entry.getValue();if (filter.test(bucket) && count < maxCount) {bucketMap.put(bucket, count + 1);return bucket;}}// 阶段二:创建新 Bucketfor (int i = 0; ; i++) {if (filter.test(i) && !bucketMap.containsKey(i)) {bucketMap.put(i, 1);return i;}}}
// ... existing code ...

它的分配策略分为两个阶段:

  1. 阶段一:尝试复用现有 Bucket

    • 它首先遍历当前分区 part 已有的所有 Bucket(bucketMap.entrySet())。
    • 对于每个 Bucket,它会检查两个条件:
      1. filter.test(bucket): 该 Bucket 是否满足外部传入的过滤条件。这个 filter 非常关键,它通常用于确保当前的 Assigner 任务只分配属于它自己管辖范围的 Bucket。例如,在一个拥有 N 个 Assigner 的集群中,可以约定 assignerId 为 i 的任务只负责处理 bucket % N == i 的 Bucket。
      2. count < maxCount: 该 Bucket 中的记录数是否已经达到上限 maxCount
    • 如果两个条件都满足,说明这个 Bucket 可用,就将该 Bucket 的计数值加一,并返回这个 Bucket ID。
  2. 阶段二:创建新 Bucket

    • 如果遍历完所有现有 Bucket 都没有找到合适的,说明需要创建一个新的 Bucket。
    • 它从 i = 0 开始无限循环,依次尝试新的 Bucket ID。
    • 对于每个尝试的 ID i,它同样会检查 filter.test(i) 是否通过,并且 !bucketMap.containsKey(i) 确保这个 Bucket 是全新的。
    • 一旦找到一个满足条件的、全新的 Bucket ID,就将其加入到 bucketMap 中,初始计数值为 1,并返回这个新的 Bucket ID。

bootstrapBucket

这个方法用于在分配开始前“引导”或“预加载”已有的 Bucket 信息。

// ... existing code ...public void bootstrapBucket(BinaryRow part, int bucket) {TreeMap<Integer, Integer> bucketMap = bucketMap(part);Integer count = bucketMap.get(bucket);if (count == null) {count = 0;}bucketMap.put(bucket, count + 1);}
// ... existing code ...

当系统从之前的状态恢复时,可以通过这个方法将已存在的 Bucket 及其中的大致记录数(这里是简单地+1)加载到 stats 中,以便 assignBucket 能够在其基础上继续分配。

decrement

这个方法用于将指定 Bucket 的计数值减一。

// ... existing code ...public void decrement(BinaryRow part, int bucket) {bucketMap(part).compute(bucket, (k, v) -> v == null ? 0 : v - 1);}
// ... existing code ...

这可能用于一些回滚或者数据迁移的场景,当一条记录被移出某个 Bucket 时,可以通过此方法更新统计信息。使用 compute 方法可以很简洁地处理 Bucket 可能不存在的情况。

bucketMap

这是一个私有的辅助方法,用于获取或创建指定分区的 TreeMap

// ... existing code ...private TreeMap<Integer, Integer> bucketMap(BinaryRow part) {TreeMap<Integer, Integer> map = stats.get(part);if (map == null) {map = new TreeMap<>();stats.put(part.copy(), map);}return map;}
}
  • 它实现了 "get-or-create" 模式。如果 stats 中已经存在该分区的 TreeMap,直接返回。
  • 如果不存在,则创建一个新的 TreeMap,并将其放入 stats 中。
  • 关键点: 在 stats.put 时,它使用了 part.copy()BinaryRow 对象在 Paimon 的处理流程中经常被复用,其底层的字节数组可能会被修改。如果不进行 copy,那么下一次使用同一个 BinaryRow 对象但代表不同分区时,就会污染 stats 中已有的 Key。通过 copy() 创建一个独立的副本,保证了 HashMap 中 Key 的稳定性和正确性。

总结

BucketAssigner 是一个设计精巧的动态 Bucket 分配器。它通过内存中的统计信息,结合外部的过滤逻辑和容量限制,实现了对数据写入的智能分桶。这对于 Paimon 的动态 Bucket 表(Dynamic Bucket Table)功能至关重要,因为它允许系统根据数据量自动扩展 Bucket 数量,而无需用户预先指定一个固定的 Bucket 数,从而提高了灵活性和资源利用率。

GlobalIndexAssigner

GlobalIndexAssigner 是 Paimon 实现跨分区 UPSERT 和动态分桶(Dynamic Bucket)功能的核心组件,它的设计比较复杂。我们来一步步、有条理地详细解析它。

首先,我们要明白它的核心使命是什么。在一个支持 UPSERT 的主键表里,当一条新数据到来时,系统必须回答两个问题:

  1. 这条数据是全新的吗?
  2. 如果不是全新的(即主键已存在),它之前在哪?

对于普通的主键表,数据写入时会根据主键计算一个固定的 bucket,所以新数据和老数据一定在同一个分区、同一个 bucket 下,问题很简单。

但对于跨分区 UPSERTcross-partition-upsert)的场景,一条数据的主键可能不变,但分区键变了(比如,一个用户的归属城市从“北京”变成了“上海”)。这意味着新数据和老数据在不同的分区里。为了正确地执行 UPSERT(即删除老分区的数据,插入新分区),系统必须有一个全局的视角,能够根据主key快速定位到它之前所在的分区和 bucket

GlobalIndexAssigner 就是为了提供这个全局主键索引而生的。它会为每一条到来的数据,分配一个最终的 bucket,并在这个过程中处理跨分区的 UPDATE_BEFORE 消息。

主要成员变量解析

我们先来理解它内部的关键成员变量

// ... existing code ...
public class GlobalIndexAssigner implements Serializable, Closeable {// ...// State & Storageprivate transient RocksDBStateFactory stateFactory;private transient RocksDBValueState<InternalRow, PositiveIntInt> keyIndex;private transient BinaryExternalSortBuffer bootstrapKeys;private transient RowBuffer bootstrapRecords;// Logic Componentsprivate transient BucketAssigner bucketAssigner;private transient ExistingProcessor existingProcessor;private transient IDMapping<BinaryRow> partMapping;// Configuration & Contextprivate final FileStoreTable table;private transient int targetBucketRowNumber;private transient int numAssigners;private transient int assignId;private transient BiConsumer<InternalRow, Integer> collector;// ...
}
  • State & Storage (状态与存储)

    • stateFactory 和 keyIndex: 这是核心中的核心keyIndex 是一个基于 RocksDB 的键值存储。
      • Key: 表的主键 (InternalRow)。
      • Value: 一个 PositiveIntInt 对象,包含了两个信息:分区ID (partId) 和 bucket号
      • 它的作用就是维护一个 主键 -> (分区ID, bucket) 的全局映射。通过它,我们可以用 O(1) 的时间复杂度查到一个主键的全局位置。
    • bootstrapKeys: 一个外部排序缓冲。在引导阶段,用来缓存并排序所有存量数据的 (主键, (分区ID, bucket)) 信息,最后通过 BulkLoader 高效地批量载入 RocksDB。
    • bootstrapRecords: 一个行缓冲。在引导阶段,用来缓存所有流入的原始数据记录。因为在引导阶段结束前,我们还不能处理它们。
  • Logic Components (逻辑组件)

    • bucketAssigner: 就是我们之前详细讨论过的桶分配器。它负责在确定了分区后,为数据分配一个具体的 bucket
    • existingProcessor存量数据处理器。当发现一条新数据的主键已经存在于另一个分区时,由它来决定如何处理。比如,对于 deduplicate 引擎,它会生成一条 UPDATE_BEFORE 消息来删除老数据;对于 first-row 引擎,它可能会直接忽略新数据。
    • partMapping: 一个 分区 -> 整数ID 的映射。因为 RocksDB 中存储分区信息用整数ID比用完整的 BinaryRow 更高效,所以用这个组件来做转换。
  • Configuration & Context (配置与上下文)

    • table: 当前操作的 Paimon 表对象。
    • targetBucketRowNumber: 动态分桶的目标行数,即一个桶里期望存放多少条数据。
    • numAssigners 和 assignId: Flink 算子的并行度和当前子任务的ID。用于实现我们之前讨论过的无锁并发分桶
    • collector: 一个回调函数(消费者),用来将处理完成的 (数据, bucket) 对发送给下游。

GlobalIndexAssigner 的工作流程可以清晰地分为两个阶段:引导 (Bootstrap) 和 处理 (Process)

阶段一:引导 (Bootstrap)

这个阶段的目标是用表的存量数据构建 RocksDB 全局索引

  1. open(...): 初始化所有组件,包括创建 RocksDB 实例、初始化 BucketAssigner 等。此时 bootstrap 标志位为 true

  2. bootstrapKey(InternalRow value):

    • 这个方法被外部调用,逐条喂入从 Paimon 表快照中读出的存量数据
    • 它从 value 中提取 主键分区bucket
    • 调用 bucketAssigner.bootstrapBucket(...) 来累积每个分区的桶内记录数。
    • 将 (序列化的主键, 序列化的(分区ID, bucket)) 写入 bootstrapKeys 这个外部排序缓冲中。
  3. processInput(InternalRow value): 在引导阶段 (inBootstrap() 为 true),所有新流入的数据(不是存量数据)都会被临时存入 bootstrapRecords 这个行缓冲中,等待引导结束后再处理。

  4. endBoostrap(boolean isEndInput):

    • 标志着引导阶段的结束,bootstrap 标志位被设为 false
    • 核心动作:将 bootstrapKeys 中缓存的所有主键索引信息,通过 BulkLoader 批量、高效地载入 RocksDB。这比一条条 put 快得多。
    • 处理 bootstrapRecords 中缓存的数据:将它们一条条拿出,交给 processInput 方法进行正式处理。

阶段二:处理 (Process)

引导结束后,GlobalIndexAssigner 进入常规处理模式。

  1. processInput(InternalRow value):
    • 从 value 中提取分区 (partition) 和主键 (key)。
    • 用 key 去查询 keyIndex (RocksDB)。
    • Case 1: keyIndex.get(key) 返回 null
      • 说明这是一个全新的主键
      • 调用 processNewRecord(...) -> assignBucket(...) 为它在新分区中分配一个新 bucket
      • 将 (key, (新分区ID, 新bucket)) 存入 keyIndex
      • 通过 collector 将 (value, 新bucket) 发往下游。
    • Case 2: keyIndex.get(key) 返回一个已存在的 (分区ID, bucket)
      • Sub-case 2.1: 新老分区相同。说明这是一次普通的分区内更新,直接将 value 和老的 bucket 发往下游即可。
      • Sub-case 2.2: 新老分区不同。这是跨分区更新的关键场景。
        • 调用 existingProcessor.processExists(...)ExistingProcessor 会根据表的 merge-engine(合并引擎)策略来处理。例如,对于 deduplicate,它会生成一条 UPDATE_BEFORE 记录,标记为删除,并使用老的分区和 bucket 发往下游。
        • 如果 processExists 返回 true(意味着新数据需要被处理),则接着调用 processNewRecord 为这条新数据在新的分区中分配 bucket,并更新 keyIndex

并发与分布式处理

GlobalIndexAssigner 本身是可序列化的,它会被分发到 Flink 的多个并行 TaskManager 上执行。

  • 并发控制: 通过 isAssignBucket 方法中的 computeAssignId(bucket) == assignId 逻辑(作为filter),保证了每个并行实例只负责分配一部分 bucket ID,从而实现了无锁的并发分配。
  • 状态独立: 每个并行实例都有自己独立的 RocksDB 实例。这看起来似乎违背了“全局索引”的初衷,但实际上 Paimon 通过 Flink 的 KeyedStream 机制,保证了相同主键的数据总是被路由到同一个并行实例。因此,虽然物理上有多个 RocksDB 实例,但逻辑上,对于任何一个主键,它的索引信息只存在于其中一个固定的实例中,从而保证了全局索引的一致性。

总结

GlobalIndexAssigner 是一个集成了本地状态存储 (RocksDB)外部排序缓存分布式计算思想的复杂但高效的组件。它通过引导+处理的两阶段模式,以及基于主键路由的分布式状态,巧妙地解决了跨分区 UPSERT 场景下的全局索引难题,是 Paimon 动态表功能能够强大、高效运行的基石。

GlobalIndexAssignerOperator

GlobalIndexAssignerOperator 是 GlobalIndexAssigner 在 Flink 计算引擎中的宿主执行器。如果说 GlobalIndexAssigner 是一个功能强大的“引擎”,那么 GlobalIndexAssignerOperator 就是为这个引擎量身打造的“座驾”。它负责将 GlobalIndexAssigner 的逻辑无缝地集成到 Flink 的流处理拓扑中,处理生命周期、数据流转和状态管理。

public class GlobalIndexAssignerOperatorextends AbstractStreamOperator<Tuple2<InternalRow, Integer>>implements OneInputStreamOperator<Tuple2<KeyPartOrRow, InternalRow>, Tuple2<InternalRow, Integer>>,BoundedOneInput {
// ...
}

深入分析:

  • extends AbstractStreamOperator<...>: 表明它是一个标准的 Flink 流处理算子。它的输出类型是 Tuple2<InternalRow, Integer>,即 (数据行, bucket号),这正是 GlobalIndexAssigner 处理后的结果。
  • implements OneInputStreamOperator<...>: 表明这是一个单输入流的算子。它的输入类型是 Tuple2<KeyPartOrRow, InternalRow>。这个 KeyPartOrRow 是一个枚举,用来区分输入的数据是存量数据KEY_PART)还是增量数据ROW),这对于实现两阶段工作模式至关重要。
  • implements BoundedOneInput: 表明这个算子可以处理有界流(批处理作业)。它需要实现 endInput() 方法,当输入流结束时 Flink 会调用该方法。

核心成员变量

// ...private final GlobalIndexAssigner assigner;private transient IOManager ioManager;
// ...

深入分析:

  • assigner: 这是最重要的成员,它持有一个 GlobalIndexAssigner 的实例。Operator 的所有核心逻辑都是通过委托给这个 assigner 对象来完成的。
  • ioManager: 一个 transient 的 IO 管理器。transient 关键字意味着它不会被序列化。它在 initializeState 方法中被创建,用于管理 GlobalIndexAssigner 可能需要的磁盘溢写操作(例如 RocksDB 和外部排序缓冲)。

生命周期与初始化 (initializeState)

这是 Operator 在 Flink TaskManager 上启动时最先执行的方法之一,负责准备好 GlobalIndexAssigner 的运行环境。

// ...@Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);org.apache.flink.runtime.io.disk.iomanager.IOManager flinkIoManager =getContainingTask().getEnvironment().getIOManager();ioManager = IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());assigner.open(computeManagedMemory(this),ioManager,RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),this::collect);}
// ...

深入分析:

  • 获取 Flink 资源: 它从 Flink 的 Task 环境中获取底层的 IOManager,并用其临时目录来创建 Paimon 自己封装的 IOManager
  • 调用 assigner.open(...): 这是关键的桥接步骤。它将 Flink 的运行时信息传递给 GlobalIndexAssigner
    • computeManagedMemory(this): 计算并传入 Flink 为该算子分配的托管内存(Managed Memory),assigner 会用这部分内存来配置 RocksDB 的 Block Cache 等。
    • ioManager: 传入 IO 管理器。
    • getNumberOfParallelSubtasks(...): 传入作业的并行度。
    • getIndexOfThisSubtask(...): 传入当前子任务的 ID。
    • this::collect: 传入一个方法引用作为回调函数。当 assigner 处理完一条数据后,会调用这个回调,Operator 再通过它将结果发送到下游。

通过这一步,GlobalIndexAssigner 就被完全“激活”并准备好处理数据了。

数据处理 (processElement)

这是算子的核心数据处理逻辑,每当一条数据记录到达时,Flink 就会调用这个方法。

// ...@Overridepublic void processElement(StreamRecord<Tuple2<KeyPartOrRow, InternalRow>> streamRecord)throws Exception {Tuple2<KeyPartOrRow, InternalRow> tuple2 = streamRecord.getValue();InternalRow value = tuple2.f1;switch (tuple2.f0) {case KEY_PART:assigner.bootstrapKey(value);break;case ROW:assigner.processInput(value);break;}}
// ...

深入分析:

  • 数据分发processElement 的逻辑非常清晰,它就像一个交通警察。它解析输入 Tuple2 的第一个字段 KeyPartOrRow
    • 如果值是 KEY_PART,说明这是来自上游 IndexBootstrapOperator 的存量数据,应该用于引导。于是它调用 assigner.bootstrapKey(value)
    • 如果值是 ROW,说明这是来自数据源的增量数据(或在引导结束后重新处理的缓存数据),应该进行正常处理。于是它调用 assigner.processInput(value)

这个 switch 语句完美地实现了对 GlobalIndexAssigner 两阶段工作模式的驱动。

Checkpoint 与批处理结束

这两个方法处理了流处理中的 Checkpoint 和批处理中的结束信号,它们都与 GlobalIndexAssigner 的引导阶段结束有关。

// ...@Overridepublic void prepareSnapshotPreBarrier(long checkpointId) throws Exception {endBootstrap(false);}@Overridepublic void endInput() throws Exception {endBootstrap(true);}private void endBootstrap(boolean isEndInput) throws Exception {if (assigner.inBoostrap()) {assigner.endBoostrap(isEndInput);}}
// ...

深入分析:

  • prepareSnapshotPreBarrier: 在 Flink 中,当 Checkpoint Barrier 到达一个算子之前,会先调用这个方法。这是一个结束引导阶段的完美时机。当第一个 Checkpoint Barrier 到达时,意味着所有存量数据(KEY_PART)都已经处理完毕。此时调用 endBootstrap(false)GlobalIndexAssigner 就会执行批量加载 RocksDB、处理缓存数据等操作,并切换到正常处理模式。
  • endInput: 在批处理模式下,当所有输入数据都处理完毕后,Flink 会调用此方法。这也标志着引导阶段(如果存在)必须结束。此时调用 endBootstrap(true),参数 isEndInput=true 会触发 GlobalIndexAssigner 中针对批处理的特殊优化路径。
  • endBootstrap: 这个私有方法封装了公共逻辑,即只有在 assigner 仍处于引导模式时 (inBoostrap() 为 true),才调用 endBoostrap,避免了重复调用。

当 Flink 作业结束或取消时,会调用 close 方法来释放资源。

// ...@Overridepublic void close() throws Exception {this.assigner.close();if (ioManager != null) {ioManager.close();}}
// ...

深入分析:

  • 它会依次关闭 assigner(这会关闭 RocksDB 并删除临时文件)和 ioManager,确保所有本地资源都被正确清理,不会造成泄露。

总结

GlobalIndexAssignerOperator 是一个典型的适配器模式应用。它本身不包含复杂的业务逻辑,而是作为一个轻量级的“外壳”,将 GlobalIndexAssigner 这个通用的、与计算引擎无关的核心组件,适配到 Flink 的流处理模型中。它通过实现 Flink 的算子接口,巧妙地利用 Flink 的生命周期(initializeStateclose)、数据处理(processElement)和事件机制(prepareSnapshotPreBarrierendInput)来驱动 GlobalIndexAssigner 的两阶段工作流程,是 Paimon 动态分桶功能在 Flink 上得以实现的关键连接点。

GlobalDynamicBucketSink 

GlobalDynamicBucketSink 是 Paimon Flink Sink 体系中,专门用于处理启用了全局索引的动态分桶表(即 bucket-mode = 'cross-partition')的总装配车间。它的核心职责不是执行具体的写入逻辑,而是构建和编排 Flink 的 DataStream 作业拓扑。它像一个总工程师,将 IndexBootstrapOperatorGlobalIndexAssignerOperatorDynamicBucketRowWriteOperator 等一系列专用的算子(Operator)按照正确的顺序和数据分发逻辑连接起来,形成一个完整、高效的数据写入流水线。

public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<InternalRow, Integer>> {
// ...
}

深入分析:

  • extends FlinkWriteSink<Tuple2<InternalRow, Integer>>:
    • 它继承自 FlinkWriteSink,这是一个通用的 Paimon Flink Sink 基类,封装了创建 Writer 和 Committer 的通用逻辑。
    • 泛型参数 Tuple2<InternalRow, Integer> 非常关键,它定义了这个 Sink 流水线中,写入算子 (Writer) 的输入数据类型是 (数据行, bucket号)。这与我们之前分析的 GlobalIndexAssignerOperator 的输出类型完全一致,表明了数据流的衔接关系。

核心方法 build

build 方法是这个类的灵魂,它定义了整个 Flink 作业的拓扑结构。让我们分步解析这个方法的实现。

// ...public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable Integer parallelism) {
// ...// Topology:// input -- bootstrap -- shuffle by key hash --> bucket-assigner -- shuffle by bucket -->// writer --> committer
// ...}
// ...

代码注释已经清晰地勾勒出了整个数据流拓扑,下面我们逐一解析每个阶段:

阶段 0: 输入 (input: DataStream<InternalRow>)

这是数据流的起点,代表着从各种数据源(如 Kafka, CDC Source)流入的原始数据。

阶段 1: 引导 (bootstrap)
// ...SingleOutputStreamOperator<Tuple2<KeyPartOrRow, InternalRow>> bootstraped =input.transform("INDEX_BOOTSTRAP",new InternalTypeInfo<>(new KeyWithRowSerializer<>(bootstrapSerializer, rowSerializer)),new IndexBootstrapOperator.Factory<>(new IndexBootstrap(table), r -> r)).setParallelism(input.getParallelism());
// ...
  • 目的: 为后续的 GlobalIndexAssigner 准备引导数据。
  • 实现:
    • 通过 input.transform 添加了一个名为 INDEX_BOOTSTRAP 的算子,这个算子就是 IndexBootstrapOperator
    • IndexBootstrapOperator 内部持有一个 IndexBootstrap 实例,它会读取 Paimon 表的当前快照,将所有存量数据作为 KeyPartOrRow.KEY_PART 类型输出。
    • 同时,它也会将增量数据(即 input 流中的数据)直接作为 KeyPartOrRow.ROW 类型透传下去。
    • 这样,输出流 bootstraped 中就混合了两种类型的数据,为后续的 GlobalIndexAssignerOperator 的两阶段工作模式做好了准备。
阶段 2: 第一次 Shuffle (shuffle by key hash)
// ...// 1. shuffle by key hashInteger assignerParallelism =MathUtils.max(options.dynamicBucketInitialBuckets(),options.dynamicBucketAssignerParallelism());if (assignerParallelism == null) {assignerParallelism = parallelism;}KeyPartRowChannelComputer channelComputer =new KeyPartRowChannelComputer(rowType, bootstrapType, primaryKeys);DataStream<Tuple2<KeyPartOrRow, InternalRow>> partitionByKeyHash =partition(bootstraped, channelComputer, assignerParallelism);
// ...
  • 目的: 保证相同主键的数据(无论是存量还是增量)都被发送到同一个 bucket-assigner 并行实例上。这是 GlobalIndexAssigner 能够正确维护全局索引状态的前提。
  • 实现:
    • partition 是一个工具方法,它本质上是调用了 Flink 的 partitionCustom API。
    • KeyPartRowChannelComputer 是一个自定义的分区器。它会从输入数据中提取主键,计算其哈希值,然后根据哈希值决定数据应该被发送到哪个下游分区。
阶段 3: 分配桶 (bucket-assigner)
// ...// 2. bucket-assignerTupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO);SingleOutputStreamOperator<Tuple2<InternalRow, Integer>> bucketAssigned =partitionByKeyHash.transform("cross-partition-bucket-assigner",rowWithBucketType,GlobalIndexAssignerOperator.forRowData(table)).setParallelism(partitionByKeyHash.getParallelism());// declare managed memory for RocksDBdeclareManagedMemory(bucketAssigned, options.toConfiguration().get(SINK_CROSS_PARTITION_MANAGED_MEMORY));
// ...
  • 目的: 为每一条数据分配一个最终的 bucket 号。
  • 实现:
    • 在 partitionByKeyHash 流上再次调用 transform,添加了 GlobalIndexAssignerOperator
    • 这个算子接收上游按主键 shuffle 好的数据,利用其内部的 RocksDB 状态和 BucketAssigner 逻辑,为每条数据计算出 bucket
    • 输出流 bucketAssigned 的类型变成了 Tuple2<InternalRow, Integer>,即 (数据行, bucket号)
    • declareManagedMemory 为这个算子声明了需要的 Flink 托管内存,这部分内存主要会被 RocksDB 用于缓存。
阶段 4: 第二次 Shuffle (shuffle by bucket)
// ...// 3. shuffle by bucketDataStream<Tuple2<InternalRow, Integer>> partitionByBucket =partition(bucketAssigned, new RowWithBucketChannelComputer(schema), parallelism);
// ...
  • 目的: 将数据按照分区进行重新分发,确保同一个桶的数据被发送到同一个 writer 实例。这是 Paimon 文件写入的基本要求,因为一个数据文件只能由一个 writer 写入。
  • 实现:
    • 再次调用 partition 工具方法。
    • RowWithBucketChannelComputer 是另一个自定义分区器。它会从 (数据行, bucket号) 中提取分区键和 bucket 号,然后根据这两者的组合哈希值来决定下游分区。
阶段 5: 写入与提交 (writer --> committer)
// ...// 4. writer and committerreturn sinkFrom(partitionByBucket, createCommitUser(options.toConfiguration()));}
// ...@Overrideprotected OneInputStreamOperatorFactory<Tuple2<InternalRow, Integer>, Committable>createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {return new DynamicBucketRowWriteOperator.Factory(table, writeProvider, commitUser);}
// ...
  • 目的: 将数据写入 Paimon 的数据文件,并在 Checkpoint 完成时生成可提交的元数据(Committable)。
  • 实现:
    • sinkFrom 是 FlinkWriteSink 基类提供的方法,它会组装 Writer 和 Committer 算子。
    • createWriteOperatorFactory 方法被重写,返回一个 DynamicBucketRowWriteOperator.Factory。这告诉 sinkFrom 方法,应该使用 DynamicBucketRowWriteOperator 作为写入算子。
    • DynamicBucketRowWriteOperator 接收按桶 shuffle 好的 (数据行, bucket号) 数据,并调用底层的 StoreSinkWrite 将其写入文件。

什么是存量数据和增量数据?

为了更好地理解,我们可以用一个仓库管理的比喻:

  • 存量数据 (Stock Data):

    • 比喻: 想象一下,你今天第一天接管一个仓库。在开始记录今天新进来的货物之前,你首先需要做的,是把仓库里已经存在的所有货物都清点一遍,登记在册。这个“已经存在的货物”就是存量
    • 在 Paimon 中: 当你启动一个 Flink 作业去写入一张 Paimon 表时,这张表里可能已经通过之前的作业写入了很多数据。这些在 Flink 作业启动那一刻已经存在于 Paimon 表文件里的数据,就是存量数据
    • 代码关联IndexBootstrap 这个类的核心职责,就是去扫描 Paimon 表的最新快照,把这些存量数据读取出来。
  • 增量数据 (Incremental Data):

    • 比喻: 在你清点完库存之后,今天陆陆续续有新的货车开到仓库门口,卸下新的货物。这些“新来的货物”就是增量
    • 在 Paimon 中: 当你的 Flink 作业启动并运行后,从数据源(比如 Kafka、CDC)实时流过来的新数据,就是增量数据
    • 代码关联IndexBootstrapOperator 的 processElement 方法,以及 GlobalIndexAssignerOperator 在引导结束后处理的数据,都属于增量数据。

总结一下:存量是“过去时”,是启动时的快照;增量是“现在进行时”,是启动后源源不断流入的数据流。

为什么 GlobalIndexAssignerOperator 需要处理存量数据?

为了保证主键的全局唯一性,并正确处理跨分区的更新(UPSERT)。

GlobalIndexAssignerOperator 的核心使命是为每一条数据分配一个 bucket。对于启用了全局索引的动态分桶表,它必须知道一个主键(Primary Key)当前到底在哪一个分区、哪一个 bucket 里。如果不知道这个信息,它就无法做出正确的判断。

让我们通过一个具体的例子来理解这个过程:

场景:

  • 一张用户表,以 user_id 为主键,login_date 为分区键。
  • merge-engine 设置为 deduplicate(去重,保留最新)。

初始状态 (存量数据):

  • 表里已经有一条数据:{user_id: 101, name: 'Alice', login_date: '2023-10-01'}。这条数据位于分区 pt='2023-10-01',假设在 bucket-3

现在,Flink 作业启动了,GlobalIndexAssignerOperator 开始工作。

第一步:处理存量数据(引导阶段 Bootstrap)

  1. IndexBootstrapOperator 运行 IndexBootstrap 逻辑,扫描全表,找到了存量数据 {user_id: 101, ...}
  2. 它将这条数据标记为 KEY_PART,发送给 GlobalIndexAssignerOperator
  3. GlobalIndexAssignerOperator 收到后,调用 assigner.bootstrapKey()
  4. GlobalIndexAssigner 将这个信息加载到它的本地 RocksDB 索引中,记录下:Key(101) -> Value(partition_id_of_2023-10-01, bucket_3)

至此,引导阶段完成。GlobalIndexAssigner 已经知道了所有“老”数据的位置。

第二步:处理增量数据

  1. 现在,从数据源来了一条新的增量数据{user_id: 101, name: 'Alice_New', login_date: '2023-10-05'}
  2. 这条数据流经 IndexBootstrapOperator,被标记为 ROW,发送给 GlobalIndexAssignerOperator
  3. GlobalIndexAssignerOperator 调用 assigner.processInput()
  4. GlobalIndexAssigner 拿到主键 101,去查询它的 RocksDB 索引。
  5. 它查到了! 索引告诉它,user_id=101 的数据之前存在于分区 pt='2023-10-01' 的 bucket-3 中。
  6. 因为新数据的分区是 '2023-10-05',与老数据不同,这触发了跨分区更新逻辑。
  7. 根据 deduplicate 引擎的规则,它会执行以下操作:
    • 生成一条 DELETE 记录,指向老的位置:{user_id: 101, ..., RowKind: DELETE},并给它分配 bucket-3,发往下游的 Writer。
    • 为新数据 {user_id: 101, name: 'Alice_New', ...} 分配一个新的 bucket(比如 bucket-8),并更新 RocksDB 索引为 Key(101) -> Value(partition_id_of_2023-10-05, bucket_8),然后将这条 INSERT 记录发往下游。

如果没有第一步对存量数据的处理,GlobalIndexAssigner 的 RocksDB 索引就是空的。当第二步的增量数据到来时,它会认为 user_id=101 是一个全新的主键,直接为其分配一个新的 bucket 并插入。这样做的后果是灾难性的:Paimon 表里会同时存在两条 user_id=101 的数据(一条在 2023-10-01 分区,一条在 2023-10-05 分区),主键约束被破坏了

因此,GlobalIndexAssignerOperator 必须先通过 IndexBootstrap 加载和处理所有“之前的数据”(存量数据),构建起一个完整的、反映历史状态的全局索引。只有在这个坚实的基础上,它才能正确地处理后续的增量数据,实现精准的跨分区 UPSERT

总结

GlobalDynamicBucketSink 通过其 build 方法,以一种声明式的方式,完美地编排了一个复杂的多阶段 Flink 流处理作业。这个作业拓扑通过两次关键的 Shuffle 操作,解决了全局索引和数据写入的核心数据分发问题:

  1. 第一次 Shuffle (by Key): 保证了状态计算的正确性(相同主键到同个 assigner)。
  2. 第二次 Shuffle (by Bucket): 保证了数据写入的正确性(相同桶到同个 writer)。

整个流程清晰、高效,充分利用了 Flink 的数据流和算子模型,是 Paimon 实现跨分区动态分桶这一高级功能的基石。

IndexBootstrap 和 IndexBootstrapOperator 

它们共同完成了为全局索引提供存量数据这个关键任务。

IndexBootstrap 是逻辑执行者,负责从 Paimon 表中读取存量数据。 IndexBootstrapOperator 是物理执行者,它在 Flink 环境中运行 IndexBootstrap 的逻辑,并将读取到的存量数据和流经的增量数据一起发送给下游。

IndexBootstrap 的核心使命是:扫描 Paimon 表的最新快照,读取所有存量数据,并提取出构建全局索引所必需的信息

我们来看它的核心方法 bootstrap(int numAssigners, int assignId)

// ... existing code ...public RecordReader<InternalRow> bootstrap(int numAssigners, int assignId) throws IOException {RowType rowType = table.rowType();List<String> fieldNames = rowType.getFieldNames();int[] keyProjection =table.primaryKeys().stream().map(fieldNames::indexOf).mapToInt(Integer::intValue).toArray();// 1. 强制使用 latest 模式扫描ReadBuilder readBuilder =table.copy(Collections.singletonMap(SCAN_MODE.key(), LATEST.toString())).newReadBuilder().withProjection(keyProjection);// 2. 规划并过滤 SplitsDataTableScan tableScan = (DataTableScan) readBuilder.newScan();List<Split> splits =tableScan.withBucketFilter(bucket -> bucket % numAssigners == assignId).plan().splits();// 3. (可选) 根据 TTL 过滤 SplitsCoreOptions options = CoreOptions.fromMap(table.options());Duration indexTtl = options.crossPartitionUpsertIndexTtl();if (indexTtl != null) {// ... filter splits by TTL ...}// 4. 并行读取并拼接数据RowDataToObjectArrayConverter partBucketConverter =new RowDataToObjectArrayConverter(TypeUtils.concat(TypeUtils.project(rowType, table.partitionKeys()),RowType.of(DataTypes.INT())));return parallelExecute(TypeUtils.project(rowType, keyProjection),s -> readBuilder.newRead().createReader(s),splits,options.pageSize(),options.crossPartitionUpsertBootstrapParallelism(),split -> {DataSplit dataSplit = ((DataSplit) split);int bucket = dataSplit.bucket();return partBucketConverter.toGenericRow(new JoinedRow(dataSplit.partition(), GenericRow.of(bucket)));},(row, extra) -> new JoinedRow().replace(row, extra));}
// ... existing code ...

深入分析:

  1. 强制 latest 扫描: 它创建了一个新的 ReadBuilder,并强制将扫描模式(scan.mode)设置为 latest。这意味着它总是读取表的最新快照,确保获取到所有最新的存量数据。同时,它只投影(withProjection)了主键字段,因为引导阶段只需要主键信息。

  2. 规划并过滤 Splits:

    • 它调用 tableScan.plan() 来规划出所有需要读取的数据文件(Splits)。
    • 最关键的一步是 .withBucketFilter(bucket -> bucket % numAssigners == assignId)。这行代码实现了引导任务的并行化numAssigners 是下游 GlobalIndexAssignerOperator 的并行度,assignId 是当前实例的 ID。这个过滤器确保了每个 IndexBootstrap 实例只读取一部分 bucket 的数据,避免了重复工作。
  3. TTL 过滤 (可选): 如果用户配置了 cross-partition-upsert.index-ttl,它会进一步过滤 SplitsfilterSplit 方法会检查 Split 中的文件创建时间,如果所有文件都已经超出了 TTL,那么这个 Split 就会被丢弃,因为它的索引数据已经过期,无需加载。

  4. 并行读取与数据拼接:

    • parallelExecute 是一个工具方法,它会启动一个线程池(并行度由 cross-partition-upsert.bootstrap-parallelism 控制)来并行地读取所有规划好的 Splits
    • 对于每个 Split,它会提取出分区信息和 bucket 号
    • 对于从文件中读取的每一行数据(只包含主键),它会和上一步提取的 (分区, bucket) 信息拼接(JoinedRow)起来。
    • 最终,它返回一个 RecordReader,这个 Reader 产出的每一行 InternalRow 都包含了 (主键字段..., 分区字段..., bucket号),这正是下游 GlobalIndexAssigner 进行引导所需要的全部信息。

IndexBootstrapOperator: Flink 中的“调度器”

IndexBootstrapOperator 是一个 Flink 算子,它为 IndexBootstrap 提供了运行环境,并负责将引导数据和增量数据进行“标记”和“转发”。

public class IndexBootstrapOperator<T> extends AbstractStreamOperator<Tuple2<KeyPartOrRow, T>>implements OneInputStreamOperator<T, Tuple2<KeyPartOrRow, T>> {
// ...@Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);// 1. 启动引导过程bootstrap.bootstrap(RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),this::collect);}@Overridepublic void processElement(StreamRecord<T> streamRecord) throws Exception {// 2. 处理增量数据output.collect(new StreamRecord<>(new Tuple2<>(KeyPartOrRow.ROW, streamRecord.getValue())));}private void collect(InternalRow row) {// 3. 处理引导数据output.collect(new StreamRecord<>(new Tuple2<>(KeyPartOrRow.KEY_PART, converter.apply(row))));}
// ...
}

深入分析:

  1. 启动引导 (initializeState):

    • 在算子初始化时,它会立即调用 bootstrap.bootstrap(...) 方法。
    • 它将 Flink 的并行度(getNumberOfParallelSubtasks)和当前子任务 ID(getIndexOfThisSubtask)传递给 IndexBootstrap,这样 IndexBootstrap 才知道自己应该读取哪些 bucket 的数据。
    • 它将自己的 collect 方法作为回调函数传进去。
  2. 处理增量数据 (processElement):

    • 这个方法处理从上游流过来的增量数据(比如来自 Kafka 的新消息)。
    • 它将每一条增量数据包装成 Tuple2(KeyPartOrRow.ROW, data) 的形式,然后发送给下游。KeyPartOrRow.ROW 这个标记告诉下游的 GlobalIndexAssignerOperator:“这是一条增量数据,请按正常流程处理”。
  3. 处理引导数据 (collect):

    • 这个方法是 IndexBootstrap 的回调函数。当 IndexBootstrap 在后台读取并拼接好一条存量数据后,就会调用这个 collect 方法。
    • 它将收到的存量数据(row)包装成 Tuple2(KeyPartOrRow.KEY_PART, data) 的形式,然后发送给下游。KeyPartOrRow.KEY_PART 这个标记告诉下游的 GlobalIndexAssignerOperator:“这是一条用于引导的存量数据,请调用 bootstrapKey 方法”。

总结

IndexBootstrap 和 IndexBootstrapOperator 的关系可以总结如下:

  • IndexBootstrap 是一个数据源。它负责**“生产”** 用于构建全局索引的存量数据。它通过扫描 Paimon 表文件,并利用 bucket 过滤来实现并行化读取。
  • IndexBootstrapOperator 是一个转换器和转发器。它在 Flink 作业启动时,触发 IndexBootstrap 开始生产数据。然后,它扮演一个交通枢纽的角色:
    • 对于 IndexBootstrap 生产的存量数据,它打上 KEY_PART 标签。
    • 对于从上游流经的增量数据,它打上 ROW 标签。
    • 最后,它将这两种打好标签的数据流合并在一起,发送给下游的 GlobalIndexAssignerOperator 进行统一处理。

这个设计将存量数据读取增量数据处理巧妙地统一到了一个 Flink 数据流中,为后续的全局索引构建和动态分桶奠定了基础。

http://www.dtcms.com/a/551527.html

相关文章:

  • 关于建网站新闻一键提交收录
  • 云南省网站备案要求近期新闻热点事件及评论
  • 如何做php网站南通网站优化公司
  • 苏南网站建设互联网营销
  • 网站首页静态好还是动态好本科自考是什么意思
  • 建一个商城网站多少钱优秀的个人博客网站
  • 河南双师培训网站深圳制作网站搜行者seo
  • 寿光专业做网站的公司有哪些wordpress模板 户外钓鱼类网站
  • 虹桥街道网站建设做响应式网站最大宽度
  • 泰州网站优化公司做网站骗钱
  • 如何进入网站管理页面做网站一般注意些什么
  • 网络会议系统app网站首页布局seo
  • 千锋教育费用多少seo服务的三种方式
  • 广州网站建设模板网站的百度推广怎么做
  • 个人做免费的网站无锡专业制作外贸网站的公司
  • 想接做网站的单子建设企业网站e路护航
  • 品牌网站开发背景asp网站源码下载
  • openwrt 做视频网站网站备案接入ip
  • 学校网站建设费用服务平台管理系统
  • 先做公众号在做网站nx二次开发
  • 网站建设 猫云seo建设银行辽宁分行招聘网站
  • 微网站首页模板wordpress用户修改邮箱
  • 怎么说服企业做网站怎么恶意点击对手竞价
  • 崇左市住房和城乡建设局网站做网站挂广告赚多少
  • 网站首页为什么不收录wordpress 在线skype
  • 高性能网站建设 pdf产品推广策划案
  • 山东莱钢建设有限公司网站策划书格式模板范文
  • 网站备案查询系统电商网站定制
  • 做家政的在哪些网站推广登陆网站密码不保存怎么做
  • 免费制作logo的网站丽水市住房和城乡建设局网站