当前位置: 首页 > news >正文

Pig Cloud使用Debezium实时监听数据变更

Debezium 是一个开源的分布式平台,专门用于实时捕获数据库变更(CDC - Change Data Capture),并将这些变更以事件流的形式发送到消息队列(如Kafka),供其他服务消费。它相当于数据库的"显微镜",能毫秒级感知数据变动

 底层原理:读取MySQL的binlog


一、基于Springboot使用流程

1、导入jar包依赖

<!--		Debezium监听数据库--><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>2.3.2.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>2.3.2.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>2.3.2.Final</version></dependency><!-- kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>connect-runtime</artifactId><version>3.3.2</version> <!-- Match with your Debezium version --></dependency>

2、配置nacos参数

# debezium配置
debezium:connector:dir: /home/hdkg/scheduling_system/jarname: mysql-connectorconnector.class: io.debezium.connector.mysql.MySqlConnector# MySQL 连接配置database:hostname: 100.100.100.211port: 33060user: canalpassword: Wxzxyfs@26824server:id: 2serverTimezone: Asia/Shanghaihistory:timezone: Asia/ShanghaiconnectionTimeZone: Asia/Shanghaiinclude:list: zkecotable:include:list: zkeco.checkinout# 时区配置timezone: Asia/Shanghai# 捕获配置include:schema:changes: falsesnapshot:mode: schema_only# mode: when_needed  # 备用选项# Schema 历史存储schema:history:internal: path: io.debezium.storage.file.history.FileSchemaHistoryfile:filename: /home/hdkg/scheduling_system/jar/schema-history.datonly:monitored:tables:ddl: true# Offset 存储offset:storage:path: org.apache.kafka.connect.storage.FileOffsetBackingStorefile:filename: /home/hdkg/scheduling_system/jar/offsets.dat# 其他配置topic:prefix: zk-mysql-servertime:precision:mode: connect

3、读取参数

/*** ClassName:CanalConfig* Package:com.wxzx.scheduling.zk.config* Description:** @Create: 2025/7/22-16:40* @version:*/
package com.wxzx.scheduling.zk.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Properties;@Data
@Configuration
public class DebeziumConfig {@Value("${debezium.connector.name}")private String name;@Value("${debezium.connector.dir}")private String dir;@Value("${debezium.connector.connector.class}")private String connectorClass;// MySQL 连接配置@Value("${debezium.connector.database.hostname}")private String databaseHostname;@Value("${debezium.connector.database.port}")private String databasePort;@Value("${debezium.connector.database.user}")private String databaseUser;@Value("${debezium.connector.database.password}")private String databasePassword;@Value("${debezium.connector.database.server.id}")private String databaseServerId;@Value("${debezium.connector.database.serverTimezone}")private String databaseServerTimezone;@Value("${debezium.connector.database.history.timezone}")private String databaseHistoryTimezone;@Value("${debezium.connector.database.connectionTimeZone}")private String databaseConnectionTimeZone;@Value("${debezium.connector.timezone}")private String timezone;// 捕获配置@Value("${debezium.connector.include.schema.changes}")private String includeSchemaChanges;@Value("${debezium.connector.snapshot.mode}")private String snapshotMode;// Schema 历史存储@Value("${debezium.connector.schema.history.internal.path}")private String schemaHistoryInternalPath;@Value("${debezium.connector.schema.history.internal.file.filename}")private String schemaHistoryInternalFileFilename;@Value("${debezium.connector.schema.only.monitored.tables.ddl}")private String historyStoreOnlyMonitoredTablesDdl;// Offset 存储@Value("${debezium.connector.offset.storage.path}")private String offsetStorage;@Value("${debezium.connector.offset.storage.file.filename}")private String offsetStorageFileFilename;// 其他配置@Value("${debezium.connector.topic.prefix}")private String topicPrefix;@Value("${debezium.connector.database.include.list}")private String databaseIncludeList;@Value("${debezium.connector.table.include.list}")private String tableIncludeList;@Value("${debezium.connector.time.precision.mode}")private String timePrecisionMode;// 可以添加一个方法将配置转换为Properties对象,便于Debezium使用public Properties toProperties() {Properties config = new Properties();config.setProperty("name", name);config.setProperty("connector.class", connectorClass);config.setProperty("database.hostname", databaseHostname);config.setProperty("database.port", databasePort);config.setProperty("database.user", databaseUser);config.setProperty("database.password", databasePassword);config.setProperty("database.server.id", databaseServerId);config.setProperty("database.serverTimezone", databaseServerTimezone);config.setProperty("database.history.timezone", databaseHistoryTimezone);config.setProperty("database.connectionTimeZone", databaseConnectionTimeZone);config.setProperty("debezium.timezone", timezone);config.setProperty("include.schema.changes", includeSchemaChanges);config.setProperty("snapshot.mode", snapshotMode);config.setProperty("schema.history.internal", schemaHistoryInternalPath);config.setProperty("schema.history.internal.file.filename", schemaHistoryInternalFileFilename);config.setProperty("database.history.store.only.monitored.tables.ddl", historyStoreOnlyMonitoredTablesDdl);config.setProperty("offset.storage", offsetStorage);config.setProperty("offset.storage.file.filename", offsetStorageFileFilename);config.setProperty("topic.prefix", topicPrefix);config.setProperty("database.include.list", databaseIncludeList);config.setProperty("table.include.list", tableIncludeList);config.setProperty("time.precision.mode", timePrecisionMode);return config;}
}

4、编写监听逻辑


@Slf4j
@Component
@AllArgsConstructor
public class DebeziumBinlogListenerV2 implements ApplicationRunner {private final DebeziumConfig debeziumConfig;private final CheckInOutService checkInOutService;//	private final CheckInOutMapper checkInOutMapper;private final CheckInOutZkMapper checkInOutZkMapper;@Overridepublic void run(ApplicationArguments args) throws Exception {// 在bean初始化完成后执行log.info("开始指纹面部监视");Properties config = debeziumConfig.toProperties();// 在配置前添加代码清除偏移量文件log.info("debezium路径:"+debeziumConfig.getOffsetStorageFileFilename() );File offsetFile = new File(debeziumConfig.getOffsetStorageFileFilename());if(offsetFile.exists()) {log.info("offset文件存在:"+debeziumConfig.getOffsetStorageFileFilename() +",进行删除");offsetFile.delete();}File schemaHistoryFile = new File(debeziumConfig.getSchemaHistoryInternalFileFilename());if(schemaHistoryFile.exists()) {log.info("schemaHistory存在:"+debeziumConfig.getSchemaHistoryInternalFileFilename() +",进行删除");schemaHistoryFile.delete();}// 清除旧数据文件File dataDir = new File(debeziumConfig.getDir());if(!dataDir.exists()) {log.info("data文件夹不存在:"+debeziumConfig.getDir() +",进行创建");dataDir.mkdirs();}// 在您的 try 块中修改 notifying 部分try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(config).using((OffsetCommitPolicy) (successCount, timeElapsed) -> true) // 自动提交.notifying(record -> {try {String value = record.value();if (value != null) {ObjectMapper mapper = new ObjectMapper();JsonNode event = mapper.readTree(value);//                            // 调试打印原始事件log.info("原始事件: " + event);
//
//                            // 检查操作类型String op = event.path("payload").path("op").asText();log.info("操作类型: " + op);if ("c".equals(op)) { // 插入操作JsonNode after = event.path("payload").path("after");if (!after.isMissingNode()) {// 业务代码}}}} catch (Exception e) {log.error("记录处理失败,跳过: " + e.getMessage(),e);}}).build()) {ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);log.info("正在监听MySQL变更...");} catch (Exception e) {log.error("Debezium监听考勤记录表异常:"+e.getMessage(),e);}log.info("指纹面部监视结束");}
}

http://www.dtcms.com/a/295092.html

相关文章:

  • pandas库
  • 18-设备虚拟化IRF
  • 题单【函数与结构体】
  • c++--面向对象封装--实践
  • window下c++共享内存,进程互斥锁。
  • 前端跨域请求原理及实践
  • 一二章笔记总结
  • CSP-J系列【2024】P11229 [CSP-J 2024] 小木棍题解
  • 1688官方跨境寻源通API接口调用实战
  • LLM指纹底层技术——混合专家模型
  • CSP-J系列【2023】P9750 [CSP-J 2023] 一元二次方程题解
  • SSH 一键互信配置脚本 V2.0 使用指南
  • 卡尔曼滤波数据融合
  • AI 及开发领域动态与资源汇总(2025年7月23日)
  • 【LeetCode】算法详解#9 ---旋转图像
  • QT开发---基础介绍及环境搭建
  • STM32中SystemCoreClockUpdate函数解读
  • 双写缓冲区 Redo Log
  • 基于GitHub的Terraform自动化管理最佳实践
  • 多服务器批量发布软件
  • Linux编程:9、线程编程-互斥锁与条件变量
  • 扫地机产品的电池CQC认证遵循哪个标准?
  • 1. 一份“从 0 到 1” 的 WSL(Windows Subsystem for Linux)速查手册
  • J2EE模式---视图助手模式
  • ospf多区域
  • git的使用,推送仓库github
  • Hierarchical-Localization 安装与常见问题解决手册
  • MSTP多生成树协议
  • 【西北工业大学公开课】导引系统原理(全61讲)周军 -个人笔记版 5000字
  • 基于多种机器学习的水质污染及安全预测分析系统的设计与实现【随机森林、XGBoost、LightGBM、SMOTE、贝叶斯优化】