当前位置: 首页 > news >正文

记录 Flink jdbc、mysql-cdc 连接 mysql8 碰到的适配问题

前言

记录 Flink jdbc、mysql-cdc 连接 mysql8 碰到的小问题

版本

  • Flink 1.15.3
  • mysql-cdc 2.3.0
  • MySQL 8.0.27

cdc_mysql2mysql

MySQL5

之前主要用 MySQL5 ,下面是 MySQL5 的 sql ,具体见 Flink MySQL CDC 使用总结

set yarn.application.name=cdc_mysql2mysql;
set execution.target=yarn-per-job;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;set execution.checkpointing.interval=10000; 
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATE TABLE mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致name string,price double,ts bigint,dt string
) WITH ('connector' = 'mysql-cdc','hostname' = '19.168.44.128','port' = '3306','username' = 'root','password' = 'root-123','database-name' = 'cdc','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

MySQL8

同样的 SQL 会报错:

Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Longat com.mysql.jdbc.ConnectionImpl.buildCollationMapping(ConnectionImpl.java:1024)

最初怀疑是该版本的 cdc 不支持 MySQL8,后来发现只需要在 jdbc 添加 driver 参数解决:

'driver' = 'com.mysql.cj.jdbc.Driver'

完整的sql:

set yarn.application.name=cdc_mysql2mysql;
set execution.target=yarn-per-job;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;set execution.checkpointing.interval=10000; 
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATE TABLE mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致name string,price double,ts bigint,dt string
) WITH ('connector' = 'mysql-cdc','hostname' = '19.168.44.128','port' = '3306','username' = 'root','password' = 'root-123','database-name' = 'cdc','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

小结:

  • mysql-cdc: MySQL5和8的写法一样
  • jdbc: MySQL8 要添加 dirver 参数:‘driver’ = ‘com.mysql.cj.jdbc.Driver’

jdbc_mysql2mysql

MySQL8

根据上面 cdc_mysql2mysql 的经验,jdbc_mysql2mysql source 和 sink 应该都添加driver:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

本来以为这样就没问题了,但是会报错:

Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long

经排查发现是字段类型不一致导致的问题,因为 mysql 建表时 ts的类型为 int ,那么在flink sql 中 ts也应该为 int 而不应该为 bigint,完整的正确 sql 为:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

MySQL5

验证一下 MySQL5 是不是存在和 MySQL8 一样的问题,经验证问题一样,在 MySQL5 中 ts 的类型为bigint 也会报同样的错误,完整的正确 sql 为:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

sink

在 cdc_mysql2mysql 中 ts bigint 就没问题, 尝试把 sink 表中 ts 的字段类型改为 bigint,最终发现 MySQL5 和 MySQL8 都没问题,也就是只有 jdbc 的 source 表对字段类型限制比较严格。

driver 参数

  • jdbc:MySQL5 添加 driver 参数也可以正常运行,但不是必须的,MySQL8 必须添加 driver 参数,所以无论是 5 还是 8 都加上 driver 参数,这样就不用区分 mysql的版本了。
  • cdc : 不支持 driver 参数

字段类型映射

官方文档:

  • jdbc: https://nightlies.apache.org/flink/flink-docs-release-2.0/zh/docs/connectors/table/jdbc/#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84
  • cdc : https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/mysql-cdc/#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84

MySQL8 适配

由此可见,Flink jdbc、mysql-cdc 均适配 MySQL5 和 MySQL8,对应 jar 如下:

  • jdbc: flink-connector-jdbc-1.15.3.jar
  • cdc : flink-sql-connector-mysql-cdc-2.3.0.jar

仅需要这两个包,不需要额外的 mysql-connector-java jar包

但在 cdc 3.1 版本以上,需要额外的 mysql-connector-java jar包,具体见官网:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/mysql-cdc/

cdc 版本支持

官方文档:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/overview/

相关文章:

  • Leetcode刷题记录22——滑动窗口最大值
  • AI Agent Protocols:现状、挑战与未来展望
  • 耐高低温抗金属RFID标签种类和应用场景
  • 【全国产化】基于飞腾 FT2000+/64 核的服务器主板设计与实践
  • LVGL -窗口操作
  • 【MCP Node.js SDK 全栈进阶指南】高级篇(3):MCP 安全体系建设
  • Unity-Shader详解-其三
  • 前端防护利器:disable-devtool 使用指南 - 保护你的Web应用安全
  • 本地知识库工具FASTGPT的安装与搭建
  • Java中final关键字的作用?
  • 信息学奥赛一本通 1454:山峰和山谷
  • 优化PCB Via Stub系列(1):一次学会利用层叠设计降低Via Stub损耗
  • MySQL数据库全面详解:从基础到高级应用
  • ref 和$refs
  • 已知条件概率,反推设计值
  • 爱普生SG2520HHN晶振数据中心服务器的理想解决方案
  • 【Luogu】动态规划七
  • 推荐系统在线离线打分不一致:核心原因与全链路解决方案
  • fastapi和flaskapi有什么区别
  • 1.5 城镇道路工程安全质量控制
  • 国务院任免国家工作人员:饶权任国家文物局局长
  • 央行副行长谈美债和美元波动:单一市场、单一资产的变动,对外储影响总体有限
  • 四川邻水县县长石国平拟任县(市、区)党委书记
  • 银川市市长信箱被指已读乱回,官方回应
  • “90后”高层建筑返青春:功能调整的技术路径和运营考验
  • 证监会发布上市公司信披豁免规定:明确两类豁免范围、规定三种豁免方式