Paimon INSERT OVERWRITE
在 Paimon 中,INSERT OVERWRITE
是一种原子性的数据替换操作。它不像传统数据库的 DELETE
+ INSERT
,而是一个完整的、不可分割的事务。
- 原子性 (Atomicity):整个覆盖操作要么完全成功,要么完全失败。不会出现数据被部分删除或部分写入的中间状态。即使作业失败,表数据也会回滚到操作之前的状态。
- 事务性 (Transactional):每一次
OVERWRITE
都会生成一个新的、完整的表快照 (Snapshot)。这使得数据具备了版本管理能力,支持时间旅行(Time Travel)。 - 高效性 (Efficiency):对于分区表,它只重写涉及到的分区,未触及的分区数据保持不变,开销可控。
INSERT OVERWRITE
主要分为两种模式:静态分区覆盖 (Static Partition Overwrite) 和 动态分区覆盖 (Dynamic Partition Overwrite)。
静态分区覆盖
定义:在 INSERT
语句中明确指定要覆盖哪些分区。Paimon 会清空这些指定分区内的所有数据,然后将新数据写入。
适用场景:
- 清理并重写某个或某几个历史分区的数据。
- 对整个表进行数据重写(不指定分区时)。
语法示例 (Spark SQL):
-- 假设表 T 有两个分区键: pt1, pt2
CREATE TABLE T (id INT, name STRING, pt1 STRING, pt2 INT) PARTITIONED BY (pt1, pt2);-- 覆盖单个分区 pt1='a', pt2=1
INSERT OVERWRITE T PARTITION (pt1 = 'a', pt2 = 1)
SELECT id, name FROM source_table WHERE ...;-- 覆盖所有 pt1='a' 的分区 (pt2 的值不限)
-- 注意:Spark 默认是静态覆盖模式,所以这会覆盖整个表,除非配置为动态模式
-- 更安全的静态覆盖是明确指定所有分区键
INSERT OVERWRITE T PARTITION (pt1 = 'a') -- 这种语法在不同引擎下行为可能不同,需谨慎
SELECT id, name, pt2 FROM source_table WHERE ...;-- 覆盖整个表 (清空所有分区)
INSERT OVERWRITE T
SELECT id, name, pt1, pt2 FROM source_table;
动态分区覆盖
定义:不直接在 INSERT
语句中指定分区,而是根据 SELECT
查询结果中的数据来动态决定需要覆盖哪些分区。如果查询结果中包含了 pt1='a'
和 pt1='b'
的数据,那么只有这两个分区会被覆盖。
适用场景:
- 日常的 ETL 任务,根据上游数据重跑当天的分区。
- 数据修正,输入一个包含多个分区修正数据的源,一次性覆盖所有相关分区。
语法示例 (Spark SQL):
-- 必须先开启 Spark 的动态分区覆盖模式
SET spark.sql.sources.partitionOverwriteMode=dynamic;-- 假设 source_table 中包含 pt1='a' 和 pt1='b' 的数据
-- 那么 T 表中只有 pt1='a' 和 pt1='b' 这两个分区会被覆盖
-- T 表中原有的 pt1='c', pt1='d' 等分区数据不受影响
INSERT OVERWRITE T
SELECT id, name, pt1, pt2 FROM source_table;
在 Flink 中,默认就是动态分区覆盖。如果想改为静态,需要加 Hint:
-- Flink SQL: 静态覆盖整个表
INSERT OVERWRITE T /*+ OPTIONS('dynamic-partition-overwrite'='false') */
SELECT ...;
如何工作:内部实现机制(重点)
INSERT OVERWRITE
的原子性和事务性完全依赖于 Paimon 的元数据和文件管理机制。
我们以一个动态分区覆盖为例,追踪其内部流程:
Step 1: 发起写入
- 当执行
INSERT OVERWRITE
命令时,计算引擎(如 Spark/Flink)的 Sink 任务开始向 Paimon 写入数据。 - Paimon 的
TableWrite
对象被创建,并且通过withOverwrite(partition)
方法被标记为覆盖模式。对于动态覆盖,partition
参数为空,表示由数据动态决定。
Step 2: 数据写入与生成新文件
- Sink 任务读取源数据,根据分区键将数据分组,然后写入到对应分区的 新数据文件(Data File)中。
- 例如,要覆盖
pt='a'
和pt='b'
两个分区,Paimon 会在path/to/table/pt=a/
和path/to/table/pt=b/
目录下创建新的.orc
或.parquet
文件。 - 关键点:此时,旧的数据文件仍然存在,线上读取任务看到的是旧数据。
Step 3: 准备提交 (Prepare Commit)
- 所有 Sink 任务完成数据写入后,会将它们生成的新文件列表作为
CommitMessage
发送给 Driver/JobManager 端的Commit
算子。 CommitMessage
中包含了每个新数据文件的元信息,如文件名、大小、行数、分区值等。
Step 4: 执行提交 (Commit)
这是 OVERWRITE
的核心所在。TableCommit
对象会整理快照和提交。
Step 5: 清理旧文件 (Cleanup)
- 旧的数据文件(被
DELETE
标记的)不会立即被物理删除。它们会保留一段时间(由snapshot.time-retained
配置决定)。 - Paimon 的后台清理线程或下一次
commit
时会检查过期的快照,并将这些快照引用的、且后续快照不再引用的文件进行物理删除。这为时间旅行和故障恢复提供了保障。
总结
INSERT OVERWRITE
在 Paimon 中是一个精心设计的、高度可靠的操作。
- 对用户而言,它提供了灵活的静态和动态数据覆盖能力。
- 在内部,它通过 Copy-on-Write 的思想,结合快照(Snapshot) -> 清单列表(Manifest List) -> 清单(Manifest) 的三级元数据结构,将一个复杂的数据替换操作,最终简化为一次原子性的文件重命名,从而保证了整个过程的原子性、一致性和事务性。
TableCommitImpl
类中的 withOverwrite
withOverwrite
方法是用来配置“覆盖写”操作的。在数据库和数据仓库中,覆盖写(Overwrite)通常指的是用新的数据集完全替换掉表中已有的数据,或者替换掉特定分区中的数据。这对应了 SQL 中的 INSERT OVERWRITE
语义。
// ... existing code ...@Overridepublic TableCommitImpl withOverwrite(@Nullable Map<String, String> overwritePartitions) {this.overwritePartition = overwritePartitions;return this;}
// ... existing code ...
overwritePartition
是一个 Map<String, String>
,它定义了覆盖写的范围:
- 如果
overwritePartition
是一个空 Map (例如Collections.emptyMap()
),这表示要覆盖整个表。这通常用于非分区表,或者需要覆盖所有分区的分区表。 - 如果
overwritePartition
包含了分区键值对 (例如{"dt": "2024-01-01", "hr": "09"}
), 这表示只覆盖符合该分区规范的特定分区。
独特的处理逻辑
当 withOverwrite
被调用后,overwritePartition
字段不再为 null
,这会触发 TableCommitImpl
中一系列独特的处理逻辑,主要集中在 commitMultiple
方法中。
让我们看一下 commitMultiple
方法的关键部分:
// ... existing code ...public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {if (overwritePartition == null) {// 这是正常的追加(append)或合并(merge)提交逻辑for (ManifestCommittable committable : committables) {commit.commit(committable, new HashMap<>(), checkAppendFiles);}
// ... existing code ...} else {// **** 这是 overwrite 模式下的特殊处理逻辑 ****ManifestCommittable committable;if (committables.size() > 1) {// 1. 检查:覆盖写操作期望只有一个提交单元(committable)throw new RuntimeException("Multiple committables appear in overwrite mode, this may be a bug, please report it: "+ committables);} else if (committables.size() == 1) {committable = committables.get(0);} else {// 2. 如果没有新的数据要写入(例如,用一个空数据集覆盖分区),// 会创建一个空的 committable 来触发底层的覆盖逻辑。// create an empty committable// identifier is Long.MAX_VALUE, come from batch job// TODO maybe it can be produced by CommitterOperatorcommittable = new ManifestCommittable(Long.MAX_VALUE);}// 3. 核心调用:调用底层的 FileStoreCommit 的 overwrite 方法,// 将分区信息和新的文件列表传递下去。commit.overwrite(overwritePartition, committable, Collections.emptyMap());expire(committable.identifier(), expireMainExecutor);}}
// ... existing code ...
总结一下其独特处理:
逻辑分支:
commitMultiple
方法通过检查overwritePartition
是否为null
来区分是常规提交还是覆盖写提交,并进入不同的逻辑分支。单一提交限制:在覆盖写模式下,系统预期一次
commit
操作只对应一个ManifestCommittable
。这是因为覆盖写是一个原子性的替换操作,将多个独立的提交单元合并到一个覆盖写操作中没有明确的语义,且容易出错。空提交处理:即使没有新的数据写入(
committables
为空),只要指定了withOverwrite
,Paimon 依然会生成一个快照。这用于处理“清空分区”或“清空表”的场景(INSERT OVERWRITE ... SELECT ... WHERE false
)。调用底层实现:它最终不会调用常规的
commit.commit()
,而是调用commit.overwrite()
。这个overwrite
方法(在FileStoreCommitImpl
中实现)会负责:- 标记指定分区(或全表)下的所有旧数据文件为已删除。
- 添加本次提交带来的新数据文件。
- 将这些变更原子性地提交为一个新的快照(Snapshot)。
强制生成快照:覆盖写操作总是会生成一个新的快照,即使没有写入新数据。这体现在
forceCreatingSnapshot
方法中:// ... existing code ... public boolean forceCreatingSnapshot() {if (this.forceCreatingSnapshot) {return true;}if (overwritePartition != null) {return true;}return tagAutoManager != null
如何使用
在实践中,withOverwrite
通常由 BatchWriteBuilder
来调用,如下面的代码所示:
// ... existing code ...@Overridepublic BatchTableCommit newCommit() {InnerTableCommit commit = table.newCommit(commitUser).withOverwrite(staticPartition);commit.ignoreEmptyCommit(true);return commit;}
// ... existing code ...
当用户执行 INSERT OVERWRITE
SQL 语句时,Paimon 的 Flink 或 Spark Connector 会构建一个 BatchWriteBuilder
,并通过 withOverwrite
方法设置要覆盖的分区,最终在提交时触发上述的特殊逻辑。
总而言之,withOverwrite
是 Paimon 中实现原子性覆盖写功能的关键入口。它通过设置一个内部状态(overwritePartition
),在提交时触发一套完全不同于常规追加/合并的逻辑,以确保数据替换的原子性和正确性。
FileStoreCommitImpl
中的 overwrite
这个方法是 Paimon 执行 INSERT OVERWRITE
语义的底层核心实现。当上层的 TableCommitImpl
确定这是一个覆盖写操作后,就会调用这个方法。它的主要职责是计算出需要删除的旧文件和需要添加的新文件,并将这些变更原子地提交为一个新的快照(Snapshot)。
我们来分解一下这个方法的具体处理流程:
// ... existing code ...@Overridepublic void overwrite(Map<String, String> partition,ManifestCommittable committable,Map<String, String> properties) {if (LOG.isDebugEnabled()) {
// ... existing code ...}long started = System.nanoTime();int generatedSnapshot = 0;int attempts = 0;// 1. 收集本次提交带来的所有文件变更List<ManifestEntry> appendTableFiles = new ArrayList<>();List<ManifestEntry> appendChangelog = new ArrayList<>();List<ManifestEntry> compactTableFiles = new ArrayList<>();List<ManifestEntry> compactChangelog = new ArrayList<>();List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();collectChanges(committable.fileCommittables(),appendTableFiles,appendChangelog,compactTableFiles,compactChangelog,appendHashIndexFiles,compactDvIndexFiles);// 2. 覆盖写模式下不处理 changelog 文件,并打印警告// 因为覆盖写是破坏性操作,会破坏流读的连续性。if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {StringBuilder warnMessage =new StringBuilder("Overwrite mode currently does not commit any changelog.\n"+ "Please make sure that the partition you're overwriting "+ "is not being consumed by a streaming reader.\n"+ "Ignored changelog files are:\n");
// ... existing code ...LOG.warn(warnMessage.toString());}try {boolean skipOverwrite = false;// 3. 根据静态/动态分区模式,创建分区过滤器,用于确定要删除哪些分区的数据PartitionPredicate partitionFilter = null;if (dynamicPartitionOverwrite) {// 动态分区覆盖模式if (appendTableFiles.isEmpty()) {// 如果没有新数据写入,则跳过覆盖操作,不会删除任何数据skipOverwrite = true;} else {// 根据新写入数据涉及的分区,来确定要覆盖哪些分区Set<BinaryRow> partitions =appendTableFiles.stream().map(ManifestEntry::partition).collect(Collectors.toSet());partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions);}} else {// 静态分区覆盖模式 (或者非分区表)Predicate partitionPredicate =createPartitionPredicate(partition, partitionType, partitionDefaultName);partitionFilter =PartitionPredicate.fromPredicate(partitionType, partitionPredicate);// 安全检查:确保所有新写入的文件都属于要覆盖的分区if (partitionFilter != null) {for (ManifestEntry entry : appendTableFiles) {if (!partitionFilter.test(entry.partition())) {throw new IllegalArgumentException(
// ... existing code ...+ " does not belong to this partition");}}}}// 4. 调用 tryOverwrite 执行核心的覆盖写逻辑if (!skipOverwrite) {attempts +=tryOverwrite(partitionFilter,appendTableFiles,appendHashIndexFiles,committable.identifier(),committable.watermark(),committable.logOffsets());generatedSnapshot += 1;}// 5. 如果本次提交还包含数据压缩(compaction)产生的文件,则单独提交一次 COMPACT 类型的快照if (!compactTableFiles.isEmpty() || !compactDvIndexFiles.isEmpty()) {attempts +=tryCommit(compactTableFiles,emptyList(),compactDvIndexFiles,committable.identifier(),committable.watermark(),committable.logOffsets(),Snapshot.CommitKind.COMPACT,mustConflictCheck(),null);generatedSnapshot += 1;}} finally {// 6. 报告提交指标long commitDuration = (System.nanoTime() - started) / 1_000_000;if (this.commitMetrics != null) {reportCommit(appendTableFiles,emptyList(),compactTableFiles,emptyList(),commitDuration,generatedSnapshot,attempts);}}}
// ... existing code ...private int tryOverwrite(@Nullable PartitionPredicate partitionFilter,List<ManifestEntry> changes,List<IndexManifestEntry> indexFiles,long identifier,@Nullable Long watermark,Map<Integer, Long> logOffsets) {// 7. tryOverwrite 的核心逻辑Snapshot latestSnapshot = snapshotManager.latestSnapshot();List<ManifestEntry> changesWithOverwrite = new ArrayList<>();List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();if (latestSnapshot != null) {// 读取最新快照中所有的文件列表List<ManifestEntry> currentEntries = readAllEntries(latestSnapshot);for (ManifestEntry entry : currentEntries) {// 如果文件属于被覆盖的分区 (partitionFilter.test),则标记为 DELETE// 否则,保留原样if (partitionFilter == null || !partitionFilter.test(entry.partition())) {changesWithOverwrite.add(entry);} else {changesWithOverwrite.add(new ManifestEntry(FileKind.DELETE,entry.partition(),entry.bucket(),entry.totalBuckets(),entry.file()));}}
// ... existing code ...}// 8. 将本次要新增的文件(来自参数 changes)加入列表changesWithOverwrite.addAll(changes);indexChangesWithOverwrite.addAll(indexFiles);// 9. 使用包含“删除旧文件”和“添加新文件”的完整列表,进行一次 OVERWRITE 类型的提交return tryCommit(changesWithOverwrite,emptyList(),indexChangesWithOverwrite,identifier,watermark,logOffsets,Snapshot.CommitKind.OVERWRITE,mustConflictCheck(),null);}
// ... existing code ...
总结 overwrite
方法的处理流程:
- 收集变更:首先,它将传入的
ManifestCommittable
中的文件变更信息进行分类,比如哪些是新增的数据文件,哪些是compaction产生的文件等。 - 忽略Changelog:
overwrite
操作会明确地忽略所有changelog
文件。这是因为overwrite
是一个全量替换操作,会破坏数据的增量变更历史,与流式读取的语义不兼容。因此系统会打印警告,提醒用户不要在流读任务正在消费的分区上执行覆盖操作。 - 确定覆盖范围:这是最关键的步骤之一。它通过
dynamicPartitionOverwrite
配置项来决定如何确定要覆盖的分区范围。- 动态覆盖 (
true
):覆盖的范围由本次写入的新数据所在的分区决定。如果写入了p1
和p2
两个分区的数据,那么只有p1
和p2
的旧数据会被删除。如果本次写入没有产生任何新数据,则不会删除任何分区。 - 静态覆盖 (
false
):覆盖的范围由传入的partition
参数决定,这通常对应 SQL 中的PARTITION (...)
子句。所有新写入的数据必须属于这个静态指定的分区。
- 动态覆盖 (
- 执行原子替换:它调用私有的
tryOverwrite
方法。这个方法: a. 读取当前最新的快照,获取全部分区的所有数据文件列表。 b. 遍历这些文件,如果一个文件属于上一步确定的覆盖范围,就将其标记为DELETE
。不属于覆盖范围的文件则保持不变。 c. 将本次提交要新增的数据文件标记为ADD
。 d. 将所有这些变更(DELETE
旧文件 +ADD
新文件)作为一个整体,通过tryCommit
方法以Snapshot.CommitKind.OVERWRITE
的类型原子性地提交,生成一个新的快照。 - 处理其他变更:如果
committable
中还包含了数据压缩(compaction)的结果,会在overwrite
操作之后,再进行一次独立的COMPACT
类型的提交。 - 特殊用例:
truncateTable
(清空表)和dropPartitions
(删除分区)方法内部也复用了tryOverwrite
逻辑。它们可以看作是特殊的overwrite
,即用一个空的数据集去覆盖整个表或指定分区。
FileStoreCommitImpl.overwrite
是一个精心设计的底层方法,它通过精确地计算文件变更集(哪些删除、哪些新增),并利用 Paimon 的快照机制,实现了分区或整表的原子性覆盖写。