当前位置: 首页 > news >正文

Paimon 写入磁盘文件名字生成机制

我们来一步步拆解,看看Paimon最终写入磁盘的目录和文件名到底是怎么产生的。

入口:createRollingChangelogFileWriter 的职责

KeyValueFileWriterFactory

// ... existing code ...public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) {WriteFormatKey key = new WriteFormatKey(level, true);return new RollingFileWriter<>(() -> {// 关键步骤 1: 获取路径工厂DataFilePathFactory pathFactory = formatContext.pathFactory(key);// 关键步骤 2: 生成新的 Changelog 文件路径return createDataFileWriter(pathFactory.newChangelogPath(),key,FileSource.APPEND,pathFactory.isExternalPath());},suggestedFileSize);}
// ... existing code ...

这段代码的逻辑 是固定的,它的作用是创建一个 RollingFileWriter。这个写入器的核心是一个文件创建的 lambda 表达式 () -> { ... }。每当需要滚动创建一个新文件时,这个 lambda 就会被执行。

这个 lambda 做了两件核心事情:

  1. formatContext.pathFactory(key): 从一个叫 formatContext 的地方获取一个 DataFilePathFactory
  2. pathFactory.newChangelogPath(): 使用这个工厂来生成一个全新的、唯一的文件路径

所以,问题的关键就转移到了 DataFilePathFactory 是什么,以及它是如何生成路径的。

核心执行者:DataFilePathFactory

DataFilePathFactory 是 Paimon 中专门负责在一个具体的分桶(Bucket)目录内生成文件名的类。

它的工作原理如下:

  • 初始化: 它在被创建时,会接收一个固定的基础目录,这个目录就是这个 Bucket 的路径,例如 /path/to/table/dt=20231026/bucket-0
  • 生成文件名: 它提供了 newPath() (用于数据文件) 和 newChangelogPath() (用于 Changelog 文件) 方法。这些方法会遵循一个固定的命名模式。

我们来看 DataFilePathFactory 的相关代码:

// ... existing code ...public DataFilePathFactory(Path bucketPath,String format,String dataFilePrefix,String changelogFilePrefix,boolean suffixWithCompression,@Nullable String compression,@Nullable String externalPath) {this.bucketPath = bucketPath;this.dataFilePrefix = dataFilePrefix;this.changelogFilePrefix = changelogFilePrefix;// ...}public Path newPath() {return newPath(newFileName(dataFilePrefix));}public Path newChangelogPath() {return newPath(newChangelogFileName());}public String newChangelogFileName() {return newFileName(changelogFilePrefix);}private String newFileName(String prefix) {// 核心:文件名 = 前缀 + UUID + 后缀return prefix + UUID.randomUUID() + suffix;}
// ... existing code ...

从 newFileName 方法可以清晰地看到,文件名是由三部分组成的:

  1. 前缀 (prefix): 对于数据文件是 data-,对于 Changelog 文件是 changelog-
  2. UUID: 一个随机生成的、全局唯一的值。这是保证每次生成的文件名都不同的关键!
  3. 后缀 (suffix): 文件格式和压缩格式,例如 .parquet.snappy

所以,每次调用 pathFactory.newChangelogPath(),都会得到一个类似这样的、带有唯一 UUID 的完整路径:/path/to/table/dt=20231026/bucket-0/changelog-a9f8b8e8-c8d1-4a2e-9a1b-7b8c7d6e5f4g.parquet

private String newFileName(String prefix) {String extension;if (fileSuffixIncludeCompression) {extension = "." + fileCompression + "." + formatIdentifier;} else {extension = "." + formatIdentifier;}return prefix + uuid + "-" + pathCount.getAndIncrement() + extension;}

上下文提供者:KeyValueFileWriterFactory 和 FileWriterContextFactory

现在我们知道了 DataFilePathFactory 是如何工作的,但还有一个问题:KeyValueFileWriterFactory 是如何为正确的分区和分桶获取到这个 DataFilePathFactory 的呢?

答案在 KeyValueFileWriterFactory 的创建过程中。它不是一个全局单例,而是针对特定分区和分桶动态创建的

请看它的 Builder 类:

// ... existing code ...public static class Builder {// ...public KeyValueFileWriterFactory build(BinaryRow partition, int bucket, CoreOptions options) {// 1. 创建一个包含分区和分桶信息的上下文FileWriterContextFactory context =new FileWriterContextFactory(partition,bucket,keyType,valueType,fileFormat,format2PathFactory,options);// 2. 使用这个上下文创建工厂实例return new KeyValueFileWriterFactory(fileIO, schemaId, context, suggestedFileSize, options);}}
// ... existing code ...

MergeTreeWriter 在初始化时,会为它所负责的那个 Bucket 调用这个 build 方法,并传入当前的分区(partition)和桶号(bucket)

这些信息被保存在内部的 FileWriterContextFactory 中。当 createRollingChangelogFileWriter 需要 DataFilePathFactory 时,FileWriterContextFactory 就会利用它持有的分区和桶信息,结合表的根路径,计算出那个 Bucket 的完整基础目录,然后用这个目录来创建一个 DataFilePathFactory 实例。

流程

merge Tree Writer调用

final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);

整个流程可以总结为一种控制反转(Inversion of Control, IoC) 的设计模式:

  • 调用者 (createRollingMergeTreeFileWriter) 不负责创建具体的对象(SingleFileWriter)和资源(文件名),而是定义了一个如何创建这些东西的“工厂”或“配方”(Supplier Lambda)。
  • 使用者 (RollingFileWriter) 持有这个“工厂”,并在自己真正需要一个新对象的时候,才去调用工厂来生产。

文件名的传递路径如下:

KeyValueFileWriterFactory -> 创建一个包含 DataFilePathFactory.newPath() 逻辑的 Lambda (Supplier) -> RollingFileWriter 构造函数 -> RollingFileWriter.writerFactory 成员变量 -> RollingFileWriter.write() 首次调用 -> openCurrentWriter() -> writerFactory.get() 执行 Lambda -> DataFilePathFactory.newPath() 生成新路径 -> SingleFileWriter 构造函数 -> SingleFileWriter.path 成员变量。

这种设计的好处是:

  1. 解耦RollingFileWriter 不需要关心文件名的生成策略,它只负责在合适的时机(文件满了)请求一个新的写入器。
  2. 灵活:文件名的生成策略可以非常复杂(比如包含分区、bucket、level、UUID等信息),但这些复杂性都被封装在了 DataFilePathFactory 和传递的 Lambda 中,对 RollingFileWriter 是透明的。
  3. 高效:只有在真正需要写入文件时,才会去生成文件名和创建文件流,避免了不必要的资源预创建。

FileWriterContextFactory

FileWriterContextFactory 是 KeyValueFileWriterFactory 的一个私有静态内部类。它的核心职责是为文件写入器(FileWriter)创建和管理上下文信息,其中就包括了文件路径的生成逻辑。

简而言之,它通过一个工厂模式和委托机制来为指定的分区(partition)和桶(bucket)生成对应的文件路径工厂。

FileWriterContextFactory 在其构造函数中接收了 partition 和 bucket 的信息,并将它们保存为成员变量。同时,它还接收了一个名为 parentFactories 的函数。

// ... existing code ...private final Function<String, FileStorePathFactory> parentFactories;private final CoreOptions options;private final boolean thinModeEnabled;private FileWriterContextFactory(BinaryRow partition,int bucket,RowType keyType,RowType valueType,FileFormat defaultFileFormat,Function<String, FileStorePathFactory> parentFactories,CoreOptions options) {this.partition = partition;this.bucket = bucket;this.keyType = keyType;
// ... existing code ...this.parentFactories = parentFactories;this.options = options;
// ... existing code ...
  • partitionBinaryRow 类型,代表了当前写入数据所属的分区值。
  • bucketint 类型,代表了数据所属的桶编号。
  • parentFactories: 这是一个函数 Function<String, FileStorePathFactory>,它的作用是根据文件格式(如 "orc", "parquet")的字符串标识,返回一个对应的 FileStorePathFactory 实例。这个 FileStorePathFactory 是路径生成的顶层工厂。

FileWriterContextFactory 中最关键的方法是 pathFactory(WriteFormatKey key)。这个方法负责创建并返回一个 DataFilePathFactory,该工厂专门用于为之前指定的分区和桶生成文件路径。

// ... existing code ...return fileFormat(format).createStatsExtractor(writeRowType, statsFactories);}private DataFilePathFactory pathFactory(WriteFormatKey key) {String format = key2Format.apply(key);return format2PathFactory.computeIfAbsent(format,k ->parentFactories.apply(format).createDataFilePathFactory(partition, bucket));}private FormatWriterFactory writerFactory(WriteFormatKey key) {
// ... existing code ...

这里的核心逻辑在 computeIfAbsent 的 lambda 表达式中: parentFactories.apply(format).createDataFilePathFactory(partition, bucket)

这行代码的执行流程如下:

  1. key2Format.apply(key): 首先根据写入的层级(level)和是否为 changelog 文件,确定具体的文件格式(format),例如 "orc"。
  2. parentFactories.apply(format): 调用传入的 parentFactories 函数,根据文件格式获取一个通用的 FileStorePathFactory 实例。
  3. .createDataFilePathFactory(partition, bucket): 这是最关键的一步。它调用 FileStorePathFactory 的 createDataFilePathFactory 方法,并将构造函数中保存的 partition 和 bucket 信息传递进去。这个方法会返回一个 DataFilePathFactory 的实例,这个实例内部已经包含了分区和桶的路径信息

createDataFilePathFactory 方法

会调用 bucketPath(partition, bucket) 的结果,并将其作为新创建的 DataFilePathFactory 实例的根目录

从代码中可以看到,bucketPath(...) 的返回值是传递给 DataFilePathFactory 构造函数的第一个参数,这个参数定义了该工厂后续创建的所有数据文件的存放位置。

// ... existing code ...public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) {return new DataFilePathFactory(bucketPath(partition, bucket),formatIdentifier,
// ... existing code ...

现在我们来递归地追踪 bucketPath(partition, bucket) 是如何构建路径的。

第一层: bucketPath

这个方法很简单,它将表的根目录 root 和一个相对路径 relativeBucketPath 组合成一个绝对路径。

// ... existing code ...public Path bucketPath(BinaryRow partition, int bucket) {return new Path(root, relativeBucketPath(partition, bucket));}public Path relativeBucketPath(BinaryRow partition, int bucket) {
// ... existing code ...
  • root: 这是 FileStorePathFactory 的成员变量,代表表的根路径,例如 /path/to/my_table

第二层: relativeBucketPath

这个方法是构建路径的核心,它负责生成分区和桶对应的相对目录结构。

// ... existing code ...public Path relativeBucketPath(BinaryRow partition, int bucket) {String bucketName = String.valueOf(bucket);if (bucket == BucketMode.POSTPONE_BUCKET) {bucketName = "postpone";}Path relativeBucketPath = new Path(BUCKET_PATH_PREFIX + bucketName);String partitionPath = getPartitionString(partition);if (!partitionPath.isEmpty()) {relativeBucketPath = new Path(partitionPath, relativeBucketPath);}if (dataFilePathDirectory != null) {relativeBucketPath = new Path(dataFilePathDirectory, relativeBucketPath);}return relativeBucketPath;}
// ... existing code ...

它的构建步骤如下:

  1. 根据传入的 bucket 号码创建一个桶目录名,例如 bucket-5 (BUCKET_PATH_PREFIX 是常量 "bucket-")。
  2. 调用 getPartitionString(partition) 方法获取分区的路径字符串。
  3. 如果分区路径不为空,就将其加在桶目录的前面,形成类似 dt=2024-01-01/hr=12/bucket-5 的结构。
  4. 如果用户配置了自定义数据目录 (dataFilePathDirectory),还会将这个目录加在最前面。

第三层: getPartitionString

这个方法负责将 BinaryRow 格式的分区信息转换成人类可读的、Hive风格的目录字符串。

// ... existing code .../** IMPORTANT: This method is NOT THREAD SAFE. */public String getPartitionString(BinaryRow partition) {return PartitionPathUtils.generatePartitionPath(partitionComputer.generatePartValues(Preconditions.checkNotNull(partition, "Partition row data is null. This is unexpected.")));}
// ... existing code ...

它通过内部的 partitionComputer 将 BinaryRow 对象解析成键值对(如 {"dt": "2024-01-01", "hr": "12"}),然后利用 PartitionPathUtils.generatePartitionPath 生成最终的字符串 dt=2024-01-01/hr=12

getPartitionString(BinaryRow partition) 

该方法通过一个两步过程,将二进制的 BinaryRow 分区数据转换为了我们熟悉的 Hive 风格的分区目录字符串(如 dt=2024-01-01/hr=12)。

 FileStorePathFactory.getPartitionString

// ... existing code .../** IMPORTANT: This method is NOT THREAD SAFE. */public String getPartitionString(BinaryRow partition) {return PartitionPathUtils.generatePartitionPath(partitionComputer.generatePartValues(Preconditions.checkNotNull(partition, "Partition row data is null. This is unexpected.")));}// @TODO, need to be changed
// ... existing code ...

可以看到,它的核心逻辑是调用了另外两个方法:

  1. partitionComputer.generatePartValues(...)
  2. PartitionPathUtils.generatePartitionPath(...)

我们需要依次深入这两个调用。

InternalRowPartitionComputer.generatePartValues

partitionComputer 是 InternalRowPartitionComputer 的实例。它的 generatePartValues 方法负责将原始的、二进制的 BinaryRow 转换成一个 Java 的 LinkedHashMap<String, String>,也就是一个有序的键值对集合。

// ... (部分代码)public LinkedHashMap<String, String> generatePartValues(InternalRow partition) {LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();for (int i = 0; i < partitionColumns.length; i++) {Object field = getters[i].getFieldOrNull(partition);String partitionValue = field == null ? defaultPartValue : field.toString();if (partitionValue == null) {throw new RuntimeException("Null partition value is not supported! "+ "You can use `partition.default-name` to specify a default partition value.");}partSpec.put(partitionColumns[i], partitionValue);}return partSpec;}
// ... (部分代码)

这个方法的逻辑是:

  1. 遍历所有的分区列名(例如 ["dt", "hr"])。
  2. 对于每一列,从 BinaryRow 中提取出对应位置的值。例如,对于 dt 列,它会从 BinaryRow 的第 0 个位置读取数据。
  3. 将提取出的值转换成字符串。如果值为 null,则使用表配置的默认分区名。
  4. 将列名作为 key,转换后的字符串值作为 value,存入 LinkedHashMap

举例来说,如果分区列是 (dt STRING, hr INT),传入的 BinaryRow 代表的值是 ('2024-05-20', 10),那么这个方法执行完后,会返回一个内容为 {"dt": "2024-05-20", "hr": "10"} 的 LinkedHashMap

PartitionPathUtils.generatePartitionPath

现在,上一步生成的 LinkedHashMap 被传递给了 PartitionPathUtils.generatePartitionPath 方法。这个方法负责将键值对格式化成最终的目录字符串

// ... (部分代码)public static String generatePartitionPath(Map<String, String> spec) {if (spec.isEmpty()) {return "";}StringBuilder suffixBuf = new StringBuilder();int i = 0;for (Map.Entry<String, String> e : spec.entrySet()) {if (i > 0) {suffixBuf.append(Path.SEPARATOR);}suffixBuf.append(escapePathName(e.getKey()));suffixBuf.append('=');suffixBuf.append(escapePathName(e.getValue()));i++;}return suffixBuf.toString();}
// ... (部分代码)

这个方法的逻辑非常清晰:

  1. 遍历传入的 Map(也就是上一步生成的 {"dt": "2024-05-20", "hr": "10"})。
  2. 对于每一个键值对,它会拼接成 key=value 的形式,例如 dt=2024-05-20
  3. 在多个键值对之间,用路径分隔符 / 连接。
  4. 最终返回拼接好的完整字符串。

继续上面的例子,当这个方法接收到 {"dt": "2024-05-20", "hr": "10"} 后,它会返回字符串 "dt=2024-05-20/hr=10"

通过以上分析,我们可以清晰地看到 getPartitionString 的完整工作流程:

BinaryRow -> InternalRowPartitionComputer.generatePartValues -> LinkedHashMap<String, String> -> PartitionPathUtils.generatePartitionPath -> String

这个流程精确地实现了将二进制分区数据转换为 Hive 风格分区目录字符串的功能,验证了之前的描述是完全准确的。

最终路径形态

综合以上追踪,bucketPath 构建出的最终路径形态如下:

{table_root_path}/{optional_data_dir}/{partition_path}/bucket-{bucket_number}

举个例子:

假设:

  • 表根路径 root 是 /tmp/paimon/orders
  • 分区 partition 是 (dt='2024-05-20', region='cn')
  • 桶 bucket 是 3
  • 未配置自定义数据目录 dataFilePathDirectory

那么 bucketPath 的构建过程是:

  1. getPartitionString 返回 "dt=2024-05-20/region=cn"
  2. relativeBucketPath 返回 "dt=2024-05-20/region=cn/bucket-3"
  3. bucketPath 最终返回的 Path 对象指向的路径是:/tmp/paimon/orders/dt=2024-05-20/region=cn/bucket-3

这个路径随后被 DataFilePathFactory 用来作为其根目录,所有新生成的数据文件都会被写入这个目录下。

总结

我们来梳理一下整个流程:

  1. 目录是怎么产生的?

    • 目录是根据 表根路径 + 分区键值 + 桶号 拼接而成的。例如:s3://my-bucket/my_table/dt=2023-10-26/bucket-0
    • 这个拼接逻辑发生在 KeyValueFileWriterFactory 的内部上下文 FileWriterContextFactory 中,它在被创建时就接收了分区和桶的信息。
  2. 文件名是怎么产生的?

    • 文件名由 DataFilePathFactory 负责生成。
    • 命名规则是:前缀-UUID.格式后缀。例如 changelog-a9f8b8e8-c8d1-4a2e-9a1b-7b8c7d6e5f4g.parquet
    • UUID 的使用保证了每次生成的文件名都是唯一的,避免了冲突。
  3. 为什么 createRollingChangelogFileWriter 的代码看起来是固定的?

    • 这段代码是一个高度抽象和复用的模板。它的逻辑是固定的:“向上下文请求一个路径工厂,然后用这个工厂创建一个新的、唯一的 changelog 文件路径”。
    • 它的动态性体现在 KeyValueFileWriterFactory 这个实例本身。每个 MergeTreeWriter 都会为自己负责的那个唯一的 (分区, 桶) 组合创建一个专属的 KeyValueFileWriterFactory 实例。所以,当这个“固定”的模板代码被执行时,它所在的上下文环境(即它属于哪个分区和桶)是完全不同的,因此总能找到正确的目录,并在该目录下生成唯一的文件名。
http://www.dtcms.com/a/274803.html

相关文章:

  • 2025年NSSCTF-青海民族大学 2025 新生赛WP
  • 学习C++、QT---20(C++的常用的4种信号与槽、自定义信号与槽的讲解)
  • 动力系统模拟与推导-AI云计算数值分析和代码验证
  • BLE低功耗设计:从广播模式到连接参数优化的全链路分析与真题解析
  • Django母婴商城项目实践(一)
  • 【JMeter】接口加密
  • 蜗轮丝杆升降机拆装图
  • 在多个DHCP服务器的网络环境中选择指定的DHCP服务
  • Windows GNU Radio避坑
  • 深入探究编程拷贝
  • mysql的性能优化:组提交、数据页复用、全表扫描优化、刷脏页
  • Vue 表单开发避坑指南:从响应式数据到动态规则的实践总结
  • Go 编译报错排查:vendor/golang.org/x/crypto/cryptobyte/asn1 no Go source files
  • Java外包怎么选?这几点不注意,项目可能血亏!
  • day21——特殊文件:XML、Properties、以及日志框架
  • Linux中geoserver中文乱码
  • 离线环境二进制安装docker
  • uniapp获取状态栏高度,胶囊按钮的高度,底部安全区域的高度,自定义导航栏
  • [实战]调频三角波和锯齿波信号生成(完整C代码)
  • hbuilderx打包的应用上传苹果应用商店最简方法
  • 字节豆包又一个新功能,超级实用,4 种玩法,你肯定用得上!(建议收藏)
  • Uniapp视频聊天软件内容监控插件开发指南
  • OA系统中的搜索功能方案:简单搜索vs高级搜索
  • 2-Git提交本地项目到远程仓库
  • 问有几条病狗?
  • 【linux网络】深入理解 TCP/UDP:从基础端口号到可靠传输机制全解析
  • 机器学习-06(Optimization-自动调整学习率)
  • consul 的安装与服务发现
  • MOSS-TTSD V2版 - 文本到语音对话生成 支持零样本多人语音克隆 一键整合包下载
  • 一文速览DeepSeek-R1的本地部署——可联网、可实现本地知识库问答(附教程)