Flink CDC「Data Pipeline」定义与参数速查
一、核心概念
- Data Pipeline:Flink CDC 里,事件从上游流向下游,整条 ETL 作业即称为一个“数据管道(Data Pipeline)”。管道在 Flink 中对应一串有序的 operators(算子链)。必备部分:source / sink / pipeline;可选部分:route / transform。(Apache Nightlies)
二、最小可用示例(Only required)
目标:把 MySQL app_db 库下的所有表,实时同步到 Doris。
✅ 正则要写成
app_db\..*(点号要转义;YAML 中避免使用\.*这样的 Markdown 残留写法)。
pipeline:name: Sync MySQL Database to Dorisparallelism: 2source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db\..*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""
pipeline.parallelism为全局并行度,默认 1。(Apache Nightlies)- Doris 作为 sink 需要 FE 地址
fenodes、用户名等基础配置。(Apache Nightlies)
三、进阶示例(With optional)
目标:同步 app_db 全库到 Doris,同时:
- 做字段投影/过滤(
transform) - 做表路由(
route):把上游表写到指定库表(ods_db.ods_*) - 注册UDF 供
transform使用
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db\..*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""transform:- source-table: adb.web_order01# 用双引号包住整个表达式,避免 YAML 把 * 解析为锚点projection: "*, format('%S', product_name) as product_name"filter: "addone(id) > 10 AND order_id > 100"description: project fields and filter- source-table: adb.web_order02projection: "*, format('%S', product_name) as product_name"filter: "addone(id) > 20 AND order_id > 200"description: project fields and filterroute:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to Dorisparallelism: 2user-defined-function:- name: addoneclasspath: com.example.functions.AddOneFunctionClass- name: formatclasspath: com.example.functions.FormatFunctionClass
上述结构、字段名与含义,均出自 Flink CDC 官方 “Data Pipeline” 文档与 “MySQL→Doris Quickstart”。(Apache Nightlies)
Doris sink 关键参数(type: doris、fenodes等)见 Doris 连接器页面与对应 3.5 API。(Apache Nightlies)
四、Pipeline 级参数一览(至少设置一个参数;pipeline 段不可为空)
| 参数 | 作用 | 备注 |
|---|---|---|
name | 提交到 Flink 集群时的作业名 | 可选 |
parallelism | 管道全局并行度(默认 1) | 可选 |
local-time-zone | 会话时区 ID | 可选 |
execution.runtime-mode | 运行模式:STREAMING / BATCH(默认 STREAMING) | 可选 |
schema.change.behavior | 上游模式变更处理策略:exception / evolve / try_evolve / lenient(默认) / ignore | 可选 |
schema.operator.uid | 已弃用(用 operator.uid.prefix) | 可选 |
schema-operator.rpc-timeout | 等下游应用 Schema 变更的超时,默认 3 分钟 | 可选 |
operator.uid.prefix | 给所有算子 UID 统一加前缀,便于有状态升级/排障/定位 | 可选 |
官方强调:上述参数各自可选,但
pipeline段必须存在且不能为空。(Apache Nightlies)
五、常见踩坑与修正
-
正则写法
- 正确:
app_db\..*(转义点号,匹配该库全部表) - 错误:
app_db.\.*(多见于网页转义后的拷贝)
原因:Markdown/网页中常把*、\做转义展示,直接复制会“多出反斜杠”。(按上文示例已修正)(Apache Nightlies)
- 正确:
-
YAML 里使用
*projection里建议把整句用引号包起来:"*, col as alias",避免 YAML 把*解析成锚点语法。
-
Doris 连接参数
- 至少提供
type: doris与fenodes(FE 的 HTTP 地址),其他如username/password/jdbc-url视集群而定。(Apache Nightlies)
- 至少提供
-
Schema 演进策略
- 生产环境常用
lenient或try_evolve;严格治理可用evolve/exception。(Apache Nightlies)
- 生产环境常用
六、参考与延伸阅读
- Flink CDC —— Data Pipeline:定义、必选/可选段落、示例、Pipeline 配置项。(Apache Nightlies)
- MySQL → Doris 快速上手:端到端 YAML + CLI。(Apache Nightlies)
- Doris 管道连接器:必需/可选参数与示例。(Apache Nightlies)
