用 Doris 托底实时明细与聚合Flink CDC Pipeline 的 Doris Sink 实战
1. 快速开始:最小可用 Pipeline
下面这份 YAML 展示从内置 values 源写入到 Doris 的最小示例:
source:type: valuesname: ValuesSourcesink:type: dorisname: Doris Sinkfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.replication_num: 1pipeline:parallelism: 1
说明:
fenodes为 FE 的 HTTP 地址;生产集群请配置多个 FE(逗号分隔)。table.create.properties.*可在首次写入时为目标表指定 Doris 表属性(副本数等)。
2. 连接器能力与适用场景
可做什么?
- ✅ 数据同步(CDC/批流写入)
- ✅ 批量写入、异步 Flush、失败重试
- ✅ 可选“忽略 update-before”实现简化 Upsert 语义
- ✅ 可选 FE→BE 自动重定向(绕过 FE 转直写 BE,提高写入吞吐)
常见用法
- ODS 明细落盘(高并发写入 + 低延迟查询)
- DWD 轻聚合/拉链表(结合主键/唯一键模型)
- 指标宽表/即席分析的实时侧视图
3. 常用参数速查与实践解读
只列高频关键项,完整清单见文末表格小结。
3.1 连接 & 路由
fenodes(必填):host:8030(FE HTTP);可多节点a:8030,b:8030。benodes(可选):host:8040(BE HTTP);和auto-redirect配合减少 FE 压力。jdbc-url(可选):jdbc:mysql://fe:9030/db;用于建表/探活等元数据交互场景。
3.2 写入语义
-
sink.enable.delete(默认 true):支持删除(与上游 CDC delete 对齐)。 -
sink.ignore.update-before(默认 true):忽略更新事件中的“旧值”(即不把 before 作为 delete 传下游),等价简化 Upsert。- 若上游存在主键变更(极少见),建议将其设为 false,以便把旧主键行删除、再插入新主键行。
3.3 批量与性能
sink.enable.batch-mode(默认 true):开启批写。sink.buffer-flush.max-rows(默认 50,000):单批最大条数。sink.buffer-flush.max-bytes(默认 10MB):单批最大字节。sink.buffer-flush.interval(默认 10s):批量超时自动 Flush,避免长时间积压。sink.flush.queue-size(默认 2):批次并发队列,适度增大提升吞吐。sink.max-retries(默认 3):失败重试次数。
3.4 FE→BE 直写
-
auto-redirect(默认 false):打开后先经 FE 下发写入位置,再直连 BE 写入,降低 FE 压力、提升吞吐。- 生产建议:开启,并确保网络 ACL 放通至 BE。
3.5 Stream Load 细化
-
sink.properties.*:透传 Doris Stream Load 参数,例如:sink.properties.strict_mode: true sink.properties.max_filter_ratio: 0 sink.properties.format: json sink.properties.read_json_by_line: true典型用法:严格模式 + 行级 JSON;具体可按 Doris 文档调整。
3.6 自动建表属性
-
table.create.properties.*:在首次写入时指定 Doris 表属性,例如副本数、存储格式等:table.create.properties.replication_num: 3 -
自动分区(Doris ≥ 2.1.6):
- 仅支持
DATE/DATETIME分区列,且采用date_trunc分区函数。 - 通过
table.create.auto-partition.properties.*进行控制(支持 include/exclude、默认分区键与单位,以及 DB.TABLE 级覆盖)。 - 分区列不可为 NULLABLE;若出现 NULL,系统会用默认值回填(DATE →
1970-01-01,DATETIME →1970-01-01 00:00:00)。
- 仅支持
4. 生产级 YAML 示例(含批量、重定向、自动分区)
source:type: mysqlname: MySQL Sourcehostname: 10.0.0.10port: 3306username: cdc_userpassword: ********tables: ecommerce\..* # 只需注意 '.' 的转义写法server-id: 5401-5404sink:type: dorisname: Doris Sink (Prod)fenodes: fe-1:8030,fe-2:8030benodes: be-1:8040,be-2:8040,be-3:8040username: sink_userpassword: ********# 性能auto-redirect: truesink.enable.batch-mode: truesink.buffer-flush.max-rows: 80000sink.buffer-flush.max-bytes: 33554432 # 32MBsink.buffer-flush.interval: 5ssink.flush.queue-size: 4sink.max-retries: 5# CDC 语义sink.enable.delete: truesink.ignore.update-before: false # 主键变更场景更安全# Stream Load 透传sink.properties.strict_mode: truesink.properties.format: jsonsink.properties.read_json_by_line: truesink.properties.max_filter_ratio: 0# 建表/分区属性(按需)table.create.properties.replication_num: 3# 自动分区(Doris >= 2.1.6)# include/exclude 支持正则;默认分区键/单位可被 DB.TABLE 覆盖table.create.auto-partition.properties.include: ecommerce.orders.*,ecommerce.logs.*table.create.auto-partition.properties.default-partition-key: dttable.create.auto-partition.properties.default-partition-unit: daytable.create.auto-partition.properties.ecommerce.orders.partition-key: order_timetable.create.auto-partition.properties.ecommerce.orders.partition-unit: daypipeline:parallelism: 4
提示:
default-partition-unit/partition-unit的取值需与 Doris 当下对date_trunc的支持保持一致(常见 day/month 等)。上线前先在测试库/表验证自动分区是否按预期创建。
5. 数据类型映射(速查表)
| Flink CDC Type | Doris Type | 备注 |
|---|---|---|
| TINYINT | TINYINT | |
| SMALLINT | SMALLINT | |
| INT | INT | |
| BIGINT | BIGINT | |
| DECIMAL | DECIMAL | |
| FLOAT | FLOAT | |
| DOUBLE | DOUBLE | |
| BOOLEAN | BOOLEAN | |
| DATE | DATE | |
| TIMESTAMP [§] | DATETIME [§] | |
| TIMESTAMP_LTZ [§] | DATETIME [§] | |
| CHAR(n) | CHAR(n*3) | Doris UTF-8 存储:英文 1 字节、中文 3 字节,长度按 3 放大;>255 自动转 VARCHAR |
| VARCHAR(n) | VARCHAR(n*3) | >65533 自动转 STRING |
| BINARY(n) | STRING | |
| VARBINARY(n) | STRING | |
| TIME | STRING | Doris 不支持 TIME,需转 STRING |
| STRING | STRING |
6. Doris 表模型与 CDC 的组合建议
- UNIQUE KEY 表:天然契合 Upsert/删除语义(推荐 CDC 明细/拉链表)。
- DUPLICATE KEY 表:适合“多版本留痕”,但需要额外列(如
version/is_delete)与下游查询约束。 - AGGREGATE KEY 表:适合预聚合;CDC 写入需明确聚合列与更新逻辑,避免“反向更新”。
若开启
sink.enable.delete=true且模型支持删除,请确保表有恰当的主键/唯一键,以免出现“删不掉”的脏数据。
7. 上线前 Checklist
- 权限与网络:FE/BE HTTP 开放;Sink 账号仅授必需权限。
- 批量阈值:根据流量与延迟目标调优
max-rows/max-bytes/interval,观察 Stream Load QPS 及失败率。 - 重定向:
auto-redirect=true并放通 BE;压测 FE/BE CPU、内存、磁盘与网卡。 - 主键/删除语义:确认是否需要处理 update-before;有主键变更风险时设为
false。 - 自动分区:验证分区键类型(DATE/DATETIME)、单位、空值回填是否符合预期。
- 容错:设置
sink.max-retries,结合任务重启策略与告警(Stream Load 返回码/错误日志)。 - 冷热分层:结合 Doris 的存储策略(副本数、压缩)与保留策略控制成本。
8. 常见问题(Troubleshooting)
- 写入慢/抖动:增大
flush.queue-size、batch阈值,开启auto-redirect;检查磁盘 IO/网络瓶颈。 - 字段写入失败:
strict_mode=true时会严格校验类型与长度;先在测试环境发现并修正脏数据。 - TIME 类型不兼容:上游 TIME 统一转 STRING,或在源侧/中间层做格式化。
- 中文被截断:留意 CHAR/VARCHAR 的“长度×3”规则;超过上限会自动升级类型(CHAR→VARCHAR、VARCHAR→STRING)。
- 删除不生效:检查
sink.enable.delete、表模型是否支持删除、以及主键/唯一键是否正确映射。
9. 选项小抄(完整版节选)
| 选项 | 必填 | 默认 | 类型 | 说明 |
|---|---|---|---|---|
type | ✅ | — | String | 固定为 doris |
name | — | String | 管道名称 | |
fenodes | ✅ | — | String | FE HTTP 地址,host:8030(可多节点) |
benodes | — | String | BE HTTP 地址,host:8040 | |
jdbc-url | — | String | Doris JDBC 地址(如 jdbc:mysql://fe:9030/db) | |
username | ✅ | — | String | Doris 用户名 |
password | — | String | Doris 密码 | |
auto-redirect | false | String | 是否 FE 重定向直连 BE | |
charset-encoding | UTF-8 | Boolean | HTTP 客户端字符集 | |
sink.enable.batch-mode | true | Boolean | 批量写入 | |
sink.enable.delete | true | Boolean | 允许 delete | |
sink.max-retries | 3 | Integer | 失败重试次数 | |
sink.flush.queue-size | 2 | Integer | 批量并发队列大小 | |
sink.buffer-flush.max-rows | 50000 | Integer | 单批最大行数 | |
sink.buffer-flush.max-bytes | 10485760 | Integer | 单批最大字节(10MB) | |
sink.buffer-flush.interval | 10s | String | 刷写间隔 | |
sink.ignore.update-before | true | Boolean | 忽略 update-before(Upsert 语义) | |
sink.properties.* | — | String | 透传 Stream Load 参数(如 strict_mode 等) | |
table.create.properties.* | — | String | Doris 表属性(如 replication_num) | |
table.create.auto-partition.properties.* | — | String | 自动分区配置(含 include/exclude、默认键/单位、DB.TABLE 覆盖) |
