MYSQL CDC 同步到 PAIMON
参考:Mysql CDC | Apache Paimon
- 下载
flink-sql-connector-mysql-cdc-*.jar - 将其放入 Flink 的
lib/目录 - 启动 Flink SQL Client
- 在 SQL CLI 中直接写 Flink SQL 创建 MySQL CDC 源表和 Paimon 结果表,然后 INSERT INTO 同步
步骤如下:
-
下载 connector jar
- 官方地址:https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/
- 选择与你 Flink 版本兼容的版本(例如 Flink 1.18+ 推荐用
flink-sql-connector-mysql-cdc-3.0.1.jar)
-
将 jar 放入 Flink lib 目录
cp flink-sql-connector-mysql-cdc-3.0.1.jar $FLINK_HOME/lib/ -
同样放入 Paimon 的 jar
- 从 https://paimon.apache.org/docs/master/engines/flink/ 下载对应版本的
paimon-flink-<version>.jar - 也放入
$FLINK_HOME/lib/
- 从 https://paimon.apache.org/docs/master/engines/flink/ 下载对应版本的
-
启动 Flink 集群

mysql中已有表:

SHOW VARIABLES LIKE 'binlog_format';
---------------------------------------------------------------------
MySQL → CDC → Paimon(MinIO)的实时 upsert 同步
在sreampark中配置Flink SQL任务
-- Step 1: MySQL CDC 源表(在 default_catalog 中)
CREATE TABLE mysql_users (user_id BIGINT,user_name STRING,email STRING,registration_date DATE,PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.1.247','port' = '3306','username' = 'root','password' = '123456','database-name' = 'emp','table-name' = 'users'
);-- Step 2: 创建 Paimon Catalog
CREATE CATALOG paimon_minio WITH ('type' = 'paimon','warehouse' = 's3://warehouse/wh','s3.endpoint' = 'http://192.168.1.243:9000','s3.access-key' = 'minio','s3.secret-key' = 'minio@123','s3.region' = 'us-east-1','s3.path.style.access' = 'true'
);-- ✅ 切换到 Paimon Catalog(关键!)
USE CATALOG paimon_minio;-- Step 3: 在 paimon_minio.default 下创建表
CREATE TABLE IF NOT EXISTS users (user_id BIGINT,user_name STRING,email STRING,registration_date DATE,PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'paimon','bucket' = '4'
);-- Step 4: 插入数据(注意源表仍属于 default_catalog)
INSERT INTO users
SELECT * FROM default_catalog.default_database.mysql_users;
执行任务:

查看日志:

查看数据:

--------------------------------------------------
在starrocks 进行相关查询:
创建database emp
然后创建 paimon_minio_catalog
DROP CATALOG IF EXISTS paimon_minio_catalog;CREATE EXTERNAL CATALOG paimon_minio_catalog
PROPERTIES ("type" = "paimon","paimon.catalog.type" = "filesystem","paimon.catalog.warehouse" = "s3://warehouse/wh","aws.s3.enable_ssl" = "false","aws.s3.enable_path_style_access" = "true","aws.s3.endpoint" = "http://192.168.1.243:9000","aws.s3.access_key" = "minio","aws.s3.secret_key" = "minio@123"
);SET catalog = paimon_minio_catalog;
SHOW DATABASES; -- 应显示 default
SELECT * FROM default.users LIMIT 1;
SHOW CATALOGS;
SET catalog = paimon_minio_catalog;



