当前位置: 首页 > news >正文

paimon---同步mysql数据到paimon表中

1.1、mysql源表

CREATE TABLE `mysql_orders` (
  `order_id` varchar(100) NOT NULL,
  `user_id` varchar(100) DEFAULT NULL,
  `amount` decimal(10,2) DEFAULT NULL,
  `update_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
  PRIMARY KEY (`order_id`)
)

mysql 开启bin_log, 设置ROW

1.2、flink cdc同步mysql数据

参考: https://blog.csdn.net/wuxintdrh/article/details/146165736

CREATE TABLE mysql_cdc_source (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10,2),
    update_time TIMESTAMP(3),
    dt STRING,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'chb1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'paimon_test',
    'table-name' = 'mysql_orders',
    'server-time-zone' = 'Asia/Shanghai'  -- 时区配置(避免时间偏差)
);
select * from mysql_cdc_source;


1.3、同步到paimon

创建paimon表

CREATE TABLE orders (
    order_id STRING PRIMARY KEY NOT ENFORCED,
    user_id STRING,
    amount DECIMAL(10,2),
    update_time TIMESTAMP(3),
	dt STRING
) WITH (
    'merge-engine' = 'deduplicate',       -- 默认去重引擎,保留最新记录
    'changelog-producer' = 'input',       -- 直接存储 CDC 的原始变更日志
    'bucket' = '4',                       -- 分桶优化写入性能
    'snapshot.time-retained' = '7d'       -- 保留 7 天快照
);

同步数据

INSERT INTO paimon_catalog.`default`.orders
SELECT 
    order_id, 
    user_id, 
    amount, 
    update_time, 
    DATE_FORMAT(update_time, 'yyyy-MM-dd') AS dt  -- 动态分区
FROM default_catalog.default_database.mysql_cdc_source;

查询paimon表:

select * from paimon_catalog.`default`.orders;

报错:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

End of exception on server side

排查发现jobManager资源充足,taskManager slot还有可用,taskManager memory资源偏小,调大资源后运行正常。

二、通过paimon-flink-action同步数据

参考:https://paimon.apache.org/docs/1.0/cdc-ingestion/mysql-cdc/
报错:ClassNotFoundException: org.apache.kafka.connect.errors.ConnectException,引入connect-api-3.2.1.jar

又报错: java.lang.NoSuchMethodError: io.debezium.config.Field.withType(Lorg/apache/kafka/common/config/ConfigDef$Type;)Lio/debezium/config/Field;

相关文章:

  • 基于Spring Boot的国产动漫网站的设计与实现(LW+源码+讲解)
  • TDengine 数据对接 EXCEL
  • 记一次Spring Boot应用中数据库连接阻塞问题排查过程
  • 重生之我在学Vue--第8天 Vue 3 UI 框架(Element Plus)
  • 深度学习——Diffusion Model学习,扩散模型
  • deepseek使用记录21——游击战略问题
  • python 中用到的文件操作
  • 从运营出发:打造更适配当下营商环境的一对一直播系统源码
  • MySQL(第3周)-database命令
  • Python自动点击器开发教程 - 支持键盘连按和鼠标连点
  • 多线程(二)
  • 蓝桥杯真题0团建dfs+哈希表/邻接表
  • 统计登录系统10秒内连续登录失败超过3次的用户
  • 看 MySQL InnoDB 和 BoltDB 如何写磁盘
  • Vivado IP核之定点数累加Accumulator使用说明
  • vscode接入DeepSeek 免费送2000 万 Tokens 解决DeepSeek无法充值问题
  • 向量数据库的选择与应用:AI工程实践
  • Android Retrofit 框架注解定义与解析模块深度剖析(一)
  • HarmonyOS NEXT开发实战:DevEco AI辅助编程工具(CodeGenie)的使用
  • requests中post中data=None, json=None两个参数区别
  • 网站模板中文/网络营销pdf
  • 免费建微网站平台/宁波网站推广联系方式
  • 大学生做家教网站/东莞网站建设优化推广
  • 电子商务网站建设报价表/百度指数如何提升
  • 网站运营seo招聘/整合营销方案怎么写
  • 网站建设 目标/晚上国网app