Pig Cloud使用Debezium实时监听数据变更
Debezium 是一个开源的分布式平台,专门用于实时捕获数据库变更(CDC - Change Data Capture),并将这些变更以事件流的形式发送到消息队列(如Kafka),供其他服务消费。它相当于数据库的"显微镜",能毫秒级感知数据变动
底层原理:读取MySQL的binlog
一、基于Springboot使用流程
1、导入jar包依赖
<!-- Debezium监听数据库--><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>2.3.2.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>2.3.2.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>2.3.2.Final</version></dependency><!-- kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>connect-runtime</artifactId><version>3.3.2</version> <!-- Match with your Debezium version --></dependency>
2、配置nacos参数
# debezium配置
debezium:connector:dir: /home/hdkg/scheduling_system/jarname: mysql-connectorconnector.class: io.debezium.connector.mysql.MySqlConnector# MySQL 连接配置database:hostname: 100.100.100.211port: 33060user: canalpassword: Wxzxyfs@26824server:id: 2serverTimezone: Asia/Shanghaihistory:timezone: Asia/ShanghaiconnectionTimeZone: Asia/Shanghaiinclude:list: zkecotable:include:list: zkeco.checkinout# 时区配置timezone: Asia/Shanghai# 捕获配置include:schema:changes: falsesnapshot:mode: schema_only# mode: when_needed # 备用选项# Schema 历史存储schema:history:internal: path: io.debezium.storage.file.history.FileSchemaHistoryfile:filename: /home/hdkg/scheduling_system/jar/schema-history.datonly:monitored:tables:ddl: true# Offset 存储offset:storage:path: org.apache.kafka.connect.storage.FileOffsetBackingStorefile:filename: /home/hdkg/scheduling_system/jar/offsets.dat# 其他配置topic:prefix: zk-mysql-servertime:precision:mode: connect
3、读取参数
/*** ClassName:CanalConfig* Package:com.wxzx.scheduling.zk.config* Description:** @Create: 2025/7/22-16:40* @version:*/
package com.wxzx.scheduling.zk.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Properties;@Data
@Configuration
public class DebeziumConfig {@Value("${debezium.connector.name}")private String name;@Value("${debezium.connector.dir}")private String dir;@Value("${debezium.connector.connector.class}")private String connectorClass;// MySQL 连接配置@Value("${debezium.connector.database.hostname}")private String databaseHostname;@Value("${debezium.connector.database.port}")private String databasePort;@Value("${debezium.connector.database.user}")private String databaseUser;@Value("${debezium.connector.database.password}")private String databasePassword;@Value("${debezium.connector.database.server.id}")private String databaseServerId;@Value("${debezium.connector.database.serverTimezone}")private String databaseServerTimezone;@Value("${debezium.connector.database.history.timezone}")private String databaseHistoryTimezone;@Value("${debezium.connector.database.connectionTimeZone}")private String databaseConnectionTimeZone;@Value("${debezium.connector.timezone}")private String timezone;// 捕获配置@Value("${debezium.connector.include.schema.changes}")private String includeSchemaChanges;@Value("${debezium.connector.snapshot.mode}")private String snapshotMode;// Schema 历史存储@Value("${debezium.connector.schema.history.internal.path}")private String schemaHistoryInternalPath;@Value("${debezium.connector.schema.history.internal.file.filename}")private String schemaHistoryInternalFileFilename;@Value("${debezium.connector.schema.only.monitored.tables.ddl}")private String historyStoreOnlyMonitoredTablesDdl;// Offset 存储@Value("${debezium.connector.offset.storage.path}")private String offsetStorage;@Value("${debezium.connector.offset.storage.file.filename}")private String offsetStorageFileFilename;// 其他配置@Value("${debezium.connector.topic.prefix}")private String topicPrefix;@Value("${debezium.connector.database.include.list}")private String databaseIncludeList;@Value("${debezium.connector.table.include.list}")private String tableIncludeList;@Value("${debezium.connector.time.precision.mode}")private String timePrecisionMode;// 可以添加一个方法将配置转换为Properties对象,便于Debezium使用public Properties toProperties() {Properties config = new Properties();config.setProperty("name", name);config.setProperty("connector.class", connectorClass);config.setProperty("database.hostname", databaseHostname);config.setProperty("database.port", databasePort);config.setProperty("database.user", databaseUser);config.setProperty("database.password", databasePassword);config.setProperty("database.server.id", databaseServerId);config.setProperty("database.serverTimezone", databaseServerTimezone);config.setProperty("database.history.timezone", databaseHistoryTimezone);config.setProperty("database.connectionTimeZone", databaseConnectionTimeZone);config.setProperty("debezium.timezone", timezone);config.setProperty("include.schema.changes", includeSchemaChanges);config.setProperty("snapshot.mode", snapshotMode);config.setProperty("schema.history.internal", schemaHistoryInternalPath);config.setProperty("schema.history.internal.file.filename", schemaHistoryInternalFileFilename);config.setProperty("database.history.store.only.monitored.tables.ddl", historyStoreOnlyMonitoredTablesDdl);config.setProperty("offset.storage", offsetStorage);config.setProperty("offset.storage.file.filename", offsetStorageFileFilename);config.setProperty("topic.prefix", topicPrefix);config.setProperty("database.include.list", databaseIncludeList);config.setProperty("table.include.list", tableIncludeList);config.setProperty("time.precision.mode", timePrecisionMode);return config;}
}
4、编写监听逻辑
@Slf4j
@Component
@AllArgsConstructor
public class DebeziumBinlogListenerV2 implements ApplicationRunner {private final DebeziumConfig debeziumConfig;private final CheckInOutService checkInOutService;// private final CheckInOutMapper checkInOutMapper;private final CheckInOutZkMapper checkInOutZkMapper;@Overridepublic void run(ApplicationArguments args) throws Exception {// 在bean初始化完成后执行log.info("开始指纹面部监视");Properties config = debeziumConfig.toProperties();// 在配置前添加代码清除偏移量文件log.info("debezium路径:"+debeziumConfig.getOffsetStorageFileFilename() );File offsetFile = new File(debeziumConfig.getOffsetStorageFileFilename());if(offsetFile.exists()) {log.info("offset文件存在:"+debeziumConfig.getOffsetStorageFileFilename() +",进行删除");offsetFile.delete();}File schemaHistoryFile = new File(debeziumConfig.getSchemaHistoryInternalFileFilename());if(schemaHistoryFile.exists()) {log.info("schemaHistory存在:"+debeziumConfig.getSchemaHistoryInternalFileFilename() +",进行删除");schemaHistoryFile.delete();}// 清除旧数据文件File dataDir = new File(debeziumConfig.getDir());if(!dataDir.exists()) {log.info("data文件夹不存在:"+debeziumConfig.getDir() +",进行创建");dataDir.mkdirs();}// 在您的 try 块中修改 notifying 部分try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(config).using((OffsetCommitPolicy) (successCount, timeElapsed) -> true) // 自动提交.notifying(record -> {try {String value = record.value();if (value != null) {ObjectMapper mapper = new ObjectMapper();JsonNode event = mapper.readTree(value);// // 调试打印原始事件log.info("原始事件: " + event);
//
// // 检查操作类型String op = event.path("payload").path("op").asText();log.info("操作类型: " + op);if ("c".equals(op)) { // 插入操作JsonNode after = event.path("payload").path("after");if (!after.isMissingNode()) {// 业务代码}}}} catch (Exception e) {log.error("记录处理失败,跳过: " + e.getMessage(),e);}}).build()) {ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);log.info("正在监听MySQL变更...");} catch (Exception e) {log.error("Debezium监听考勤记录表异常:"+e.getMessage(),e);}log.info("指纹面部监视结束");}
}