当前位置: 首页 > news >正文

flink-cdc同步数据到doris中

1 创建数据库和表

1.1 数据库脚本

这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:123456@10.101.12.82:9030/internal.eayc?charset=utf8mb4

-- 创建数据库eayc
create database if not exists ods_eayc;
-- 创建数据表

1

2 数据同步

2.1 flnk-cdc

参考Flink CDC实时同步MySQL到Doris
Flink CDC 概述

2.1.1 最简单的单表同步

从下面的yml脚本可以看到,并没有doris中创建eayc_user表,应该是flink-cdc自动创建的。

#Mysql的参数配置
source:
  type: mysql
  hostname: 10.101.10.11
  port: 3306
  username: flink
  password: 123456
  tables: eayc.eayc_user
  server-id: 5400
  # server-time-zone: UTC
#Doris的参数配置
sink:
  type: doris
  fenodes: 10.101.11.2:8030,10.101.11.2:8030,10.101.11.3:8030
  username: root
  password: 123456
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

route:
  - source-table: eayc.eayc_user
    sink-table: ods_eayc.eayc_user
pipeline:
  name: eayc to doris
  parallelism: 1

注意连接mysql的server-id的要唯一,否则提示下面的错误

A slave with the same server_uuid/server_id as this slave has connected to the master...
The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.

进入到flink的界面查看到错误日志,任务执行失败。下面报的错是mysql时区与flink配置不匹配。现在改生产库影响未知,不敢动,于是去掉server-time-zone: UTC设置。重新执行任务。
1

1
此时任务可以正常执行了,数据也可以正常过来了。因为flink-cdc是根据binlog,因此mysql变更,doris中的数据也实时更新过来。
1

2.1.2 多表同步

如下配置

source:
	tables: eayc.eayc_user,eayc.eayc_company,eayc.eayc_company_user
route:
  - source-table: eayc.eayc_user
    sink-table: ods_eayc.eayc_user
  - source-table: eayc.eayc_company
    sink-table: ods_eayc.eayc_company
  - source-table: eayc.eayc_company_user
    sink-table: ods_eayc.eayc_company_user

下面这种方式不支持,会报下面的错误:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: java.util.LinkedHashMap["tables"])

1

2.1.3 分表导入

taskmanager.numberOfTaskSlots默认为1,slot不够,就报下面的错误,因为是16C32G,于是我改成了8,parallelism.default默认也是1,我也改成了8,启动之后,没有报下面的错误,但是之前执行的任务没有了。

2025-02-19 15:05:07
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
	at 

如果mysql的表没有主键,则报下面的错误,这个时候就需要修正原mysql表数据。

Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.

doris权限问题,这个是FE集群有问题,更改过来就好了。

reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Unauthorized","code":401,"data":"Access denied for user 'root@10.101.12.90' (using password: YES)","count":0}

可以看到下面,要获取acc的全部表,但是有一些是做了分表,需合并到其中doris的一张表里面,这个规则是有效的,开始parallelism: 1,我以为有一异常,只同步了一张表,过了几分钟才发现其他表也陆续进来。

source:
	tables: acc.\.*
route:
  - source-table: acc.acc_account_balance_\.*
    sink-table: acc.acc_account_balance
  - source-table: acc.acc_account_subject_\.*
    sink-table: acc.acc_account_subject
  - source-table: acc.acc_initial_balance_\.*
    sink-table: acc.acc_initial_balance
  - source-table: acc.acc_voucher_\.*
    sink-table: acc.acc_voucher
  - source-table: acc.acc_voucher_entry_\.*
    sink-table: acc.acc_voucher_entry    

于是将parallelism: 4,很快后台又抛异常。

java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

于是调整

taskmanager.memory.process.size: 8192m  # 增加 TaskManager 的内存

Flink CDC并行执行,会出现数据越界的问题。
Flink CDC报错ArrayIndexOutOfBoundsException解决思路

2.2 flink安装

2.2.1 单节点
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
# 配置环境变量
vi /etc/profile
export JAVA_HOME=/appdata/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib
export FLINK_HOME=/appdata/flink/flink-1.18.0
export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
# 生效
source /etc/profile
# flink配置
vim conf/flink-conf.yaml
execution.checkpointing.interval: 3000
rest.bind-address: 0.0.0.0
cd bin
./start-cluster.sh
#
tar -zxvf flink-cdc-3.0.0-bin.tar.gz
# 执行任务
cd /appdata/flink/flink-cdc-3.0.0
bash bin/flink-cdc.sh /appdata/flink/job/eayc_to_doris.yml

flink-1.18.0
flink-cdc-3.0.0
mysql pipeline connector 3.0.0
doris pipeline connector 3.0.0
将上面两个connector放到cdc的lib目录
1

2.2.2 监控

1

1

相关文章:

  • 算法的复杂性分析以及时间复杂度的表示方法
  • JavaSE学习笔记25-反射(reflection)
  • 顺序表和STL——vector【 复习笔记】
  • C++ IDE设置 visual studio 2010安装、注册、使用
  • 一周学会Flask3 Python Web开发-flask3模块化blueprint配置
  • 【Go语言快速上手】第二部分:Go语言进阶之工具与框架
  • L2-【英音】地道语音语调
  • 自由学习记录(37)
  • python学智能算法(二)|模拟退火算法:进阶分析
  • PHP 会话(Session)实现用户登陆功能
  • Flutter CupertinoNavigationBar iOS 风格导航栏的组件
  • 10-R数组
  • LeetCode 热题 100_在排序数组中查找元素的第一个和最后一个位置(65_34_中等_C++)(二分查找)(一次二分查找+挨个搜索;两次二分查找)
  • 独立开发者如何寻找产品设计灵感
  • 大规模 RDMA AI 组网技术创新:算法和可编程硬件的深度融合
  • 基于Spring Boot的兴顺物流管理系统设计与实现(LW+源码+讲解)
  • eclips 快捷键
  • java方法学习
  • 探索 Peewee:轻量级 Python ORM 简明指南
  • 更改visual studio 2022 默认NuGet包路径
  • 俄乌释放停火和谈信号,克宫:将组建“相应级别”谈判代表团
  • 乌外长:乌方准备无条件停火至少30天
  • 道指跌逾100点,特斯拉涨近5%
  • 拿出压箱底作品,北京交响乐团让上海观众享受音乐盛宴
  • 上海国际电影节推出三大官方推荐单元,精选十部优秀影片
  • 第32届梅花奖终评启幕,上海京剧院《智取威虎山》满堂彩