Paimon 写入磁盘文件名字生成机制
我们来一步步拆解,看看Paimon最终写入磁盘的目录和文件名到底是怎么产生的。
入口:createRollingChangelogFileWriter
的职责
KeyValueFileWriterFactory
// ... existing code ...public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) {WriteFormatKey key = new WriteFormatKey(level, true);return new RollingFileWriter<>(() -> {// 关键步骤 1: 获取路径工厂DataFilePathFactory pathFactory = formatContext.pathFactory(key);// 关键步骤 2: 生成新的 Changelog 文件路径return createDataFileWriter(pathFactory.newChangelogPath(),key,FileSource.APPEND,pathFactory.isExternalPath());},suggestedFileSize);}
// ... existing code ...
这段代码的逻辑 是固定的,它的作用是创建一个 RollingFileWriter
。这个写入器的核心是一个文件创建的 lambda 表达式 () -> { ... }
。每当需要滚动创建一个新文件时,这个 lambda 就会被执行。
这个 lambda 做了两件核心事情:
formatContext.pathFactory(key)
: 从一个叫formatContext
的地方获取一个DataFilePathFactory
。pathFactory.newChangelogPath()
: 使用这个工厂来生成一个全新的、唯一的文件路径。
所以,问题的关键就转移到了 DataFilePathFactory
是什么,以及它是如何生成路径的。
核心执行者:DataFilePathFactory
DataFilePathFactory
是 Paimon 中专门负责在一个具体的分桶(Bucket)目录内生成文件名的类。
它的工作原理如下:
- 初始化: 它在被创建时,会接收一个固定的基础目录,这个目录就是这个 Bucket 的路径,例如
/path/to/table/dt=20231026/bucket-0
。 - 生成文件名: 它提供了
newPath()
(用于数据文件) 和newChangelogPath()
(用于 Changelog 文件) 方法。这些方法会遵循一个固定的命名模式。
我们来看 DataFilePathFactory
的相关代码:
// ... existing code ...public DataFilePathFactory(Path bucketPath,String format,String dataFilePrefix,String changelogFilePrefix,boolean suffixWithCompression,@Nullable String compression,@Nullable String externalPath) {this.bucketPath = bucketPath;this.dataFilePrefix = dataFilePrefix;this.changelogFilePrefix = changelogFilePrefix;// ...}public Path newPath() {return newPath(newFileName(dataFilePrefix));}public Path newChangelogPath() {return newPath(newChangelogFileName());}public String newChangelogFileName() {return newFileName(changelogFilePrefix);}private String newFileName(String prefix) {// 核心:文件名 = 前缀 + UUID + 后缀return prefix + UUID.randomUUID() + suffix;}
// ... existing code ...
从 newFileName
方法可以清晰地看到,文件名是由三部分组成的:
- 前缀 (prefix): 对于数据文件是
data-
,对于 Changelog 文件是changelog-
。 - UUID: 一个随机生成的、全局唯一的值。这是保证每次生成的文件名都不同的关键!
- 后缀 (suffix): 文件格式和压缩格式,例如
.parquet.snappy
。
所以,每次调用 pathFactory.newChangelogPath()
,都会得到一个类似这样的、带有唯一 UUID 的完整路径:/path/to/table/dt=20231026/bucket-0/changelog-a9f8b8e8-c8d1-4a2e-9a1b-7b8c7d6e5f4g.parquet
。
private String newFileName(String prefix) {String extension;if (fileSuffixIncludeCompression) {extension = "." + fileCompression + "." + formatIdentifier;} else {extension = "." + formatIdentifier;}return prefix + uuid + "-" + pathCount.getAndIncrement() + extension;}
上下文提供者:KeyValueFileWriterFactory
和 FileWriterContextFactory
现在我们知道了 DataFilePathFactory
是如何工作的,但还有一个问题:KeyValueFileWriterFactory
是如何为正确的分区和分桶获取到这个 DataFilePathFactory
的呢?
答案在 KeyValueFileWriterFactory
的创建过程中。它不是一个全局单例,而是针对特定分区和分桶动态创建的。
请看它的 Builder
类:
// ... existing code ...public static class Builder {// ...public KeyValueFileWriterFactory build(BinaryRow partition, int bucket, CoreOptions options) {// 1. 创建一个包含分区和分桶信息的上下文FileWriterContextFactory context =new FileWriterContextFactory(partition,bucket,keyType,valueType,fileFormat,format2PathFactory,options);// 2. 使用这个上下文创建工厂实例return new KeyValueFileWriterFactory(fileIO, schemaId, context, suggestedFileSize, options);}}
// ... existing code ...
MergeTreeWriter
在初始化时,会为它所负责的那个 Bucket 调用这个 build
方法,并传入当前的分区(partition)和桶号(bucket)。
这些信息被保存在内部的 FileWriterContextFactory
中。当 createRollingChangelogFileWriter
需要 DataFilePathFactory
时,FileWriterContextFactory
就会利用它持有的分区和桶信息,结合表的根路径,计算出那个 Bucket 的完整基础目录,然后用这个目录来创建一个 DataFilePathFactory
实例。
流程
merge Tree Writer调用
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
整个流程可以总结为一种控制反转(Inversion of Control, IoC) 的设计模式:
- 调用者 (
createRollingMergeTreeFileWriter
) 不负责创建具体的对象(SingleFileWriter
)和资源(文件名),而是定义了一个如何创建这些东西的“工厂”或“配方”(Supplier
Lambda)。 - 使用者 (
RollingFileWriter
) 持有这个“工厂”,并在自己真正需要一个新对象的时候,才去调用工厂来生产。
文件名的传递路径如下:
KeyValueFileWriterFactory
-> 创建一个包含 DataFilePathFactory.newPath()
逻辑的 Lambda (Supplier) -> RollingFileWriter
构造函数 -> RollingFileWriter.writerFactory
成员变量 -> RollingFileWriter.write()
首次调用 -> openCurrentWriter()
-> writerFactory.get()
执行 Lambda -> DataFilePathFactory.newPath()
生成新路径 -> SingleFileWriter
构造函数 -> SingleFileWriter.path
成员变量。
这种设计的好处是:
- 解耦:
RollingFileWriter
不需要关心文件名的生成策略,它只负责在合适的时机(文件满了)请求一个新的写入器。 - 灵活:文件名的生成策略可以非常复杂(比如包含分区、bucket、level、UUID等信息),但这些复杂性都被封装在了
DataFilePathFactory
和传递的 Lambda 中,对RollingFileWriter
是透明的。 - 高效:只有在真正需要写入文件时,才会去生成文件名和创建文件流,避免了不必要的资源预创建。
FileWriterContextFactory
FileWriterContextFactory
是 KeyValueFileWriterFactory
的一个私有静态内部类。它的核心职责是为文件写入器(FileWriter
)创建和管理上下文信息,其中就包括了文件路径的生成逻辑。
简而言之,它通过一个工厂模式和委托机制来为指定的分区(partition)和桶(bucket)生成对应的文件路径工厂。
FileWriterContextFactory
在其构造函数中接收了 partition
和 bucket
的信息,并将它们保存为成员变量。同时,它还接收了一个名为 parentFactories
的函数。
// ... existing code ...private final Function<String, FileStorePathFactory> parentFactories;private final CoreOptions options;private final boolean thinModeEnabled;private FileWriterContextFactory(BinaryRow partition,int bucket,RowType keyType,RowType valueType,FileFormat defaultFileFormat,Function<String, FileStorePathFactory> parentFactories,CoreOptions options) {this.partition = partition;this.bucket = bucket;this.keyType = keyType;
// ... existing code ...this.parentFactories = parentFactories;this.options = options;
// ... existing code ...
partition
:BinaryRow
类型,代表了当前写入数据所属的分区值。bucket
:int
类型,代表了数据所属的桶编号。parentFactories
: 这是一个函数Function<String, FileStorePathFactory>
,它的作用是根据文件格式(如 "orc", "parquet")的字符串标识,返回一个对应的FileStorePathFactory
实例。这个FileStorePathFactory
是路径生成的顶层工厂。
FileWriterContextFactory
中最关键的方法是 pathFactory(WriteFormatKey key)
。这个方法负责创建并返回一个 DataFilePathFactory
,该工厂专门用于为之前指定的分区和桶生成文件路径。
// ... existing code ...return fileFormat(format).createStatsExtractor(writeRowType, statsFactories);}private DataFilePathFactory pathFactory(WriteFormatKey key) {String format = key2Format.apply(key);return format2PathFactory.computeIfAbsent(format,k ->parentFactories.apply(format).createDataFilePathFactory(partition, bucket));}private FormatWriterFactory writerFactory(WriteFormatKey key) {
// ... existing code ...
这里的核心逻辑在 computeIfAbsent
的 lambda 表达式中: parentFactories.apply(format).createDataFilePathFactory(partition, bucket)
这行代码的执行流程如下:
key2Format.apply(key)
: 首先根据写入的层级(level)和是否为 changelog 文件,确定具体的文件格式(format
),例如 "orc"。parentFactories.apply(format)
: 调用传入的parentFactories
函数,根据文件格式获取一个通用的FileStorePathFactory
实例。.createDataFilePathFactory(partition, bucket)
: 这是最关键的一步。它调用FileStorePathFactory
的createDataFilePathFactory
方法,并将构造函数中保存的partition
和bucket
信息传递进去。这个方法会返回一个DataFilePathFactory
的实例,这个实例内部已经包含了分区和桶的路径信息。
createDataFilePathFactory
方法
会调用 bucketPath(partition, bucket)
的结果,并将其作为新创建的 DataFilePathFactory
实例的根目录。
从代码中可以看到,bucketPath(...)
的返回值是传递给 DataFilePathFactory
构造函数的第一个参数,这个参数定义了该工厂后续创建的所有数据文件的存放位置。
// ... existing code ...public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) {return new DataFilePathFactory(bucketPath(partition, bucket),formatIdentifier,
// ... existing code ...
现在我们来递归地追踪 bucketPath(partition, bucket)
是如何构建路径的。
第一层: bucketPath
这个方法很简单,它将表的根目录 root
和一个相对路径 relativeBucketPath
组合成一个绝对路径。
// ... existing code ...public Path bucketPath(BinaryRow partition, int bucket) {return new Path(root, relativeBucketPath(partition, bucket));}public Path relativeBucketPath(BinaryRow partition, int bucket) {
// ... existing code ...
root
: 这是FileStorePathFactory
的成员变量,代表表的根路径,例如/path/to/my_table
。
第二层: relativeBucketPath
这个方法是构建路径的核心,它负责生成分区和桶对应的相对目录结构。
// ... existing code ...public Path relativeBucketPath(BinaryRow partition, int bucket) {String bucketName = String.valueOf(bucket);if (bucket == BucketMode.POSTPONE_BUCKET) {bucketName = "postpone";}Path relativeBucketPath = new Path(BUCKET_PATH_PREFIX + bucketName);String partitionPath = getPartitionString(partition);if (!partitionPath.isEmpty()) {relativeBucketPath = new Path(partitionPath, relativeBucketPath);}if (dataFilePathDirectory != null) {relativeBucketPath = new Path(dataFilePathDirectory, relativeBucketPath);}return relativeBucketPath;}
// ... existing code ...
它的构建步骤如下:
- 根据传入的
bucket
号码创建一个桶目录名,例如bucket-5
(BUCKET_PATH_PREFIX
是常量 "bucket-")。 - 调用
getPartitionString(partition)
方法获取分区的路径字符串。 - 如果分区路径不为空,就将其加在桶目录的前面,形成类似
dt=2024-01-01/hr=12/bucket-5
的结构。 - 如果用户配置了自定义数据目录 (
dataFilePathDirectory
),还会将这个目录加在最前面。
第三层: getPartitionString
这个方法负责将 BinaryRow
格式的分区信息转换成人类可读的、Hive风格的目录字符串。
// ... existing code .../** IMPORTANT: This method is NOT THREAD SAFE. */public String getPartitionString(BinaryRow partition) {return PartitionPathUtils.generatePartitionPath(partitionComputer.generatePartValues(Preconditions.checkNotNull(partition, "Partition row data is null. This is unexpected.")));}
// ... existing code ...
它通过内部的 partitionComputer
将 BinaryRow
对象解析成键值对(如 {"dt": "2024-01-01", "hr": "12"}
),然后利用 PartitionPathUtils.generatePartitionPath
生成最终的字符串 dt=2024-01-01/hr=12
。
getPartitionString(BinaryRow partition)
该方法通过一个两步过程,将二进制的 BinaryRow
分区数据转换为了我们熟悉的 Hive 风格的分区目录字符串(如 dt=2024-01-01/hr=12
)。
FileStorePathFactory.getPartitionString
// ... existing code .../** IMPORTANT: This method is NOT THREAD SAFE. */public String getPartitionString(BinaryRow partition) {return PartitionPathUtils.generatePartitionPath(partitionComputer.generatePartValues(Preconditions.checkNotNull(partition, "Partition row data is null. This is unexpected.")));}// @TODO, need to be changed
// ... existing code ...
可以看到,它的核心逻辑是调用了另外两个方法:
partitionComputer.generatePartValues(...)
PartitionPathUtils.generatePartitionPath(...)
我们需要依次深入这两个调用。
InternalRowPartitionComputer.generatePartValues
partitionComputer
是 InternalRowPartitionComputer
的实例。它的 generatePartValues
方法负责将原始的、二进制的 BinaryRow
转换成一个 Java 的 LinkedHashMap<String, String>
,也就是一个有序的键值对集合。
// ... (部分代码)public LinkedHashMap<String, String> generatePartValues(InternalRow partition) {LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();for (int i = 0; i < partitionColumns.length; i++) {Object field = getters[i].getFieldOrNull(partition);String partitionValue = field == null ? defaultPartValue : field.toString();if (partitionValue == null) {throw new RuntimeException("Null partition value is not supported! "+ "You can use `partition.default-name` to specify a default partition value.");}partSpec.put(partitionColumns[i], partitionValue);}return partSpec;}
// ... (部分代码)
这个方法的逻辑是:
- 遍历所有的分区列名(例如
["dt", "hr"]
)。 - 对于每一列,从
BinaryRow
中提取出对应位置的值。例如,对于dt
列,它会从BinaryRow
的第 0 个位置读取数据。 - 将提取出的值转换成字符串。如果值为
null
,则使用表配置的默认分区名。 - 将列名作为 key,转换后的字符串值作为 value,存入
LinkedHashMap
。
举例来说,如果分区列是 (dt STRING, hr INT)
,传入的 BinaryRow
代表的值是 ('2024-05-20', 10)
,那么这个方法执行完后,会返回一个内容为 {"dt": "2024-05-20", "hr": "10"}
的 LinkedHashMap
。
PartitionPathUtils.generatePartitionPath
现在,上一步生成的 LinkedHashMap
被传递给了 PartitionPathUtils.generatePartitionPath
方法。这个方法负责将键值对格式化成最终的目录字符串。
// ... (部分代码)public static String generatePartitionPath(Map<String, String> spec) {if (spec.isEmpty()) {return "";}StringBuilder suffixBuf = new StringBuilder();int i = 0;for (Map.Entry<String, String> e : spec.entrySet()) {if (i > 0) {suffixBuf.append(Path.SEPARATOR);}suffixBuf.append(escapePathName(e.getKey()));suffixBuf.append('=');suffixBuf.append(escapePathName(e.getValue()));i++;}return suffixBuf.toString();}
// ... (部分代码)
这个方法的逻辑非常清晰:
- 遍历传入的
Map
(也就是上一步生成的{"dt": "2024-05-20", "hr": "10"}
)。 - 对于每一个键值对,它会拼接成
key=value
的形式,例如dt=2024-05-20
。 - 在多个键值对之间,用路径分隔符
/
连接。 - 最终返回拼接好的完整字符串。
继续上面的例子,当这个方法接收到 {"dt": "2024-05-20", "hr": "10"}
后,它会返回字符串 "dt=2024-05-20/hr=10"
。
通过以上分析,我们可以清晰地看到 getPartitionString
的完整工作流程:
BinaryRow
-> InternalRowPartitionComputer.generatePartValues
-> LinkedHashMap<String, String>
-> PartitionPathUtils.generatePartitionPath
-> String
这个流程精确地实现了将二进制分区数据转换为 Hive 风格分区目录字符串的功能,验证了之前的描述是完全准确的。
最终路径形态
综合以上追踪,bucketPath
构建出的最终路径形态如下:
{table_root_path}/{optional_data_dir}/{partition_path}/bucket-{bucket_number}
举个例子:
假设:
- 表根路径
root
是/tmp/paimon/orders
- 分区
partition
是(dt='2024-05-20', region='cn')
- 桶
bucket
是3
- 未配置自定义数据目录
dataFilePathDirectory
那么 bucketPath
的构建过程是:
getPartitionString
返回"dt=2024-05-20/region=cn"
。relativeBucketPath
返回"dt=2024-05-20/region=cn/bucket-3"
。bucketPath
最终返回的Path
对象指向的路径是:/tmp/paimon/orders/dt=2024-05-20/region=cn/bucket-3
。
这个路径随后被 DataFilePathFactory
用来作为其根目录,所有新生成的数据文件都会被写入这个目录下。
总结
我们来梳理一下整个流程:
目录是怎么产生的?
- 目录是根据 表根路径 + 分区键值 + 桶号 拼接而成的。例如:
s3://my-bucket/my_table/dt=2023-10-26/bucket-0
。 - 这个拼接逻辑发生在
KeyValueFileWriterFactory
的内部上下文FileWriterContextFactory
中,它在被创建时就接收了分区和桶的信息。
- 目录是根据 表根路径 + 分区键值 + 桶号 拼接而成的。例如:
文件名是怎么产生的?
- 文件名由
DataFilePathFactory
负责生成。 - 命名规则是:
前缀-UUID.格式后缀
。例如changelog-a9f8b8e8-c8d1-4a2e-9a1b-7b8c7d6e5f4g.parquet
。 - UUID 的使用保证了每次生成的文件名都是唯一的,避免了冲突。
- 文件名由
为什么
createRollingChangelogFileWriter
的代码看起来是固定的?- 这段代码是一个高度抽象和复用的模板。它的逻辑是固定的:“向上下文请求一个路径工厂,然后用这个工厂创建一个新的、唯一的 changelog 文件路径”。
- 它的动态性体现在
KeyValueFileWriterFactory
这个实例本身。每个MergeTreeWriter
都会为自己负责的那个唯一的(分区, 桶)
组合创建一个专属的KeyValueFileWriterFactory
实例。所以,当这个“固定”的模板代码被执行时,它所在的上下文环境(即它属于哪个分区和桶)是完全不同的,因此总能找到正确的目录,并在该目录下生成唯一的文件名。