当前位置: 首页 > news >正文

用 Flink CDC 将 MySQL 实时同步到 Doris

1. 前置条件与环境准备

  • 一台 Linux 或 macOS 电脑,并已安装 Dockerdocker-compose
  • 端口占用:Flink Web UI(默认 8081)、Doris FE(Web 8030 / MySQL 协议 9030)、MySQL(3306)。

备注:生产环境网络与鉴权策略不同,本文仅做本地演示。示例口令请勿直接用于生产。

2. 启动 Flink Standalone(含开启 Checkpoint)

下载 Flink 1.20.1 并解压,进入安装目录(例如 flink-1.20.1):

cd flink-1.20.1

开启 Checkpoint(每 3 秒一次)。常见配置文件为 conf/flink-conf.yaml(有的文档写作 conf/config.yaml,以你本地发行包为准):

# 在 conf/flink-conf.yaml 追加:
execution.checkpointing.interval: 3s

启动 Flink 集群:

./bin/start-cluster.sh

浏览器访问 http://localhost:8081/ 查看 Flink Web UI。多次执行 start-cluster.sh 可以启动多个 TaskManager。

3. Docker Compose 一键拉起 MySQL 与 Doris

宿主机设置(Doris 需要较大的内存映射)

Linux:

sudo sysctl -w vm.max_map_count=2000000

macOS(容器实现差异,常见做法是在特权容器里修改宿主机命名空间):

docker run -it --privileged --pid=host --name=change_count debian nsenter -t 1 -m -u -n -i sh
# 进入容器后执行:
sysctl -w vm.max_map_count=2000000
# 退出容器
exit

创建 docker-compose.yml(与本文同目录):

version: '2.1'
services:doris:image: yagagagaga/doris-standaloneports:- "8030:8030"   # Doris FE Web- "8040:8040"   # Doris BE Web(镜像提供)- "9030:9030"   # Doris MySQL 协议端口mysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw

启动容器:

docker-compose up -d
docker ps      # 检查容器是否正常

访问 http://localhost:8030/ 验证 Doris FE Web UI。默认用户 root,默认密码为空。

4. 在 MySQL 造数(orders/products/shipments)

进入 MySQL 容器并创建库表与示例数据:

docker-compose exec mysql mysql -uroot -p123456

执行 SQL:

-- 创建数据库
CREATE DATABASE app_db;
USE app_db;-- 订单表
CREATE TABLE `orders` (`id` INT NOT NULL,`price` DECIMAL(10,2) NOT NULL,PRIMARY KEY (`id`)
);
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00), (2, 100.00);-- 物流表
CREATE TABLE `shipments` (`id` INT NOT NULL,`city` VARCHAR(255) NOT NULL,PRIMARY KEY (`id`)
);
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'), (2, 'xian');-- 商品表
CREATE TABLE `products` (`id` INT NOT NULL,`product` VARCHAR(255) NOT NULL,PRIMARY KEY (`id`)
);
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'), (2, 'Cap'), (3, 'Peanut');

5. 在 Doris 创建数据库(app_db)

目前多数 Doris 连接器不支持自动创建数据库,需先手工创建。

进入 Doris FE Web(http://localhost:8030/)或使用 MySQL 客户端(mysql -h 127.0.0.1 -P9030 -uroot)执行:

CREATE DATABASE app_db;

6. 准备 Flink CDC CLI 与连接器

解压 flink-cdc-3.5.0-bin.tar.gz,得到 flink-cdc-3.5.0(包含 bin/ lib/ log/ conf/)。

将以下 CDC 管道连接器 JAR 放入 Flink CDC 的 lib/ 目录(注意:不是 Flink Home 的 lib/):

  • MySQL pipeline connector 3.5.0
  • Apache Doris pipeline connector 3.5.0

另外,MySQL Connector/J 不再随 CDC 连接器打包,你需要:

  • mysql-connector-j-*.jar 放入 Flink(非 CDC)的 lib/ 目录,
  • 或提交作业时使用 --jar 传入。

以上 JAR 获取方式与版本以你环境为准(稳定版可直接下载;SNAPSHOT 需从源码构建)。

7. 编写并提交整库同步的 YAML

新建 mysql-to-doris.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db\.\*              # 正则匹配整库server-id: 5400-5404            # 确保不与其他复制进程冲突server-time-zone: UTC           # 避免时间字段偏移sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1  # 演示镜像常见为单副本pipeline:name: Sync MySQL Database to Dorisparallelism: 2

提交作业(两种常见写法,取决于 CLI 版本):

# 写法 A(常见)
bash bin/flink-cdc.sh run -f mysql-to-doris.yaml# 写法 B(部分版本)
bash bin/flink-cdc.sh mysql-to-doris.yaml

成功后返回类似:

Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris

在 Flink Web UI 可看到作业运行中。

8. 在线验证:增删改与 DDL 变更的实时同步

进入 MySQL 容器:

docker-compose exec mysql mysql -uroot -p123456

执行以下步骤,每次在 Doris Web UI 刷新查看 orders 表变化:

-- 新增一行
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);-- 新增列(Schema 演进)
ALTER TABLE app_db.orders ADD amount VARCHAR(100) NULL;-- 更新(同时改 price 与新增列 amount)
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;-- 删除
DELETE FROM app_db.orders WHERE id=2;

同理,修改 shipmentsproducts 也会实时反映到 Doris。

9. 路由与分表并表:将多张表/分表写入统一目标表

路由(Route) 可以把源端库表结构/数据映射到不同的目标库表名,实现库表重命名跨库迁移、以及分表并表

9.1 指定映射(逐表改名/迁移)

################################################################################
# Description: Sync MySQL all tables to Doris with Route
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db\.\*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030benodes: 127.0.0.1:8040username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to Dorisparallelism: 2

上述配置可实现把 app_db.orders 迁移到 ods_db.ods_orders 等。

9.2 分表并表(正则匹配分表名)

route:- source-table: app_db.order\..*   # 例如 app_db.order01 / order02 / order03sink-table: ods_db.ods_orders   # 汇总到统一目标表

注意:当前不支持“多个分表里存在相同主键数据”的并表场景(会在未来版本支持)。如你有这种情况,需要在上游或 Transform 层先做去重/裁剪策略。

10. 收尾清理

停止所有容器(位于 docker-compose.yml 同目录):

docker-compose down

停止 Flink 集群(位于 flink-1.20.1 目录):

./bin/stop-cluster.sh

11. 常见问题与注意事项

  • Checkpoint 文件名:一般为 conf/flink-conf.yaml;不同发行包文档可能写作 config.yaml,以你本地为准。
  • Server-ID 冲突:若报 “server-id in use”,请更换不冲突的 server-id 区间。
  • 时区偏移:务必在 Source 端指定 server-time-zone(例如 UTC),避免时间字段错位。
  • Doris 副本数:演示镜像常见单副本;生产建议 replication_num >= 3
  • 连接器位置:CDC 连接器放 CDC Home 的 lib/mysql-connector-jFlink Home 的 lib/(或 --jar 指定)。
  • 整库白名单tables: app_db\.\* 会捕获整库,生产需建立白名单/命名规范,避免误采/误删。
  • Exactly-Once 基线:按需调整 Checkpoint 间隔/超时;在下游启用主键/唯一键的 Upsert 或事务写入策略。
  • macOS vm.max_map_count:请按文中“特权容器 + nsenter”方式设置;不同系统或 Docker 后端实现可能略有差异。
http://www.dtcms.com/a/561491.html

相关文章:

  • win7家用电脑做网站服务器网站开发实习报告
  • 鲸鱼算法详细原理,公式,应用案例-鲸鱼算法优化扩展卡尔曼滤波EKF
  • 「PPG信号处理——(4)基于PPG-ECG 多模态生理信号预处理与特征提取方法研究」2025年11月2日
  • 【Kotlin内联函数】
  • (论文速读)YOLA:学习照明不变特征的低光目标检测
  • 太原网站建设方案开发开源之家
  • 系统思考修炼之旅
  • 网站 默认首页网站域名ip地址查询
  • 矩阵系统哪个好?2025年全方位选型指南与品牌深度解析
  • 合肥瑶海区网站建设方案陇南网站建设
  • 怎么做vip视频网站dedecms 网站还原
  • 网站关键词标签php 网站源代码
  • 本地仓库如何同时绑定多个远程仓库
  • 网站基础建设巴巴商友圈wordpress最好用php
  • Maven 详解(上)
  • 25年05月架构甄选范文“论AI软件测试”,软考高级,系统架构设计师论文
  • Maven 详解(下)
  • 网站建设和托管商业网站建设与维护
  • 国内优秀的企业网站seo排名优化服务
  • 网站开发毕设文档上海软件外包公司名单
  • 4.2 IPv4【2009统考真题】
  • Photoshop - Photoshop 工具栏(21)吸管工具
  • 网站建设专公众号平台有哪些
  • wordpress 全站过滤河南省建设厅官网查询
  • AEB对碰撞安全的作用
  • [论文阅读] AI + 教育 | AI赋能“三个课堂”的破局之道——具身认知与技术路径深度解读
  • 男的和女的做那种短视频网站网站标签的作用
  • 响应式网站开发的特点万网域名管理平台
  • 车载软件需求开发与管理 --- 需求分析与分解
  • 点击劫持攻击完整防护指南