当前位置: 首页 > news >正文

基于 Flink CDC 的 MySQL → Kafka Streaming ELT 实战

1. 为什么选 Flink CDC CLI?

  • 更轻:无需 IDE 与代码工程,YAML 定义即上生产。
  • 更稳:基于 Flink Checkpoint 的端到端语义,天然容错。
  • 更灵:内置路由、分区策略、Topic 映射、格式切换,覆盖 80% 以上实时采集/同步场景。

2. 目标与数据流

在这里插入图片描述

3. 环境准备

  • 操作系统:Linux 或 macOS(Windows 建议 WSL2)

  • 已安装:Docker & Docker Compose

  • 需要下载:

    • Flink 1.20.1
    • Flink CDC CLI 3.5.0(解压后包含 bin/ lib/ conf/
    • MySQL、Kafka CDC 连接器(Jar 放到 Flink CDClib/
    • MySQL Connector/J(Jar 放到 Flink CDClib/--jar 指定)

4. 启动 Flink Standalone 集群

# 1) 下载并解压 Flink
tar -zxvf flink-1.20.1-bin-scala_2.12.tgz
export FLINK_HOME="$PWD/flink-1.20.1"   # 注意:export 而不是 exprot(常见笔误)
cd "$FLINK_HOME"# 2) 开启 checkpoint(每 3 秒)
cat >> conf/config.yaml <<'YAML'
execution:checkpointing:interval: 3s
YAML# 3) 启动
./bin/start-cluster.sh
# 访问 UI: http://localhost:8081# 如需更多 TaskManager,可以再次执行 start-cluster.sh

生产部署在云上时,如需公网访问 UI,请按需设置 rest.bind-address / rest.address 并配置安全策略。

5. 用 Docker Compose 启动 Kafka/ZooKeeper/MySQL

推荐:Kafka 使用双监听避免“容器内能连、宿主机连不上”的老问题。

docker-compose.yml

version: "3.8"
services:zookeeper:image: bitnami/zookeeper:3.9container_name: zookeeperenvironment:- ALLOW_ANONYMOUS_LOGIN=yesports:- "2181:2181"kafka:image: bitnami/kafka:3.7container_name: kafkadepends_on:- zookeeperenvironment:- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181# 双监听:容器内用 kafka:9092;宿主机用 localhost:9093- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,EXTERNAL://:9093- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXTports:- "9092:9092"- "9093:9093"mysql:# Debezium 示例镜像已启用 binlog 与所需权限image: debezium/example-mysql:1.1container_name: mysqlenvironment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpwports:- "3306:3306"

启动并检查:

docker compose up -d
docker ps

6. 初始化 MySQL 示例数据

docker compose exec mysql mysql -uroot -p123456 -e "
CREATE DATABASE IF NOT EXISTS app_db;
USE app_db;
CREATE TABLE orders   (id INT NOT NULL, price DECIMAL(10,2) NOT NULL, PRIMARY KEY(id));
CREATE TABLE shipments(id INT NOT NULL, city  VARCHAR(255) NOT NULL, PRIMARY KEY(id));
CREATE TABLE products (id INT NOT NULL, product VARCHAR(255) NOT NULL, PRIMARY KEY(id));
INSERT INTO orders    VALUES (1,4.00),(2,100.00);
INSERT INTO shipments VALUES (1,'beijing'),(2,'xian');
INSERT INTO products  VALUES (1,'Beer'),(2,'Cap'),(3,'Peanut');
"

7. 准备 Flink CDC CLI 与连接器

  1. 解压 Flink CDC CLI 3.5.0 到目录(记为 FLINK_CDC_HOME

  2. 将以下 Jar 放入 FLINK_CDC_HOME/lib/

    • MySQL Pipeline Connector 3.5.0
    • Kafka Pipeline Connector 3.5.0
    • MySQL Connector/J(8.x)

切记:CDC 的连接器 Jar 放在 Flink CDC 的 lib 下,不要放到 Flink Home 的 lib(两者不是一个目录)。

8. 编写并提交第一条管道(全库 → 单 Topic)

mysql-to-kafka.yaml

################################################################################
# Sync all MySQL tables in app_db to a single Kafka topic
################################################################################
source:type: mysqlhostname: mysqlport: 3306username: rootpassword: 123456tables: app_db\..*          # 正则,匹配 app_db 下所有表;注意转义点号server-id: 5400-5404        # 多值区间,避免与其他采集端冲突server-time-zone: UTCsink:type: kafkaname: Kafka Sinkproperties.bootstrap.servers: kafka:9092topic: yaml-mysql-kafkavalue.format: debezium-json  # 默认;也可 canal-jsonpipeline:name: MySQL to Kafka Pipelineparallelism: 1

提交:

cd <FLINK_CDC_HOME>
bash bin/flink-cdc.sh mysql-to-kafka.yaml
# 控制台会打印 Job ID;Flink UI 可见运行中的作业

9. 验证消费

  • 容器内消费:
docker compose exec kafka kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--topic yaml-mysql-kafka --from-beginning
  • 宿主机消费(如本地装了 Kafka CLI):
kafka-console-consumer --bootstrap-server localhost:9093 \--topic yaml-mysql-kafka --from-beginning

你将看到 debezium-json 结构(before/after/op/source 等)。例如 insert:

{"before": null,"after": { "id": 1, "price": 4.00 },"op": "c","source": { "db": "app_db", "table": "orders" }
}

需要 ts_ms 等更多元数据?可在 MySQL source 开启 metadata.list 暴露(进阶可选)。

10. 实时变更与模式演进演示

在 MySQL 中执行(容器内):

INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);ALTER TABLE app_db.orders ADD amount VARCHAR(100) NULL;UPDATE app_db.orders SET price=100.00, amount='100.00' WHERE id=1;DELETE FROM app_db.orders WHERE id=2;

Kafka 消费端会实时看到 c/u/d 事件;新增列 amount 也会随模式演进传播到下游。

11. 进阶一:按表路由(含分片表合并)

将上游多表,路由为不同 Kafka 主题;或将分片表合并到一个主题

route:- source-table: app_db.orderssink-table: kafka_ods_orders- source-table: app_db.shipmentssink-table: kafka_ods_shipments- source-table: app_db.productssink-table: kafka_ods_products# 分片合并示例:app_db.order01 / order02 / ... → 同一主题- source-table: app_db.order\.*sink-table: kafka_ods_orders

查看已创建 Topic:

docker compose exec kafka kafka-topics.sh \--bootstrap-server kafka:9092 --list

12. 进阶二:多分区写入策略

sink.partition.strategy 支持:

  • all-to-zero(默认):全部写入分区 0
  • hash-by-key:按主键哈希分布到不同分区

示例:

sink:type: kafkaproperties.bootstrap.servers: kafka:9092topic: yaml-mysql-kafka-hash-by-keypartition.strategy: hash-by-key

先创建多分区主题(容器内):

docker compose exec kafka kafka-topics.sh \--create --topic yaml-mysql-kafka-hash-by-key \--bootstrap-server kafka:9092 --partitions 12

13. 进阶三:表到主题映射(不做 Schema 合并)

route 不同,TableId-to-Topic 映射不会合并不同表的 Schema,只是把不同表分派到不同 Topic

sink:type: kafkaproperties.bootstrap.servers: kafka:9092sink.tableId-to-topic.mapping: |app_db.orders:yaml-mysql-kafka-orders;app_db.shipments:yaml-mysql-kafka-shipments;app_db.products:yaml-mysql-kafka-products

14. 进阶四:输出格式切换

  • value.format: debezium-json(默认):before/after/op/source 等字段
  • value.format: canal-jsonold/data/type/database/table/pkNames 等字段

canal-json insert 示例:

{"old": null,"data": [{ "id": 1, "price": 100, "amount": "100.00" }],"type": "INSERT","database": "app_db","table": "orders","pkNames": ["id"]
}

15. 常见问题(踩坑手册)

  1. Host 连不通 Kafka

    • 使用双监听:容器用 kafka:9092,宿主机用 localhost:9093
    • 千万别把 bootstrap.servers 配成 0.0.0.0
  2. CDC 连接器未生效 / ClassNotFound

    • CDC MySQL/Kafka 连接器 JarFlink CDC 的 lib/
    • 还需要 MySQL Connector/J
  3. 正则匹配不到表

    • 匹配全库正确写法:app_db\..*(点号要转义)。
  4. server-id 冲突

    • 多个采集端连同一 MySQL 时,给不同的 server-id 区间。
  5. 时区/时间戳错乱

    • 指定 server-time-zone(如 UTC),并结合下游消费侧统一时区。
  6. Topic 自动创建与权限

    • 默认会自动创建;生产建议显式创建并配置权限、分区数、副本数、压缩与保留策略。
  7. Checkpoint 间隔

    • 开发环境 3s 没问题;生产需根据延迟、吞吐、Kafka 事务/幂等等综合评估。

16. 生产化清单(建议)

  • Flink 侧

    • Restart 策略、Checkpoint 存储(如 HDFS/S3)、并行度与资源配额
    • Savepoint 升级流程与回滚演练
  • Kafka 侧

    • Topic 副本数、分区数、保留策略(retention.ms/bytes
    • 压缩(compression.type),幂等/事务(Producer 端)
    • ACL、鉴权、网络与磁盘监控
  • MySQL 侧

    • binlog 保留、写入压力、主从延迟、热点表索引优化
    • DDL 规范(列类型兼容、避免频繁 DDL 抖动)
  • 可观测性

    • Flink Metrics、GC、背压(Backpressure)
    • Kafka Lag、Topic QPS、ISR 变化
  • 变更治理

    • 路由/映射规则变更审批与回归测试
    • 下游 Schema 演进策略(兼容列、空值、默认值)

17. 清理环境

# 关闭 Kafka/ZK/MySQL
docker compose down# 停止 Flink
"$FLINK_HOME"/bin/stop-cluster.sh
http://www.dtcms.com/a/581465.html

相关文章:

  • Redis内存回收,缓存问题
  • 一项基于高灵敏度sCMOS相机的光镊成像实验
  • wordpress 调用分类名上海不限关键词优化
  • 运维高级故障排除与恢复-SysRq
  • word插入的图片显示不完全解决方法
  • 北京响应式h5网站开发网站优化排名推广
  • 面向强化学习的状态空间建模:RSSM的介绍和PyTorch实现(3)
  • 网站建设如何上传图片阿里云虚拟主机可以做几个网站吗
  • 文章精读:(CVPR2024)DemosaicFormer:用于HybridEVS相机的粗到细去马赛克网络
  • 红帽Linux-基本管理存储
  • LangGraph基础教程(1)---LangGraph的简介
  • IDEA + Spring Boot 的三种热加载方案
  • 有哪些摄影网站甘肃省建设厅官网
  • presto安装与使用
  • 信息论(三):霍夫曼编码
  • 002-GD32L235KBQ6 Keil工程移植J-Link RTT
  • 手机网站建设 苏州企石镇做网站
  • java的设计模式之桥接模式(Bridge)
  • 【Unity踩坑】Error MSB3774: 找不到 SDK“WindowsMobile, Version=10.0.26100.0
  • 网站二维码可以做长按识别吗深圳专业网站建设制作价格低
  • 图片优化 上传图片压缩 npm包支持vue(react)框架开源插件 支持在线与本地
  • React Native CLI的搭建
  • 世界互联网大会乌镇峰会:共话数字未来新可能
  • TeamCity更新包
  • 第8届 AiDD峰会 深圳 | “AI+领域”线:解锁未来科技新图景
  • 网站搭建服务器需要多少钱网站运营seo招聘
  • 结构自由度
  • Effective Python 第49条:用__init_subclass__记录现有的子类
  • 《PLC编程与MES系统开发学习指南》详细大纲
  • AbMole小课堂丨L-NAME:一氧化氮合酶(NOS)经典抑制剂在心血管及免疫研究中的应用