记录 Flink jdbc、mysql-cdc 连接 mysql8 碰到的适配问题
前言
记录 Flink jdbc、mysql-cdc 连接 mysql8 碰到的小问题
版本
- Flink 1.15.3
- mysql-cdc 2.3.0
- MySQL 8.0.27
cdc_mysql2mysql
MySQL5
之前主要用 MySQL5 ,下面是 MySQL5 的 sql ,具体见 Flink MySQL CDC 使用总结
set yarn.application.name=cdc_mysql2mysql;
set execution.target=yarn-per-job;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;set execution.checkpointing.interval=10000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATE TABLE mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致name string,price double,ts bigint,dt string
) WITH ('connector' = 'mysql-cdc','hostname' = '19.168.44.128','port' = '3306','username' = 'root','password' = 'root-123','database-name' = 'cdc','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;
MySQL8
同样的 SQL 会报错:
Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Longat com.mysql.jdbc.ConnectionImpl.buildCollationMapping(ConnectionImpl.java:1024)
最初怀疑是该版本的 cdc 不支持 MySQL8,后来发现只需要在 jdbc 添加 driver 参数解决:
'driver' = 'com.mysql.cj.jdbc.Driver'
完整的sql:
set yarn.application.name=cdc_mysql2mysql;
set execution.target=yarn-per-job;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;set execution.checkpointing.interval=10000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATE TABLE mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致name string,price double,ts bigint,dt string
) WITH ('connector' = 'mysql-cdc','hostname' = '19.168.44.128','port' = '3306','username' = 'root','password' = 'root-123','database-name' = 'cdc','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;
小结:
- mysql-cdc: MySQL5和8的写法一样
- jdbc: MySQL8 要添加 dirver 参数:‘driver’ = ‘com.mysql.cj.jdbc.Driver’
jdbc_mysql2mysql
MySQL8
根据上面 cdc_mysql2mysql 的经验,jdbc_mysql2mysql source 和 sink 应该都添加driver:
set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;
本来以为这样就没问题了,但是会报错:
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
经排查发现是字段类型不一致导致的问题,因为 mysql 建表时 ts的类型为 int ,那么在flink sql 中 ts也应该为 int 而不应该为 bigint,完整的正确 sql 为:
set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;
MySQL5
验证一下 MySQL5 是不是存在和 MySQL8 一样的问题,经验证问题一样,在 MySQL5 中 ts 的类型为bigint 也会报同样的错误,完整的正确 sql 为:
set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;
sink
在 cdc_mysql2mysql 中 ts bigint 就没问题, 尝试把 sink 表中 ts 的字段类型改为 bigint,最终发现 MySQL5 和 MySQL8 都没问题,也就是只有 jdbc 的 source 表对字段类型限制比较严格。
driver 参数
- jdbc:MySQL5 添加 driver 参数也可以正常运行,但不是必须的,MySQL8 必须添加 driver 参数,所以无论是 5 还是 8 都加上 driver 参数,这样就不用区分 mysql的版本了。
- cdc : 不支持 driver 参数
字段类型映射
官方文档:
- jdbc: https://nightlies.apache.org/flink/flink-docs-release-2.0/zh/docs/connectors/table/jdbc/#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84
- cdc : https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/mysql-cdc/#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84
MySQL8 适配
由此可见,Flink jdbc、mysql-cdc 均适配 MySQL5 和 MySQL8,对应 jar 如下:
- jdbc: flink-connector-jdbc-1.15.3.jar
- cdc : flink-sql-connector-mysql-cdc-2.3.0.jar
仅需要这两个包,不需要额外的 mysql-connector-java jar包
但在 cdc 3.1 版本以上,需要额外的 mysql-connector-java jar包,具体见官网:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/mysql-cdc/
cdc 版本支持
官方文档:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/overview/