用 Flink CDC 将 MySQL 实时同步到 StarRocks
1、前置条件与环境说明
-
一台 Linux 或 macOS 电脑,已安装 Docker / docker-compose。
-
端口占用:
- Flink Web UI 默认
8081 - StarRocks FE:HTTP(8030)、MySQL 协议(9030)(个别 all-in-one 镜像会把 FE HTTP 暴露为 8080,见下方“端口对照”说明)
- MySQL:
3306
- Flink Web UI 默认
端口对照备注(重要)
- StarRocks 官方默认 FE HTTP 端口是 8030。
- 若你使用的镜像将 FE HTTP 暴露为 8080,请将
load-url与浏览器访问端口改为8080。 - 下面示例以 8030 为主,同时在需要处提示如何替换为
8080。
2、启动 Flink Standalone(开启 Checkpoint)
下载 Flink 1.20.1 并解压,进入目录:
cd flink-1.20.1
在 conf/flink-conf.yaml(注意不是 config.yaml)中开启周期性 Checkpoint(每 3 秒一次):
# conf/flink-conf.yaml
execution.checkpointing.interval: 3s
启动集群:
./bin/start-cluster.sh
访问 Flink Web UI:http://localhost:8081/。若需要更多 TaskManager,可重复执行 start-cluster.sh。
3、用 Docker Compose 拉起 MySQL 与 StarRocks
创建 docker-compose.yml(建议服务名全部小写,避免兼容性问题):
version: '2.1'
services:starrocks:image: starrocks/allin1-ubuntu:3.2.6ports:- "8030:8030" # 若你的镜像以 8080 暴露 FE HTTP,请改成 "8080:8080"- "9030:9030" # MySQL 协议端口mysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw
启动容器:
docker-compose up -d
docker ps
验证 StarRocks:
- 若使用 8030:浏览器打开 http://localhost:8030/
- 若镜像是 8080:打开 http://localhost:8080/
debezium/example-mysql镜像已为 CDC 配置好 Binlog(ROW),适合演示。
4、在 MySQL 造数(orders / shipments / products)
进入容器并创建库表与示例数据:
docker-compose exec mysql mysql -uroot -p123456
执行 SQL:
-- 1) 创建库
CREATE DATABASE app_db;
USE app_db;-- 2) 订单表
CREATE TABLE `orders` (`id` INT NOT NULL,`price` DECIMAL(10,2) NOT NULL,PRIMARY KEY (`id`)
);
INSERT INTO `orders` (`id`, `price`) VALUES(1, 4.00),(2, 100.00);-- 3) 物流表
CREATE TABLE `shipments` (`id` INT NOT NULL,`city` VARCHAR(255) NOT NULL,PRIMARY KEY (`id`)
);
INSERT INTO `shipments` (`id`, `city`) VALUES(1, 'beijing'),(2, 'xian');-- 4) 商品表
CREATE TABLE `products` (`id` INT NOT NULL,`product` VARCHAR(255) NOT NULL,PRIMARY KEY (`id`)
);
INSERT INTO `products` (`id`, `product`) VALUES(1, 'Beer'),(2, 'Cap'),(3, 'Peanut');
5、准备 Flink CDC CLI 与连接器
-
解压
flink-cdc-3.5.0-bin.tar.gz→ 得到flink-cdc-3.5.0/{bin, lib, log, conf}。 -
将以下 CDC 管道连接器 JAR 拷入 Flink CDC 的
lib/(注意:不是 Flink Home 的lib/):- MySQL pipeline connector 3.5.0
- StarRocks pipeline connector 3.5.0
-
MySQL Connector/J(JDBC)不再随 CDC 连接器打包,你需要:
- 把
mysql-connector-j-*.jar放入 Flink(非 CDC) 的lib/目录, - 或在提交作业时通过
--jar传入。
- 把
稳定版本 JAR 可直接下载,SNAPSHOT 需自行从源码构建。
6、编写并提交整库同步 YAML
新建 mysql-to-starrocks.yaml:
################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db\.\* # 正则:整库 app_dbserver-id: 5400-5404 # 与现有复制/采集避免冲突server-time-zone: UTC # 显式时区避免时间字段偏移sink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030 # 若镜像用 8080 暴露 FE HTTP,则改为 127.0.0.1:8080username: rootpassword: ""table.create.properties.replication_num: 1 # 演示单副本。生产建议 >= 3# 可选:若希望主键模型以支持 upsert,可在建表属性里声明 primary_key# table.create.properties.duplicate_key: false# table.create.properties.primary_key: "id"pipeline:name: Sync MySQL Database to StarRocksparallelism: 2
提交(不同版本 CLI 写法略有差异,任选其一):
bash bin/flink-cdc.sh run -f mysql-to-starrocks.yaml
# 或
bash bin/flink-cdc.sh mysql-to-starrocks.yaml
输出示例:
Pipeline has been submitted to cluster.
Job ID: 02a31c92f0e7bc9a1f4c0051980088a0
Job Description: Sync MySQL Database to StarRocks
在 Flink Web UI 可见名为 “Sync MySQL Database to StarRocks” 的作业运行中。
查看 StarRocks 数据:用数据库客户端(DBeaver / DataGrip / mysql CLI)连接
mysql://127.0.0.1:9030。
7、在线验证:DML + DDL 的实时同步
进入 MySQL 容器:
docker-compose exec mysql mysql -uroot -p123456
逐步执行,观察 StarRocks 的 app_db.orders 变化:
-- 新增
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);-- Schema 演进:新增列
ALTER TABLE app_db.orders ADD amount VARCHAR(100) NULL;-- 更新(含新列)
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;-- 删除
DELETE FROM app_db.orders WHERE id=2;
刷新客户端(或 SELECT * FROM app_db.orders),可见 StarRocks 实时更新。
同理修改 shipments、products 也会同步变化。
8、路由与“分表并表”示例
Flink CDC 的 route 能把源端库表“改名/迁移”到目标端的其它库表名;也支持正则匹配把多张分表并入一张目标表。
8.1 逐表路由(跨库迁移/改名)
################################################################################
# Description: Sync MySQL all tables to StarRocks with Route
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db\.\*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030 # 若你的镜像是 8080,请改为 127.0.0.1:8080username: rootpassword: ""table.create.properties.replication_num: 1route:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to StarRocksparallelism: 2
8.2 分表并表(正则聚合到单表)
route:- source-table: app_db.order\..* # 如 app_db.order01 / order02 / order03sink-table: ods_db.ods_orders # 并入同一目标表
⚠️ 现阶段 不支持“多个分表里存在相同主键”的数据并表(去重/冲突解决需在上游或 Transform 层处理;后续版本会增强)。
9、清理与回收
停止容器(在 docker-compose.yml 所在目录):
docker-compose down
停止 Flink 集群(在 flink-1.20.1 目录):
./bin/stop-cluster.sh
10、常见坑位与排障清单
- 服务名大小写:Compose 服务名建议全小写(如
mysql、starrocks),避免大小写导致的兼容问题;相应地docker-compose exec mysql ...。 - FE HTTP 端口:默认是 8030;若你的 all-in-one 镜像暴露为 8080,请同步修改
load-url与浏览器访问端口。 - Flink 配置文件名:是
conf/flink-conf.yaml,不是config.yaml。 - server-id 冲突:报错 “server-id in use” 时更换不冲突区间。
- 时区错位:务必在 Source 指定
server-time-zone(如 UTC);否则TIMESTAMP/DATETIME可能偏移。 - StarRocks 表模型:如需 Upsert 语义,建议使用 主键(PRIMARY KEY)模型,并确保有合理的主键;默认 Duplicate Key 也可运行,但与 Upsert 预期不同。
- 连接器放置路径:CDC 连接器放在 Flink CDC Home 的
lib/;mysql-connector-j放在 Flink Home 的lib/(或--jar传入)。 - 整库白名单:
tables: app_db\.\*会抓整库,生产建议白名单规则与命名规范,避免误采。 - Exactly-Once 基线:合理设置 Checkpoint 间隔/超时;下游采用主键表 + 幂等/事务装载;重大变更前先做 Savepoint。
