当前位置: 首页 > news >正文

flink CDC 3.5.0

两种模式

维度YAML Pipeline 模式Flink SQL 模式
开发方式配置文件(YAML)SQL 脚本
是否需要编码❌ 无代码❌ 无代码(声明式)
依赖组件需完整 flink-cdc 发行包仅需 flink-sql-connector-mysql-cdc-*.jar
提交方式$FLINK_CDC/bin/flink-cdc.sh conf.yaml$FLINK_HOME/bin/sql-client.sh -f job.sql
运行模式本地 MiniCluster(默认)
或 Application Mode(3.4+)
必须提交到 Flink 集群(Session/Per-Job/Application)
Web UI 可见性❌ 默认不可见
✅ 3.4+ 集群模式可见
✅ 始终可见
作业生命周期管理⚠️ 有限(kill 进程 / Savepoint)✅ 完整(Cancel / Savepoint / Resume)
多表同步✅ 支持(table-name: t1,t2✅ 支持(需建多个源表 + UNION ALL 或分别 INSERT)
字段过滤/转换❌ 不支持(全字段透传)✅ 支持(SELECT col1, UPPER(col2) ...
Join / Aggregation❌ 不支持✅ 支持(窗口、维表关联等)
调试友好性✅ 简单快速✅ SQL Client 支持交互式调试
生产适用性⚠️ 适合轻量级、测试✅ 强烈推荐用于生产环境
版本要求Flink CDC ≥ 2.3(集群模式需 ≥ 3.4)Flink ≥ 1.13 + 对应 CDC JAR
部署复杂度中(需维护 flink-cdc 目录)低(只需放 JAR 到 lib/)

YAML Pipeline 模式

Flink CDC部署

  1.  解压安装 flink-cdc-3.5.0-bin.tar.gz
    tar -xzvf flink-cdc-3.5.0-bin.tar.gz -C /opt/module

  2. 配置环境变量
    sudo vim /etc/profile.d/myprofile.sh
    #设置 flink CDC 环境变量
    export FLINK_CDC=/opt/module/flink-cdc-3.5.0
    export PATH=$FLINK_CDC/bin:$PATH# 配置生效
    source /etc/profile

  3. 将连接器上载到lib
    
    ph@mydoris:/opt/module/flink-cdc-3.5.0/lib$ ls -l  *pip*con*
    -rw-rw-r-- 1 ph ph 40975122 Nov 16 10:05 flink-cdc-pipeline-connector-doris-3.5.0.jar
    -rw-rw-r-- 1 ph ph 33183668 Nov 16 10:05 flink-cdc-pipeline-connector-iceberg-3.5.0.jar
    -rw-rw-r-- 1 ph ph  5975969 Nov 16 10:05 flink-cdc-pipeline-connector-kafka-3.5.0.jar
    -rw-rw-r-- 1 ph ph 21375378 Nov 16 10:05 flink-cdc-pipeline-connector-mysql-3.5.0.jar
    






Mysql 实时同步到 Kafka

数据库准备

  1.  mysql创建数据库
    -- 创建数据库
    CREATE DATABASE app_db;-- 数据库内至少要有一个表,后面才能启动CDC同步任务,否则报错
    -- 创建 orders 表
    CREATE TABLE `orders` (
    `order_id` INT NOT NULL,
    `price` DECIMAL(10,2) NOT NULL,
    PRIMARY KEY (`order_id`)
    );
    

 启动同步任务

创建整库同步任务配置文件

vim $FLINK_CDC/conf/mysql-to-kafka.yaml
################################################################################
# Description: Sync MySQL all tables to Kafka (as intermediate layer)
################################################################################
source:type: mysqlhostname: mydorisport: 3306username: rootpassword: Admin1234tables: app_db.\.*server-id: 5400-5404server-time-zone: Asia/Shanghaisink:type: kafkatopic: mysql-app-db-all    # 所有表写入一个 Topicproperties.bootstrap.servers: mydoris:9092value.format: debezium-jsonpipeline:name: MySQL-to-Kafka-CDC-Pipelineparallelism: 2

提交任务到 Flink Standalone cluster

$FLINK_CDC//bin/flink-cdc.sh $FLINK_CDC//conf/mysql-to-kafka.yaml
Pipeline has been submitted to cluster.
Job ID: 7297b79ba93e5483f83cc60dca95b883
Job Description: MySQL-to-Kafka-CDC-Pipeline

查看作业

查看checkpoint

验证同步数据

验证同步数据

启用测试消费客户端

$KAFKA_HOME/bin/kafka-console-consumer.sh \--bootstrap-server mydoris:9092 \--topic mysql-app-db-all \| jq
插入更新删除
INSERT INTO `orders` (`order_id`, `price`) VALUES (27, 4.00);UPDATE `orders` SET `price` = 5.00 WHERE `order_id` = 1;DELETE FROM  `orders` WHERE `order_id` = 27;

    Mysql 实时同步到 Doris

    数据库准备

    1. doris中创建数据库
      create database app_db;

    启动同步任务

    1. 创建整库同步任务配置文件:
      vim $FLINK_CDC/conf/mysql-to-doris.yaml
      ################################################################################
      # Description: Sync MySQL all tables to Doris
      ################################################################################
      source:type: mysqlhostname: mydorisport: 3306username: rootpassword: Admin1234tables: app_db.\.*server-id: 5400-5404server-time-zone: Asia/Shanghaisink:type: dorisfenodes: mydoris:8030username: rootpassword: "Admin1234"table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2
      
    2. 提交任务到 Flink Standalone cluster
      $FLINK_CDC//bin/flink-cdc.sh $FLINK_CDC//conf/mysql-to-doris.yaml
      SLF4J: Class path contains multiple SLF4J bindings.
      SLF4J: Found binding in [jar:file:/opt/module/flink-1.19.3/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.3.6/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
      SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
      Pipeline has been submitted to cluster.
      Job ID: 2926ca3390225dee172cfed94a818191
      Job Description: Sync MySQL Database to Doris
      
    3. 查看作业(flink web ui)
    4. 检查checkpoint

    5. 查看作业(cli)
      $FLINK_HOME/bin/flink list
      ------------------ Running/Restarting Jobs -------------------
      16.11.2025 02:37:23 : 2926ca3390225dee172cfed94a818191 : Sync MySQL Database to Doris (RUNNING)
      --------------------------------------------------------------
      
    6. 查看日志


      作业已经启动

      列出所有mysql数据库,过滤出要同步的表

      将表结构同步到doris
    7. 查看表
      mysql表结构已同步至doris
      SHOW CREATE TABLE app_db.orders ;
      CREATE TABLE `orders` (`order_id` int NULL,`price` decimal(10,2) NULL
      ) ENGINE=OLAP
      UNIQUE KEY(`order_id`)
      DISTRIBUTED BY HASH(`order_id`) BUCKETS AUTO
      PROPERTIES (
      "replication_allocation" = "tag.location.default: 1",
      "min_load_replica_num" = "-1",
      "is_being_synced" = "false",
      "storage_medium" = "hdd",
      "storage_format" = "V2",
      "inverted_index_storage_format" = "V2",
      "enable_unique_key_merge_on_write" = "true",
      "light_schema_change" = "true",
      "disable_auto_compaction" = "false",
      "enable_single_replica_compaction" = "false",
      "group_commit_interval_ms" = "10000",
      "group_commit_data_bytes" = "134217728",
      "enable_mow_light_delete" = "false"
      );

    验证同步数据

    1. 插入数据
      INSERT INTO `orders` (`order_id`, `price`) VALUES (1, 4.00);

    2. 更新数据
      UPDATE `orders` SET `price` = 5.00 WHERE `order_id` = 1;
    3. 删除数据
    4. 查看表已经同步

    Flink SQL 模式

    MySQL 到 Kafka

    创建作业

    -- 创建 CDC 源表
    CREATE TABLE orders_cdc (order_id INT,price DECIMAL(10,2),PRIMARY KEY (order_id ) NOT ENFORCED
    ) WITH ('connector' = 'mysql-cdc','hostname' = 'mydoris','port' = '3306','username' = 'root','password' = 'Admin1234','database-name' = 'app_db','table-name' = 'orders','server-id' = '5400-5499','server-time-zone' = 'Asia/Shanghai'
    );-- 创建 Kafka Sink 表
    CREATE TABLE kafka_orders (order_id INT,price DECIMAL(10,2)
    ) WITH ('connector' = 'kafka','topic' = 'app_db.orders','properties.bootstrap.servers' = 'mydoris:9092','format' = 'debezium-json'  -- 与 Doris Routine Load 兼容
    );-- 启动同步
    INSERT INTO kafka_orders SELECT * FROM orders_cdc;
    

    提交作业

    $FLINK_HOME/bin/sql-client.sh -f $FLINK_HOME/conf/mysql-to-kafka.sql[INFO] Executing SQL from file.Command history file path: /home/ph/.flink-sql-history
    Flink SQL> -- 创建 CDC 源表
    > CREATE TABLE orders_cdc (
    >     order_id INT,
    >     price DECIMAL(10,2),
    >     PRIMARY KEY (order_id) NOT ENFORCED
    > ) WITH (
    >     'connector' = 'mysql-cdc',
    >     'hostname' = 'mydoris',
    >     'port' = '3306',
    >     'username' = 'root',
    >     'password' = 'Admin1234',
    >     'database-name' = 'app_db',
    >     'table-name' = 'orders',
    >     'server-id' = '5400-5499',
    >     'server-time-zone' = 'Asia/Shanghai'
    > )[INFO] Execute statement succeed.Flink SQL>
    > -- 创建 Kafka Sink 表
    > CREATE TABLE kafka_orders (
    >     order_id INT,
    >     price DECIMAL(10,2)
    > ) WITH (
    >     'connector' = 'kafka',
    >     'topic' = 'app_db.orders',
    >     'properties.bootstrap.servers' = 'mydoris:9092',
    >     'format' = 'debezium-json'  -- 与 Doris Routine Load 兼容
    > )[INFO] Execute statement succeed.Flink SQL> [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: e7c7aaffdf46165bb0c7ef403f32cd1dFlink SQL>
    Shutting down the session...
    done.

    Mysql 到 Doris

    说明:还不支持自动建表,需在Doris提前建表

    创建作业

    -- 创建 CDC 源表
    CREATE TABLE orders_cdc (order_id INT,price DECIMAL(10,2),PRIMARY KEY (order_id ) NOT ENFORCED
    ) WITH ('connector' = 'mysql-cdc','hostname' = 'mydoris','port' = '3306','username' = 'root','password' = 'Admin1234','database-name' = 'app_db','table-name' = 'orders','server-id' = '5400-5499','server-time-zone' = 'Asia/Shanghai'
    );-- 创建 Doris Sink 表(Doris 3.0.8 + Connector 25.1.0)
    CREATE TABLE doris_orders (order_id INT,price DECIMAL(10,2)
    ) WITH ('connector' = 'doris','fenodes' = 'mydoris:8030','table.identifier' = 'app_db.orders','username' = 'root','password' = 'Admin1234','sink.label-prefix' = 'flink_app_db_orders_308','sink.enable-delete' = 'true','sink.buffer-flush.interval' = '10s',        --'sink.buffer-flush.max-rows' = '100000','sink.buffer-flush.max-bytes' = '10485760'   -- 可选:10MB
    );
    

    提交作业

    $FLINK_HOME/bin/sql-client.sh -f $FLINK_HOME/conf/mysql-to-doris.sql
    [INFO] Executing SQL from file.Command history file path: /home/ph/.flink-sql-history
    Flink SQL> -- 创建 CDC 源表
    > CREATE TABLE orders_cdc (
    >     order_id INT,
    >     price DECIMAL(10,2),
    >     PRIMARY KEY (order_id) NOT ENFORCED
    > ) WITH (
    >     'connector' = 'mysql-cdc',
    >     'hostname' = 'mydoris',
    >     'port' = '3306',
    >     'username' = 'root',
    >     'password' = 'Admin1234',
    >     'database-name' = 'app_db',
    >     'table-name' = 'orders',
    >     'server-id' = '5400-5499',
    >     'server-time-zone' = 'Asia/Shanghai'
    > )[INFO] Execute statement succeed.Flink SQL>
    > -- 创建 Doris Sink 表(Doris 3.0.8 + Connector 25.1.0)
    > CREATE TABLE doris_orders (
    >     order_id INT,
    >     price DECIMAL(10,2)
    > ) WITH (
    >     'connector' = 'doris',
    >     'fenodes' = 'mydoris:8030',
    >     'table.identifier' = 'app_db.orders',
    >     'username' = 'root',
    >     'password' = 'Admin1234',
    >     'sink.label-prefix' = 'flink_app_db_orders_308',
    >     'sink.enable-delete' = 'true',
    >     'sink.buffer-flush.interval' = '10s',        --
    >     'sink.buffer-flush.max-rows' = '100000',
    >     'sink.buffer-flush.max-bytes' = '10485760'   -- 可选:10MB
    > )[INFO] Execute statement succeed.Flink SQL> [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 75c9f897cead849fe501d130d5a1b055Flink SQL>
    Shutting down the session...
    done.
    

     Mysql 到 Iceberg

    创建作业

    -- 1. 创建 Iceberg Catalog(使用 Hive Metastore)
    CREATE CATALOG iceberg_hive WITH ('type' = 'iceberg','catalog-type' = 'hive','uri' = 'thrift://mydoris:9083',               -- Hive Metastore 地址'warehouse' = 'hdfs://mydoris:9000/user/hive/warehouse', -- Iceberg 数据根目录'clients' = '5',                                      -- 可选:Hive 客户端池大小'property-version' = '1'
    );-- 2. 使用该 Catalog
    USE CATALOG iceberg_hive;-- 3. 创建数据库(如果不存在)
    CREATE DATABASE IF NOT EXISTS app_db;-- 4. 【可选】手动创建 Iceberg 表(推荐)
    -- 如果不创建,Flink 会自动建表(但可能缺少主键/v2 格式)
    CREATE TABLE IF NOT EXISTS app_db.orders (order_id INT,price DECIMAL(10,2),PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH ('format-version' = '2','write.upsert.enabled' = 'true','write.format.default' = 'parquet'
    );-- 5. 创建 MySQL CDC 源表
    USE CATALOG default_catalog;
    USE default_database;CREATE TABLE orders_cdc (order_id INT,price DECIMAL(10,2),PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH ('connector' = 'mysql-cdc','hostname' = 'mydoris','port' = '3306','username' = 'root','password' = 'Admin1234','database-name' = 'app_db','table-name' = 'orders','server-id' = '5400-5499','server-time-zone' = 'Asia/Shanghai'
    );-- 6. 启动同步(写入 Iceberg)
    

    提交作业

    $FLINK_HOME/bin/sql-client.sh -f $FLINK_HOME/conf/mysql-to-iceberg.sql
    [INFO] Executing SQL from file.Command history file path: /home/ph/.flink-sql-history
    Flink SQL> -- 1. 创建 Iceberg Catalog(使用 Hive Metastore)
    > CREATE CATALOG iceberg_hive WITH (
    >     'type' = 'iceberg',
    >     'catalog-type' = 'hive',
    >     'uri' = 'thrift://mydoris:9083',               -- Hive Metastore 地址
    >     'warehouse' = 'hdfs://mydoris:9000/user/hive/warehouse', -- Iceberg 数据根目录
    >     'clients' = '5',                                      -- 可选:Hive 客户端池大小
    >     'property-version' = '1'
    > )[INFO] Execute statement succeed.Flink SQL> [INFO] Execute statement succeed.Flink SQL> [INFO] Execute statement succeed.Flink SQL>
    > -- 4. 【可选】手动创建 Iceberg 表(推荐)
    > -- 如果不创建,Flink 会自动建表(但可能缺少主键/v2 格式)
    > CREATE TABLE IF NOT EXISTS app_db.orders (
    >     order_id INT,
    >     price DECIMAL(10,2),
    >     PRIMARY KEY (order_id) NOT ENFORCED
    > ) WITH (
    >     'format-version' = '2',
    >     'write.upsert.enabled' = 'true',
    >     'write.format.default' = 'parquet'
    > )[INFO] Execute statement succeed.Flink SQL> [INFO] Execute statement succeed.Flink SQL> [INFO] Execute statement succeed.Flink SQL>
    > CREATE TABLE orders_cdc (
    >     order_id INT,
    >     price DECIMAL(10,2),
    >     PRIMARY KEY (order_id) NOT ENFORCED
    > ) WITH (
    >     'connector' = 'mysql-cdc',
    >     'hostname' = 'mydoris',
    >     'port' = '3306',
    >     'username' = 'root',
    >     'password' = 'Admin1234',
    >     'database-name' = 'app_db',
    >     'table-name' = 'orders',
    >     'server-id' = '5400-5499',
    >     'server-time-zone' = 'Asia/Shanghai'
    > )[INFO] Execute statement succeed.Flink SQL> [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: c09197fa420dae7499def1a6ae386a24
    

    作业启停

    显示当前作业

    # 查询当前作业
    $FLINK_HOME/bin/flink list------------------ Running/Restarting Jobs -------------------
    16.11.2025 07:41:13 : 0bfd330def2ba1de3a1d4ea873728d4a : insert-into_default_catalog.default_database.doris_orders (RUNNING)
    --------------------------------------------------------------
    No scheduled jobs.# 停止当前作业
    $FLINK_HOME/bin/flink stop 0bfd330def2ba1de3a1d4ea873728d4a Suspending job "0bfd330def2ba1de3a1d4ea873728d4a" with a CANONICAL savepoint.
    Triggering stop-with-savepoint for job 0bfd330def2ba1de3a1d4ea873728d4a.
    Waiting for response...
    Savepoint completed. Path: hdfs://mydoris:9000/flink/savepoints/savepoint-0bfd33-49b7d53ecc09# 重启当前作业
    修改 mysql-to-doris.sql 文件,在最前面添加
    SET 'execution.savepoint.path' = 'hdfs://mydoris:9000/flink/savepoints/savepoint-0bfd33-49b7d53ecc09';
    # 修改后,再重新提交作业
    $FLINK_HOME/bin/sql-client.sh -f $FLINK_HOME/conf/mysql-to-doris.sql

    作业启动后查看恢复信息

    Latest Restore    ID: 12 
    Restore Time: 2025-11-16 15:54:50.524 
    Type: Savepoint  
    Path: hdfs://mydoris:9000/flink/savepoints/savepoint-0bfd33-49b7d53ecc09

    http://www.dtcms.com/a/618934.html

    相关文章:

  • 阿里巴巴网站备案号用wordpress
  • 网站seo服务商seo文章外包
  • 微信网站设计运营用DW做的网站怎么分享给别人
  • 怎么建网站教程图解棋牌游戏开发多少钱
  • 广西智能网站建设哪家有h5页面设计是什么意思
  • AI 招聘智能体
  • 菏泽网站建设熊掌号微信怎么制作自己的小程序
  • 网站商城系统建设协会网站改版建议
  • CSS-2:CSS的元素显示模式
  • 国外互动网站wordpress使用邮箱
  • F280049C学习笔记之SCI
  • 17.背光PWM调节
  • RAID特性
  • ThreadLocal为什么会发生内存泄漏
  • 在阿里云建设一个网站的全流程华凯创意的展馆设计案例
  • 网站主页怎么做竞价排名适合百度这样的网络平台吗
  • 开源机器学习课程mlcourse.ai:理论与实践完美结合的AI学习指南
  • 网站怎么做站内美化城乡建设杂志社官方网站
  • 网站推广的主要方法腾讯云域名注册官网
  • MySQL 主从延迟问题深度解析:常见原因与解决方案(强总结 + 易懂版)
  • 【开题答辩全过程】以 基于springboot的在线影院系统设为例,包含答辩的问题和答案
  • 39.华为云运维类服务
  • 做个网站商场需要多少软件商店下载到手机
  • 【Java 基础】3 面向对象 - this
  • 网站开发赚钱方向做个app商城类的要多少钱
  • 代码随想录-day37
  • <MySQL——L2>
  • 建设银行网站首页口网站建设存在哪些问题
  • LeetCode94.二叉树的中序遍历、LeetCode144. 二叉树的前序遍历、LeetCode145. 二叉树的后序遍历
  • 网站开发专业怎么样免费商用图片的网站