当前位置: 首页 > news >正文

flink部署使用(flink-connector-jdbc)连接达梦数据库并写入读取数据

flink介绍

1)Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

2)在实时计算或离线任务中,往往需要与关系型数据库交互,例如 MySQL、PostgreSQL 等。Apache Flink 提供了 JDBC Connector,可以方便地将流式数据写入或读取数据库。

3)flink版本下载:https://archive.apache.org/dist/flink/

flink单机搭建

## 1. 下载并解压flink
[root@localhost flink_soft]# mkdir /data/flink_soft
[root@localhost flink_soft]# tar -zxvf flink-1.16.1-bin-scala_2.12.tgz
## 2. 修改配置文件,把下面的三行全部去掉
[root@localhost flink-1.16.1]# cd /data/flink_soft/flink-1.16.1
[root@localhost flink-1.16.1]# vim /data/flink_soft/flink-1.16.1/conf/flink-conf.yaml
rest.port: 8081
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
## 3. 启动flink
[root@localhost flink-1.16.1]# ./bin/start-cluster.sh
## 4. 查询进程是否存在
[root@localhost flink-1.16.1]# ps aux | grep flink
## 5. 访问http://192.168.112.162:8081/ 即可。

将已经适配dameng的jar包放到lib目录下

1)下载已经适配好的包https://github.com/gaoyuan98/flink-connector-jdbc-dameng/releases

提供了两个版本的dameng适配驱动包,一个是实现JdbcFactory接口,还有一个是实现JdbcDialectFactory接口。

2)截止发文v3.3版本官方还未正式发版,所以大概率是用这个版本:flink-connector-jdbc-dameng_20250331_(适用于v3.2及以下版本)

3)将下载好的适配包放到flink的lib目录下

DmJdbcDriver8.jar 达梦数据库jdbc驱动,可以更换为与数据库版本相同的驱动。

flink-connector-jdbc-3.1.jar flink使用jdbc方式连接数据库时的桥接包,如果项目本身已经有flink-connector-jdbc包可忽略该包。

flink-connector-jdbc-dameng-1.0.jar flink使用jdbc方式连接达梦数据库的适配包,源码基于flink-connector-jdbc.jar包进行调整,所以该包必须存在。

如项目中已经有flink-connector-jdbc的包,那么只需要使用DmJdbcDriver8.jar跟flink-connector-jdbc-dameng-1.0.jar的驱动包即可。

如项目中没有flink-connector-jdbc的包,就把这三个包全部放到lib下。

[root@localhost lib]# cd /data/flink_soft/flink-1.16.1/lib
[root@localhost lib]# ll
total 204020
-rw-r--r--. 1 root root   1615303 Jan 17 00:30 DmJdbcDriver8.jar
-rwxrwxrwx. 1 root root    198857 Jan 19  2023 flink-cep-1.16.1.jar
-rwxrwxrwx. 1 root root    516144 Jan 19  2023 flink-connector-files-1.16.1.jar
-rw-r--r--. 1 root root    277945 Mar 28 23:46 flink-connector-jdbc-3.1-SNAPSHOT.jar
-rw-r--r--. 1 root root     13458 Mar 29 00:13 flink-connector-jdbc-dameng-1.0-SNAPSHOT.jar
-rwxrwxrwx. 1 root root    102470 Jan 19  2023 flink-csv-1.16.1.jar
-rwxrwxrwx. 1 root root 117107159 Jan 19  2023 flink-dist-1.16.1.jar
-rwxrwxrwx. 1 root root    180248 Jan 19  2023 flink-json-1.16.1.jar
-rwxrwxrwx. 1 root root  21052640 Jan 19  2023 flink-scala_2.12-1.16.1.jar
-rwxrwxrwx. 1 root root  10737871 Jan 13  2023 flink-shaded-zookeeper-3.5.9.jar
-rwxrwxrwx. 1 root root  15367504 Jan 19  2023 flink-table-api-java-uber-1.16.1.jar
-rwxrwxrwx. 1 root root  36249667 Jan 19  2023 flink-table-planner-loader-1.16.1.jar
-rwxrwxrwx. 1 root root   3133690 Jan 19  2023 flink-table-runtime-1.16.1.jar
-rwxrwxrwx. 1 root root    208006 Jan 13  2023 log4j-1.2-api-2.17.1.jar
-rwxrwxrwx. 1 root root    301872 Jan 13  2023 log4j-api-2.17.1.jar
-rwxrwxrwx. 1 root root   1790452 Jan 13  2023 log4j-core-2.17.1.jar
-rwxrwxrwx. 1 root root     24279 Jan 13  2023 log4j-slf4j-impl-2.17.1.jar

重启flink

[root@localhost flink-1.16.1]# cd /data/flink_soft/flink-1.16.1
[root@localhost flink-1.16.1]# ./bin/stop-cluster.sh
[root@localhost flink-1.16.1]# ./bin/start-cluster.sh

## 如果报错的话查看这个日志
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log

flink驱动验证

在达梦数据库上创建表数据

CREATE TABLE source_table (
    id INT PRIMARY KEY,
    name VARCHAR(50),
    age INT
);
INSERT INTO source_table (id, name, age) VALUES (1, 'Alice', 30);
INSERT INTO source_table (id, name, age) VALUES (2, 'Bob', 25);
INSERT INTO source_table (id, name, age) VALUES (3, 'Charlie', 40);
COMMIT;

在 Flink SQL CLI 中定义达梦表

[root@localhost lib]# cd /data/flink_soft/flink-1.16.1/
[root@localhost flink-1.16.1]#  ./bin/sql-client.sh embedded

CREATE TABLE source (
    id INT,
    name STRING,
    age INT
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:dm://192.168.127.2:5236/SYSDBA',
    'table-name' = 'source_table',
    'driver' = 'dm.jdbc.driver.DmDriver',
    'username' = 'SYSDBA',
    'password' = 'SYSDBA123'
);


## 在 Flink SQL CLI 中查询数据
SELECT * FROM source;
## 筛选数据,比如 查询年龄大于 30 的用户:
SELECT id, name FROM source WHERE age > 30;
## 插入数据
INSERT INTO source (id, name, age) VALUES (3, '33', 33);

CREATE TABLE source1 (
    id INT,
    name STRING,
    age INT
) WITH (
    'connector' = 'dameng',
    'url' = 'jdbc:dm://81.70.105.201:5236/SYSDBA',
    'table-name' = 'source_table',
    'driver' = 'dm.jdbc.driver.DmDriver',
    'username' = 'SYSDBA',
    'password' = '123456'
);
SELECT * FROM source1;

flink-jdbc-dameng选错会怎么?

目前flink-connector-jdbc中,v3.0 - v3.2 都是同一个实现思路,也就是只需要集成实现JdbcDialectFactory接口的方法即可,main分支的话是实现JdbcFactory接口函数,也就是需要适配两个版本。

因使用的是v3.3的dameng包,但flink-connector-jdbc是v3.2及以下版本,驱动包接口实现不对所以会报这个错。

相关文章:

  • NO.85十六届蓝桥杯备战|动态规划-经典线性DP|最长上升子序列|合唱队形|最长公共子序列|编辑距离(C++)
  • FreeRTOS入门与工程实践-基于STM32F103(一)(单片机程序设计模式,FreeRTOS源码概述,内存管理,任务管理,同步互斥与通信,队列,信号量)
  • BGP分解实验·23——BGP选路原则之路由器标识
  • 最新版IDEA超详细图文安装教程(适用Mac系统)附安装包及补丁2025最新教程
  • 首批 | 云轴科技ZStack通过电子标准院云上部署DeepSeek验证测试
  • Tkinter高级布局与窗口管理
  • Node.js中util模块详解
  • 【golang/jsonrpc】go-ethereum中json rpc初步使用(websocket版本)
  • vue3使用keep-alive缓存组件与踩坑日记
  • [实战] 二分查找与哈希表查找:原理、对比与C语言实现(附完整C代码)
  • PostgreSQL 实例运行状态全面检查
  • 考研数据结构精讲:数组与特殊矩阵的压缩存储技巧(包含真题及解析)
  • 大数据面试问答-Hadoop/Hive/HDFS/Yarn
  • 基于SpringBoot汽车零件商城系统设计和实现(源码+文档+部署讲解)
  • vue3+nodeJs+webSocket实现聊天功能
  • stack overflow国内无法访问原因
  • 中文编码,GB系列,UTF
  • 正则表达式使用知识(日常翻阅)
  • 基于频率约束条件的最小惯量需求评估,包括频率变化率ROCOF约束和频率最低点约束matlab/simulink
  • 探索 Rust 语言:高效、安全与并发的完美融合
  • 《瞭望》周刊社原总编辑、党委书记姬斌逝世,享年67岁
  • 刘元春在《光明日报》撰文:以法治护航民营经济高质量发展
  • 广西钦州:坚决拥护自治区党委对钟恒钦进行审查调查的决定
  • 金融监管局:已设立74支私募股权投资基金,支持投资科技创新企业
  • 吴清稳市场稳预期发布会十要点:谈平准基金、股市稳定、公募改革和巴菲特
  • 何立峰将访问瑞士、法国并举行中美经贸高层会谈、第十次中法高级别经济财金对话