Canal:让 MySQL 数据同步像呼吸一样自然
引言:为什么我们需要 Canal?
在当今的分布式系统架构中,数据一致性和实时性成为越来越重要的挑战。想象一下这样的场景:当用户在电商平台下单后,订单数据写入 MySQL 数据库,此时需要实时更新缓存、同步到搜索服务、推送消息给用户和商家,还要同步到数据仓库进行分析。如果这些操作都通过业务代码硬编码实现,不仅耦合度高,而且难以维护和扩展。
Canal 正是为解决这类问题而生的中间件。它就像一位 "数据搬运工",能够实时捕获 MySQL 数据库的变更,并将这些变更高效地同步到其他系统。本文将带你从理论到实践,全面掌握 Canal 的使用,让数据同步变得简单而高效。
一、Canal 核心原理揭秘
1.1 什么是 Canal?
Canal 是阿里巴巴开源的一款基于 MySQL 二进制日志(binlog)的增量数据同步工具。它最初是为了解决阿里巴巴内部跨机房数据同步问题而开发的,后来开源供社区使用。
简单来说,Canal 的工作原理就是模拟 MySQL 主从复制的过程:
- 伪装成 MySQL 的从库(slave)
- 向 MySQL 主库(master)发送 dump 协议
- 接收并解析 MySQL 主库的 binlog
- 将解析后的数据变更推送给消费者
1.2 MySQL binlog 简介
要理解 Canal,首先需要了解 MySQL 的 binlog。binlog 是 MySQL 服务器端的一种二进制日志,它记录了所有的数据变更操作(DDL 和 DML),并以事件的形式存储。
binlog 有三种格式:
- STATEMENT:记录 SQL 语句,不记录数据,日志体积小,但可能存在数据不一致的问题
- ROW:记录数据的变更,而不是 SQL 语句,日志体积大,但能保证数据一致性
- MIXED:混合模式,MySQL 会根据情况自动选择使用 STATEMENT 或 ROW 模式
Canal 推荐使用 ROW 模式,因为这种模式可以明确记录每行数据的变更细节,最适合数据同步场景。
1.3 Canal 工作流程图
二、环境准备与配置
2.1 版本选择
本文将使用以下最新稳定版本:
- MySQL:8.0.36
- Canal:1.1.7
- JDK:17
- Spring Boot:3.2.5
- MyBatis-Plus:3.5.6
- Lombok:1.18.30
- Fastjson2:2.0.47
2.2 MySQL 配置
首先需要配置 MySQL 以支持 binlog 和主从复制:
-- 查看当前MySQL的binlog配置
show variables like 'log_bin';
show variables like 'binlog_format';
show variables like 'server_id';-- 如果未开启binlog,需要修改my.cnf或my.ini配置文件
[mysqld]
# 开启binlog
log_bin=mysql-bin
# binlog格式为ROW
binlog_format=ROW
# 服务器ID,主从复制时需要,Canal也需要
server_id=1
# 需要同步的数据库,多个用逗号分隔
binlog_do_db=test_canal
# 不需要同步的数据库
binlog_ignore_db=mysql
# binlog过期时间,避免日志过大
expire_logs_days=7
配置完成后重启 MySQL,并创建 Canal 专用账号:
-- 创建用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'Canal@123456';
-- 授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;
2.3 Canal 服务端安装与配置
- 下载 Canal 服务端:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
- 解压:
mkdir -p /opt/canal
tar -zxvf canal.deployer-1.1.7.tar.gz -C /opt/canal
- 修改配置文件
/opt/canal/conf/example/instance.properties:
# MySQL主库地址
canal.instance.master.address=127.0.0.1:3306
# 用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 起始同步的binlog位置,如果是第一次同步可以不填,默认从最新位置开始
canal.instance.master.journal.name=
canal.instance.master.position=
# 需要监听的数据库,多个用逗号分隔
canal.instance.defaultDatabaseName=test_canal
# 表过滤规则,格式:数据库名.表名,支持通配符*
canal.instance.filter.regex=test_canal\\..*
- 启动 Canal 服务:
/opt/canal/bin/startup.sh
- 查看日志确认启动成功:
tail -f /opt/canal/logs/canal/canal.log
tail -f /opt/canal/logs/example/example.log
三、Canal 客户端开发实战
3.1 项目初始化
创建一个 Spring Boot 项目,添加以下依赖到pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/></parent><groupId>com.example</groupId><artifactId>canal-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>canal-demo</name><description>Canal实战示例</description><properties><java.version>17</java.version><canal.version>1.1.7</canal.version><mybatis-plus.version>3.5.6</mybatis-plus.version><fastjson2.version>2.0.47</fastjson2.version><lombok.version>1.18.30</lombok.version><swagger.version>3.0.0</swagger.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Canal客户端 --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>${canal.version}</version></dependency><!-- MyBatis-Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><!-- MySQL驱动 --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><!-- Fastjson2 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>${fastjson2.version}</version></dependency><!-- Swagger3 --><dependency><groupId>io.springfox</groupId><artifactId>springfox-boot-starter</artifactId><version>${swagger.version}</version></dependency><!-- Google Collections --><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>33.1.0-jre</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>
3.2 配置文件
创建application.yml配置文件:
spring:datasource:url: jdbc:mysql://localhost:3306/test_canal?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Drivermybatis-plus:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.example.canaldemo.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImplcanal:server: 127.0.0.1:11111destination: exampleusername:password:server:port: 8080logging:level:com.example.canaldemo: info
3.3 创建数据库和实体类
首先创建测试数据库:
CREATE DATABASE IF NOT EXISTS test_canal CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE test_canal;-- 创建用户表
CREATE TABLE `user` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',`username` varchar(50) NOT NULL COMMENT '用户名',`password` varchar(100) NOT NULL COMMENT '密码',`nickname` varchar(50) DEFAULT NULL COMMENT '昵称',`age` int DEFAULT NULL COMMENT '年龄',`email` varchar(100) DEFAULT NULL COMMENT '邮箱',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表';
创建对应的实体类User.java:
package com.example.canaldemo.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;/*** 用户实体类** @author ken*/
@Data
@TableName("user")
@Schema(description = "用户实体")
public class User {@TableId(type = IdType.AUTO)@Schema(description = "主键ID")private Long id;@TableField("username")@Schema(description = "用户名")private String username;@TableField("password")@Schema(description = "密码")private String password;@TableField("nickname")@Schema(description = "昵称")private String nickname;@TableField("age")@Schema(description = "年龄")private Integer age;@TableField("email")@Schema(description = "邮箱")private String email;@TableField("create_time")@Schema(description = "创建时间")private LocalDateTime createTime;@TableField("update_time")@Schema(description = "更新时间")private LocalDateTime updateTime;
}
3.4 编写 Canal 客户端核心代码
创建 Canal 客户端配置类CanalClientConfig.java:
package com.example.canaldemo.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;/*** Canal客户端配置** @author ken*/
@Configuration
public class CanalClientConfig {@Value("${canal.server}")private String canalServer;@Value("${canal.destination}")private String destination;@Value("${canal.username}")private String username;@Value("${canal.password}")private String password;/*** 创建Canal连接器** @return CanalConnector实例*/@Beanpublic CanalConnector canalConnector() {String[] serverAddress = canalServer.split(":");String host = serverAddress[0];int port = Integer.parseInt(serverAddress[1]);return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port),destination,username,password);}
}
创建 Canal 消息处理器接口CanalMessageHandler.java:
package com.example.canaldemo.handler;import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.List;/*** Canal消息处理器接口** @author ken*/
public interface CanalMessageHandler {/*** 处理Canal消息** @param tableName 表名* @param eventType 事件类型* @param rowDatas 行数据列表*/void handle(String tableName, CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDatas);
}
创建用户表消息处理器UserTableHandler.java:
package com.example.canaldemo.handler.impl;import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.example.canaldemo.entity.User;
import com.example.canaldemo.handler.CanalMessageHandler;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;/*** 用户表数据变更处理器** @author ken*/
@Slf4j
@Component
public class UserTableHandler implements CanalMessageHandler {private static final String TABLE_NAME = "user";@Overridepublic void handle(String tableName, CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDatas) {// 只处理用户表的变更if (!TABLE_NAME.equals(tableName)) {return;}log.info("开始处理用户表变更,事件类型:{},变更行数:{}", eventType, rowDatas.size());switch (eventType) {case INSERT:handleInsert(rowDatas);break;case UPDATE:handleUpdate(rowDatas);break;case DELETE:handleDelete(rowDatas);break;default:log.info("不处理的事件类型:{}", eventType);}}/*** 处理插入事件** @param rowDatas 行数据列表*/private void handleInsert(List<CanalEntry.RowData> rowDatas) {List<User> users = Lists.newArrayList();for (CanalEntry.RowData rowData : rowDatas) {Map<String, String> dataMap = rowData.getAfterColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));User user = JSON.parseObject(JSON.toJSONString(dataMap), User.class);users.add(user);}if (!CollectionUtils.isEmpty(users)) {log.info("新增用户:{}", JSON.toJSONString(users));// 这里可以添加同步到缓存、搜索等业务逻辑}}/*** 处理更新事件** @param rowDatas 行数据列表*/private void handleUpdate(List<CanalEntry.RowData> rowDatas) {for (CanalEntry.RowData rowData : rowDatas) {Map<String, String> beforeMap = rowData.getBeforeColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));Map<String, String> afterMap = rowData.getAfterColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));User beforeUser = JSON.parseObject(JSON.toJSONString(beforeMap), User.class);User afterUser = JSON.parseObject(JSON.toJSONString(afterMap), User.class);log.info("更新用户,更新前:{},更新后:{}", JSON.toJSONString(beforeUser), JSON.toJSONString(afterUser));// 这里可以添加同步到缓存、搜索等业务逻辑}}/*** 处理删除事件** @param rowDatas 行数据列表*/private void handleDelete(List<CanalEntry.RowData> rowDatas) {List<User> users = Lists.newArrayList();for (CanalEntry.RowData rowData : rowDatas) {Map<String, String> dataMap = rowData.getBeforeColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));User user = JSON.parseObject(JSON.toJSONString(dataMap), User.class);users.add(user);}if (!CollectionUtils.isEmpty(users)) {log.info("删除用户:{}", JSON.toJSONString(users));// 这里可以添加从缓存、搜索等删除的业务逻辑}}
}
创建 Canal 客户端服务CanalClientService.java:
package com.example.canaldemo.service;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canaldemo.handler.CanalMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** Canal客户端服务** @author ken*/
@Slf4j
@Service
public class CanalClientService {private final CanalConnector canalConnector;private final List<CanalMessageHandler> messageHandlers;private final ExecutorService executorService = Executors.newSingleThreadExecutor();private volatile boolean running = false;@Autowiredpublic CanalClientService(CanalConnector canalConnector, List<CanalMessageHandler> messageHandlers) {this.canalConnector = canalConnector;this.messageHandlers = messageHandlers;}/*** 启动Canal客户端*/public void start() {if (running) {log.warn("Canal客户端已经在运行中");return;}running = true;executorService.execute(this::process);log.info("Canal客户端启动成功");}/*** 停止Canal客户端*/public void stop() {if (!running) {log.warn("Canal客户端已经停止");return;}running = false;executorService.shutdown();canalConnector.disconnect();log.info("Canal客户端停止成功");}/*** 处理Canal消息*/private void process() {int batchSize = 1000;try {// 连接Canal服务端canalConnector.connect();// 订阅所有消息canalConnector.subscribe();// 回滚到上次处理成功的位置canalConnector.rollback();while (running) {// 获取消息Message message = canalConnector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {// 没有消息,休眠一段时间try {Thread.sleep(1000);} catch (InterruptedException e) {// 忽略中断异常}} else {// 处理消息processMessage(message.getEntries());// 确认消息已处理canalConnector.ack(batchId);}}} catch (Exception e) {log.error("Canal客户端处理消息异常", e);try {// 发生异常时回滚canalConnector.rollback();} catch (Exception ex) {log.error("Canal客户端回滚异常", ex);}} finally {canalConnector.disconnect();}}/*** 处理消息条目** @param entries 消息条目列表*/private void processMessage(List<CanalEntry.Entry> entries) {for (CanalEntry.Entry entry : entries) {// 忽略事务开始和结束的消息if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChange;try {// 解析消息内容rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {log.error("解析Canal消息失败,entry: {}", entry, e);continue;}// 获取数据库名和表名String databaseName = entry.getHeader().getSchemaName();String tableName = entry.getHeader().getTableName();CanalEntry.EventType eventType = rowChange.getEventType();log.info("接收到数据变更,数据库:{},表名:{},事件类型:{}", databaseName, tableName, eventType);// 处理行数据变更List<CanalEntry.RowData> rowDatas = rowChange.getRowDatasList();if (!CollectionUtils.isEmpty(rowDatas) && !CollectionUtils.isEmpty(messageHandlers)) {for (CanalMessageHandler handler : messageHandlers) {try {handler.handle(tableName, eventType, rowDatas);} catch (Exception e) {log.error("处理Canal消息异常,handler: {}", handler.getClass().getName(), e);}}}}}
}
创建启动类CanalDemoApplication.java:
package com.example.canaldemo;import com.example.canaldemo.service.CanalClientService;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** Canal示例应用启动类** @author ken*/
@Slf4j
@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
@OpenAPIDefinition(info = @Info(title = "Canal实战示例API", version = "1.0", description = "Canal实战示例接口文档"))
public class CanalDemoApplication implements CommandLineRunner {private final CanalClientService canalClientService;@Autowiredpublic CanalDemoApplication(CanalClientService canalClientService) {this.canalClientService = canalClientService;}public static void main(String[] args) {SpringApplication.run(CanalDemoApplication.class, args);}@Overridepublic void run(String... args) {// 启动Canal客户端canalClientService.start();log.info("Canal示例应用启动成功");}
}
3.5 创建测试接口
为了方便测试,创建用户相关的 Controller、Service 和 Mapper:
UserController.java:
package com.example.canaldemo.controller;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.canaldemo.entity.User;
import com.example.canaldemo.service.UserService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;/*** 用户控制器** @author ken*/
@RestController
@RequestMapping("/api/users")
@Tag(name = "用户管理", description = "用户CRUD接口")
public class UserController {private final UserService userService;@Autowiredpublic UserController(UserService userService) {this.userService = userService;}@PostMapping@Operation(summary = "创建用户", description = "新增一个用户")public ResponseEntity<User> createUser(@RequestBody User user) {StringUtils.hasText(user.getUsername(), "用户名不能为空");StringUtils.hasText(user.getPassword(), "密码不能为空");User savedUser = userService.saveUser(user);return ResponseEntity.ok(savedUser);}@GetMapping("/{id}")@Operation(summary = "查询用户", description = "根据ID查询用户详情")public ResponseEntity<User> getUserById(@Parameter(description = "用户ID", required = true)@PathVariable Long id) {User user = userService.getUserById(id);return ResponseEntity.ok(user);}@PutMapping("/{id}")@Operation(summary = "更新用户", description = "根据ID更新用户信息")public ResponseEntity<User> updateUser(@Parameter(description = "用户ID", required = true)@PathVariable Long id,@RequestBody User user) {user.setId(id);User updatedUser = userService.updateUser(user);return ResponseEntity.ok(updatedUser);}@DeleteMapping("/{id}")@Operation(summary = "删除用户", description = "根据ID删除用户")public ResponseEntity<Void> deleteUser(@Parameter(description = "用户ID", required = true)@PathVariable Long id) {userService.deleteUser(id);return ResponseEntity.noContent().build();}@GetMapping@Operation(summary = "分页查询用户", description = "分页查询用户列表")public ResponseEntity<Page<User>> listUsers(@Parameter(description = "页码,从1开始")@RequestParam(defaultValue = "1") Integer pageNum,@Parameter(description = "每页条数")@RequestParam(defaultValue = "10") Integer pageSize) {Page<User> page = userService.listUsers(pageNum, pageSize);return ResponseEntity.ok(page);}
}
UserService.java:
package com.example.canaldemo.service;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.canaldemo.entity.User;/*** 用户服务接口** @author ken*/
public interface UserService {/*** 保存用户** @param user 用户实体* @return 保存后的用户实体*/User saveUser(User user);/*** 根据ID查询用户** @param id 用户ID* @return 用户实体*/User getUserById(Long id);/*** 更新用户** @param user 用户实体* @return 更新后的用户实体*/User updateUser(User user);/*** 根据ID删除用户** @param id 用户ID*/void deleteUser(Long id);/*** 分页查询用户** @param pageNum 页码* @param pageSize 每页条数* @return 用户分页列表*/Page<User> listUsers(Integer pageNum, Integer pageSize);
}
UserServiceImpl.java:
package com.example.canaldemo.service.impl;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.canaldemo.entity.User;
import com.example.canaldemo.mapper.UserMapper;
import com.example.canaldemo.service.UserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;/*** 用户服务实现类** @author ken*/
@Slf4j
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {private final UserMapper userMapper;@Autowiredpublic UserServiceImpl(UserMapper userMapper) {this.userMapper = userMapper;}@Overridepublic User saveUser(User user) {userMapper.insert(user);log.info("保存用户成功,ID: {}", user.getId());return user;}@Overridepublic User getUserById(Long id) {User user = userMapper.selectById(id);if (ObjectUtils.isEmpty(user)) {log.warn("用户不存在,ID: {}", id);}return user;}@Overridepublic User updateUser(User user) {userMapper.updateById(user);log.info("更新用户成功,ID: {}", user.getId());return userMapper.selectById(user.getId());}@Overridepublic void deleteUser(Long id) {userMapper.deleteById(id);log.info("删除用户成功,ID: {}", id);}@Overridepublic Page<User> listUsers(Integer pageNum, Integer pageSize) {Page<User> page = new Page<>(pageNum, pageSize);return userMapper.selectPage(page, new QueryWrapper<>());}
}
UserMapper.java:
package com.example.canaldemo.mapper;import com.example.canaldemo.entity.User;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;/*** 用户Mapper接口** @author ken*/
@Mapper
public interface UserMapper extends BaseMapper<User> {
}
3.6 添加 MyBatis-Plus 分页插件
创建MyBatisPlusConfig.java:
package com.example.canaldemo.config;import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** MyBatis-Plus配置** @author ken*/
@Configuration
public class MyBatisPlusConfig {/*** 添加分页插件** @return MybatisPlusInterceptor实例*/@Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor() {MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));return interceptor;}
}
四、测试验证
4.1 启动应用
运行CanalDemoApplication的 main 方法启动应用,控制台会输出类似以下日志:
2023-11-05 10:00:00.000 INFO 12345 --- [ main] c.e.c.CanaldemoApplication : Canal客户端启动成功
2023-11-05 10:00:00.000 INFO 12345 --- [ main] c.e.c.CanaldemoApplication : Canal示例应用启动成功
4.2 测试数据变更
可以通过 Swagger 界面(http://localhost:8080/swagger-ui/index.html)或 Postman 等工具调用接口进行测试:
- 创建用户:
POST http://localhost:8080/api/users
Content-Type: application/json{"username": "testuser","password": "123456","nickname": "测试用户","age": 25,"email": "test@example.com"
}
应用控制台会输出 Canal 捕获到的新增事件:
2023-11-05 10:01:00.000 INFO 12345 --- [pool-1-thread-1] c.e.c.service.CanalClientService : 接收到数据变更,数据库:test_canal,表名:user,事件类型:INSERT
2023-11-05 10:01:00.000 INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler : 开始处理用户表变更,事件类型:INSERT,变更行数:1
2023-11-05 10:01:00.000 INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler : 新增用户:[{"age":25,"createTime":"2023-11-05T10:01:00","email":"test@example.com","id":1,"nickname":"测试用户","password":"123456","updateTime":"2023-11-05T10:01:00","username":"testuser"}]
- 更新用户:
PUT http://localhost:8080/api/users/1
Content-Type: application/json{"nickname": "更新后的测试用户","age": 26
}
应用控制台会输出 Canal 捕获到的更新事件:
2023-11-05 10:02:00.000 INFO 12345 --- [pool-1-thread-1] c.e.c.service.CanalClientService : 接收到数据变更,数据库:test_canal,表名:user,事件类型:UPDATE
2023-11-05 10:02:00.000 INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler : 开始处理用户表变更,事件类型:UPDATE,变更行数:1
2023-11-05 10:02:00.000 INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler : 更新用户,更新前:[{"age":25,"createTime":"2023-11-05T10:01:00","email":"test@example.com","id":1,"nickname":"测试用户","password":"123456","updateTime":"2023-11-05T10:01:00","username":"testuser"}],更新后:[{"age":26,"createTime":"2023-11-05T10:01:00","email":"test@example.com","id":1,"nickname":"更新后的测试用户","password":"123456","updateTime":"2023-11-05T10:02:00","username":"testuser"}]
- 删除用户:
DELETE http://localhost:8080/api/users/1
应用控制台会输出 Canal 捕获到的删除事件:
2023-11-05 10:03:00.000 INFO 12345 --- [pool-1-thread-1] c.e.c.service.CanalClientService : 接收到数据变更,数据库:test_canal,表名:user,事件类型:DELETE
2023-11-05 10:03:00.000 INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler : 开始处理用户表变更,事件类型:DELETE,变更行数:1
2023-11-05 10:03:00.000 INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler : 删除用户:[{"age":26,"createTime":"2023-11-05T10:01:00","email":"test@example.com","id":1,"nickname":"更新后的测试用户","password":"123456","updateTime":"2023-11-05T10:02:00","username":"testuser"}]
五、Canal 高级特性
5.1 数据过滤
Canal 支持通过配置文件进行数据过滤,只同步特定的数据库和表:
# 白名单配置,格式:数据库名.表名,多个用逗号分隔,支持通配符*
canal.instance.filter.regex=test_canal.user,test_canal.order_.*# 黑名单配置,格式同上
canal.instance.filter.black.regex=test_canal\\.user_log
也可以在客户端代码中进行过滤,如前面示例中的UserTableHandler只处理用户表的变更。
5.2 集群部署
在生产环境中,为了保证高可用,通常需要部署 Canal 集群。Canal 的集群部署主要有两种方式:
- 多 Canal 实例连接同一个 MySQL 主库,客户端通过负载均衡访问 Canal 服务。

- Canal 结合 ZooKeeper 实现 HA 模式,多个 Canal 实例通过 ZooKeeper 选举出一个主节点提供服务,其他节点作为备用。
5.3 数据同步到其他存储
Canal 不仅可以同步数据到应用程序,还可以直接同步到其他存储系统,如 Elasticsearch、Redis、Kafka 等。Canal 提供了相应的适配器(canal-adapter)来实现这些功能。
以同步到 Elasticsearch 为例,配置示例:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:_index: user_type: _doc_id: idsql: "select u.id, u.username, u.nickname, u.age, u.email from user u"etlCondition: "where u.id > {}"commitBatch: 3000
5.4 断点续传
Canal 支持断点续传功能,即当 Canal 客户端或服务端重启后,能够从上次处理的位置继续处理,而不是从头开始。这是通过记录 binlog 的文件名和位置实现的。
在客户端代码中,可以通过以下方式指定起始位置:
// 指定从某个binlog位置开始消费
canalConnector.seek(destination, journalName, position);
在服务端配置中,可以通过以下参数设置:
# 初始binlog文件名
canal.instance.master.journal.name=mysql-bin.000001
# 初始binlog位置
canal.instance.master.position=12345
六、性能优化
6.1 调整批量处理大小
在客户端代码中,可以调整每次获取的消息数量:
// 每次获取1000条消息
int batchSize = 1000;
Message message = canalConnector.getWithoutAck(batchSize);
合理设置 batchSize 可以提高处理效率,但不宜过大,以免占用过多内存。
6.2 异步处理
对于耗时的业务逻辑,可以采用异步处理的方式,避免阻塞 Canal 消息的消费:
// 使用线程池异步处理
executorService.submit(() -> {// 处理业务逻辑handleBusinessLogic(data);
});
6.3 索引优化
确保 MySQL 的 binlog 相关表有合适的索引,同时 Canal 客户端处理的目标存储(如 Elasticsearch、Redis)也需要有合理的索引设计,以提高查询和写入性能。
6.4 网络优化
Canal 服务端和客户端之间的网络延迟会影响数据同步性能,应尽量将它们部署在同一个局域网内,减少网络传输时间。
七、常见问题与解决方案
7.1 Canal 连接 MySQL 失败
问题现象:Canal 服务端日志中出现连接 MySQL 失败的错误。
可能原因:
- MySQL 地址或端口不正确
- 用户名或密码错误
- MySQL 未开启 binlog
- 网络不通
- MySQL 用户没有足够的权限
解决方案:
- 检查 MySQL 地址和端口是否正确
- 验证用户名和密码是否正确
- 确认 MySQL 已开启 binlog(
show variables like 'log_bin') - 检查网络连通性(
ping和telnet命令) - 确保用户有
SELECT, REPLICATION SLAVE, REPLICATION CLIENT权限
7.2 数据同步延迟
问题现象:Canal 同步的数据与 MySQL 中的数据存在明显延迟。
可能原因:
- MySQL binlog 生成延迟
- Canal 服务端处理速度慢
- 客户端处理逻辑耗时过长
- 网络传输延迟
- 系统资源不足(CPU、内存、磁盘 IO 等)
解决方案:
- 优化 MySQL 性能,确保 binlog 及时生成
- 增加 Canal 服务端实例
- 优化客户端处理逻辑,采用异步处理
- 优化网络环境
- 增加系统资源
7.3 数据同步丢失
问题现象:MySQL 中的部分数据变更没有被 Canal 捕获到。
可能原因:
- 过滤规则配置不当
- binlog 格式不是 ROW 模式
- Canal 服务端或客户端异常退出
- binlog 被清理
解决方案:
- 检查过滤规则,确保需要同步的表没有被过滤
- 确认 binlog 格式为 ROW 模式
- 确保客户端正确处理异常,及时提交 offset
- 合理设置 binlog 过期时间,避免过早清理
八、总结与展望
Canal 作为一款优秀的增量数据同步工具,凭借其稳定、高效、易用的特点,在分布式系统中得到了广泛应用。本文从原理、配置、开发实战、高级特性、性能优化和问题排查等方面全面介绍了 Canal 的使用。
通过 Canal,我们可以轻松实现 MySQL 数据到其他系统的实时同步,解决分布式系统中的数据一致性问题。无论是缓存更新、搜索引擎同步、数据仓库同步还是跨系统数据集成,Canal 都能发挥重要作用。
附录:参考资料
- Canal 官方文档
- MySQL 官方文档 - binlog
- Spring Boot 官方文档
- MyBatis-Plus 官方文档


