当前位置: 首页 > news >正文

用 Kafka 打通实时数据总线Flink CDC Pipeline 的 Kafka Sink 实战

1. 五分钟起步:MySQL → Kafka 最小可用 Pipeline

source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404sink:type: kafkaname: Kafka Sinkproperties.bootstrap.servers: PLAINTEXT://localhost:62510# 可选:固定写入同一个 topic(否则默认按 tableId 路由)# topic: cdc-all# 可选:分区策略(默认 all-to-zero)# partition.strategy: hash-by-key# 可选:键/值编码# key.format: json# value.format: debezium-jsonpipeline:name: MySQL to Kafka Pipelineparallelism: 2

易错点

  • tables 支持正则;. 是库/模式/表的分隔符,匹配任意字符请用 \. 转义。
  • Kafka 地址写在 properties.bootstrap.servers(可带协议前缀,如 PLAINTEXT://SASL_SSL://)。
  • 未显式配置 topic 时,默认主题名为 namespace.schemaName.tableName(即 tableId)。

2. 主题与分区:如何“稳可观测、快可扩展”

2.1 路由方式

  • 默认:按 tableId 写入对应主题:namespace.schema.table

  • 固定主题:设置 topic: your_topic,所有事件汇聚到一个主题(便于统一消费,但需更高吞吐与分区数)。

  • 自定义映射:细粒度把表映射到不同主题:

    sink.tableId-to-topic.mapping: mydb.orders:orders_cdc;mydb.users:users_cdc
    
  • 记录头携带表信息(便于单主题多表消费做路由/监控):

    sink.add-tableId-to-header-enabled: true
    

    将为每条记录添加 namespace/schemaName/tableName 三个 header。

2.2 分区策略

  • partition.strategy: all-to-zero(默认):全部写 0 号分区,利于单消费者串行处理与全序消费,但扩展性差。
  • partition.strategy: hash-by-key:按主键哈希分发到多个分区,同主键有序且可横向扩展(推荐用于高吞吐)。

使用 hash-by-key 时,务必确保上游表定义了主键键序列化key.format 决定(json/csv,默认 json)。

3. 消息格式(Value)与 Schema

3.1 可选值格式

  • value.format: debezium-json(默认)

    • 字段包含 before/after/op/sourcesource 中不含 ts_ms

    • 示例:

      {"before": null,"after": { "col1": "1", "col2": "1" },"op": "c","source": { "db": "default_namespace", "table": "table1" }
      }
      
    • 如需携带 schema(schema/payload 两层),开启:

      debezium-json.include-schema.enabled: true
      
  • value.format: canal-json

    • 字段包含 old/data/type/database/table/pkNames不包含 ts

    • 示例:

      {"old": null,"data": [{ "col1": "1", "col2": "1" }],"type": "INSERT","database": "default_schema","table": "table1","pkNames": ["col1"]
      }
      

事件时间:两种内置格式都不包含时间戳字段,可用 Kafka Record Timestamp(Broker 侧)作为事件时间,或在消费端自行补齐。

3.2 键格式(Key)

  • key.format: json | csv(默认 json)。
  • 仅在 hash-by-key 或你在消费端需要读取 Kafka Message Key 时有意义。

3.3 自定义 Header

  • 固定追加自定义头:

    sink.custom-header: env:prod,team:realtime
    
  • 加表身份头:见 2.1 的 sink.add-tableId-to-header-enabled: true

4. Kafka Producer 常用参数(透传)

通过 properties.* 透传 Kafka Producer 配置,如:

sink:type: kafkaname: Kafka Sinkproperties.bootstrap.servers: SASL_SSL://kafka-1:9093,kafka-2:9093properties.acks: allproperties.linger.ms: 20properties.batch.size: 524288properties.max.in.flight.requests.per.connection: 1properties.compression.type: lz4# 安全建议(示例,按你的集群实际为准)properties.security.protocol: SASL_SSLproperties.sasl.mechanism: PLAINproperties.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="yyy";

生产建议:

  • 预建主题并指定分区数/副本因子/保留策略;自动建主题虽方便,但不可控。
  • 开启 acks=all,将 max.in.flight.requests.per.connection 设为 1 以避免乱序(吞吐换一致)。
  • 合理设置 linger.ms/batch.size/compression.type 提升吞吐与成本效率。

5. 数据类型映射(速查)

Literal type:Kafka Connect 的物理存储类型;Semantic type:Debezium 的逻辑语义。

CDC typeJSON typeLiteral typeSemantic type备注
TINYINTTINYINTINT16
SMALLINTSMALLINTINT16
INTINTINT32
BIGINTBIGINTINT64
FLOATFLOATFLOAT
DOUBLEDOUBLEDOUBLE
DECIMAL(p,s)DECIMAL(p,s)BYTESorg.apache.kafka.connect.data.Decimal
BOOLEANBOOLEANBOOLEAN
DATEDATEINT32io.debezium.time.Date
TIMESTAMP§TIMESTAMP§INT64p<=3: io.debezium.time.Timestampp>3: io.debezium.time.MicroTimestamp
TIMESTAMP_LTZTIMESTAMP_LTZSTRINGio.debezium.time.ZonedTimestamp
CHAR(n)CHAR(n)STRING
VARCHAR(n)VARCHAR(n)STRING

6. 消费端落地:Flink SQL 示例(可直接测试)

6.1 读取 Debezium-JSON

CREATE TABLE ods_orders (col1 STRING,col2 STRING,PRIMARY KEY (col1) NOT ENFORCED
) WITH ('connector' = 'kafka','topic' = 'mydb.public.orders','properties.bootstrap.servers' = 'kafka:9092','value.format' = 'debezium-json'
);

6.2 读取 Canal-JSON

CREATE TABLE ods_orders_canal (col1 STRING,col2 STRING,PRIMARY KEY (col1) NOT ENFORCED
) WITH ('connector' = 'kafka','topic' = 'mydb.public.orders','properties.bootstrap.servers' = 'kafka:9092','value.format' = 'canal-json'
);

若你采用单主题多表,可在消费端结合 tableId 头或 value 中的表名做动态路由/过滤

7. 上线 Checklist(强烈建议)

  1. 主题预创建:为单表或汇聚主题规划分区数/副本因子/保留策略;打开 min.insync.replicas
  2. 分区策略:高吞吐业务优先 hash-by-key;如需全序处理选 all-to-zero + 单消费者。
  3. 键/值格式:若做主键幂等消费或按键路由,配置 key.format 并校验主键完整性。
  4. 格式验证:消费端用脚本检查 value 格式与字符集;需要 schema 时开启 debezium-json.include-schema.enabled
  5. 安全与配额:开启 SASL/SSL;为生产者设置 quota 与监控,防止“风暴写入”。
  6. 可观测性:接入 Producer/Consumer 指标与 lag 监控,预设告警阈值。
  7. 回压与重启:演练任务重启与 Broker 降级,确认无数据丢失与可接受的重试延迟。

8. 常见问题(Troubleshooting)

  • 消息全都到 0 号分区:未配置或仍使用默认 all-to-zero;改为 hash-by-key 并确保上游主键存在。
  • 消费端乱序:检查 max.in.flight.requests.per.connection 是否 > 1;需要严格顺序时置为 1。
  • 单主题多表难以拆分:开启 sink.add-tableId-to-header-enabled,消费端按 header 路由。
  • 需要事件时间:两种内置格式不含 ts 字段;使用 Kafka Record Timestamp 或在消费端补齐。
  • 自动建主题分区太少:建议预创建主题并指定分区与副本,避免后续扩分区带来的数据倾斜。

小结

Kafka Sink 作为 实时数据总线 的“出口”稳定可靠:

  • 路由可选(按表、单主题、映射);
  • 分区可控(全序/可扩展);
  • 编码清晰(debezium-json/canal-json + 可选 schema);
  • 生产可用(支持 header、SASL/SSL、Producer 参数透传)。
http://www.dtcms.com/a/605253.html

相关文章:

  • Podman讲解
  • PHP EOF (Heredoc)
  • Spring Boot集成Kafka:最佳实践与详细指南
  • 【运维】Docker 入门
  • 手机网站的尺寸做多大的如何搭建wordpress商城
  • 百度商桥绑定网站正规的关键词优化软件
  • F280049C学习笔记之CMPSS
  • linux系统学习(10.shell基础)
  • 融合之力:金仓数据库“五个一体化“如何重塑国产数据库生态
  • 山西省第十九届职业院校技能大赛 网络建设与运维赛项 1. 系统安装答案解析
  • 做网站哪个服务商便宜菏泽网站设计培训
  • SLAM中的非线性优-3D图优化之轴角在Opencv-PNP中的应用(二)
  • Step-Audio-EditX
  • Notepad++编译C语言 | 如何高效配置和使用Notepad++进行C语言开发
  • Hadoop学习_week1
  • 靠谱的时序数据库哪家技术强
  • VR水污染体验系统——VR 里的碧水守护
  • 重构企业运维智慧:低代码 ITSM 知识管理平台的创新与实践
  • 从C++开始的编程生活(13)——list和浅谈stack、queue
  • m序列原理及在5G的应用
  • 焦作建设网站的公司怎么让百度快速收录网站
  • API创建指定版本k8s集群
  • K8S NFS PVC PV 挂载点路径问题
  • 【CANN】开启AI开发新纪元,释放极致计算效率
  • ui展示 网站百度热门排行榜
  • Java并发编程基石:深入理解JMM(Java内存模型)与Happens-Before规则
  • 一个基于现代 C++23 Modules 的传统文化算法库,使用纯模块化设计实现(包含大六壬、六爻、紫薇斗数、八字、奇门遁甲)
  • 注释网站开发全国大型教育集团网站建设
  • PyQt5 + Qt Designer配置指令
  • setprop debug.hwui.profile visual_bars有什么作用