当前位置: 首页 > news >正文

Paimon系列:IDEA环境读写Paimon表

本文基于 Apache Flink 1.16 和 Apache Paimon 1.0.1,详细介绍如何配置 Paimon 环境、创建和使用追加表(Append-Only Table)与主键表(Primary-Key Table)的特点和使用。


一. IDEA 环境准备

下载官方paimon-1.0.1版本,本地编译选择对应的flink版本

图片

把对应的jar包安装到本地仓库:

mvn install:install-file -DgroupId=org.apache.paimon -DartifactId=paimon-flink-1.16 -Dversion=1.0.1 -Dpackaging=jar -Dfile=paimon-flink-1.16-1.0.1.jar

在工程里引用对应的依赖

<dependency>                 <groupId>org.apache.paimon</groupId>                 <artifactId>paimon-flink-1.16</artifactId>     <version>1.0.1</version>                 <scope>provided</scope>       </dependency>

二. 追加表

1. 追加表(Append Only Table) 特点

追加表是 Apache Paimon 中没有定义主键的表类型,默认情况下,表如果未指定主键,就被视为追加表。这种表主要用于追加数据的场景,类似于日志数据同步或不需要频繁更新的数据湖存储

特点:

      1.1. 无主键:追加表不定义主键,因此无法直接通过主键进行数据更新或删除操作。

      1.2. 分为Scalable 表 和 Queue 表:Scalable 表没有桶的概念,无需考虑数据顺序、无需对数据进行hash partitioning;Queue 表按 bucket-key 分发,每个分桶中数据的消费顺序与数据写入Paimon表的顺序一致;

2. Scalable Table 的创建和写入

创建基本本地文件的 Paimon catalog,无主键表,定义 'bucket' = '-1'  ​​​​​​​

/** * @author BigData共享 */public class AppendTable {        public static void main(String[] args) throws Exception {             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();                env.setParallelism(1);                env.enableCheckpointing(10000l);                env.getCheckpointConfig().setCheckpointStorage(new MemoryStateBackend(20 * 1024 * 1024));                        TableEnvironment tableEnv = StreamTableEnvironment.create(env);                // 创建 paimon catalog                tableEnv.executeSql("CREATE CATALOG paimon_catalog WITH (\n" +                           "    'type'='paimon',\n" +                           "    'warehouse'='file:///tmp/paimon'\n" +                           ")");                           tableEnv.executeSql("USE CATALOG paimon_catalog");                        // 创建 scalable table                tableEnv.executeSql("CREATE TABLE IF NOT EXISTS scalable_table (\n" +                           "    word STRING,\n" +                           "    dt STRING\n" +                           ") PARTITIONED BY (dt) with  (\n" +                           " 'bucket' = '-1'\n" +                           ")");                           tableEnv.executeSql("CREATE TEMPORARY TABLE IF NOT EXISTS word_table (\n" +                           "    id INT,\n" +                           "    word STRING\n" +                           ") WITH (\n" +                           "    'connector' = 'datagen',\n" +                           "    'fields.id.kind' = 'random',\n" +                           "    'fields.id.min' = '1',\n" +                           "    'fields.id.max' = '100',\n" +                           "    'fields.word.length' = '1'\n" +                           ")");                           // 插入数据                tableEnv.executeSql("INSERT INTO scalable_table SELECT word, '20250816' FROM word_table");                env.execute();       }}

创建对应的元数据文件,mainfest,schema, snapshot 和对应的分区数据 dt=20250816

图片

3. Queue 的创建和写入

创建基本本地文件的 Paimon catalog,无主键表,定义 'bucket' = '3', 'bucket-key' = 'id'​​​​​​​

/** * @author BigData共享 */public class AppendTable {        public static void main(String[] args) throws Exception {                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();                env.setParallelism(1);                env.enableCheckpointing(10000l);                env.getCheckpointConfig().setCheckpointStorage(new MemoryStateBackend(20 * 1024 * 1024));                TableEnvironment tableEnv = StreamTableEnvironment.create(env);                        // 创建 paimon catalog                tableEnv.executeSql("CREATE CATALOG paimon_catalog WITH (\n" +                           "    'type'='paimon',\n" +                           "    'warehouse'='file:///tmp/paimon'\n" +                           ")");        
        tableEnv.executeSql("USE CATALOG paimon_catalog");                        // 创建 queue table                tableEnv.executeSql("CREATE TABLE IF NOT EXISTS queue_table (\n" +                           "    id INT,\n" +                           "    word STRING,\n" +                           "    dt STRING\n" +                           ") PARTITIONED BY (dt) with  (\n" +                           " 'bucket' = '3',\n" +                           " 'bucket-key' = 'id'\n" +                           ")");                           tableEnv.executeSql("CREATE TEMPORARY TABLE IF NOT EXISTS word_table (\n" +                           "    id INT,\n" +                           "    word STRING\n" +                           ") WITH (\n" +                           "    'connector' = 'datagen',\n" +                           "    'fields.id.kind' = 'random',\n" +                           "    'fields.id.min' = '1',\n" +                           "    'fields.id.max' = '100',\n" +                           "    'fields.word.length' = '1'\n" +                           ")");                           // 插入数据                tableEnv.executeSql("INSERT INTO queue_table SELECT id, word, '20250816' FROM word_table");                env.execute();       }}

创建对应的元数据文件,mainfest,schema, snapshot 和对应的分区数据 dt=20250816,分桶数据:bucket-0, bucket-1, bucket-2

图片


三. Primary-Key 表

1. Primary-Key Table(主键表)特点

主键表是 Apache Paimon 中定义了主键的表类型,支持高效的 upsert 操作和变更日志生成,适合需要实时更新和复杂数据处理的应用场景

特点:

     1.1. 主键约束:主键由一组列组成,保证每条记录的唯一性。Paimon 在每个 bucket 内按主键排序,优化基于主键的查询性能。

     1.2. 支持 upsert 语义:支持插入、更新和删除操作,适合处理动态变化的数据(如数据库的 binlog)。

     1.3. 合并引擎:主键表通过合并引擎(Merge Engine)处理具有相同主键的记录,支持以下类型:Deduplicate/Partial Update/Aggregation/First Row。

     1.4. 分桶模式:

  • 动态分桶(bucket = -1):自动调整分桶数量,不支持多 Flink 作业并发写入。
  • 固定分桶(bucket = <num>):指定固定分桶数量,支持并发写入,但需确保主键包含所有分区键以避免跨分区更新。

   1.5. Changelog 生成:支持生成完整的变更日志(INSERT、UPDATE、DELETE),通过 changelog-producer 配置:none/input/lookup/full-compaction。

2. Primary-Key 的创建和写入

创建基本本地文件的 Paimon catalog,定义分区键 PRIMARY KEY (dt, id) NOT ENFORCED;​​​​​​​

/** * @author BigData共享 */public class PrimaryKeyTable {       public static void main(String[] args) throws Exception {              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();              env.setParallelism(1);              env.enableCheckpointing(10000l);              env.getCheckpointConfig().setCheckpointStorage(new MemoryStateBackend(20 * 1024 * 1024));              TableEnvironment tableEnv = StreamTableEnvironment.create(env);              tableEnv.getConfig().set("table.exec.sink.upsert-materialize", "NONE");                    // 创建 paimon catalog              tableEnv.executeSql("CREATE CATALOG paimon_catalog WITH (\n" +                         "    'type'='paimon',\n" +                         "    'warehouse'='file:///tmp/paimon'\n" +                         ")");              tableEnv.executeSql("USE CATALOG paimon_catalog");                    // 创建 primary-key table              tableEnv.executeSql("CREATE TABLE IF NOT EXISTS primary_key_table (\n" +                         "    id INT,\n" +                         "    word STRING,\n" +                         "    dt STRING,\n" +                         "    PRIMARY KEY (dt, id) NOT ENFORCED\n" +                         ")\n" +                         "PARTITIONED BY (dt) with  (\n" +                         "   'bucket' = '-1'\n" +                         ")");                       tableEnv.executeSql("CREATE TEMPORARY TABLE IF NOT EXISTS word_table (\n" +                         "    id INT,\n" +                         "    word STRING\n" +                         ") WITH (\n" +                         "    'connector' = 'datagen',\n" +                         "    'fields.id.kind' = 'random',\n" +                         "    'fields.id.min' = '1',\n" +                         "    'fields.id.max' = '100',\n" +                         "    'fields.word.length' = '1'\n" +                         ")");                       // 插入数据              tableEnv.executeSql("INSERT INTO primary_key_table (SELECT id, word, '20250816' FROM word_table UNION SELECT id, word, '20250817' FROM word_table)");              env.execute();       }}

创建对应的元数据文件,mainfest,schema, snapshot,需要注意的是,下面元数据文件中多了个index文件; 对应的分区数据 dt=20250816,dt=20250817

图片

3. 使用注意事项

      多个insert into table_xx 语句写入同一张表会有问题,可以先把多条select 语句 union 后再插入;或者表新增参数:'write-only'='true',然后再单独启动一个Dedicated Compaction Job。

后面的文章会对主键表的特性做详细分析。

http://www.dtcms.com/a/393188.html

相关文章:

  • Java进阶教程,全面剖析Java多线程编程,插入线程,笔记10
  • 成品油加油站综合服务智慧监管平台! “智控油站,计量秒核查” + “数治加油,科技防猫腻”
  • C++编程学习(第34天)
  • 【数据结构与算法Trip第5站】动态规划
  • 防抖那些事儿
  • 【办公类-115-01】20250920信息员每周通讯上传之文字稿整理
  • 深入解析HotSpot解释器方法调用机制:从invokevirtual到方法入口
  • 用AI修复失语者的声音:大模型如何帮助渐冻人重新“说话”?
  • 【ICCV 2023】通过学习采样来学习上采样
  • 有效解决舍入误差的方法
  • count down 90 days
  • GEO完全指南 AI时代的内容优化新范式
  • Npass gate transistor是什么器件?
  • TensorRT-LLM中的in-flight batching(IFB)
  • kaggle-NeurIPS - Open Polymer Prediction 2025-0.069
  • Linux基础网络命令
  • CTFHub 密码口令通关笔记:默认密码
  • IPv6 网络连接与 “v6ns“ 测试失败解决方案
  • C++11之move移动语义
  • 【计算机三级备考】真题总结
  • Linux基础指令(上)
  • Protocol Buffers .NET 运行时从核心 API 到工程实战
  • DFT教程 part1 VASP安装与学习推荐
  • 泛函 Φ(u) 驻点所满足的偏微分方程与自然边界条件
  • 基于springboot的健康饮食营养管理系统
  • C语言入门指南:联合体与枚举
  • JS逆向 -去哪儿滑块
  • C++包装器(Wrapper)概述
  • java后端工程师进修ing(研一版‖day47)
  • 小谈:物联网(IoT)与工业传感器技术