当前位置: 首页 > news >正文

告别数据不一致!Spring Boot+Canal+RocketMQ实现精准的MySQL变更监听

下面我将提供一个完整的 Spring Boot 项目,集成 Canal 和 RocketMQ 实现 MySQL binlog 监听和异步处理。

项目结构

src/
├── main/
│   ├── java/
│   │   └── com/
│   │       └── example/
│   │           └── canalrocketmqdemo/
│   │               ├── CanalClientRunner.java        // Canal 客户端
│   │               ├── BinlogEventProducer.java      // RocketMQ 生产者
│   │               ├── BinlogEventConsumer.java      // RocketMQ 消费者
│   │               ├── BinlogEvent.java              // 事件模型
│   │               ├── config/
│   │               │   └── RocketMQConfig.java       // RocketMQ 配置
│   │               └── Application.java              // 主应用
│   └── resources/
│       ├── application.yml                           // 配置文件
│       └── rocketmq.properties                       // RocketMQ 属性文件

完整代码实现

1. 添加依赖 (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>canal-rocketmq-demo</artifactId><version>1.0.0</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.4</version><relativePath/></parent><properties><java.version>17</java.version><rocketmq.version>5.1.4</rocketmq.version><canal.version>1.1.7</canal.version><lombok.version>1.18.30</lombok.version></properties><dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RocketMQ --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq.version}</version></dependency><!-- Canal --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>${canal.version}</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><!-- JSON --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 连接池 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

2. 配置文件 (application.yml)

server:port: 8080spring:application:name: canal-rocketmq-demo# Canal 配置
canal:server: 127.0.0.1:11111destination: examplebatch-size: 1000username: ""password: ""retry:max-attempts: 5backoff: 5000# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876producer:group: canal-producer-groupsend-message-timeout: 3000retry-times-when-send-failed: 3consumer:group: canal-consumer-groupconsume-timeout: 30000

3. RocketMQ 配置类 (RocketMQConfig.java)

package com.example.canalrocketmqdemo.config;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;import java.util.List;@Configuration
public class RocketMQConfig {@Value("${rocketmq.name-server}")private String nameServer;@Beanpublic RocketMQTemplate rocketMQTemplate() {RocketMQTemplate template = new RocketMQTemplate();template.setNameServer(nameServer);return template;}@Beanpublic RocketMQMessageConverter rocketMQMessageConverter() {RocketMQMessageConverter converter = new RocketMQMessageConverter();CompositeMessageConverter compositeConverter = (CompositeMessageConverter) converter.getMessageConverter();List<MessageConverter> converterList = compositeConverter.getConverters();for (MessageConverter messageConverter : converterList) {if (messageConverter instanceof MappingJackson2MessageConverter) {((MappingJackson2MessageConverter) messageConverter).setSerializedPayloadClass(String.class);}}return converter;}
}

4. Binlog 事件模型 (BinlogEvent.java)

package com.example.canalrocketmqdemo;import lombok.Data;import java.util.List;
import java.util.Map;@Data
public class BinlogEvent {public enum EventType { INSERT, UPDATE, DELETE }private String database;private String table;private EventType eventType;private Long executeTime;private List<Map<String, String>> before;   // DELETE/UPDATE 前的数据private List<Map<String, String>> after;    // INSERT/UPDATE 后的数据// 获取唯一标识(用于幂等处理)public String getUniqueId() {return database + ":" + table + ":" + executeTime;}
}

5. RocketMQ 生产者 (BinlogEventProducer.java)

package com.example.canalrocketmqdemo;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class BinlogEventProducer {private final RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.group}")private String producerGroup;@Value("${rocketmq.topic.binlog}")private String binlogTopic;@Autowiredpublic BinlogEventProducer(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;}public void sendEvent(BinlogEvent event) {try {rocketMQTemplate.syncSend(binlogTopic, MessageBuilder.withPayload(event).setHeader("EVENT_TYPE", event.getEventType().name()).build());// 可添加日志记录发送情况} catch (Exception e) {// 添加重试或错误处理逻辑throw new RuntimeException("发送消息到RocketMQ失败", e);}}
}

6. Canal 客户端 (CanalClientRunner.java)

package com.example.canalrocketmqdemo;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;@Component
public class CanalClientRunner implements CommandLineRunner {@Value("${canal.server}")private String canalServer;@Value("${canal.destination}")private String destination;@Value("${canal.batch-size}")private int batchSize;@Value("${canal.username}")private String username;@Value("${canal.password}")private String password;@Value("${canal.retry.max-attempts}")private int maxRetryAttempts;@Value("${canal.retry.backoff}")private long retryBackoff;@Autowiredprivate BinlogEventProducer eventProducer;private CanalConnector connector;private volatile boolean running = true;@Overridepublic void run(String... args) {// 初始化连接initConnector();// 启动处理线程new Thread(this::process).start();// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));}private void initConnector() {String[] serverParts = canalServer.split(":");String host = serverParts[0];int port = Integer.parseInt(serverParts[1]);connector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port),destination,username,password);connectWithRetry();}private void connectWithRetry() {int attempts = 0;while (attempts < maxRetryAttempts) {try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();return;} catch (Exception e) {attempts++;if (attempts >= maxRetryAttempts) {throw new RuntimeException("连接Canal失败,达到最大重试次数", e);}try {Thread.sleep(retryBackoff);} catch (InterruptedException ex) {Thread.currentThread().interrupt();}}}}private void process() {while (running) {try {Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);continue;}processEntries(message.getEntries());connector.ack(batchId);} catch (InterruptedException e) {Thread.currentThread().interrupt();} catch (Exception e) {// 处理异常,尝试重连handleProcessingException(e);}}}private void processEntries(List<Entry> entries) {for (Entry entry : entries) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());Header header = entry.getHeader();BinlogEvent event = new BinlogEvent();event.setDatabase(header.getSchemaName());event.setTable(header.getTableName());event.setExecuteTime(header.getExecuteTime());event.setEventType(mapEventType(rowChange.getEventType()));for (RowData rowData : rowChange.getRowDatasList()) {if (event.getEventType() == BinlogEvent.EventType.UPDATE || event.getEventType() == BinlogEvent.EventType.DELETE) {event.setBefore(convertColumns(rowData.getBeforeColumnsList()));}if (event.getEventType() == BinlogEvent.EventType.INSERT || event.getEventType() == BinlogEvent.EventType.UPDATE) {event.setAfter(convertColumns(rowData.getAfterColumnsList()));}// 发送事件到RocketMQeventProducer.sendEvent(event);}} catch (Exception e) {// 记录错误日志,不要阻断主流程System.err.println("处理binlog条目失败: " + e.getMessage());}}}private List<Map<String, String>> convertColumns(List<Column> columns) {return columns.stream().map(col -> Map.of("name", col.getName(),"value", col.getValue(),"type", col.getMysqlType(),"updated", String.valueOf(col.getUpdated()))).collect(Collectors.toList());}private BinlogEvent.EventType mapEventType(EventType eventType) {switch (eventType) {case INSERT: return BinlogEvent.EventType.INSERT;case UPDATE: return BinlogEvent.EventType.UPDATE;case DELETE: return BinlogEvent.EventType.DELETE;default: return null;}}private void handleProcessingException(Exception e) {System.err.println("处理Canal消息异常: " + e.getMessage());try {Thread.sleep(retryBackoff);connectWithRetry(); // 尝试重新连接} catch (InterruptedException ex) {Thread.currentThread().interrupt();}}private void shutdown() {running = false;if (connector != null) {connector.disconnect();}}
}

7. RocketMQ 消费者 (BinlogEventConsumer.java)

package com.example.canalrocketmqdemo;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.binlog}",consumerGroup = "${rocketmq.consumer.group}"
)
public class BinlogEventConsumer implements RocketMQListener<BinlogEvent> {private static final Logger logger = LoggerFactory.getLogger(BinlogEventConsumer.class);@Overridepublic void onMessage(BinlogEvent event) {try {switch (event.getEventType()) {case INSERT:handleInsert(event);break;case UPDATE:handleUpdate(event);break;case DELETE:handleDelete(event);break;}logger.info("处理成功: {}.{} {}", event.getDatabase(), event.getTable(), event.getEventType());} catch (Exception e) {logger.error("处理事件失败: {}", event, e);// 这里可以添加重试逻辑或发送到死信队列}}private void handleInsert(BinlogEvent event) {// 实际业务处理:更新缓存、同步到ES、数据仓库等logger.info("INSERT 事件: {}.{}", event.getDatabase(), event.getTable());logger.debug("新增数据: {}", event.getAfter());}private void handleUpdate(BinlogEvent event) {logger.info("UPDATE 事件: {}.{}", event.getDatabase(), event.getTable());logger.debug("变更前: {}", event.getBefore());logger.debug("变更后: {}", event.getAfter());}private void handleDelete(BinlogEvent event) {logger.info("DELETE 事件: {}.{}", event.getDatabase(), event.getTable());logger.debug("删除数据: {}", event.getBefore());}
}

8. 主应用类 (Application.java)

package com.example.canalrocketmqdemo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

环境准备与部署步骤

1. MySQL 配置

  1. 修改 MySQL 配置文件 (my.cnf 或 my.ini):

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1
    expire_logs_days=3
  2. 重启 MySQL 服务

  3. 创建 Canal 用户:

    CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;

2. Canal Server 部署

  1. 下载 Canal: https://github.com/alibaba/canal/releases

  2. 解压并修改配置:

    cd canal
    vi conf/example/instance.properties

    修改内容:

    properties

    canal.instance.master.address=127.0.0.1:3306
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal_password
    canal.instance.filter.regex=.*\\..*
  3. 启动 Canal:

    sh bin/startup.sh  # Linux
    bin/startup.bat    # Windows

3. RocketMQ 部署

  1. 下载 RocketMQ: https://rocketmq.apache.org/download

  2. 启动 NameServer:

    nohup sh bin/mqnamesrv &
  3. 启动 Broker:

    nohup sh bin/mqbroker -n localhost:9876 &
  4. 创建 Topic:

    sh bin/mqadmin updateTopic -n localhost:9876 -t binlog-events -b localhost:10911

4. 启动 Spring Boot 应用

mvn spring-boot:run

5. 测试验证

  1. 在 MySQL 中执行操作:

    -- 创建测试表
    CREATE TABLE test_table (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50),email VARCHAR(100)
    );-- 执行DML操作
    INSERT INTO test_table (name, email) VALUES ('John', 'john@example.com');
    UPDATE test_table SET email = 'john.doe@example.com' WHERE id = 1;
    DELETE FROM test_table WHERE id = 1;
  2. 查看应用日志:

    INSERT 事件: test_db.test_table
    UPDATE 事件: test_db.test_table
    DELETE 事件: test_db.test_table
  3. 使用 RocketMQ 控制台查看消息:

    http://localhost:9876

生产环境优化建议

1. 性能优化

// 在 CanalClientRunner 中添加批量发送
private void processEntries(List<Entry> entries) {List<BinlogEvent> events = new ArrayList<>();for (Entry entry : entries) {// ... 解析事件 ...events.add(event);}// 批量发送if (!events.isEmpty()) {eventProducer.sendBatchEvents(events);}
}// 在 BinlogEventProducer 中添加批量发送方法
public void sendBatchEvents(List<BinlogEvent> events) {List<Message<BinlogEvent>> messages = events.stream().map(event -> MessageBuilder.withPayload(event).build()).collect(Collectors.toList());rocketMQTemplate.syncSend(binlogTopic, messages);
}

2. 消息过滤

在 application.yml 中添加:

canal:filter:tables: user, product, order.*  # 只监听指定表

在 CanalClientRunner 中:

@Value("${canal.filter.tables:}")
private String tableFilter;private void initConnector() {// ...connector.subscribe(tableFilter.isEmpty() ? ".*\\..*" : tableFilter);
}

3. 消息轨迹和监控

// 在发送消息时添加消息轨迹
public void sendEvent(BinlogEvent event) {rocketMQTemplate.syncSendOrderly(binlogTopic, MessageBuilder.withPayload(event).build(), event.getUniqueId());// 添加监控指标Metrics.counter("rocketmq.sent.messages", "table", event.getTable(),"type", event.getEventType().name()).increment();
}

4. 死信队列处理

// 在 RocketMQConfig 中添加死信队列配置
@Bean
public RocketMQListenerContainerFactory rocketMQListenerContainerFactory() {DefaultRocketMQListenerContainerFactory factory = new DefaultRocketMQListenerContainerFactory();factory.setConsumerGroup("${rocketmq.consumer.group}");factory.setNameServerAddress("${rocketmq.name-server}");// 设置重试次数factory.setMaxReconsumeTimes(3);// 设置死信队列主题factory.setDLQTopic("DLQ_${rocketmq.topic.binlog}");return factory;
}

常见问题解决

1. Canal 连接问题

症状: 连接 Canal Server 失败
解决:

  • 检查 Canal Server 是否运行: netstat -an | grep 11111

  • 确认用户名/密码是否正确

  • 查看 Canal Server 日志: logs/canal/canal.log

2. RocketMQ 发送失败

症状: 消息发送超时或失败
解决:

  • 检查 NameServer 状态: mqadmin clusterList -n localhost:9876

  • 增加发送超时时间: rocketmq.producer.send-message-timeout=5000

  • 检查网络连接

3. 消息消费积压

症状: RocketMQ 控制台显示消息积压
解决:

  • 增加消费者实例数量

  • 调整批量消费大小

  • 优化消费者处理逻辑

4. 数据不一致问题

症状: 处理后的数据与源数据不一致
解决:

  • 添加幂等处理逻辑

  • 实现数据校对机制

  • 记录消息处理状态

总结

本教程完整实现了 Spring Boot 项目整合 Canal 和 RocketMQ 监听 MySQL binlog 的方案:

  1. Canal 客户端:负责监听 MySQL binlog 变更

  2. RocketMQ 生产者:将 binlog 事件发送到消息队列

  3. RocketMQ 消费者:异步处理变更事件

  4. 完整配置:包含 MySQL、Canal Server 和 RocketMQ 的配置

通过使用 RocketMQ 作为消息中间件,系统获得了以下优势:

  • 解耦:数据处理与数据捕获分离

  • 可靠性:消息持久化保证不丢失

  • 扩展性:消费者可水平扩展

  • 容错性:重试机制和死信队列

  • 顺序性:相同表的数据变更保持顺序

生产环境部署时,请根据实际流量调整:

  • Canal 的 batch-size 参数

  • RocketMQ 的 Topic 分区数

  • 消费者并发数

  • 消息存储时间

此方案适用于需要实时数据同步的场景,如缓存更新、搜索引擎同步、实时分析等。

http://www.dtcms.com/a/268884.html

相关文章:

  • 【flutter 在最新版本中已经弃用了传统的 apply from 方式引入 Gradle 插件】
  • 源哈希(sh)解析
  • 如果让计算机理解人类语言- One-hot 编码(One-hot Encoding,1950s)
  • Charles 中文版抓包工具详解:加速 API 调试与网络问题排查
  • tensorflow武林志第三卷第一章:天罗剑网
  • 【Java】switch,case,default,break用法
  • 在线学堂-4.媒资管理模块(三)
  • 技术面试题,HR面试题
  • LastActivityView -查看电脑上的所有操作记录
  • 基于MATLAB的图片和视频时间戳识别与可视化系统
  • 国际数字影像产业园:一站式服务,加速企业成长
  • 国产DSP,QXS320F280049,QXS320F28377D,QXS320F2800137,QXS320F28034
  • unity luban接入
  • Kotlin流操作符简介
  • uniapp三步完成生成一维码图片
  • 在linux 上使用tcpdump监听http 端口的报文并分析
  • 部署NextCloud AIO + Frp + nginx-proxy-manager内网穿透私有云服务
  • [免费]基于Python豆瓣电影数据分析及可视化系统(Flask+echarts+pandas)【论文+源码+SQL脚本】
  • WebView安全实现(一)
  • 「Windows/Mac OS」AIGC图片生成视频 ,webui + stable-diffusion环境部署教程
  • Linux系统从入门到精通!第四天(shell编程和Docker)
  • 雪豹速清:智能清理,释放手机空间
  • 操作系统级TCP性能优化:高并发场景下的内核参数调优实践
  • 【机器学习笔记 Ⅲ】4 特征选择
  • WebDAV与内网穿透的协同创新:Joplin私有云同步架构深度解析
  • 【Linux系统】Linux权限 | Shell命令以及运行原理
  • 【Java安全】反射基础
  • (倍增)洛谷 P1613 跑路/P4155 国旗计划
  • VBA之Word应用第三章第十一节:Document对象的事件
  • 图像采集卡选型详细指南