基于 Flink CDC 的 MySQL → Kafka Streaming ELT 实战
1. 为什么选 Flink CDC CLI?
- 更轻:无需 IDE 与代码工程,YAML 定义即上生产。
- 更稳:基于 Flink Checkpoint 的端到端语义,天然容错。
- 更灵:内置路由、分区策略、Topic 映射、格式切换,覆盖 80% 以上实时采集/同步场景。
2. 目标与数据流

3. 环境准备
-
操作系统:Linux 或 macOS(Windows 建议 WSL2)
-
已安装:Docker & Docker Compose
-
需要下载:
- Flink 1.20.1
- Flink CDC CLI 3.5.0(解压后包含
bin/ lib/ conf/) - MySQL、Kafka CDC 连接器(Jar 放到 Flink CDC 的
lib/) - MySQL Connector/J(Jar 放到 Flink CDC 的
lib/或--jar指定)
4. 启动 Flink Standalone 集群
# 1) 下载并解压 Flink
tar -zxvf flink-1.20.1-bin-scala_2.12.tgz
export FLINK_HOME="$PWD/flink-1.20.1" # 注意:export 而不是 exprot(常见笔误)
cd "$FLINK_HOME"# 2) 开启 checkpoint(每 3 秒)
cat >> conf/config.yaml <<'YAML'
execution:checkpointing:interval: 3s
YAML# 3) 启动
./bin/start-cluster.sh
# 访问 UI: http://localhost:8081# 如需更多 TaskManager,可以再次执行 start-cluster.sh
生产部署在云上时,如需公网访问 UI,请按需设置
rest.bind-address/rest.address并配置安全策略。
5. 用 Docker Compose 启动 Kafka/ZooKeeper/MySQL
推荐:Kafka 使用双监听避免“容器内能连、宿主机连不上”的老问题。
docker-compose.yml
version: "3.8"
services:zookeeper:image: bitnami/zookeeper:3.9container_name: zookeeperenvironment:- ALLOW_ANONYMOUS_LOGIN=yesports:- "2181:2181"kafka:image: bitnami/kafka:3.7container_name: kafkadepends_on:- zookeeperenvironment:- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181# 双监听:容器内用 kafka:9092;宿主机用 localhost:9093- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,EXTERNAL://:9093- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXTports:- "9092:9092"- "9093:9093"mysql:# Debezium 示例镜像已启用 binlog 与所需权限image: debezium/example-mysql:1.1container_name: mysqlenvironment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpwports:- "3306:3306"
启动并检查:
docker compose up -d
docker ps
6. 初始化 MySQL 示例数据
docker compose exec mysql mysql -uroot -p123456 -e "
CREATE DATABASE IF NOT EXISTS app_db;
USE app_db;
CREATE TABLE orders (id INT NOT NULL, price DECIMAL(10,2) NOT NULL, PRIMARY KEY(id));
CREATE TABLE shipments(id INT NOT NULL, city VARCHAR(255) NOT NULL, PRIMARY KEY(id));
CREATE TABLE products (id INT NOT NULL, product VARCHAR(255) NOT NULL, PRIMARY KEY(id));
INSERT INTO orders VALUES (1,4.00),(2,100.00);
INSERT INTO shipments VALUES (1,'beijing'),(2,'xian');
INSERT INTO products VALUES (1,'Beer'),(2,'Cap'),(3,'Peanut');
"
7. 准备 Flink CDC CLI 与连接器
-
解压 Flink CDC CLI 3.5.0 到目录(记为 FLINK_CDC_HOME)
-
将以下 Jar 放入 FLINK_CDC_HOME/lib/:
- MySQL Pipeline Connector 3.5.0
- Kafka Pipeline Connector 3.5.0
- MySQL Connector/J(8.x)
切记:CDC 的连接器 Jar 放在 Flink CDC 的 lib 下,不要放到 Flink Home 的 lib(两者不是一个目录)。
8. 编写并提交第一条管道(全库 → 单 Topic)
mysql-to-kafka.yaml
################################################################################
# Sync all MySQL tables in app_db to a single Kafka topic
################################################################################
source:type: mysqlhostname: mysqlport: 3306username: rootpassword: 123456tables: app_db\..* # 正则,匹配 app_db 下所有表;注意转义点号server-id: 5400-5404 # 多值区间,避免与其他采集端冲突server-time-zone: UTCsink:type: kafkaname: Kafka Sinkproperties.bootstrap.servers: kafka:9092topic: yaml-mysql-kafkavalue.format: debezium-json # 默认;也可 canal-jsonpipeline:name: MySQL to Kafka Pipelineparallelism: 1
提交:
cd <FLINK_CDC_HOME>
bash bin/flink-cdc.sh mysql-to-kafka.yaml
# 控制台会打印 Job ID;Flink UI 可见运行中的作业
9. 验证消费
- 容器内消费:
docker compose exec kafka kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--topic yaml-mysql-kafka --from-beginning
- 宿主机消费(如本地装了 Kafka CLI):
kafka-console-consumer --bootstrap-server localhost:9093 \--topic yaml-mysql-kafka --from-beginning
你将看到 debezium-json 结构(before/after/op/source 等)。例如 insert:
{"before": null,"after": { "id": 1, "price": 4.00 },"op": "c","source": { "db": "app_db", "table": "orders" }
}
需要
ts_ms等更多元数据?可在 MySQL source 开启metadata.list暴露(进阶可选)。
10. 实时变更与模式演进演示
在 MySQL 中执行(容器内):
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);ALTER TABLE app_db.orders ADD amount VARCHAR(100) NULL;UPDATE app_db.orders SET price=100.00, amount='100.00' WHERE id=1;DELETE FROM app_db.orders WHERE id=2;
Kafka 消费端会实时看到 c/u/d 事件;新增列 amount 也会随模式演进传播到下游。
11. 进阶一:按表路由(含分片表合并)
将上游多表,路由为不同 Kafka 主题;或将分片表合并到一个主题:
route:- source-table: app_db.orderssink-table: kafka_ods_orders- source-table: app_db.shipmentssink-table: kafka_ods_shipments- source-table: app_db.productssink-table: kafka_ods_products# 分片合并示例:app_db.order01 / order02 / ... → 同一主题- source-table: app_db.order\.*sink-table: kafka_ods_orders
查看已创建 Topic:
docker compose exec kafka kafka-topics.sh \--bootstrap-server kafka:9092 --list
12. 进阶二:多分区写入策略
sink.partition.strategy 支持:
all-to-zero(默认):全部写入分区 0hash-by-key:按主键哈希分布到不同分区
示例:
sink:type: kafkaproperties.bootstrap.servers: kafka:9092topic: yaml-mysql-kafka-hash-by-keypartition.strategy: hash-by-key
先创建多分区主题(容器内):
docker compose exec kafka kafka-topics.sh \--create --topic yaml-mysql-kafka-hash-by-key \--bootstrap-server kafka:9092 --partitions 12
13. 进阶三:表到主题映射(不做 Schema 合并)
与 route 不同,TableId-to-Topic 映射不会合并不同表的 Schema,只是把不同表分派到不同 Topic:
sink:type: kafkaproperties.bootstrap.servers: kafka:9092sink.tableId-to-topic.mapping: |app_db.orders:yaml-mysql-kafka-orders;app_db.shipments:yaml-mysql-kafka-shipments;app_db.products:yaml-mysql-kafka-products
14. 进阶四:输出格式切换
value.format: debezium-json(默认):before/after/op/source等字段value.format: canal-json:old/data/type/database/table/pkNames等字段
canal-json insert 示例:
{"old": null,"data": [{ "id": 1, "price": 100, "amount": "100.00" }],"type": "INSERT","database": "app_db","table": "orders","pkNames": ["id"]
}
15. 常见问题(踩坑手册)
-
Host 连不通 Kafka
- 使用双监听:容器用
kafka:9092,宿主机用localhost:9093。 - 千万别把
bootstrap.servers配成0.0.0.0。
- 使用双监听:容器用
-
CDC 连接器未生效 / ClassNotFound
- CDC MySQL/Kafka 连接器 Jar 放 Flink CDC 的
lib/。 - 还需要 MySQL Connector/J。
- CDC MySQL/Kafka 连接器 Jar 放 Flink CDC 的
-
正则匹配不到表
- 匹配全库正确写法:
app_db\..*(点号要转义)。
- 匹配全库正确写法:
-
server-id 冲突
- 多个采集端连同一 MySQL 时,给不同的
server-id区间。
- 多个采集端连同一 MySQL 时,给不同的
-
时区/时间戳错乱
- 指定
server-time-zone(如UTC),并结合下游消费侧统一时区。
- 指定
-
Topic 自动创建与权限
- 默认会自动创建;生产建议显式创建并配置权限、分区数、副本数、压缩与保留策略。
-
Checkpoint 间隔
- 开发环境 3s 没问题;生产需根据延迟、吞吐、Kafka 事务/幂等等综合评估。
16. 生产化清单(建议)
-
Flink 侧
- Restart 策略、Checkpoint 存储(如 HDFS/S3)、并行度与资源配额
- Savepoint 升级流程与回滚演练
-
Kafka 侧
- Topic 副本数、分区数、保留策略(
retention.ms/bytes) - 压缩(
compression.type),幂等/事务(Producer 端) - ACL、鉴权、网络与磁盘监控
- Topic 副本数、分区数、保留策略(
-
MySQL 侧
- binlog 保留、写入压力、主从延迟、热点表索引优化
- DDL 规范(列类型兼容、避免频繁 DDL 抖动)
-
可观测性
- Flink Metrics、GC、背压(Backpressure)
- Kafka Lag、Topic QPS、ISR 变化
-
变更治理
- 路由/映射规则变更审批与回归测试
- 下游 Schema 演进策略(兼容列、空值、默认值)
17. 清理环境
# 关闭 Kafka/ZK/MySQL
docker compose down# 停止 Flink
"$FLINK_HOME"/bin/stop-cluster.sh
