当前位置: 首页 > news >正文

Java应用Flink CDC监听MySQL数据变动内容输出到控制台

文章目录

  • maven 依赖
  • 自定义数据变化处理器
  • flink cdc监听
  • 验证

maven 依赖

<properties><flink.version>1.14.0</flink.version><flink-cdc.version>2.3.0</flink-cdc.version></properties><dependencies><!-- Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>${flink.version}</version></dependency></dependencies>

自定义数据变化处理器

package org.example;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class CustomSink extends RichSinkFunction<String> {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void close() throws Exception {super.close();}@Overridepublic void invoke(String value, Context context) throws Exception {//0P字段,该字段也有4种取值。分别是C(Create ) , U(Updlate) . D(Delete ),Read 。// 对于U操作,其数据部分同时包含了Before和After.System.out.println(">>>" + value);}
}

flink cdc监听

package org.example;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MysqlSourceExample {public static void main(String[] args) throws Exception {DebeziumDeserializationSchema debeziumDeserializationSchema = new JsonDebeziumDeserializationSchema();MySqlSource<String> source = MySqlSource.builder().hostname("127.0.0.1").port(3306).databaseList("canal_manager")// set captured database.tableList("canal_manager.canal_user")// set captured table.startupOptions(StartupOptions.latest()) // 设置从最新的修改记录开始读取.username("root").password("123456").deserializer(debeziumDeserializationSchema) // converts SourceRecord to JSON string.includeSchemaChanges(true).build();//启动一个webuI。Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);//检者点间隔时间env .enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new CustomSink());env.execute();}
}

验证

启动后web页面地址访问http://localhost:8081/,MySQL数据库canal_manager中的表canal_user数据发生修改,控制台有输出json:
在这里插入图片描述

相关文章:

  • 【Go核心编程】第十三章:接口与多态——灵活性的艺术
  • 计算机考研408真题解析(2024-15 整数乘法运算的四种实现方式)
  • Java 反射机制详解及示例
  • Java 中 synchronized 和 ReentrantLock 的全面对比解析
  • LeetCode hot100---152.乘机最大子数组
  • Protobuf 中的类型查找规则
  • MS358A 低功耗运算放大器 车规
  • 在 Windows 11 或 10 上将 Git 升级到最新版本的方法
  • Linux【4】------RK3568启动和引导顺序
  • JAVA理论第五章-JVM
  • ubuntu服务器件如何配置python环境并运行多个python脚本
  • Ubuntu20.04基础配置安装——系统安装(一)
  • 应急响应思路
  • 【超详细】英伟达Jetson Orin NX-YOLOv8配置与TensorRT测试
  • 深入理解 Vue.observable:轻量级响应式状态管理利器
  • Vue 项目实战:三种方式实现列表→详情页表单数据保留与恢复
  • UOS 20 Pro为国际版WPS设置中文菜单
  • iOS、Android、鸿蒙、Web、桌面 多端开发框架Kotlin Multiplatform
  • Redis主从复制的原理一 之 概述
  • 数字通信复习
  • 网站的下载链接怎么做/怎样做网站的优化、排名
  • 用ps怎么做网站导航条/网站排名查询站长之家
  • 什么网站权重快/百度下载链接
  • 泰安高端网站建设/会计培训班一般多少钱
  • 义乌小商品市场进货渠道/如何进行关键词优化工作
  • wordpress搭建购物网站/站长之家ip查询