当前位置: 首页 > news >正文

用 Doris 托底实时明细与聚合Flink CDC Pipeline 的 Doris Sink 实战

1. 快速开始:最小可用 Pipeline

下面这份 YAML 展示从内置 values 源写入到 Doris 的最小示例:

source:type: valuesname: ValuesSourcesink:type: dorisname: Doris Sinkfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.replication_num: 1pipeline:parallelism: 1

说明:fenodes 为 FE 的 HTTP 地址;生产集群请配置多个 FE(逗号分隔)。table.create.properties.* 可在首次写入时为目标表指定 Doris 表属性(副本数等)。

2. 连接器能力与适用场景

可做什么?

  • ✅ 数据同步(CDC/批流写入)
  • ✅ 批量写入、异步 Flush、失败重试
  • ✅ 可选“忽略 update-before”实现简化 Upsert 语义
  • ✅ 可选 FE→BE 自动重定向(绕过 FE 转直写 BE,提高写入吞吐)

常见用法

  • ODS 明细落盘(高并发写入 + 低延迟查询)
  • DWD 轻聚合/拉链表(结合主键/唯一键模型)
  • 指标宽表/即席分析的实时侧视图

3. 常用参数速查与实践解读

只列高频关键项,完整清单见文末表格小结。

3.1 连接 & 路由

  • fenodes(必填):host:8030(FE HTTP);可多节点 a:8030,b:8030
  • benodes(可选):host:8040(BE HTTP);和 auto-redirect 配合减少 FE 压力。
  • jdbc-url(可选):jdbc:mysql://fe:9030/db;用于建表/探活等元数据交互场景。

3.2 写入语义

  • sink.enable.delete(默认 true):支持删除(与上游 CDC delete 对齐)。

  • sink.ignore.update-before(默认 true):忽略更新事件中的“旧值”(即不把 before 作为 delete 传下游),等价简化 Upsert

    • 若上游存在主键变更(极少见),建议将其设为 false,以便把旧主键行删除、再插入新主键行。

3.3 批量与性能

  • sink.enable.batch-mode(默认 true):开启批写。
  • sink.buffer-flush.max-rows(默认 50,000):单批最大条数。
  • sink.buffer-flush.max-bytes(默认 10MB):单批最大字节。
  • sink.buffer-flush.interval(默认 10s):批量超时自动 Flush,避免长时间积压。
  • sink.flush.queue-size(默认 2):批次并发队列,适度增大提升吞吐。
  • sink.max-retries(默认 3):失败重试次数。

3.4 FE→BE 直写

  • auto-redirect(默认 false):打开后先经 FE 下发写入位置,再直连 BE 写入,降低 FE 压力、提升吞吐。

    • 生产建议:开启,并确保网络 ACL 放通至 BE

3.5 Stream Load 细化

  • sink.properties.*:透传 Doris Stream Load 参数,例如:

    sink.properties.strict_mode: true
    sink.properties.max_filter_ratio: 0
    sink.properties.format: json
    sink.properties.read_json_by_line: true
    

    典型用法:严格模式 + 行级 JSON;具体可按 Doris 文档调整。

3.6 自动建表属性

  • table.create.properties.*:在首次写入时指定 Doris 表属性,例如副本数、存储格式等:

    table.create.properties.replication_num: 3
    
  • 自动分区(Doris ≥ 2.1.6):

    • 仅支持 DATE / DATETIME 分区列,且采用 date_trunc 分区函数。
    • 通过 table.create.auto-partition.properties.* 进行控制(支持 include/exclude、默认分区键与单位,以及 DB.TABLE 级覆盖)。
    • 分区列不可为 NULLABLE;若出现 NULL,系统会用默认值回填(DATE → 1970-01-01,DATETIME → 1970-01-01 00:00:00)。

4. 生产级 YAML 示例(含批量、重定向、自动分区)

source:type: mysqlname: MySQL Sourcehostname: 10.0.0.10port: 3306username: cdc_userpassword: ********tables: ecommerce\..*          # 只需注意 '.' 的转义写法server-id: 5401-5404sink:type: dorisname: Doris Sink (Prod)fenodes: fe-1:8030,fe-2:8030benodes: be-1:8040,be-2:8040,be-3:8040username: sink_userpassword: ********# 性能auto-redirect: truesink.enable.batch-mode: truesink.buffer-flush.max-rows: 80000sink.buffer-flush.max-bytes: 33554432     # 32MBsink.buffer-flush.interval: 5ssink.flush.queue-size: 4sink.max-retries: 5# CDC 语义sink.enable.delete: truesink.ignore.update-before: false          # 主键变更场景更安全# Stream Load 透传sink.properties.strict_mode: truesink.properties.format: jsonsink.properties.read_json_by_line: truesink.properties.max_filter_ratio: 0# 建表/分区属性(按需)table.create.properties.replication_num: 3# 自动分区(Doris >= 2.1.6)# include/exclude 支持正则;默认分区键/单位可被 DB.TABLE 覆盖table.create.auto-partition.properties.include: ecommerce.orders.*,ecommerce.logs.*table.create.auto-partition.properties.default-partition-key: dttable.create.auto-partition.properties.default-partition-unit: daytable.create.auto-partition.properties.ecommerce.orders.partition-key: order_timetable.create.auto-partition.properties.ecommerce.orders.partition-unit: daypipeline:parallelism: 4

提示:default-partition-unit/partition-unit 的取值需与 Doris 当下对 date_trunc 的支持保持一致(常见 day/month 等)。上线前先在测试库/表验证自动分区是否按预期创建。

5. 数据类型映射(速查表)

Flink CDC TypeDoris Type备注
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
DECIMALDECIMAL
FLOATFLOAT
DOUBLEDOUBLE
BOOLEANBOOLEAN
DATEDATE
TIMESTAMP [§]DATETIME [§]
TIMESTAMP_LTZ [§]DATETIME [§]
CHAR(n)CHAR(n*3)Doris UTF-8 存储:英文 1 字节、中文 3 字节,长度按 3 放大;>255 自动转 VARCHAR
VARCHAR(n)VARCHAR(n*3)>65533 自动转 STRING
BINARY(n)STRING
VARBINARY(n)STRING
TIMESTRINGDoris 不支持 TIME,需转 STRING
STRINGSTRING

6. Doris 表模型与 CDC 的组合建议

  • UNIQUE KEY 表:天然契合 Upsert/删除语义(推荐 CDC 明细/拉链表)。
  • DUPLICATE KEY 表:适合“多版本留痕”,但需要额外列(如 version/is_delete)与下游查询约束。
  • AGGREGATE KEY 表:适合预聚合;CDC 写入需明确聚合列与更新逻辑,避免“反向更新”。

若开启 sink.enable.delete=true 且模型支持删除,请确保表有恰当的主键/唯一键,以免出现“删不掉”的脏数据。

7. 上线前 Checklist

  1. 权限与网络:FE/BE HTTP 开放;Sink 账号仅授必需权限。
  2. 批量阈值:根据流量与延迟目标调优 max-rows/max-bytes/interval,观察 Stream Load QPS 及失败率。
  3. 重定向auto-redirect=true 并放通 BE;压测 FE/BE CPU、内存、磁盘与网卡。
  4. 主键/删除语义:确认是否需要处理 update-before;有主键变更风险时设为 false
  5. 自动分区:验证分区键类型(DATE/DATETIME)、单位、空值回填是否符合预期。
  6. 容错:设置 sink.max-retries,结合任务重启策略与告警(Stream Load 返回码/错误日志)。
  7. 冷热分层:结合 Doris 的存储策略(副本数、压缩)与保留策略控制成本。

8. 常见问题(Troubleshooting)

  • 写入慢/抖动:增大 flush.queue-sizebatch 阈值,开启 auto-redirect;检查磁盘 IO/网络瓶颈。
  • 字段写入失败strict_mode=true 时会严格校验类型与长度;先在测试环境发现并修正脏数据。
  • TIME 类型不兼容:上游 TIME 统一转 STRING,或在源侧/中间层做格式化。
  • 中文被截断:留意 CHAR/VARCHAR 的“长度×3”规则;超过上限会自动升级类型(CHAR→VARCHAR、VARCHAR→STRING)。
  • 删除不生效:检查 sink.enable.delete、表模型是否支持删除、以及主键/唯一键是否正确映射。

9. 选项小抄(完整版节选)

选项必填默认类型说明
typeString固定为 doris
nameString管道名称
fenodesStringFE HTTP 地址,host:8030(可多节点)
benodesStringBE HTTP 地址,host:8040
jdbc-urlStringDoris JDBC 地址(如 jdbc:mysql://fe:9030/db
usernameStringDoris 用户名
passwordStringDoris 密码
auto-redirectfalseString是否 FE 重定向直连 BE
charset-encodingUTF-8BooleanHTTP 客户端字符集
sink.enable.batch-modetrueBoolean批量写入
sink.enable.deletetrueBoolean允许 delete
sink.max-retries3Integer失败重试次数
sink.flush.queue-size2Integer批量并发队列大小
sink.buffer-flush.max-rows50000Integer单批最大行数
sink.buffer-flush.max-bytes10485760Integer单批最大字节(10MB)
sink.buffer-flush.interval10sString刷写间隔
sink.ignore.update-beforetrueBoolean忽略 update-before(Upsert 语义)
sink.properties.*String透传 Stream Load 参数(如 strict_mode 等)
table.create.properties.*StringDoris 表属性(如 replication_num
table.create.auto-partition.properties.*String自动分区配置(含 include/exclude、默认键/单位、DB.TABLE 覆盖)
http://www.dtcms.com/a/605290.html

相关文章:

  • FLINK CDC 的学习
  • AI音乐生成 | 音乐流派分类的原理和python实现
  • WSL下将Ubuntu从C盘迁移到D盘(个人记录、详细图解)
  • LRU缓存淘汰算法详解与C++实现
  • AbMole小讲堂丨Cyclophosphamide(环磷酰胺):应用于肿瘤与免疫研究的热门烷化工具
  • 网站建设费用如何收取什么叫网站开发应用框架
  • 怎么在.Net中做团购网站专门做钻石国外网站
  • 教程上新丨Deepseek-OCR 以极少视觉 token 数在端到端模型中实现 SOTA
  • Mac多功能音视频AI处理工具VideoProc Converter AI
  • 【技术贴】全链路协同!艾为电子开启端侧AI音频“精而优”时代
  • 2025国产ITSM厂商选型指南:从基础流程、智能赋能到全链路协同方案的全面对比
  • 数据结构——四十二、二叉排序树(王道408)
  • VueUse的使用
  • 【LeetCode】111. 二叉树的最小深度
  • 如何将html发布到网站wordpress用户筛选
  • 深度智能体-智能体加强版
  • ZCC75XXH- 40V/150mA 高压线性稳压器替代HT75XX
  • 多媒体语音通话中,信令参数T1/ms, T2/s, T4/s作用
  • Travel uni-app 项目说明
  • 永磁同步电机无速度算法--基于一阶线性状态观测器的反电动势观测器
  • 番禺网站建设怎样网站建设公司怎样做账
  • 网站开发项目总结模板网站开发 证书
  • Python 自定义迭代器 --以斐波那契数列为例
  • AI一键PPT 2.0.3 一键智能生成
  • 232. 用栈实现队列
  • 如何在桌面创建网页快捷图标?(电脑 / 手机通用操作指南)
  • soular实战教程系列(2) - 如何统一管理TikLab帐号体系
  • k8s 发行说明(版本)
  • 批处理病毒原理、防御
  • 网站建设对企业的好处有哪些嘉兴网站制作建设