Flink CDC 从 Definition 到可落地 YAML
1. Data Pipeline
1.1 Definition
在 Flink CDC 中,事件以管道方式从上游到下游,整条 ETL 作业称为 Data Pipeline;其在 Flink 内部对应一条算子链。
必选段:source、sink、pipeline
可选段:route、transform
1.2 Pipeline Configurations(管道级配置)
规则:这些参数各自可选,但
pipeline段必须存在且不可为空(至少填一个)。
| 参数 | 含义 | 说明 |
|---|---|---|
name | 管道/作业名(Flink UI 可见) | 可选 |
parallelism | 管道全局并行度(默认 1) | 可选 |
local-time-zone | 会话时区 ID | 可选 |
execution.runtime-mode | STREAMING / BATCH(默认 STREAMING) | 可选 |
schema.change.behavior | 上游模式变更处理:exception / evolve / try_evolve / lenient(默认) / ignore | 可选 |
schema.operator.uid | 已不推荐;用 operator.uid.prefix | 可选 |
schema-operator.rpc-timeout | 等下游应用 schema 变更的超时(默认 3 分钟) | 可选 |
operator.uid.prefix | 给全部算子 UID 加统一前缀,便于升级与排障 | 可选 |
2. Data Source
2.1 Definition
Data Source 用于连接外部系统,访问元数据并读取变更数据;一个 Source 可同时订阅多张表。
2.2 Parameters
type:来源类型(如mysql),必填name:来源名称(用户自定义,有默认值),可选- 数据源配置:连接信息与表属性(如主机、端口、账号、表匹配等),可选
3. MySQL → Doris 示例
3.1 Only Required(最小必要字段)
A. 忠实版(沿用你的原始写法)
pipeline:name: Sync MySQL Database to Dorisparallelism: 2source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db\..* # 忠实使用“库.表”正则sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""
注:你最初的片段里
tables曾写成app_db.\.*;为保证正则含义正确,这里统一写为app_db\..*(转义点号表示“库.表”分隔,.*表示任意表名)。
B. 修正版(更稳妥的可运行写法)
pipeline:name: Sync MySQL Database to Dorisparallelism: 2source:type: mysqlhostname: localhost # MySQL 主机字段推荐使用 hostnameport: 3306username: rootpassword: 123456tables: "app_db\..*" # 用引号避免 YAML 解析 * 与 \ 的歧义sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""
实用补充(可选且安全):
server-time-zone: UTC(或与你的 Doris/消费侧一致的时区)scan.startup.mode: initial(首启做快照,后续增量)
3.2 With Optional(route / transform / UDF)
A. 忠实版(按你提供的结构与语义组织)
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db\..*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""transform:- source-table: adb.web_order01projection: "*, format('%S', product_name) as product_name"filter: "addone(id) > 10 AND order_id > 100"description: project fields and filter- source-table: adb.web_order02projection: "*, format('%S', product_name) as product_name"filter: "addone(id) > 20 AND order_id > 200"description: project fields and filterroute:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to Dorisparallelism: 2user-defined-function:- name: addoneclasspath: com.example.functions.AddOneFunctionClass- name: formatclasspath: com.example.functions.FormatFunctionClass
B. 修正版(健壮性细节)
- 给
projection与filter统一加引号,避免*/%S被 YAML 特殊解析。 - 若
adb.web_order01/02不在同一个库/前缀,可把source-table改为更精确的db.table或正则。 - Doris 目标表(如
ods_db.ods_orders)建议提前建好模型(主键/明细/聚合)与分区/分桶策略,保证导入性能与回放幂等。
4. 正则与 YAML 书写要点(强烈建议收藏)
-
库.表分隔符:在正则里写成
\.,例如app_db\..*。 -
多模式并列:可用逗号分隔多段正则,建议整体加引号:
tables: "adb\..*, bdb\.user_table_[0-9]+, (app|web)_order\..*" -
星号与反斜杠:
projection: "*, col as alias"用引号包裹,避免 YAML 锚点与转义误判。 -
最小“必填”以外的增强:
operator.uid.prefix:保障算子 UID 稳定,方便后续有状态升级与定位问题;schema.change.behavior: lenient/try_evolve:减少 DDL 抖动的不可用窗口;parallelism:根据 Doris 写入速率与上游变更量调整。
5. 上线前检查清单
pipeline段非空(至少name或parallelism之一)- Source 连通性(账号权限、binlog 打开、时区一致)
- Doris 目标库表存在且模型/分区/分桶正确
- UDF
classpath可加载、版本匹配 - schema 变更策略与数据质量告警(DDL、空值、默认值)
- 资源与并行度(避免下游被写爆或上游背压)
6. 结语
这篇文章严格围绕 Definition / Parameters / Example / Pipeline Configurations 展开,并用 MySQL → Doris 的 YAML 作为贯穿示例:
- A. 忠实版确保和你现有文档一一对应;
- B. 修正版保证细节可直接落地运行、减少踩坑。
