告别数据不一致!Spring Boot+Canal+RocketMQ实现精准的MySQL变更监听
下面我将提供一个完整的 Spring Boot 项目,集成 Canal 和 RocketMQ 实现 MySQL binlog 监听和异步处理。
项目结构
src/ ├── main/ │ ├── java/ │ │ └── com/ │ │ └── example/ │ │ └── canalrocketmqdemo/ │ │ ├── CanalClientRunner.java // Canal 客户端 │ │ ├── BinlogEventProducer.java // RocketMQ 生产者 │ │ ├── BinlogEventConsumer.java // RocketMQ 消费者 │ │ ├── BinlogEvent.java // 事件模型 │ │ ├── config/ │ │ │ └── RocketMQConfig.java // RocketMQ 配置 │ │ └── Application.java // 主应用 │ └── resources/ │ ├── application.yml // 配置文件 │ └── rocketmq.properties // RocketMQ 属性文件
完整代码实现
1. 添加依赖 (pom.xml
)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>canal-rocketmq-demo</artifactId><version>1.0.0</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.4</version><relativePath/></parent><properties><java.version>17</java.version><rocketmq.version>5.1.4</rocketmq.version><canal.version>1.1.7</canal.version><lombok.version>1.18.30</lombok.version></properties><dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RocketMQ --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq.version}</version></dependency><!-- Canal --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>${canal.version}</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><!-- JSON --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 连接池 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
2. 配置文件 (application.yml
)
server:port: 8080spring:application:name: canal-rocketmq-demo# Canal 配置
canal:server: 127.0.0.1:11111destination: examplebatch-size: 1000username: ""password: ""retry:max-attempts: 5backoff: 5000# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876producer:group: canal-producer-groupsend-message-timeout: 3000retry-times-when-send-failed: 3consumer:group: canal-consumer-groupconsume-timeout: 30000
3. RocketMQ 配置类 (RocketMQConfig.java
)
package com.example.canalrocketmqdemo.config;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;import java.util.List;@Configuration
public class RocketMQConfig {@Value("${rocketmq.name-server}")private String nameServer;@Beanpublic RocketMQTemplate rocketMQTemplate() {RocketMQTemplate template = new RocketMQTemplate();template.setNameServer(nameServer);return template;}@Beanpublic RocketMQMessageConverter rocketMQMessageConverter() {RocketMQMessageConverter converter = new RocketMQMessageConverter();CompositeMessageConverter compositeConverter = (CompositeMessageConverter) converter.getMessageConverter();List<MessageConverter> converterList = compositeConverter.getConverters();for (MessageConverter messageConverter : converterList) {if (messageConverter instanceof MappingJackson2MessageConverter) {((MappingJackson2MessageConverter) messageConverter).setSerializedPayloadClass(String.class);}}return converter;}
}
4. Binlog 事件模型 (BinlogEvent.java
)
package com.example.canalrocketmqdemo;import lombok.Data;import java.util.List;
import java.util.Map;@Data
public class BinlogEvent {public enum EventType { INSERT, UPDATE, DELETE }private String database;private String table;private EventType eventType;private Long executeTime;private List<Map<String, String>> before; // DELETE/UPDATE 前的数据private List<Map<String, String>> after; // INSERT/UPDATE 后的数据// 获取唯一标识(用于幂等处理)public String getUniqueId() {return database + ":" + table + ":" + executeTime;}
}
5. RocketMQ 生产者 (BinlogEventProducer.java
)
package com.example.canalrocketmqdemo;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class BinlogEventProducer {private final RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.group}")private String producerGroup;@Value("${rocketmq.topic.binlog}")private String binlogTopic;@Autowiredpublic BinlogEventProducer(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;}public void sendEvent(BinlogEvent event) {try {rocketMQTemplate.syncSend(binlogTopic, MessageBuilder.withPayload(event).setHeader("EVENT_TYPE", event.getEventType().name()).build());// 可添加日志记录发送情况} catch (Exception e) {// 添加重试或错误处理逻辑throw new RuntimeException("发送消息到RocketMQ失败", e);}}
}
6. Canal 客户端 (CanalClientRunner.java
)
package com.example.canalrocketmqdemo;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;@Component
public class CanalClientRunner implements CommandLineRunner {@Value("${canal.server}")private String canalServer;@Value("${canal.destination}")private String destination;@Value("${canal.batch-size}")private int batchSize;@Value("${canal.username}")private String username;@Value("${canal.password}")private String password;@Value("${canal.retry.max-attempts}")private int maxRetryAttempts;@Value("${canal.retry.backoff}")private long retryBackoff;@Autowiredprivate BinlogEventProducer eventProducer;private CanalConnector connector;private volatile boolean running = true;@Overridepublic void run(String... args) {// 初始化连接initConnector();// 启动处理线程new Thread(this::process).start();// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));}private void initConnector() {String[] serverParts = canalServer.split(":");String host = serverParts[0];int port = Integer.parseInt(serverParts[1]);connector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port),destination,username,password);connectWithRetry();}private void connectWithRetry() {int attempts = 0;while (attempts < maxRetryAttempts) {try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();return;} catch (Exception e) {attempts++;if (attempts >= maxRetryAttempts) {throw new RuntimeException("连接Canal失败,达到最大重试次数", e);}try {Thread.sleep(retryBackoff);} catch (InterruptedException ex) {Thread.currentThread().interrupt();}}}}private void process() {while (running) {try {Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);continue;}processEntries(message.getEntries());connector.ack(batchId);} catch (InterruptedException e) {Thread.currentThread().interrupt();} catch (Exception e) {// 处理异常,尝试重连handleProcessingException(e);}}}private void processEntries(List<Entry> entries) {for (Entry entry : entries) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());Header header = entry.getHeader();BinlogEvent event = new BinlogEvent();event.setDatabase(header.getSchemaName());event.setTable(header.getTableName());event.setExecuteTime(header.getExecuteTime());event.setEventType(mapEventType(rowChange.getEventType()));for (RowData rowData : rowChange.getRowDatasList()) {if (event.getEventType() == BinlogEvent.EventType.UPDATE || event.getEventType() == BinlogEvent.EventType.DELETE) {event.setBefore(convertColumns(rowData.getBeforeColumnsList()));}if (event.getEventType() == BinlogEvent.EventType.INSERT || event.getEventType() == BinlogEvent.EventType.UPDATE) {event.setAfter(convertColumns(rowData.getAfterColumnsList()));}// 发送事件到RocketMQeventProducer.sendEvent(event);}} catch (Exception e) {// 记录错误日志,不要阻断主流程System.err.println("处理binlog条目失败: " + e.getMessage());}}}private List<Map<String, String>> convertColumns(List<Column> columns) {return columns.stream().map(col -> Map.of("name", col.getName(),"value", col.getValue(),"type", col.getMysqlType(),"updated", String.valueOf(col.getUpdated()))).collect(Collectors.toList());}private BinlogEvent.EventType mapEventType(EventType eventType) {switch (eventType) {case INSERT: return BinlogEvent.EventType.INSERT;case UPDATE: return BinlogEvent.EventType.UPDATE;case DELETE: return BinlogEvent.EventType.DELETE;default: return null;}}private void handleProcessingException(Exception e) {System.err.println("处理Canal消息异常: " + e.getMessage());try {Thread.sleep(retryBackoff);connectWithRetry(); // 尝试重新连接} catch (InterruptedException ex) {Thread.currentThread().interrupt();}}private void shutdown() {running = false;if (connector != null) {connector.disconnect();}}
}
7. RocketMQ 消费者 (BinlogEventConsumer.java
)
package com.example.canalrocketmqdemo;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.binlog}",consumerGroup = "${rocketmq.consumer.group}"
)
public class BinlogEventConsumer implements RocketMQListener<BinlogEvent> {private static final Logger logger = LoggerFactory.getLogger(BinlogEventConsumer.class);@Overridepublic void onMessage(BinlogEvent event) {try {switch (event.getEventType()) {case INSERT:handleInsert(event);break;case UPDATE:handleUpdate(event);break;case DELETE:handleDelete(event);break;}logger.info("处理成功: {}.{} {}", event.getDatabase(), event.getTable(), event.getEventType());} catch (Exception e) {logger.error("处理事件失败: {}", event, e);// 这里可以添加重试逻辑或发送到死信队列}}private void handleInsert(BinlogEvent event) {// 实际业务处理:更新缓存、同步到ES、数据仓库等logger.info("INSERT 事件: {}.{}", event.getDatabase(), event.getTable());logger.debug("新增数据: {}", event.getAfter());}private void handleUpdate(BinlogEvent event) {logger.info("UPDATE 事件: {}.{}", event.getDatabase(), event.getTable());logger.debug("变更前: {}", event.getBefore());logger.debug("变更后: {}", event.getAfter());}private void handleDelete(BinlogEvent event) {logger.info("DELETE 事件: {}.{}", event.getDatabase(), event.getTable());logger.debug("删除数据: {}", event.getBefore());}
}
8. 主应用类 (Application.java
)
package com.example.canalrocketmqdemo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
环境准备与部署步骤
1. MySQL 配置
-
修改 MySQL 配置文件 (
my.cnf
或my.ini
):[mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1 expire_logs_days=3
-
重启 MySQL 服务
-
创建 Canal 用户:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
2. Canal Server 部署
-
下载 Canal: https://github.com/alibaba/canal/releases
-
解压并修改配置:
cd canal vi conf/example/instance.properties
修改内容:
properties
canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal_password canal.instance.filter.regex=.*\\..*
-
启动 Canal:
sh bin/startup.sh # Linux bin/startup.bat # Windows
3. RocketMQ 部署
-
下载 RocketMQ: https://rocketmq.apache.org/download
-
启动 NameServer:
nohup sh bin/mqnamesrv &
-
启动 Broker:
nohup sh bin/mqbroker -n localhost:9876 &
-
创建 Topic:
sh bin/mqadmin updateTopic -n localhost:9876 -t binlog-events -b localhost:10911
4. 启动 Spring Boot 应用
mvn spring-boot:run
5. 测试验证
-
在 MySQL 中执行操作:
-- 创建测试表 CREATE TABLE test_table (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50),email VARCHAR(100) );-- 执行DML操作 INSERT INTO test_table (name, email) VALUES ('John', 'john@example.com'); UPDATE test_table SET email = 'john.doe@example.com' WHERE id = 1; DELETE FROM test_table WHERE id = 1;
-
查看应用日志:
INSERT 事件: test_db.test_table UPDATE 事件: test_db.test_table DELETE 事件: test_db.test_table
-
使用 RocketMQ 控制台查看消息:
http://localhost:9876
生产环境优化建议
1. 性能优化
// 在 CanalClientRunner 中添加批量发送
private void processEntries(List<Entry> entries) {List<BinlogEvent> events = new ArrayList<>();for (Entry entry : entries) {// ... 解析事件 ...events.add(event);}// 批量发送if (!events.isEmpty()) {eventProducer.sendBatchEvents(events);}
}// 在 BinlogEventProducer 中添加批量发送方法
public void sendBatchEvents(List<BinlogEvent> events) {List<Message<BinlogEvent>> messages = events.stream().map(event -> MessageBuilder.withPayload(event).build()).collect(Collectors.toList());rocketMQTemplate.syncSend(binlogTopic, messages);
}
2. 消息过滤
在 application.yml
中添加:
canal:filter:tables: user, product, order.* # 只监听指定表
在 CanalClientRunner
中:
@Value("${canal.filter.tables:}")
private String tableFilter;private void initConnector() {// ...connector.subscribe(tableFilter.isEmpty() ? ".*\\..*" : tableFilter);
}
3. 消息轨迹和监控
// 在发送消息时添加消息轨迹
public void sendEvent(BinlogEvent event) {rocketMQTemplate.syncSendOrderly(binlogTopic, MessageBuilder.withPayload(event).build(), event.getUniqueId());// 添加监控指标Metrics.counter("rocketmq.sent.messages", "table", event.getTable(),"type", event.getEventType().name()).increment();
}
4. 死信队列处理
// 在 RocketMQConfig 中添加死信队列配置
@Bean
public RocketMQListenerContainerFactory rocketMQListenerContainerFactory() {DefaultRocketMQListenerContainerFactory factory = new DefaultRocketMQListenerContainerFactory();factory.setConsumerGroup("${rocketmq.consumer.group}");factory.setNameServerAddress("${rocketmq.name-server}");// 设置重试次数factory.setMaxReconsumeTimes(3);// 设置死信队列主题factory.setDLQTopic("DLQ_${rocketmq.topic.binlog}");return factory;
}
常见问题解决
1. Canal 连接问题
症状: 连接 Canal Server 失败
解决:
-
检查 Canal Server 是否运行:
netstat -an | grep 11111
-
确认用户名/密码是否正确
-
查看 Canal Server 日志:
logs/canal/canal.log
2. RocketMQ 发送失败
症状: 消息发送超时或失败
解决:
-
检查 NameServer 状态:
mqadmin clusterList -n localhost:9876
-
增加发送超时时间:
rocketmq.producer.send-message-timeout=5000
-
检查网络连接
3. 消息消费积压
症状: RocketMQ 控制台显示消息积压
解决:
-
增加消费者实例数量
-
调整批量消费大小
-
优化消费者处理逻辑
4. 数据不一致问题
症状: 处理后的数据与源数据不一致
解决:
-
添加幂等处理逻辑
-
实现数据校对机制
-
记录消息处理状态
总结
本教程完整实现了 Spring Boot 项目整合 Canal 和 RocketMQ 监听 MySQL binlog 的方案:
-
Canal 客户端:负责监听 MySQL binlog 变更
-
RocketMQ 生产者:将 binlog 事件发送到消息队列
-
RocketMQ 消费者:异步处理变更事件
-
完整配置:包含 MySQL、Canal Server 和 RocketMQ 的配置
通过使用 RocketMQ 作为消息中间件,系统获得了以下优势:
-
解耦:数据处理与数据捕获分离
-
可靠性:消息持久化保证不丢失
-
扩展性:消费者可水平扩展
-
容错性:重试机制和死信队列
-
顺序性:相同表的数据变更保持顺序
生产环境部署时,请根据实际流量调整:
-
Canal 的
batch-size
参数 -
RocketMQ 的 Topic 分区数
-
消费者并发数
-
消息存储时间
此方案适用于需要实时数据同步的场景,如缓存更新、搜索引擎同步、实时分析等。