当前位置: 首页 > news >正文

Streaming ELT with Flink CDC · Iceberg Sink

1. Iceberg Pipeline Connector 能做什么?

官方给出的关键能力有三条,可以和你前几篇 Sink 篇统一一下心智模型:

  1. 自动建表(Automatic Table Creation)

    • 当 Iceberg 中还不存在目标表时,Connector 可以自动创建;
    • 会根据上游表结构映射出 Iceberg 的字段与类型;
    • 结合 catalog.properties.*table.properties.*,还能定制部分表属性。
  2. Schema 同步(Schema Synchronization)

    • 当上游字段发生变化(比如新增列),可以将 Schema 变更同步到 Iceberg 表;
    • 具体支持的演进能力依赖于 Iceberg 的 Schema 演进规则。
  3. 数据复制 / 同步(Data Replication)

    • 支持批量 & 流式两种同步方式;
    • 可以实现从 MySQL 到 Iceberg 的持续增量同步,将业务库的 binlog 落成 Iceberg 表的快照,以及不断追加新的数据文件。

一句话总结:

Iceberg Pipeline Connector = 自动建表 + 自动 Schema 演进 + 批流一体写入 Iceberg 表。

2. MySQL → Iceberg 的 Pipeline 示例

先看一眼官方给的最小示例,感受一下整体结构:

source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404sink:type: icebergname: Iceberg Sinkcatalog.properties.type: hadoopcatalog.properties.warehouse: /path/warehousepipeline:name: MySQL to Iceberg Pipelineparallelism: 2

2.1 Source:MySQL 变更捕获

source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404

这是一段非常典型的 Flink CDC MySQL Source 配置:

  • tables 支持多库 + 正则匹配:

    • adb.\.*:同步 adb 库全部表;
    • bdb.user_table_[0-9]+:同步 bdb 库分表 user_table_0 / user_table_1 / …
    • [app|web].order_\. *:同步 appweb 两个库下所有 order_ 前缀的表。
  • server-id 使用一段范围,方便多并行度的 Source subtasks 分配不同 server-id。

你可以直接替换为自己业务的库表即可。

2.2 Sink:Iceberg Pipeline Connector

sink:type: icebergname: Iceberg Sinkcatalog.properties.type: hadoopcatalog.properties.warehouse: /path/warehouse

几个关键配置:

  • type: iceberg
    指定使用 Iceberg Pipeline Connector 作为 Sink。

  • catalog.properties.type
    指定 Iceberg Catalog 的类型,目前示例里是 hadoop,还支持 hive 等:

    • hadoop:元数据存本地 / HDFS / 对象存储上;
    • hive:元数据存 Hive Metastore。
  • catalog.properties.warehouse
    仓库根目录(warehouse)的路径,例如:

    catalog.properties.warehouse: hdfs:///iceberg_warehouse
    # 或
    catalog.properties.warehouse: s3://mybucket/iceberg/
    

如果你用的是 hive catalog,还需要配置 catalog.properties.uri 指向 Hive Metastore,例如:

catalog.properties.type: hive
catalog.properties.uri: thrift://hms-host:9083
catalog.properties.warehouse: hdfs:///user/hive/warehouse

2.3 pipeline:任务定义

pipeline:name: MySQL to Iceberg Pipelineparallelism: 2
  • name:任务名,在 Flink CDC CLI UI 和日志中可见;
  • parallelism:Pipeline 的整体并行度,影响 Source 和 Sink 的吞吐能力。

3. Hadoop Catalog 依赖配置(hadoop 类型时)

文档里有一条很重要的 Note:

如果 catalog.properties.typehadoop,需要手动配置以下依赖,并在使用 Flink CDC CLI 提交 YAML 时通过 --jar 传入。

依赖项:

依赖项说明
org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0提供 Hadoop 相关依赖

也就是说,如果你用 Flink CDC CLI:

flink-cdc.sh \pipeline \-f /path/to/pipeline.yaml \--jar /path/to/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

否则在访问 HDFS / 对象存储(依赖 Hadoop FileSystem 实现)时会报类缺失错误。

如果你使用的是 Hive Catalog,也需要保证运行环境里有 Hive / Hadoop 相关依赖,这部分根据你的部署方式略有差异,这里不展开。

4. Iceberg Pipeline Connector 配置项详解

官方“Pipeline Connector Options” 列了一堆参数,我们按功能块来看。

4.1 基础属性

参数是否必填默认值类型说明
typerequired(none)String固定为 iceberg
nameoptional(none)StringSink 名字,主要是标识用

4.2 Catalog 相关属性

参数是否必填默认值类型说明
catalog.properties.typerequired(none)StringIceberg Catalog 类型,支持 hadoophive
catalog.properties.warehouseoptional(none)StringCatalog 的 warehouse 根路径
catalog.properties.urioptional(none)StringMetastore 服务 URI(Hive Catalog 时常用)
catalog.properties.*optional(none)String其他传给 Iceberg Catalog 的配置项

例子:

sink:type: icebergname: Iceberg Sinkcatalog.properties.type: hivecatalog.properties.uri: thrift://hms:9083catalog.properties.warehouse: hdfs:///user/hive/warehousecatalog.properties.client.pool.size: 5catalog.properties.io-impl: org.apache.iceberg.aws.s3.S3FileIO

所有 catalog.properties.* 最终都会透传给 Iceberg Catalog,具体支持项可以参考 Iceberg 官方文档中的 “Catalog options”。

4.3 表级属性

参数是否必填默认值类型说明
table.properties.*optional(none)String传递给 Iceberg 表的属性(如分区、写入策略、小文件管理等)

例如:

table.properties.write.metadata.delete-after-commit.enabled: true
table.properties.write.metadata.previous-versions-max: 10
table.properties.write.parquet.row-group-size-bytes: 134217728

这类属性决定了:

  • 元数据保留策略;
  • 文件格式与大小;
  • 小文件合并策略等。

4.4 分区键设置:partition.key

参数是否必填默认值类型说明
partition.keyoptional(none)String为每个分区表设置分区键,支持多表配置

格式说明:

  • 支持为多个表设置分区键;
  • 表之间用 ; 分隔;
  • 一个表的多个分区键用 , 分隔;
  • 完整格式:db.table:col1,col2;db2.table2:col3

文档中的例子:

partition.key: testdb.table1:id1,id2;testdb.table2:name

表示:

  • testdb.table1 的分区键是 id1, id2
  • testdb.table2 的分区键是 name

注意:
这里的分区键是 Iceberg 表的逻辑分区字段,你后续可以结合 table.properties.* 进一步控制分区策略(例如 identity、bucket、truncate 等),但基础字段的选择先通过 partition.key 指定。

5. 使用注意事项(Usage Notes)

文档里有两条非常关键的 Usage Notes:

5.1 源表必须有主键

The source table must have a primary key. Tables with no primary key are not supported.

Iceberg Connector 要求:

  • 上游 CDC 源表必须有主键;
  • 没有主键的表目前不支持。

这与很多 “upsert sink” 的典型设计一致:
需要借助主键信息来实现 merge upsert、幂等写入等能力。

在实际建模时,建议:

  • 保证业务表都有清晰的主键(id / 业务唯一键);
  • 对于没有主键的历史日志表,考虑先在上游或中间层补一个“技术主键”。

5.2 at-least-once + 主键幂等写入

Exactly-once semantics are not supported. The connector uses at-least-once + the table’s primary key for idempotent writing.

语义说明:

  • Flink CDC → Iceberg 的整体语义是 at-least-once
  • 任务失败 / 重启时可能会重放一部分 binlog 事件;
  • 依赖 Iceberg 表的主键来实现 幂等写入,从结果上接近逻辑上的 exactly-once。

从架构的角度看,这一点和 StarRocks 主键表、Doris Unique Key 表、Elasticsearch + _id 写入策略非常类似。

6. 数据类型映射:CDC type → Iceberg type

最后是非常重要的类型映射表(简洁但关键):

CDC typeIceberg type
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMAL(p, s)DECIMAL(p, s)
BOOLEANBOOLEAN
DATEDATE
TIMESTAMPTIMESTAMP
TIMESTAMP_LTZTIMESTAMP_LTZ
CHAR(n)CHAR(n)
VARCHAR(n)VARCHAR(n)

可以看到,这里非常“对齐”:

  • 标准整数 / 浮点 / DECIMAL / BOOLEAN / DATE 都是一一对应;

  • 时间相关:

    • 普通 TIMESTAMP → Iceberg TIMESTAMP
    • 本地时区 TIMESTAMP_LTZ → Iceberg TIMESTAMP_LTZ
  • 字符串:

    • CHAR / VARCHAR 都直接映射为对应类型。

对于湖仓场景来说,这样的映射比较自然,也便于后续在 Iceberg 上做 SQL、Spark、Flink 等多引擎查询。

7. 实战建议:如何用好 Iceberg Sink?

结合上面的能力和限制,最后给一些更偏工程实践的建议:

  1. 主键一定要设计好

    • 没有主键的表不支持 Iceberg Sink;
    • 主键还会参与幂等写入逻辑,尽量选择真正业务唯一的字段组合;
    • 多列主键也没问题,按业务自然键(例如 user_id + date)来设计。
  2. 合理选择 Catalog 类型

    • 小规模、实验环境:hadoop catalog 简单直接;
    • 生产环境、需求统一元数据管理:推荐 hive catalog 或其他生产级 catalog;
    • 使用 hadoop catalog 要记得通过 --jar 携带对应的 Hadoop 依赖。
  3. 正确配置分区键(partition.key)

    • 对于大明细表,合理的分区是性能与成本的关键;

    • 常见方案:

      • 交易 /日志数据:按日期(dt)分区;
      • 按用户维度聚合:按 user_id hash/bucket 分区;
    • partition.key 中先确定字段,再用 table.properties.* 细化分区策略。

  4. 表属性控制写入行为

    • 利用 table.properties.* 设置:

      • 写入文件的大小(行数 / 字节);
      • 小文件合并策略;
      • 元数据历史版本的保留策略;
    • 尤其是高频更新场景,一定要防止产生海量小文件。

  5. 结合 Time Travel 做数据校对

    • Iceberg 的一个优势是支持快照和时间旅行;
    • 你可以在某个时间点做快照,然后用 SQL 回看当时的表数据;
    • 非常适合结合 Flink CDC 做“历史版本数据核对”。
  6. 监控 & 运维

    • Flink CDC 侧:

      • checkpoint 时间;
      • backpressure 指标;
      • 重启次数和原因。
    • Iceberg 侧(或者底层存储):

      • 文件数量(单表 / 单分区);
      • Warehouse 存储占用;
      • 元数据文件大小与数量(manifest、manifest list)。
http://www.dtcms.com/a/617892.html

相关文章:

  • AI(新手)
  • 海南城乡建设厅网站百度竞价关键词查询
  • QT开发——常用控件(2)
  • 【Java架构师体系课 | MySQL篇】⑥ 索引优化实战二
  • Spring Boot、Redis、RabbitMQ 在项目中的核心作用详解
  • 做完整的网站设计需要的技术长治建立公司网站的步骤
  • 南宁京象建站公司网站建设留言板实验心得
  • AI、LLM全景图
  • pip升级已安装包方法详解
  • 【Linux日新月异(六)】CentOS 7网络命令深度解析:从传统到现代网络管理
  • LangChain 构建 AI 代理(Agent)
  • 人工智能训练师备考——3.1.1题解
  • 【RL】ORPO: Monolithic Preference Optimization without Reference Model
  • 公益平台网站怎么做网站跳出
  • QT的5种标准对话框
  • 用Rust构建一个OCR命令行工具
  • 网站代码大全国内网站设计作品欣赏
  • LeetCode 热题 100——子串——滑动窗口最大值
  • CPP(容器)STL:
  • 【Java常用API】----- Math
  • RAG 系统 “检索 - 筛选 - 生成” 完整流程
  • 时间复杂度 和 嵌入式时钟概念 有关系。 我的理由是:时钟经常需要计算频率,而频率往往需要和时间进行计数次数i 。 时间复杂度就像是计数次数i
  • 公司做普通网站建立网站地图
  • Java 大视界 -- Java 大数据在智能农业病虫害精准识别与绿色防控中的创新应用
  • 【高并发架构】从 0 到亿,从单机部署到 K8s 编排:高并发架构的 8 级演进之路
  • 基于Streamlit的交互式3D手指运动学仿真
  • 甘肃做网站找谁金种子酒业网站建设
  • 使用 Flink CDC Elasticsearch Pipeline Connector 打通 MySQL 与 Elasticsearch 的实时链路
  • 基于视频识别的大模型项目实战心得
  • Firefly-Modeler 体积雕刻:AI 概念到 3D 基模