Paimon 写入磁盘文件名字生成机制
我们来一步步拆解,看看Paimon最终写入磁盘的目录和文件名到底是怎么产生的。
入口:createRollingChangelogFileWriter 的职责
KeyValueFileWriterFactory
// ... existing code ...public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) {WriteFormatKey key = new WriteFormatKey(level, true);return new RollingFileWriter<>(() -> {// 关键步骤 1: 获取路径工厂DataFilePathFactory pathFactory = formatContext.pathFactory(key);// 关键步骤 2: 生成新的 Changelog 文件路径return createDataFileWriter(pathFactory.newChangelogPath(),key,FileSource.APPEND,pathFactory.isExternalPath());},suggestedFileSize);}
// ... existing code ...
这段代码的逻辑 是固定的,它的作用是创建一个 RollingFileWriter。这个写入器的核心是一个文件创建的 lambda 表达式 () -> { ... }。每当需要滚动创建一个新文件时,这个 lambda 就会被执行。
这个 lambda 做了两件核心事情:
formatContext.pathFactory(key): 从一个叫formatContext的地方获取一个DataFilePathFactory。pathFactory.newChangelogPath(): 使用这个工厂来生成一个全新的、唯一的文件路径。
所以,问题的关键就转移到了 DataFilePathFactory 是什么,以及它是如何生成路径的。
核心执行者:DataFilePathFactory
DataFilePathFactory 是 Paimon 中专门负责在一个具体的分桶(Bucket)目录内生成文件名的类。
它的工作原理如下:
- 初始化: 它在被创建时,会接收一个固定的基础目录,这个目录就是这个 Bucket 的路径,例如
/path/to/table/dt=20231026/bucket-0。 - 生成文件名: 它提供了
newPath()(用于数据文件) 和newChangelogPath()(用于 Changelog 文件) 方法。这些方法会遵循一个固定的命名模式。
我们来看 DataFilePathFactory 的相关代码:
// ... existing code ...public DataFilePathFactory(Path bucketPath,String format,String dataFilePrefix,String changelogFilePrefix,boolean suffixWithCompression,@Nullable String compression,@Nullable String externalPath) {this.bucketPath = bucketPath;this.dataFilePrefix = dataFilePrefix;this.changelogFilePrefix = changelogFilePrefix;// ...}public Path newPath() {return newPath(newFileName(dataFilePrefix));}public Path newChangelogPath() {return newPath(newChangelogFileName());}public String newChangelogFileName() {return newFileName(changelogFilePrefix);}private String newFileName(String prefix) {// 核心:文件名 = 前缀 + UUID + 后缀return prefix + UUID.randomUUID() + suffix;}
// ... existing code ...
从 newFileName 方法可以清晰地看到,文件名是由三部分组成的:
- 前缀 (prefix): 对于数据文件是
data-,对于 Changelog 文件是changelog-。 - UUID: 一个随机生成的、全局唯一的值。这是保证每次生成的文件名都不同的关键!
- 后缀 (suffix): 文件格式和压缩格式,例如
.parquet.snappy。
所以,每次调用 pathFactory.newChangelogPath(),都会得到一个类似这样的、带有唯一 UUID 的完整路径:/path/to/table/dt=20231026/bucket-0/changelog-a9f8b8e8-c8d1-4a2e-9a1b-7b8c7d6e5f4g.parquet。
private String newFileName(String prefix) {String extension;if (fileSuffixIncludeCompression) {extension = "." + fileCompression + "." + formatIdentifier;} else {extension = "." + formatIdentifier;}return prefix + uuid + "-" + pathCount.getAndIncrement() + extension;}上下文提供者:KeyValueFileWriterFactory 和 FileWriterContextFactory
现在我们知道了 DataFilePathFactory 是如何工作的,但还有一个问题:KeyValueFileWriterFactory 是如何为正确的分区和分桶获取到这个 DataFilePathFactory 的呢?
答案在 KeyValueFileWriterFactory 的创建过程中。它不是一个全局单例,而是针对特定分区和分桶动态创建的。
请看它的 Builder 类:
// ... existing code ...public static class Builder {// ...public KeyValueFileWriterFactory build(BinaryRow partition, int bucket, CoreOptions options) {// 1. 创建一个包含分区和分桶信息的上下文FileWriterContextFactory context =new FileWriterContextFactory(partition,bucket,keyType,valueType,fileFormat,format2PathFactory,options);// 2. 使用这个上下文创建工厂实例return new KeyValueFileWriterFactory(fileIO, schemaId, context, suggestedFileSize, options);}}
// ... existing code ...
MergeTreeWriter 在初始化时,会为它所负责的那个 Bucket 调用这个 build 方法,并传入当前的分区(partition)和桶号(bucket)。
这些信息被保存在内部的 FileWriterContextFactory 中。当 createRollingChangelogFileWriter 需要 DataFilePathFactory 时,FileWriterContextFactory 就会利用它持有的分区和桶信息,结合表的根路径,计算出那个 Bucket 的完整基础目录,然后用这个目录来创建一个 DataFilePathFactory 实例。
流程
merge Tree Writer调用
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
整个流程可以总结为一种控制反转(Inversion of Control, IoC) 的设计模式:
- 调用者 (
createRollingMergeTreeFileWriter) 不负责创建具体的对象(SingleFileWriter)和资源(文件名),而是定义了一个如何创建这些东西的“工厂”或“配方”(SupplierLambda)。 - 使用者 (
RollingFileWriter) 持有这个“工厂”,并在自己真正需要一个新对象的时候,才去调用工厂来生产。
文件名的传递路径如下:
KeyValueFileWriterFactory -> 创建一个包含 DataFilePathFactory.newPath() 逻辑的 Lambda (Supplier) -> RollingFileWriter 构造函数 -> RollingFileWriter.writerFactory 成员变量 -> RollingFileWriter.write() 首次调用 -> openCurrentWriter() -> writerFactory.get() 执行 Lambda -> DataFilePathFactory.newPath() 生成新路径 -> SingleFileWriter 构造函数 -> SingleFileWriter.path 成员变量。
这种设计的好处是:
- 解耦:
RollingFileWriter不需要关心文件名的生成策略,它只负责在合适的时机(文件满了)请求一个新的写入器。 - 灵活:文件名的生成策略可以非常复杂(比如包含分区、bucket、level、UUID等信息),但这些复杂性都被封装在了
DataFilePathFactory和传递的 Lambda 中,对RollingFileWriter是透明的。 - 高效:只有在真正需要写入文件时,才会去生成文件名和创建文件流,避免了不必要的资源预创建。
FileWriterContextFactory
FileWriterContextFactory 是 KeyValueFileWriterFactory 的一个私有静态内部类。它的核心职责是为文件写入器(FileWriter)创建和管理上下文信息,其中就包括了文件路径的生成逻辑。
简而言之,它通过一个工厂模式和委托机制来为指定的分区(partition)和桶(bucket)生成对应的文件路径工厂。
FileWriterContextFactory 在其构造函数中接收了 partition 和 bucket 的信息,并将它们保存为成员变量。同时,它还接收了一个名为 parentFactories 的函数。
// ... existing code ...private final Function<String, FileStorePathFactory> parentFactories;private final CoreOptions options;private final boolean thinModeEnabled;private FileWriterContextFactory(BinaryRow partition,int bucket,RowType keyType,RowType valueType,FileFormat defaultFileFormat,Function<String, FileStorePathFactory> parentFactories,CoreOptions options) {this.partition = partition;this.bucket = bucket;this.keyType = keyType;
// ... existing code ...this.parentFactories = parentFactories;this.options = options;
// ... existing code ...
partition:BinaryRow类型,代表了当前写入数据所属的分区值。bucket:int类型,代表了数据所属的桶编号。parentFactories: 这是一个函数Function<String, FileStorePathFactory>,它的作用是根据文件格式(如 "orc", "parquet")的字符串标识,返回一个对应的FileStorePathFactory实例。这个FileStorePathFactory是路径生成的顶层工厂。
FileWriterContextFactory 中最关键的方法是 pathFactory(WriteFormatKey key)。这个方法负责创建并返回一个 DataFilePathFactory,该工厂专门用于为之前指定的分区和桶生成文件路径。
// ... existing code ...return fileFormat(format).createStatsExtractor(writeRowType, statsFactories);}private DataFilePathFactory pathFactory(WriteFormatKey key) {String format = key2Format.apply(key);return format2PathFactory.computeIfAbsent(format,k ->parentFactories.apply(format).createDataFilePathFactory(partition, bucket));}private FormatWriterFactory writerFactory(WriteFormatKey key) {
// ... existing code ...
这里的核心逻辑在 computeIfAbsent 的 lambda 表达式中: parentFactories.apply(format).createDataFilePathFactory(partition, bucket)
这行代码的执行流程如下:
key2Format.apply(key): 首先根据写入的层级(level)和是否为 changelog 文件,确定具体的文件格式(format),例如 "orc"。parentFactories.apply(format): 调用传入的parentFactories函数,根据文件格式获取一个通用的FileStorePathFactory实例。.createDataFilePathFactory(partition, bucket): 这是最关键的一步。它调用FileStorePathFactory的createDataFilePathFactory方法,并将构造函数中保存的partition和bucket信息传递进去。这个方法会返回一个DataFilePathFactory的实例,这个实例内部已经包含了分区和桶的路径信息。
createDataFilePathFactory 方法
会调用 bucketPath(partition, bucket) 的结果,并将其作为新创建的 DataFilePathFactory 实例的根目录。
从代码中可以看到,bucketPath(...) 的返回值是传递给 DataFilePathFactory 构造函数的第一个参数,这个参数定义了该工厂后续创建的所有数据文件的存放位置。
// ... existing code ...public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) {return new DataFilePathFactory(bucketPath(partition, bucket),formatIdentifier,
// ... existing code ...
现在我们来递归地追踪 bucketPath(partition, bucket) 是如何构建路径的。
第一层: bucketPath
这个方法很简单,它将表的根目录 root 和一个相对路径 relativeBucketPath 组合成一个绝对路径。
// ... existing code ...public Path bucketPath(BinaryRow partition, int bucket) {return new Path(root, relativeBucketPath(partition, bucket));}public Path relativeBucketPath(BinaryRow partition, int bucket) {
// ... existing code ...
root: 这是FileStorePathFactory的成员变量,代表表的根路径,例如/path/to/my_table。
第二层: relativeBucketPath
这个方法是构建路径的核心,它负责生成分区和桶对应的相对目录结构。
// ... existing code ...public Path relativeBucketPath(BinaryRow partition, int bucket) {String bucketName = String.valueOf(bucket);if (bucket == BucketMode.POSTPONE_BUCKET) {bucketName = "postpone";}Path relativeBucketPath = new Path(BUCKET_PATH_PREFIX + bucketName);String partitionPath = getPartitionString(partition);if (!partitionPath.isEmpty()) {relativeBucketPath = new Path(partitionPath, relativeBucketPath);}if (dataFilePathDirectory != null) {relativeBucketPath = new Path(dataFilePathDirectory, relativeBucketPath);}return relativeBucketPath;}
// ... existing code ...
它的构建步骤如下:
- 根据传入的
bucket号码创建一个桶目录名,例如bucket-5(BUCKET_PATH_PREFIX是常量 "bucket-")。 - 调用
getPartitionString(partition)方法获取分区的路径字符串。 - 如果分区路径不为空,就将其加在桶目录的前面,形成类似
dt=2024-01-01/hr=12/bucket-5的结构。 - 如果用户配置了自定义数据目录 (
dataFilePathDirectory),还会将这个目录加在最前面。
第三层: getPartitionString
这个方法负责将 BinaryRow 格式的分区信息转换成人类可读的、Hive风格的目录字符串。
// ... existing code .../** IMPORTANT: This method is NOT THREAD SAFE. */public String getPartitionString(BinaryRow partition) {return PartitionPathUtils.generatePartitionPath(partitionComputer.generatePartValues(Preconditions.checkNotNull(partition, "Partition row data is null. This is unexpected.")));}
// ... existing code ...
它通过内部的 partitionComputer 将 BinaryRow 对象解析成键值对(如 {"dt": "2024-01-01", "hr": "12"}),然后利用 PartitionPathUtils.generatePartitionPath 生成最终的字符串 dt=2024-01-01/hr=12。
getPartitionString(BinaryRow partition)
该方法通过一个两步过程,将二进制的 BinaryRow 分区数据转换为了我们熟悉的 Hive 风格的分区目录字符串(如 dt=2024-01-01/hr=12)。
FileStorePathFactory.getPartitionString
// ... existing code .../** IMPORTANT: This method is NOT THREAD SAFE. */public String getPartitionString(BinaryRow partition) {return PartitionPathUtils.generatePartitionPath(partitionComputer.generatePartValues(Preconditions.checkNotNull(partition, "Partition row data is null. This is unexpected.")));}// @TODO, need to be changed
// ... existing code ...
可以看到,它的核心逻辑是调用了另外两个方法:
partitionComputer.generatePartValues(...)PartitionPathUtils.generatePartitionPath(...)
我们需要依次深入这两个调用。
InternalRowPartitionComputer.generatePartValues
partitionComputer 是 InternalRowPartitionComputer 的实例。它的 generatePartValues 方法负责将原始的、二进制的 BinaryRow 转换成一个 Java 的 LinkedHashMap<String, String>,也就是一个有序的键值对集合。
// ... (部分代码)public LinkedHashMap<String, String> generatePartValues(InternalRow partition) {LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();for (int i = 0; i < partitionColumns.length; i++) {Object field = getters[i].getFieldOrNull(partition);String partitionValue = field == null ? defaultPartValue : field.toString();if (partitionValue == null) {throw new RuntimeException("Null partition value is not supported! "+ "You can use `partition.default-name` to specify a default partition value.");}partSpec.put(partitionColumns[i], partitionValue);}return partSpec;}
// ... (部分代码)
这个方法的逻辑是:
- 遍历所有的分区列名(例如
["dt", "hr"])。 - 对于每一列,从
BinaryRow中提取出对应位置的值。例如,对于dt列,它会从BinaryRow的第 0 个位置读取数据。 - 将提取出的值转换成字符串。如果值为
null,则使用表配置的默认分区名。 - 将列名作为 key,转换后的字符串值作为 value,存入
LinkedHashMap。
举例来说,如果分区列是 (dt STRING, hr INT),传入的 BinaryRow 代表的值是 ('2024-05-20', 10),那么这个方法执行完后,会返回一个内容为 {"dt": "2024-05-20", "hr": "10"} 的 LinkedHashMap。
PartitionPathUtils.generatePartitionPath
现在,上一步生成的 LinkedHashMap 被传递给了 PartitionPathUtils.generatePartitionPath 方法。这个方法负责将键值对格式化成最终的目录字符串。
// ... (部分代码)public static String generatePartitionPath(Map<String, String> spec) {if (spec.isEmpty()) {return "";}StringBuilder suffixBuf = new StringBuilder();int i = 0;for (Map.Entry<String, String> e : spec.entrySet()) {if (i > 0) {suffixBuf.append(Path.SEPARATOR);}suffixBuf.append(escapePathName(e.getKey()));suffixBuf.append('=');suffixBuf.append(escapePathName(e.getValue()));i++;}return suffixBuf.toString();}
// ... (部分代码)
这个方法的逻辑非常清晰:
- 遍历传入的
Map(也就是上一步生成的{"dt": "2024-05-20", "hr": "10"})。 - 对于每一个键值对,它会拼接成
key=value的形式,例如dt=2024-05-20。 - 在多个键值对之间,用路径分隔符
/连接。 - 最终返回拼接好的完整字符串。
继续上面的例子,当这个方法接收到 {"dt": "2024-05-20", "hr": "10"} 后,它会返回字符串 "dt=2024-05-20/hr=10"。
通过以上分析,我们可以清晰地看到 getPartitionString 的完整工作流程:
BinaryRow -> InternalRowPartitionComputer.generatePartValues -> LinkedHashMap<String, String> -> PartitionPathUtils.generatePartitionPath -> String
这个流程精确地实现了将二进制分区数据转换为 Hive 风格分区目录字符串的功能,验证了之前的描述是完全准确的。
最终路径形态
综合以上追踪,bucketPath 构建出的最终路径形态如下:
{table_root_path}/{optional_data_dir}/{partition_path}/bucket-{bucket_number}
举个例子:
假设:
- 表根路径
root是/tmp/paimon/orders - 分区
partition是(dt='2024-05-20', region='cn') - 桶
bucket是3 - 未配置自定义数据目录
dataFilePathDirectory
那么 bucketPath 的构建过程是:
getPartitionString返回"dt=2024-05-20/region=cn"。relativeBucketPath返回"dt=2024-05-20/region=cn/bucket-3"。bucketPath最终返回的Path对象指向的路径是:/tmp/paimon/orders/dt=2024-05-20/region=cn/bucket-3。
这个路径随后被 DataFilePathFactory 用来作为其根目录,所有新生成的数据文件都会被写入这个目录下。
总结
我们来梳理一下整个流程:
目录是怎么产生的?
- 目录是根据 表根路径 + 分区键值 + 桶号 拼接而成的。例如:
s3://my-bucket/my_table/dt=2023-10-26/bucket-0。 - 这个拼接逻辑发生在
KeyValueFileWriterFactory的内部上下文FileWriterContextFactory中,它在被创建时就接收了分区和桶的信息。
- 目录是根据 表根路径 + 分区键值 + 桶号 拼接而成的。例如:
文件名是怎么产生的?
- 文件名由
DataFilePathFactory负责生成。 - 命名规则是:
前缀-UUID.格式后缀。例如changelog-a9f8b8e8-c8d1-4a2e-9a1b-7b8c7d6e5f4g.parquet。 - UUID 的使用保证了每次生成的文件名都是唯一的,避免了冲突。
- 文件名由
为什么
createRollingChangelogFileWriter的代码看起来是固定的?- 这段代码是一个高度抽象和复用的模板。它的逻辑是固定的:“向上下文请求一个路径工厂,然后用这个工厂创建一个新的、唯一的 changelog 文件路径”。
- 它的动态性体现在
KeyValueFileWriterFactory这个实例本身。每个MergeTreeWriter都会为自己负责的那个唯一的(分区, 桶)组合创建一个专属的KeyValueFileWriterFactory实例。所以,当这个“固定”的模板代码被执行时,它所在的上下文环境(即它属于哪个分区和桶)是完全不同的,因此总能找到正确的目录,并在该目录下生成唯一的文件名。
