当前位置: 首页 > news >正文

用 Flink CDC 将 MySQL 实时同步到 StarRocks

1、前置条件与环境说明

  • 一台 Linux 或 macOS 电脑,已安装 Docker / docker-compose

  • 端口占用:

    • Flink Web UI 默认 8081
    • StarRocks FE:HTTP(8030)、MySQL 协议(9030)(个别 all-in-one 镜像会把 FE HTTP 暴露为 8080,见下方“端口对照”说明)
    • MySQL:3306

端口对照备注(重要)

  • StarRocks 官方默认 FE HTTP 端口是 8030
  • 若你使用的镜像将 FE HTTP 暴露为 8080,请将 load-url 与浏览器访问端口改为 8080
  • 下面示例以 8030 为主,同时在需要处提示如何替换为 8080

2、启动 Flink Standalone(开启 Checkpoint)

下载 Flink 1.20.1 并解压,进入目录:

cd flink-1.20.1

conf/flink-conf.yaml(注意不是 config.yaml)中开启周期性 Checkpoint(每 3 秒一次):

# conf/flink-conf.yaml
execution.checkpointing.interval: 3s

启动集群:

./bin/start-cluster.sh

访问 Flink Web UI:http://localhost:8081/。若需要更多 TaskManager,可重复执行 start-cluster.sh

3、用 Docker Compose 拉起 MySQL 与 StarRocks

创建 docker-compose.yml(建议服务名全部小写,避免兼容性问题):

version: '2.1'
services:starrocks:image: starrocks/allin1-ubuntu:3.2.6ports:- "8030:8030"   # 若你的镜像以 8080 暴露 FE HTTP,请改成 "8080:8080"- "9030:9030"   # MySQL 协议端口mysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw

启动容器:

docker-compose up -d
docker ps

验证 StarRocks:

  • 若使用 8030:浏览器打开 http://localhost:8030/
  • 若镜像是 8080:打开 http://localhost:8080/

debezium/example-mysql 镜像已为 CDC 配置好 Binlog(ROW),适合演示。

4、在 MySQL 造数(orders / shipments / products)

进入容器并创建库表与示例数据:

docker-compose exec mysql mysql -uroot -p123456

执行 SQL:

-- 1) 创建库
CREATE DATABASE app_db;
USE app_db;-- 2) 订单表
CREATE TABLE `orders` (`id` INT NOT NULL,`price` DECIMAL(10,2) NOT NULL,PRIMARY KEY (`id`)
);
INSERT INTO `orders` (`id`, `price`) VALUES(1, 4.00),(2, 100.00);-- 3) 物流表
CREATE TABLE `shipments` (`id` INT NOT NULL,`city` VARCHAR(255) NOT NULL,PRIMARY KEY (`id`)
);
INSERT INTO `shipments` (`id`, `city`) VALUES(1, 'beijing'),(2, 'xian');-- 4) 商品表
CREATE TABLE `products` (`id` INT NOT NULL,`product` VARCHAR(255) NOT NULL,PRIMARY KEY (`id`)
);
INSERT INTO `products` (`id`, `product`) VALUES(1, 'Beer'),(2, 'Cap'),(3, 'Peanut');

5、准备 Flink CDC CLI 与连接器

  1. 解压 flink-cdc-3.5.0-bin.tar.gz → 得到 flink-cdc-3.5.0/{bin, lib, log, conf}

  2. 将以下 CDC 管道连接器 JAR 拷入 Flink CDC 的 lib/(注意:不是 Flink Home 的 lib/):

    • MySQL pipeline connector 3.5.0
    • StarRocks pipeline connector 3.5.0
  3. MySQL Connector/J(JDBC)不再随 CDC 连接器打包,你需要:

    • mysql-connector-j-*.jar 放入 Flink(非 CDC)lib/ 目录,
    • 或在提交作业时通过 --jar 传入。

稳定版本 JAR 可直接下载,SNAPSHOT 需自行从源码构建。

6、编写并提交整库同步 YAML

新建 mysql-to-starrocks.yaml

################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db\.\*              # 正则:整库 app_dbserver-id: 5400-5404            # 与现有复制/采集避免冲突server-time-zone: UTC           # 显式时区避免时间字段偏移sink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030        # 若镜像用 8080 暴露 FE HTTP,则改为 127.0.0.1:8080username: rootpassword: ""table.create.properties.replication_num: 1  # 演示单副本。生产建议 >= 3# 可选:若希望主键模型以支持 upsert,可在建表属性里声明 primary_key# table.create.properties.duplicate_key: false# table.create.properties.primary_key: "id"pipeline:name: Sync MySQL Database to StarRocksparallelism: 2

提交(不同版本 CLI 写法略有差异,任选其一):

bash bin/flink-cdc.sh run -f mysql-to-starrocks.yaml
# 或
bash bin/flink-cdc.sh mysql-to-starrocks.yaml

输出示例:

Pipeline has been submitted to cluster.
Job ID: 02a31c92f0e7bc9a1f4c0051980088a0
Job Description: Sync MySQL Database to StarRocks

在 Flink Web UI 可见名为 “Sync MySQL Database to StarRocks” 的作业运行中。

查看 StarRocks 数据:用数据库客户端(DBeaver / DataGrip / mysql CLI)连接 mysql://127.0.0.1:9030

7、在线验证:DML + DDL 的实时同步

进入 MySQL 容器:

docker-compose exec mysql mysql -uroot -p123456

逐步执行,观察 StarRocks 的 app_db.orders 变化:

-- 新增
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);-- Schema 演进:新增列
ALTER TABLE app_db.orders ADD amount VARCHAR(100) NULL;-- 更新(含新列)
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;-- 删除
DELETE FROM app_db.orders WHERE id=2;

刷新客户端(或 SELECT * FROM app_db.orders),可见 StarRocks 实时更新。
同理修改 shipmentsproducts 也会同步变化。

8、路由与“分表并表”示例

Flink CDC 的 route 能把源端库表“改名/迁移”到目标端的其它库表名;也支持正则匹配把多张分表并入一张目标表。

8.1 逐表路由(跨库迁移/改名)

################################################################################
# Description: Sync MySQL all tables to StarRocks with Route
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db\.\*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030    # 若你的镜像是 8080,请改为 127.0.0.1:8080username: rootpassword: ""table.create.properties.replication_num: 1route:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to StarRocksparallelism: 2

8.2 分表并表(正则聚合到单表)

route:- source-table: app_db.order\..*   # 如 app_db.order01 / order02 / order03sink-table: ods_db.ods_orders    # 并入同一目标表

⚠️ 现阶段 不支持“多个分表里存在相同主键”的数据并表(去重/冲突解决需在上游或 Transform 层处理;后续版本会增强)。

9、清理与回收

停止容器(在 docker-compose.yml 所在目录):

docker-compose down

停止 Flink 集群(在 flink-1.20.1 目录):

./bin/stop-cluster.sh

10、常见坑位与排障清单

  • 服务名大小写:Compose 服务名建议全小写(如 mysqlstarrocks),避免大小写导致的兼容问题;相应地 docker-compose exec mysql ...
  • FE HTTP 端口:默认是 8030;若你的 all-in-one 镜像暴露为 8080,请同步修改 load-url 与浏览器访问端口。
  • Flink 配置文件名:是 conf/flink-conf.yaml,不是 config.yaml
  • server-id 冲突:报错 “server-id in use” 时更换不冲突区间。
  • 时区错位:务必在 Source 指定 server-time-zone(如 UTC);否则 TIMESTAMP/DATETIME 可能偏移。
  • StarRocks 表模型:如需 Upsert 语义,建议使用 主键(PRIMARY KEY)模型,并确保有合理的主键;默认 Duplicate Key 也可运行,但与 Upsert 预期不同。
  • 连接器放置路径:CDC 连接器放在 Flink CDC Homelib/mysql-connector-j 放在 Flink Homelib/(或 --jar 传入)。
  • 整库白名单tables: app_db\.\* 会抓整库,生产建议白名单规则与命名规范,避免误采。
  • Exactly-Once 基线:合理设置 Checkpoint 间隔/超时;下游采用主键表 + 幂等/事务装载;重大变更前先做 Savepoint
http://www.dtcms.com/a/560632.html

相关文章:

  • 基础开发工具---软件包装管理器及vim
  • 邮箱登陆嵌入网站义乌网站建设方案详细
  • 榨干 CPU 性能:通过绑核将 Redis 尾延迟减半!
  • 数据结构之栈和队列-队列
  • 十九、STM32的TIM(十)(编码器)
  • FSDP(Fully Sharded Data Parallel)全分片数据并行详解
  • Transformer 模型详解:从输入到输出的全流程剖析
  • 网站开发工单营销型网站建设设定包括哪些方面
  • EF Core 数据库迁移
  • 【攻防实战】通达OA文件上传联动Cobalt Strike打穿三层内网(下)
  • 网站备案 教程广州花都区网站建设
  • FPC-40P-05转接板-嘉立创EDA设计
  • Java核心概念深度解析:从包装类到泛型的全面指南
  • 灵途科技亮相NEPCON ASIA 2025 以光电感知点亮具身智能未来
  • flash-attn安装过程中编译错误
  • 世界最受欢迎的免费架站平台经营网站 备案信息
  • 7.1 阴影贴图
  • Elastic AI agent builder 介绍(三)
  • React18中在有路由的情况下父组件如何给子组件传递数据?
  • 边缘计算和云计算有什么区别?
  • 做哪种网站流量大嵌入式软件开发工程师工作内容
  • 【第二十周】机器学习笔记09
  • Linux定时任务:crontab使用教程(附案例)
  • 网站建设配色方案wordpress对接静态网页
  • Instant Mail临时邮箱v7.3.0 最新解锁版
  • MPK(Mirage Persistent Kernel)源码笔记(5)--- 执行引擎
  • 微网站菜单网站开发用哪个框架
  • 【Cache缓存】基本概念
  • 数据结构:单链表(1)
  • 4.2 【2018统考真题】