Streaming ELT with Flink CDC · Iceberg Sink
1. Iceberg Pipeline Connector 能做什么?
官方给出的关键能力有三条,可以和你前几篇 Sink 篇统一一下心智模型:
-
自动建表(Automatic Table Creation)
- 当 Iceberg 中还不存在目标表时,Connector 可以自动创建;
- 会根据上游表结构映射出 Iceberg 的字段与类型;
- 结合
catalog.properties.*和table.properties.*,还能定制部分表属性。
-
Schema 同步(Schema Synchronization)
- 当上游字段发生变化(比如新增列),可以将 Schema 变更同步到 Iceberg 表;
- 具体支持的演进能力依赖于 Iceberg 的 Schema 演进规则。
-
数据复制 / 同步(Data Replication)
- 支持批量 & 流式两种同步方式;
- 可以实现从 MySQL 到 Iceberg 的持续增量同步,将业务库的 binlog 落成 Iceberg 表的快照,以及不断追加新的数据文件。
一句话总结:
Iceberg Pipeline Connector = 自动建表 + 自动 Schema 演进 + 批流一体写入 Iceberg 表。
2. MySQL → Iceberg 的 Pipeline 示例
先看一眼官方给的最小示例,感受一下整体结构:
source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404sink:type: icebergname: Iceberg Sinkcatalog.properties.type: hadoopcatalog.properties.warehouse: /path/warehousepipeline:name: MySQL to Iceberg Pipelineparallelism: 2
2.1 Source:MySQL 变更捕获
source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404
这是一段非常典型的 Flink CDC MySQL Source 配置:
-
tables支持多库 + 正则匹配:adb.\.*:同步adb库全部表;bdb.user_table_[0-9]+:同步bdb库分表user_table_0 / user_table_1 / …;[app|web].order_\. *:同步app、web两个库下所有order_前缀的表。
-
server-id使用一段范围,方便多并行度的 Source subtasks 分配不同 server-id。
你可以直接替换为自己业务的库表即可。
2.2 Sink:Iceberg Pipeline Connector
sink:type: icebergname: Iceberg Sinkcatalog.properties.type: hadoopcatalog.properties.warehouse: /path/warehouse
几个关键配置:
-
type: iceberg
指定使用 Iceberg Pipeline Connector 作为 Sink。 -
catalog.properties.type
指定 Iceberg Catalog 的类型,目前示例里是hadoop,还支持hive等:hadoop:元数据存本地 / HDFS / 对象存储上;hive:元数据存 Hive Metastore。
-
catalog.properties.warehouse
仓库根目录(warehouse)的路径,例如:catalog.properties.warehouse: hdfs:///iceberg_warehouse # 或 catalog.properties.warehouse: s3://mybucket/iceberg/
如果你用的是 hive catalog,还需要配置 catalog.properties.uri 指向 Hive Metastore,例如:
catalog.properties.type: hive
catalog.properties.uri: thrift://hms-host:9083
catalog.properties.warehouse: hdfs:///user/hive/warehouse
2.3 pipeline:任务定义
pipeline:name: MySQL to Iceberg Pipelineparallelism: 2
name:任务名,在 Flink CDC CLI UI 和日志中可见;parallelism:Pipeline 的整体并行度,影响 Source 和 Sink 的吞吐能力。
3. Hadoop Catalog 依赖配置(hadoop 类型时)
文档里有一条很重要的 Note:
如果
catalog.properties.type是hadoop,需要手动配置以下依赖,并在使用 Flink CDC CLI 提交 YAML 时通过--jar传入。
依赖项:
| 依赖项 | 说明 |
|---|---|
org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0 | 提供 Hadoop 相关依赖 |
也就是说,如果你用 Flink CDC CLI:
flink-cdc.sh \pipeline \-f /path/to/pipeline.yaml \--jar /path/to/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
否则在访问 HDFS / 对象存储(依赖 Hadoop FileSystem 实现)时会报类缺失错误。
如果你使用的是 Hive Catalog,也需要保证运行环境里有 Hive / Hadoop 相关依赖,这部分根据你的部署方式略有差异,这里不展开。
4. Iceberg Pipeline Connector 配置项详解
官方“Pipeline Connector Options” 列了一堆参数,我们按功能块来看。
4.1 基础属性
| 参数 | 是否必填 | 默认值 | 类型 | 说明 |
|---|---|---|---|---|
type | required | (none) | String | 固定为 iceberg |
name | optional | (none) | String | Sink 名字,主要是标识用 |
4.2 Catalog 相关属性
| 参数 | 是否必填 | 默认值 | 类型 | 说明 |
|---|---|---|---|---|
catalog.properties.type | required | (none) | String | Iceberg Catalog 类型,支持 hadoop、hive 等 |
catalog.properties.warehouse | optional | (none) | String | Catalog 的 warehouse 根路径 |
catalog.properties.uri | optional | (none) | String | Metastore 服务 URI(Hive Catalog 时常用) |
catalog.properties.* | optional | (none) | String | 其他传给 Iceberg Catalog 的配置项 |
例子:
sink:type: icebergname: Iceberg Sinkcatalog.properties.type: hivecatalog.properties.uri: thrift://hms:9083catalog.properties.warehouse: hdfs:///user/hive/warehousecatalog.properties.client.pool.size: 5catalog.properties.io-impl: org.apache.iceberg.aws.s3.S3FileIO
所有 catalog.properties.* 最终都会透传给 Iceberg Catalog,具体支持项可以参考 Iceberg 官方文档中的 “Catalog options”。
4.3 表级属性
| 参数 | 是否必填 | 默认值 | 类型 | 说明 |
|---|---|---|---|---|
table.properties.* | optional | (none) | String | 传递给 Iceberg 表的属性(如分区、写入策略、小文件管理等) |
例如:
table.properties.write.metadata.delete-after-commit.enabled: true
table.properties.write.metadata.previous-versions-max: 10
table.properties.write.parquet.row-group-size-bytes: 134217728
这类属性决定了:
- 元数据保留策略;
- 文件格式与大小;
- 小文件合并策略等。
4.4 分区键设置:partition.key
| 参数 | 是否必填 | 默认值 | 类型 | 说明 |
|---|---|---|---|---|
partition.key | optional | (none) | String | 为每个分区表设置分区键,支持多表配置 |
格式说明:
- 支持为多个表设置分区键;
- 表之间用
;分隔; - 一个表的多个分区键用
,分隔; - 完整格式:
db.table:col1,col2;db2.table2:col3
文档中的例子:
partition.key: testdb.table1:id1,id2;testdb.table2:name
表示:
testdb.table1的分区键是id1, id2;testdb.table2的分区键是name。
注意:
这里的分区键是 Iceberg 表的逻辑分区字段,你后续可以结合table.properties.*进一步控制分区策略(例如 identity、bucket、truncate 等),但基础字段的选择先通过partition.key指定。
5. 使用注意事项(Usage Notes)
文档里有两条非常关键的 Usage Notes:
5.1 源表必须有主键
The source table must have a primary key. Tables with no primary key are not supported.
Iceberg Connector 要求:
- 上游 CDC 源表必须有主键;
- 没有主键的表目前不支持。
这与很多 “upsert sink” 的典型设计一致:
需要借助主键信息来实现 merge upsert、幂等写入等能力。
在实际建模时,建议:
- 保证业务表都有清晰的主键(id / 业务唯一键);
- 对于没有主键的历史日志表,考虑先在上游或中间层补一个“技术主键”。
5.2 at-least-once + 主键幂等写入
Exactly-once semantics are not supported. The connector uses at-least-once + the table’s primary key for idempotent writing.
语义说明:
- Flink CDC → Iceberg 的整体语义是 at-least-once;
- 任务失败 / 重启时可能会重放一部分 binlog 事件;
- 依赖 Iceberg 表的主键来实现 幂等写入,从结果上接近逻辑上的 exactly-once。
从架构的角度看,这一点和 StarRocks 主键表、Doris Unique Key 表、Elasticsearch + _id 写入策略非常类似。
6. 数据类型映射:CDC type → Iceberg type
最后是非常重要的类型映射表(简洁但关键):
| CDC type | Iceberg type |
|---|---|
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL(p, s) | DECIMAL(p, s) |
| BOOLEAN | BOOLEAN |
| DATE | DATE |
| TIMESTAMP | TIMESTAMP |
| TIMESTAMP_LTZ | TIMESTAMP_LTZ |
| CHAR(n) | CHAR(n) |
| VARCHAR(n) | VARCHAR(n) |
可以看到,这里非常“对齐”:
-
标准整数 / 浮点 / DECIMAL / BOOLEAN / DATE 都是一一对应;
-
时间相关:
- 普通
TIMESTAMP→ IcebergTIMESTAMP; - 本地时区
TIMESTAMP_LTZ→ IcebergTIMESTAMP_LTZ;
- 普通
-
字符串:
- CHAR / VARCHAR 都直接映射为对应类型。
对于湖仓场景来说,这样的映射比较自然,也便于后续在 Iceberg 上做 SQL、Spark、Flink 等多引擎查询。
7. 实战建议:如何用好 Iceberg Sink?
结合上面的能力和限制,最后给一些更偏工程实践的建议:
-
主键一定要设计好
- 没有主键的表不支持 Iceberg Sink;
- 主键还会参与幂等写入逻辑,尽量选择真正业务唯一的字段组合;
- 多列主键也没问题,按业务自然键(例如 user_id + date)来设计。
-
合理选择 Catalog 类型
- 小规模、实验环境:
hadoopcatalog 简单直接; - 生产环境、需求统一元数据管理:推荐
hivecatalog 或其他生产级 catalog; - 使用
hadoopcatalog 要记得通过--jar携带对应的 Hadoop 依赖。
- 小规模、实验环境:
-
正确配置分区键(partition.key)
-
对于大明细表,合理的分区是性能与成本的关键;
-
常见方案:
- 交易 /日志数据:按日期(dt)分区;
- 按用户维度聚合:按 user_id hash/bucket 分区;
-
在
partition.key中先确定字段,再用table.properties.*细化分区策略。
-
-
表属性控制写入行为
-
利用
table.properties.*设置:- 写入文件的大小(行数 / 字节);
- 小文件合并策略;
- 元数据历史版本的保留策略;
-
尤其是高频更新场景,一定要防止产生海量小文件。
-
-
结合 Time Travel 做数据校对
- Iceberg 的一个优势是支持快照和时间旅行;
- 你可以在某个时间点做快照,然后用 SQL 回看当时的表数据;
- 非常适合结合 Flink CDC 做“历史版本数据核对”。
-
监控 & 运维
-
Flink CDC 侧:
- checkpoint 时间;
- backpressure 指标;
- 重启次数和原因。
-
Iceberg 侧(或者底层存储):
- 文件数量(单表 / 单分区);
- Warehouse 存储占用;
- 元数据文件大小与数量(manifest、manifest list)。
-
