当前位置: 首页 > news >正文

Paimon Consumer机制解析

Consumer

Consumer 在 Paimon 中扮演着至关重要的角色,它是实现流式读取、断点续传和安全消费(防止快照过早过期)的核心机制。

首先,我们来看Consumer.java 文件。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.consumer;import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.JsonSerdeUtil;import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException;import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Optional;/** Consumer which contains next snapshot. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Consumer {private static final String FIELD_NEXT_SNAPSHOT = "nextSnapshot";private final long nextSnapshot;@JsonCreatorpublic Consumer(@JsonProperty(FIELD_NEXT_SNAPSHOT) long nextSnapshot) {this.nextSnapshot = nextSnapshot;}@JsonGetter(FIELD_NEXT_SNAPSHOT)public long nextSnapshot() {return nextSnapshot;}public String toJson() {return JsonSerdeUtil.toJson(this);}public static Consumer fromJson(String json) {return JsonSerdeUtil.fromJson(json, Consumer.class);}public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {int retryNumber = 0;MismatchedInputException exception = null;while (retryNumber++ < 10) {try {return fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson);} catch (MismatchedInputException e) {// retryexception = e;try {Thread.sleep(1_000);} catch (InterruptedException ie) {Thread.currentThread().interrupt();throw new RuntimeException(ie);}} catch (IOException e) {throw new UncheckedIOException(e);}}throw new UncheckedIOException(exception);}
}

从代码结构来看,Consumer 是一个非常简单的 POJO (Plain Old Java Object),其核心职责可以概括为:

  • 状态载体:它只包含一个核心字段 nextSnapshot,用于记录一个消费者(由 consumer-id 标识)下一次应该从哪个快照(Snapshot)开始消费。
  • 序列化/反序列化:它提供了 toJson() 和 fromJson() 方法,利用 Jackson 库将自身对象与 JSON 字符串进行转换。这表明消费者的状态是被持久化为 JSON 文件的。
  • 原子化读写fromPath() 方法是关键。它负责从文件系统中读取并解析一个 Consumer 对象。
    • 它使用了 fileIO.readOverwrittenFileUtf8(path),这暗示了消费者状态文件是可被原子性覆盖更新的。
    • 它包含了一个重试机制。当捕获到 MismatchedInputException 时,会进行最多10次、每次间隔1秒的重试。这个异常通常发生在读写并发的场景下(例如,一个 Flink 作业正在更新 consumer 文件,而另一个进程(如 expire 任务)恰好在读取这个文件),文件可能不完整。这个重试机制保证了在分布式环境下的读取鲁棒性。

Consumer 的管理与使用:ConsumerManager

Consumer 类本身只是一个数据模型,其生命周期的管理由 ConsumerManager 类负责。

// ... existing code ...
public class ConsumerManager implements Serializable {// ... existing code ...private final FileIO fileIO;private final Path tablePath;private final String branch;private static final String CONSUMER_PREFIX = "consumer-";// ... existing code ...public ConsumerManager(FileIO fileIO, Path tablePath, String branchName) {this.fileIO = fileIO;this.tablePath = tablePath;this.branch =StringUtils.isNullOrWhitespaceOnly(branchName) ? DEFAULT_MAIN_BRANCH : branchName;}public Optional<Consumer> consumer(String consumerId) {return Consumer.fromPath(fileIO, consumerPath(consumerId));}public void resetConsumer(String consumerId, Consumer consumer) {try {fileIO.overwriteFileUtf8(consumerPath(consumerId), consumer.toJson());} catch (IOException e) {throw new UncheckedIOException(e);}}public void deleteConsumer(String consumerId) {fileIO.deleteQuietly(consumerPath(consumerId));}// ... existing code ...public Map<String, Long> consumers() throws IOException {Map<String, Long> consumers = new HashMap<>();listOriginalVersionedFiles(fileIO, consumerDirectory(), CONSUMER_PREFIX).forEach(id -> {Optional<Consumer> consumer = this.consumer(id);consumer.ifPresent(value -> consumers.put(id, value.nextSnapshot()));});return consumers;}
// ... existing code ...private Path consumerPath(String consumerId) {return new Path(branchPath(tablePath, branch) + "/consumer/" + CONSUMER_PREFIX + consumerId);}
}

ConsumerManager 提供了对 Consumer 的完整 CRUD 操作:

  • 创建/更新 (resetConsumer): 将一个 Consumer 对象序列化为 JSON,并写入到指定的文件中。文件名格式为 consumer-<consumerId>,存储在表路径下的 consumer 目录中。
  • 读取 (consumer): 根据 consumerId 读取并解析对应的 Consumer 对象。
  • 删除 (deleteConsumer): 删除指定的 consumer 文件。
  • 列举 (consumers): 扫描 consumer 目录,列出所有存在的消费者及其 nextSnapshot ID。

Consumer 的核心价值与应用场景

结合文档和测试用例,我们可以看到 Consumer 机制的两个核心价值:

a. 安全消费 (Safe Consumption)

Paimon 会定期清理旧的、不再需要的快照以节省存储空间。但如果一个流作业正在消费某个旧快照的数据,这个快照就不能被删除。Consumer 机制解决了这个问题。

文档摘录 (docs/content/flink/consumer-id.md):

  1. Safe consumption: When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in the file system, and if there are consumers that still depend on this snapshot, then this snapshot will not be deleted by expiration.

Paimon 的快照清理逻辑会通过 ConsumerManager 获取所有 Consumer 的 nextSnapshot,并找到其中的最小值。任何比这个最小值还旧的快照都可以被安全地删除,而大于等于这个最小值的快照则会被保留,从而保证了没有任何消费者会因为快照被清理而丢失数据。

b. 断点续传 (Resume from Breakpoint)

这是流式处理中非常重要的容错能力。当一个流作业失败重启后,它需要知道上次消费到了哪里。

文档摘录 (docs/content/flink/consumer-id.md): 2. Resume from breakpoint: When previous job is stopped, the newly started job can continue to consume from the previous progress without resuming from the state.

当你在 Flink 或 Spark 中使用 Paimon a作为 Source 并指定 consumer-id 时:

  1. 作业运行时,会定期将消费进度(即下一个要消费的快照ID)通过 ConsumerManager 更新到对应的 consumer-<consumer-id> 文件中。
  2. 当作业重启时,它会首先读取这个文件,获取 nextSnapshot,然后从这个快照开始继续消费,实现了无缝的断点续传。

例如,在 Flink SQL 中这样使用:

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.mode' = 'at-least-once') */;

用户交互与可见性

Paimon 提供了多种方式让用户可以查看和管理 Consumer

a. 系统表 $consumers

你可以像查询普通表一样查询一个表的消费者信息。

文档摘录 (docs/content/concepts/system-tables.md):

SELECT * FROM my_table$consumers;/*
+-------------+------------------+
| consumer_id | next_snapshot_id |
+-------------+------------------+
|         id1 |                1 |
|         id2 |                3 |
+-------------+------------------+
2 rows in set
*/

其底层实现 ConsumersTable.java 正是调用了 ConsumerManager.consumers() 方法来获取数据。

ConsumersTable.java

// ... existing code ...public RecordReader<InternalRow> createReader(Split split) throws IOException {
// ... existing code ...Path location = ((ConsumersTable.ConsumersSplit) split).location;Map<String, Long> consumers = new ConsumerManager(fileIO, location, branch).consumers();Iterator<InternalRow> rows =Iterators.transform(consumers.entrySet().iterator(), this::toRow);
// ... existing code ...}
// ... existing code ...
b. Action 与 Procedure

当需要手动干预消费位点时(比如修复数据问题、重跑任务),可以使用 Paimon 提供的 Action 或 Procedure。

  • reset_consumer: 重置或删除一个指定的 consumer。
  • clear_consumers: 批量清理符合条件的 consumer。

文档摘录 (docs/content/spark/procedures.md):

-- reset the new next snapshot id in the consumer
CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid', nextSnapshotId => 10)-- delete consumer
CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')-- clear all consumers in the table
CALL sys.clear_consumers(table => 'default.T')

这些操作的底层最终都会调用 ConsumerManager 的 resetConsumer 或 deleteConsumer 等方法。

总结

Consumer 类虽然代码简单,但它是 Paimon 流处理生态的基石之一。我们可以将其理解为一个分布式、持久化的消费位点标记

  • 物理表现:一个存储在表目录下的、名为 consumer-<id> 的 JSON 文件。
  • 核心内容:记录了 nextSnapshot,即下一次要消费的快照 ID。
  • 管理接口:通过 ConsumerManager 进行统一的增删改查。
  • 核心功能:支撑了流式消费的断点续传和 Paimon 表的快照安全保留两大关键特性。
  • 用户可见性:通过 $consumers 系统表和 reset/clear 等 Procedure/Action 暴露给用户,提供了强大的可观测性和可管理性。

http://www.dtcms.com/a/299867.html

相关文章:

  • 守护汽车“空中升级“:基于HSM/KMS的安全OTA固件签名与验证方案
  • 通过redis_exporter监控redis cluster
  • 1. Qt多线程开发
  • JavaEE初阶第十一期:解锁多线程,从 “单车道” 到 “高速公路” 的编程升级(九)
  • 第10篇:实战验收篇
  • 无需云服务器的内网穿透方案 -- cloudflare tunnel
  • 特产|基于SSM+vue的南阳特产销售平台(源码+数据库+文档)
  • 如何实现打印功能
  • 大话数据结构之 < 栈>(C语言)
  • Java中mybatis 无参构造器?你会吗
  • Spring AI 项目实战(二十):基于Spring Boot + AI + DeepSeek的智能环境监测与分析平台(附完整源码)
  • 修改site-packages位置与pip配置
  • Kubernetes 与 Docker的爱恨情仇
  • 面试实战,问题十三,Redis在Java项目中的作用及使用场景详解,怎么回答
  • 面试问题总结——关于OpenCV(二)
  • 【电赛学习笔记】MaxiCAM 的OCR图片文字识别
  • 力扣404.左叶子之和
  • jxORM--查询数据
  • ART配对软件使用
  • Macast配置
  • ThreadLocal--ThreadLocal介绍
  • 7.26 cpu
  • 单片机ADC机理层面详细分析(一)
  • SSE (Server-Sent Events) 服务出现连接卡在 pending 状态的原因
  • 嵌入式软硬件开发入门工具推荐
  • `read`系统调用示例
  • java每日精进 7.26【流程设计5.0(中间事件+结束事件)】
  • 检索召回率优化探究一:基于 LangChain 0.3集成 Milvus 2.5向量数据库构建的智能问答系统
  • 全球化2.0 | 云轴科技ZStack亮相阿里云印尼国有企业CXO专家活动
  • FreeMarker模板引擎