当前位置: 首页 > news >正文

flink cdc 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>cdc</artifactId><version>1.0.0-SNAPSHOT</version><packaging>jar</packaging><properties><version.debezium>1.9.8.Final</version.debezium><flink.version>1.19.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion><exclusion><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-redis</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.9.8.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${version.debezium}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${version.debezium}</version></dependency><!-- kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId><exclusions><exclusion><artifactId>spring-boot-starter-logging</artifactId><groupId>org.springframework.boot</groupId></exclusion></exclusions></dependency><dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId></dependency><dependency><exclusions><exclusion><artifactId>log4j-api</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion><exclusion><artifactId>log4j-slf4j-impl</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion><exclusion><artifactId>log4j-jul</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion><exclusion><artifactId>log4j-core</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><jvmArguments>-Dfile.encoding=UTF-8</jvmArguments><fork>true</fork></configuration></plugin></plugins><resources><resource><directory>src/main/resources</directory><includes><include>**/*</include></includes></resource></resources></build></project>

一定要和mysql的时区ID 一致

public class Application {public static void main(String[] args) {TimeZone.setDefault(TimeZone.getTimeZone("UTC"));System.setProperty("log4j2.contextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");System.setProperty("com.alibaba.nacos.client.naming.tls.enable", "true");SpringApplication.run(Application.class, args);}
}
 import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class CustomSink extends RichSinkFunction<String> {private ObjectMapper mapper = new ObjectMapper();@Overridepublic void invoke(String value, Context context) throws Exception {System.out.printf("数据发生变化: %s%n", value);TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {};Map<String, Object> result = mapper.readValue(value, valueType);Map<String, Object> payload = (Map<String, Object>) result.get("payload");String op = (String) payload.get("op") ;// 不对读操作处理if (!"r".equals(op)) {MonitorMySQLCDC.queue.put(result);}}
}

 import DebeziumProperties;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;@Component
public class MonitorMySQLCDC implements InitializingBean {// 该队列专门用来临时保存变化的数据(实际生产环境,你应该使用MQ相关的产品)public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>();private final StringRedisTemplate stringRedisTemplate;// 保存到redis中key的前缀private final String PREFIX = "users:";// 数据发生变化后的sink处理private final CustomSink customSink;@Resourceprivate DebeziumProperties debeziumProperties;public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {this.customSink = customSink;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic void afterPropertiesSet() throws Exception {// 启动异步线程,实时处理队列中的数据new Thread(() -> {while (true) {try {Map<String, Object> result = queue.take();this.doAction(result);} catch (Exception e) {e.printStackTrace();}}}).start();DebeziumProperties.Datasource datasource = debeziumProperties.getDatasource();Properties jdbcProperties = new Properties();jdbcProperties.setProperty("useSSL", "false");MySqlSource<String> source = MySqlSource.<String>builder().hostname(datasource.getHostname()).port(datasource.getPort()).serverTimeZone("UTC")// 可配置多个数据库.databaseList("energy_pay_server")// 可配置多个表.tableList("energy_pay_server.tb_request_log").username(datasource.getUser()).password(datasource.getPassword()).jdbcProperties(jdbcProperties)// 包括schema的改变.includeSchemaChanges(true)// 反序列化设置// .deserializer(new StringDebeziumDeserializationSchema()).deserializer(new JsonDebeziumDeserializationSchema(true))// 启动模式;关于启动模式下面详细介绍/*** initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。** earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取** latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。** specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。** timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。*/.startupOptions(StartupOptions.initial()).build();// 环境配置Configuration config = new Configuration() ;config.set(RestOptions.PORT, 9090) ;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;// 设置 6s 的 checkpoint 间隔env.enableCheckpointing(6000);// 设置 source 节点的并行度为 4env.setParallelism(4);env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")// 添加Sink.addSink(this.customSink);env.execute();}@SuppressWarnings("unchecked")private void doAction(Map<String, Object> result) throws Exception {Map<String, Object> payload = (Map<String, Object>) result.get("payload");String op = (String) payload.get("op");switch (op) {// 更新和插入操作case "u":case "c":Map<String, Object> after = (Map<String, Object>) payload.get("after");String id = after.get("id").toString();System.out.printf("操作:%s, ID: %s%n", op, id);stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after));break;// 删除操作case "d":Map<String, Object> after1 = (Map<String, Object>) payload.get("before");String id1 = after1.get("id").toString();stringRedisTemplate.delete(PREFIX + id1);}}}

相关文章:

  • 客户案例分享|运营商数智化经典案例 — XX运营商
  • Apache Flink的架构设计与运行流程说明
  • 电子电器架构 --- 人工智能、固态电池和先进自动驾驶功能等新兴技术的影响
  • IntelliJ IDEA 2024.3.1 for Mac 中文 Java开发工具
  • 织梦dedecms发布文章时取消自动生成关键字
  • 数据挖掘专栏介绍:用 Python + 大语言模型 (LLM) 重塑电商数据价值
  • 【Hive入门】Hive高级特性:视图与物化视图
  • C++——调用OpenCV和NVIDIA Video Codec SDK库实现使用GPU硬解码MP4视频文件
  • Go 1.25为什么要废除核心类型
  • 后验概率最大化(MAP)估计算法原理以及相具体的应用实例附C++代码示例
  • 设计模式 | 详解常用设计模式(六大设计原则,单例模式,工厂模式,建造者模式,代理模式)
  • 最新的30个Android Kotlin面试题
  • Python程序开发,麒麟系统模拟电脑打开文件实现
  • <c++>使用detectMultiScale的时候出现opencv.dll冲突
  • EtherCAT 分布式时钟(DC)补偿技术解析
  • 【今日半导体行业分析】2025年4月29日
  • Missashe考研日记-day30
  • 1.3 点云数据获取方式——ToF相机
  • windows如何使用cmd命令翻转屏幕
  • 高可用、高并发、高性能架构设计深度解析
  • 专访|首夺天元头衔创生涯历史,王星昊打算一步一步慢慢来
  • 幸福航空五一前三天航班取消:客服称目前是锁舱状态,无法确认何时恢复
  • 央行副行长:研究建立民营中小企业增信制度,破解民营中小企业信用不足等融资制约
  • 油电同智,安全超充!从上海车展看中国汽车产业先发优势
  • 校长套取学生伙食费设小金库,重庆通报6起违反八项规定典型问题
  • 凝聚多方力量,中国农科院油菜产业专家团部署单产提升新任务