当前位置: 首页 > news >正文

Paimon INSERT OVERWRITE

在 Paimon 中,INSERT OVERWRITE 是一种原子性的数据替换操作。它不像传统数据库的 DELETE + INSERT,而是一个完整的、不可分割的事务。

  • 原子性 (Atomicity):整个覆盖操作要么完全成功,要么完全失败。不会出现数据被部分删除或部分写入的中间状态。即使作业失败,表数据也会回滚到操作之前的状态。
  • 事务性 (Transactional):每一次 OVERWRITE 都会生成一个新的、完整的表快照 (Snapshot)。这使得数据具备了版本管理能力,支持时间旅行(Time Travel)。
  • 高效性 (Efficiency):对于分区表,它只重写涉及到的分区,未触及的分区数据保持不变,开销可控。

INSERT OVERWRITE 主要分为两种模式:静态分区覆盖 (Static Partition Overwrite) 和 动态分区覆盖 (Dynamic Partition Overwrite)

静态分区覆盖

定义:在 INSERT 语句中明确指定要覆盖哪些分区。Paimon 会清空这些指定分区内的所有数据,然后将新数据写入。

适用场景

  • 清理并重写某个或某几个历史分区的数据。
  • 对整个表进行数据重写(不指定分区时)。

语法示例 (Spark SQL):

-- 假设表 T 有两个分区键: pt1, pt2
CREATE TABLE T (id INT, name STRING, pt1 STRING, pt2 INT) PARTITIONED BY (pt1, pt2);-- 覆盖单个分区 pt1='a', pt2=1
INSERT OVERWRITE T PARTITION (pt1 = 'a', pt2 = 1)
SELECT id, name FROM source_table WHERE ...;-- 覆盖所有 pt1='a' 的分区 (pt2 的值不限)
-- 注意:Spark 默认是静态覆盖模式,所以这会覆盖整个表,除非配置为动态模式
-- 更安全的静态覆盖是明确指定所有分区键
INSERT OVERWRITE T PARTITION (pt1 = 'a') -- 这种语法在不同引擎下行为可能不同,需谨慎
SELECT id, name, pt2 FROM source_table WHERE ...;-- 覆盖整个表 (清空所有分区)
INSERT OVERWRITE T
SELECT id, name, pt1, pt2 FROM source_table;
动态分区覆盖

定义:不直接在 INSERT 语句中指定分区,而是根据 SELECT 查询结果中的数据来动态决定需要覆盖哪些分区。如果查询结果中包含了 pt1='a' 和 pt1='b' 的数据,那么只有这两个分区会被覆盖。

适用场景

  • 日常的 ETL 任务,根据上游数据重跑当天的分区。
  • 数据修正,输入一个包含多个分区修正数据的源,一次性覆盖所有相关分区。

语法示例 (Spark SQL):

-- 必须先开启 Spark 的动态分区覆盖模式
SET spark.sql.sources.partitionOverwriteMode=dynamic;-- 假设 source_table 中包含 pt1='a' 和 pt1='b' 的数据
-- 那么 T 表中只有 pt1='a' 和 pt1='b' 这两个分区会被覆盖
-- T 表中原有的 pt1='c', pt1='d' 等分区数据不受影响
INSERT OVERWRITE T
SELECT id, name, pt1, pt2 FROM source_table;

在 Flink 中,默认就是动态分区覆盖。如果想改为静态,需要加 Hint:

-- Flink SQL: 静态覆盖整个表
INSERT OVERWRITE T /*+ OPTIONS('dynamic-partition-overwrite'='false') */
SELECT ...;

如何工作:内部实现机制(重点)

INSERT OVERWRITE 的原子性和事务性完全依赖于 Paimon 的元数据和文件管理机制

我们以一个动态分区覆盖为例,追踪其内部流程:

Step 1: 发起写入
  • 当执行 INSERT OVERWRITE 命令时,计算引擎(如 Spark/Flink)的 Sink 任务开始向 Paimon 写入数据。
  • Paimon 的 TableWrite 对象被创建,并且通过 withOverwrite(partition) 方法被标记为覆盖模式。对于动态覆盖,partition 参数为空,表示由数据动态决定。
Step 2: 数据写入与生成新文件
  • Sink 任务读取源数据,根据分区键将数据分组,然后写入到对应分区的 新数据文件(Data File)中。
  • 例如,要覆盖 pt='a' 和 pt='b' 两个分区,Paimon 会在 path/to/table/pt=a/ 和 path/to/table/pt=b/ 目录下创建新的 .orc 或 .parquet 文件。
  • 关键点:此时,旧的数据文件仍然存在,线上读取任务看到的是旧数据。
Step 3: 准备提交 (Prepare Commit)
  • 所有 Sink 任务完成数据写入后,会将它们生成的新文件列表作为 CommitMessage 发送给 Driver/JobManager 端的 Commit 算子。
  • CommitMessage 中包含了每个新数据文件的元信息,如文件名、大小、行数、分区值等。
Step 4: 执行提交 (Commit)

这是 OVERWRITE 的核心所在。TableCommit 对象会整理快照和提交。

Step 5: 清理旧文件 (Cleanup)
  • 旧的数据文件(被 DELETE 标记的)不会立即被物理删除。它们会保留一段时间(由 snapshot.time-retained 配置决定)。
  • Paimon 的后台清理线程或下一次 commit 时会检查过期的快照,并将这些快照引用的、且后续快照不再引用的文件进行物理删除。这为时间旅行和故障恢复提供了保障。

总结

INSERT OVERWRITE 在 Paimon 中是一个精心设计的、高度可靠的操作。

  • 对用户而言,它提供了灵活的静态和动态数据覆盖能力。
  • 在内部,它通过 Copy-on-Write 的思想,结合快照(Snapshot) -> 清单列表(Manifest List) -> 清单(Manifest) 的三级元数据结构,将一个复杂的数据替换操作,最终简化为一次原子性的文件重命名,从而保证了整个过程的原子性、一致性和事务性。

TableCommitImpl 类中的 withOverwrite 

withOverwrite 方法是用来配置“覆盖写”操作的。在数据库和数据仓库中,覆盖写(Overwrite)通常指的是用新的数据集完全替换掉表中已有的数据,或者替换掉特定分区中的数据。这对应了 SQL 中的 INSERT OVERWRITE 语义。

// ... existing code ...@Overridepublic TableCommitImpl withOverwrite(@Nullable Map<String, String> overwritePartitions) {this.overwritePartition = overwritePartitions;return this;}
// ... existing code ...

overwritePartition 是一个 Map<String, String>,它定义了覆盖写的范围:

  • 如果 overwritePartition 是一个空 Map (例如 Collections.emptyMap()),这表示要覆盖整个表。这通常用于非分区表,或者需要覆盖所有分区的分区表。
  • 如果 overwritePartition 包含了分区键值对 (例如 {"dt": "2024-01-01", "hr": "09"}), 这表示只覆盖符合该分区规范的特定分区。

独特的处理逻辑

当 withOverwrite 被调用后,overwritePartition 字段不再为 null,这会触发 TableCommitImpl 中一系列独特的处理逻辑,主要集中在 commitMultiple 方法中。

让我们看一下 commitMultiple 方法的关键部分:

// ... existing code ...public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {if (overwritePartition == null) {// 这是正常的追加(append)或合并(merge)提交逻辑for (ManifestCommittable committable : committables) {commit.commit(committable, new HashMap<>(), checkAppendFiles);}
// ... existing code ...} else {// **** 这是 overwrite 模式下的特殊处理逻辑 ****ManifestCommittable committable;if (committables.size() > 1) {// 1. 检查:覆盖写操作期望只有一个提交单元(committable)throw new RuntimeException("Multiple committables appear in overwrite mode, this may be a bug, please report it: "+ committables);} else if (committables.size() == 1) {committable = committables.get(0);} else {// 2. 如果没有新的数据要写入(例如,用一个空数据集覆盖分区),//    会创建一个空的 committable 来触发底层的覆盖逻辑。// create an empty committable// identifier is Long.MAX_VALUE, come from batch job// TODO maybe it can be produced by CommitterOperatorcommittable = new ManifestCommittable(Long.MAX_VALUE);}// 3. 核心调用:调用底层的 FileStoreCommit 的 overwrite 方法,//    将分区信息和新的文件列表传递下去。commit.overwrite(overwritePartition, committable, Collections.emptyMap());expire(committable.identifier(), expireMainExecutor);}}
// ... existing code ...

总结一下其独特处理:

  1. 逻辑分支commitMultiple 方法通过检查 overwritePartition 是否为 null 来区分是常规提交还是覆盖写提交,并进入不同的逻辑分支。

  2. 单一提交限制:在覆盖写模式下,系统预期一次 commit 操作只对应一个 ManifestCommittable。这是因为覆盖写是一个原子性的替换操作,将多个独立的提交单元合并到一个覆盖写操作中没有明确的语义,且容易出错。

  3. 空提交处理:即使没有新的数据写入(committables 为空),只要指定了 withOverwrite,Paimon 依然会生成一个快照。这用于处理“清空分区”或“清空表”的场景(INSERT OVERWRITE ... SELECT ... WHERE false)。

  4. 调用底层实现:它最终不会调用常规的 commit.commit(),而是调用 commit.overwrite()。这个 overwrite 方法(在 FileStoreCommitImpl 中实现)会负责:

    • 标记指定分区(或全表)下的所有旧数据文件为已删除。
    • 添加本次提交带来的新数据文件。
    • 将这些变更原子性地提交为一个新的快照(Snapshot)。
  5. 强制生成快照:覆盖写操作总是会生成一个新的快照,即使没有写入新数据。这体现在 forceCreatingSnapshot 方法中:

    // ... existing code ...
    public boolean forceCreatingSnapshot() {if (this.forceCreatingSnapshot) {return true;}if (overwritePartition != null) {return true;}return tagAutoManager != null
    

如何使用

在实践中,withOverwrite 通常由 BatchWriteBuilder 来调用,如下面的代码所示:

// ... existing code ...@Overridepublic BatchTableCommit newCommit() {InnerTableCommit commit = table.newCommit(commitUser).withOverwrite(staticPartition);commit.ignoreEmptyCommit(true);return commit;}
// ... existing code ...

当用户执行 INSERT OVERWRITE SQL 语句时,Paimon 的 Flink 或 Spark Connector 会构建一个 BatchWriteBuilder,并通过 withOverwrite 方法设置要覆盖的分区,最终在提交时触发上述的特殊逻辑。

总而言之,withOverwrite 是 Paimon 中实现原子性覆盖写功能的关键入口。它通过设置一个内部状态(overwritePartition),在提交时触发一套完全不同于常规追加/合并的逻辑,以确保数据替换的原子性和正确性。

FileStoreCommitImpl 中的 overwrite 

这个方法是 Paimon 执行 INSERT OVERWRITE 语义的底层核心实现。当上层的 TableCommitImpl 确定这是一个覆盖写操作后,就会调用这个方法。它的主要职责是计算出需要删除的旧文件和需要添加的新文件,并将这些变更原子地提交为一个新的快照(Snapshot)。

我们来分解一下这个方法的具体处理流程:

// ... existing code ...@Overridepublic void overwrite(Map<String, String> partition,ManifestCommittable committable,Map<String, String> properties) {if (LOG.isDebugEnabled()) {
// ... existing code ...}long started = System.nanoTime();int generatedSnapshot = 0;int attempts = 0;// 1. 收集本次提交带来的所有文件变更List<ManifestEntry> appendTableFiles = new ArrayList<>();List<ManifestEntry> appendChangelog = new ArrayList<>();List<ManifestEntry> compactTableFiles = new ArrayList<>();List<ManifestEntry> compactChangelog = new ArrayList<>();List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();collectChanges(committable.fileCommittables(),appendTableFiles,appendChangelog,compactTableFiles,compactChangelog,appendHashIndexFiles,compactDvIndexFiles);// 2. 覆盖写模式下不处理 changelog 文件,并打印警告//    因为覆盖写是破坏性操作,会破坏流读的连续性。if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {StringBuilder warnMessage =new StringBuilder("Overwrite mode currently does not commit any changelog.\n"+ "Please make sure that the partition you're overwriting "+ "is not being consumed by a streaming reader.\n"+ "Ignored changelog files are:\n");
// ... existing code ...LOG.warn(warnMessage.toString());}try {boolean skipOverwrite = false;// 3. 根据静态/动态分区模式,创建分区过滤器,用于确定要删除哪些分区的数据PartitionPredicate partitionFilter = null;if (dynamicPartitionOverwrite) {// 动态分区覆盖模式if (appendTableFiles.isEmpty()) {// 如果没有新数据写入,则跳过覆盖操作,不会删除任何数据skipOverwrite = true;} else {// 根据新写入数据涉及的分区,来确定要覆盖哪些分区Set<BinaryRow> partitions =appendTableFiles.stream().map(ManifestEntry::partition).collect(Collectors.toSet());partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions);}} else {// 静态分区覆盖模式 (或者非分区表)Predicate partitionPredicate =createPartitionPredicate(partition, partitionType, partitionDefaultName);partitionFilter =PartitionPredicate.fromPredicate(partitionType, partitionPredicate);// 安全检查:确保所有新写入的文件都属于要覆盖的分区if (partitionFilter != null) {for (ManifestEntry entry : appendTableFiles) {if (!partitionFilter.test(entry.partition())) {throw new IllegalArgumentException(
// ... existing code ...+ " does not belong to this partition");}}}}// 4. 调用 tryOverwrite 执行核心的覆盖写逻辑if (!skipOverwrite) {attempts +=tryOverwrite(partitionFilter,appendTableFiles,appendHashIndexFiles,committable.identifier(),committable.watermark(),committable.logOffsets());generatedSnapshot += 1;}// 5. 如果本次提交还包含数据压缩(compaction)产生的文件,则单独提交一次 COMPACT 类型的快照if (!compactTableFiles.isEmpty() || !compactDvIndexFiles.isEmpty()) {attempts +=tryCommit(compactTableFiles,emptyList(),compactDvIndexFiles,committable.identifier(),committable.watermark(),committable.logOffsets(),Snapshot.CommitKind.COMPACT,mustConflictCheck(),null);generatedSnapshot += 1;}} finally {// 6. 报告提交指标long commitDuration = (System.nanoTime() - started) / 1_000_000;if (this.commitMetrics != null) {reportCommit(appendTableFiles,emptyList(),compactTableFiles,emptyList(),commitDuration,generatedSnapshot,attempts);}}}
// ... existing code ...private int tryOverwrite(@Nullable PartitionPredicate partitionFilter,List<ManifestEntry> changes,List<IndexManifestEntry> indexFiles,long identifier,@Nullable Long watermark,Map<Integer, Long> logOffsets) {// 7. tryOverwrite 的核心逻辑Snapshot latestSnapshot = snapshotManager.latestSnapshot();List<ManifestEntry> changesWithOverwrite = new ArrayList<>();List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();if (latestSnapshot != null) {// 读取最新快照中所有的文件列表List<ManifestEntry> currentEntries = readAllEntries(latestSnapshot);for (ManifestEntry entry : currentEntries) {// 如果文件属于被覆盖的分区 (partitionFilter.test),则标记为 DELETE// 否则,保留原样if (partitionFilter == null || !partitionFilter.test(entry.partition())) {changesWithOverwrite.add(entry);} else {changesWithOverwrite.add(new ManifestEntry(FileKind.DELETE,entry.partition(),entry.bucket(),entry.totalBuckets(),entry.file()));}}
// ... existing code ...}// 8. 将本次要新增的文件(来自参数 changes)加入列表changesWithOverwrite.addAll(changes);indexChangesWithOverwrite.addAll(indexFiles);// 9. 使用包含“删除旧文件”和“添加新文件”的完整列表,进行一次 OVERWRITE 类型的提交return tryCommit(changesWithOverwrite,emptyList(),indexChangesWithOverwrite,identifier,watermark,logOffsets,Snapshot.CommitKind.OVERWRITE,mustConflictCheck(),null);}
// ... existing code ...

总结 overwrite 方法的处理流程:

  1. 收集变更:首先,它将传入的 ManifestCommittable 中的文件变更信息进行分类,比如哪些是新增的数据文件,哪些是compaction产生的文件等。
  2. 忽略Changelogoverwrite 操作会明确地忽略所有 changelog 文件。这是因为 overwrite 是一个全量替换操作,会破坏数据的增量变更历史,与流式读取的语义不兼容。因此系统会打印警告,提醒用户不要在流读任务正在消费的分区上执行覆盖操作。
  3. 确定覆盖范围:这是最关键的步骤之一。它通过 dynamicPartitionOverwrite 配置项来决定如何确定要覆盖的分区范围。
    • 动态覆盖 (true):覆盖的范围由本次写入的新数据所在的分区决定。如果写入了 p1 和 p2 两个分区的数据,那么只有 p1 和 p2 的旧数据会被删除。如果本次写入没有产生任何新数据,则不会删除任何分区。
    • 静态覆盖 (false):覆盖的范围由传入的 partition 参数决定,这通常对应 SQL 中的 PARTITION (...) 子句。所有新写入的数据必须属于这个静态指定的分区。
  4. 执行原子替换:它调用私有的 tryOverwrite 方法。这个方法: a. 读取当前最新的快照,获取全部分区的所有数据文件列表。 b. 遍历这些文件,如果一个文件属于上一步确定的覆盖范围,就将其标记为 DELETE。不属于覆盖范围的文件则保持不变。 c. 将本次提交要新增的数据文件标记为 ADD。 d. 将所有这些变更(DELETE旧文件 + ADD新文件)作为一个整体,通过 tryCommit 方法以 Snapshot.CommitKind.OVERWRITE 的类型原子性地提交,生成一个新的快照。
  5. 处理其他变更:如果 committable 中还包含了数据压缩(compaction)的结果,会在 overwrite 操作之后,再进行一次独立的 COMPACT 类型的提交。
  6. 特殊用例truncateTable(清空表)和 dropPartitions(删除分区)方法内部也复用了 tryOverwrite 逻辑。它们可以看作是特殊的 overwrite,即用一个空的数据集去覆盖整个表或指定分区。

FileStoreCommitImpl.overwrite 是一个精心设计的底层方法,它通过精确地计算文件变更集(哪些删除、哪些新增),并利用 Paimon 的快照机制,实现了分区或整表的原子性覆盖写。

http://www.dtcms.com/a/289313.html

相关文章:

  • 一维数组练题习~
  • PyTorch的基础概念和复杂模型的基本使用
  • 【软件测试】从软件测试到Bug评审:生命周期与管理技巧
  • ESXi6.7硬件传感器红色警示信息
  • ICT模拟零件测试方法--测量参数详解
  • ThinkPHP8极简上手指南:开启高效开发之旅
  • 基于机器视觉的迈克耳孙干涉环自动计数系统设计与实现
  • STM32CubeMX的一些操作步骤的作用
  • 拼写纠错模型Noisy Channel(下)
  • 机器学习理论基础 - 核心概念篇
  • 复杂度优先:基于推理链复杂性的提示工程新范式
  • Linux操作系统之线程(四):线程控制
  • 20250720-1-Kubernetes 调度-白话理解创建一个Pod的内部工作流_笔记
  • Qt的安装和环境配置
  • Ubuntu挂载和取消挂载
  • 【vue-7】Vue3 响应式数据声明:深入理解 reactive()
  • Matlab自学笔记六十四:求解自变量带有约束条件的方程
  • 相同问题的有奇点模型和无奇点模型有什么区别
  • 服务器上的文件复制到本地 Windows 系统
  • [学习] 深入理解傅里叶变换:从时域到频域的桥梁
  • 04训练windows电脑低算力显卡如何部署pytorch实现GPU加速
  • LINUX720 SWAP扩容;新增逻辑卷;逻辑卷扩容;数据库迁移;gdisk
  • 【超越VGGT】π3-利用置换等变方法去除3r系列的归纳偏置
  • 机器视觉---深度图像存储格式
  • 监督学习应用
  • 零基础学习性能测试第三章:执行性能测试
  • Spring Boot 订单超时自动取消的 3 种主流实现方案
  • 将SAC强化学习算法部署到ROS2的完整指南
  • 基于卷积傅里叶分析网络 (CFAN)的心电图分类的统一时频方法
  • 复杂度+包装类型+泛型