当前位置: 首页 > news >正文

Canal:让 MySQL 数据同步像呼吸一样自然

引言:为什么我们需要 Canal?

在当今的分布式系统架构中,数据一致性和实时性成为越来越重要的挑战。想象一下这样的场景:当用户在电商平台下单后,订单数据写入 MySQL 数据库,此时需要实时更新缓存、同步到搜索服务、推送消息给用户和商家,还要同步到数据仓库进行分析。如果这些操作都通过业务代码硬编码实现,不仅耦合度高,而且难以维护和扩展。

Canal 正是为解决这类问题而生的中间件。它就像一位 "数据搬运工",能够实时捕获 MySQL 数据库的变更,并将这些变更高效地同步到其他系统。本文将带你从理论到实践,全面掌握 Canal 的使用,让数据同步变得简单而高效。

一、Canal 核心原理揭秘

1.1 什么是 Canal?

Canal 是阿里巴巴开源的一款基于 MySQL 二进制日志(binlog)的增量数据同步工具。它最初是为了解决阿里巴巴内部跨机房数据同步问题而开发的,后来开源供社区使用。

简单来说,Canal 的工作原理就是模拟 MySQL 主从复制的过程:

  • 伪装成 MySQL 的从库(slave)
  • 向 MySQL 主库(master)发送 dump 协议
  • 接收并解析 MySQL 主库的 binlog
  • 将解析后的数据变更推送给消费者

1.2 MySQL binlog 简介

要理解 Canal,首先需要了解 MySQL 的 binlog。binlog 是 MySQL 服务器端的一种二进制日志,它记录了所有的数据变更操作(DDL 和 DML),并以事件的形式存储。

binlog 有三种格式:

  • STATEMENT:记录 SQL 语句,不记录数据,日志体积小,但可能存在数据不一致的问题
  • ROW:记录数据的变更,而不是 SQL 语句,日志体积大,但能保证数据一致性
  • MIXED:混合模式,MySQL 会根据情况自动选择使用 STATEMENT 或 ROW 模式

Canal 推荐使用 ROW 模式,因为这种模式可以明确记录每行数据的变更细节,最适合数据同步场景。

1.3 Canal 工作流程图

二、环境准备与配置

2.1 版本选择

本文将使用以下最新稳定版本:

  • MySQL:8.0.36
  • Canal:1.1.7
  • JDK:17
  • Spring Boot:3.2.5
  • MyBatis-Plus:3.5.6
  • Lombok:1.18.30
  • Fastjson2:2.0.47

2.2 MySQL 配置

首先需要配置 MySQL 以支持 binlog 和主从复制:

-- 查看当前MySQL的binlog配置
show variables like 'log_bin';
show variables like 'binlog_format';
show variables like 'server_id';-- 如果未开启binlog,需要修改my.cnf或my.ini配置文件
[mysqld]
# 开启binlog
log_bin=mysql-bin
# binlog格式为ROW
binlog_format=ROW
# 服务器ID,主从复制时需要,Canal也需要
server_id=1
# 需要同步的数据库,多个用逗号分隔
binlog_do_db=test_canal
# 不需要同步的数据库
binlog_ignore_db=mysql
# binlog过期时间,避免日志过大
expire_logs_days=7

配置完成后重启 MySQL,并创建 Canal 专用账号:

-- 创建用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'Canal@123456';
-- 授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;

2.3 Canal 服务端安装与配置

  1. 下载 Canal 服务端:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
  1. 解压:
mkdir -p /opt/canal
tar -zxvf canal.deployer-1.1.7.tar.gz -C /opt/canal
  1. 修改配置文件/opt/canal/conf/example/instance.properties
# MySQL主库地址
canal.instance.master.address=127.0.0.1:3306
# 用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 起始同步的binlog位置,如果是第一次同步可以不填,默认从最新位置开始
canal.instance.master.journal.name=
canal.instance.master.position=
# 需要监听的数据库,多个用逗号分隔
canal.instance.defaultDatabaseName=test_canal
# 表过滤规则,格式:数据库名.表名,支持通配符*
canal.instance.filter.regex=test_canal\\..*
  1. 启动 Canal 服务:
/opt/canal/bin/startup.sh
  1. 查看日志确认启动成功:
tail -f /opt/canal/logs/canal/canal.log
tail -f /opt/canal/logs/example/example.log

三、Canal 客户端开发实战

3.1 项目初始化

创建一个 Spring Boot 项目,添加以下依赖到pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/></parent><groupId>com.example</groupId><artifactId>canal-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>canal-demo</name><description>Canal实战示例</description><properties><java.version>17</java.version><canal.version>1.1.7</canal.version><mybatis-plus.version>3.5.6</mybatis-plus.version><fastjson2.version>2.0.47</fastjson2.version><lombok.version>1.18.30</lombok.version><swagger.version>3.0.0</swagger.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Canal客户端 --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>${canal.version}</version></dependency><!-- MyBatis-Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><!-- MySQL驱动 --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><!-- Fastjson2 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>${fastjson2.version}</version></dependency><!-- Swagger3 --><dependency><groupId>io.springfox</groupId><artifactId>springfox-boot-starter</artifactId><version>${swagger.version}</version></dependency><!-- Google Collections --><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>33.1.0-jre</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>

3.2 配置文件

创建application.yml配置文件:

spring:datasource:url: jdbc:mysql://localhost:3306/test_canal?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Drivermybatis-plus:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.example.canaldemo.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImplcanal:server: 127.0.0.1:11111destination: exampleusername:password:server:port: 8080logging:level:com.example.canaldemo: info

3.3 创建数据库和实体类

首先创建测试数据库:

CREATE DATABASE IF NOT EXISTS test_canal CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE test_canal;-- 创建用户表
CREATE TABLE `user` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',`username` varchar(50) NOT NULL COMMENT '用户名',`password` varchar(100) NOT NULL COMMENT '密码',`nickname` varchar(50) DEFAULT NULL COMMENT '昵称',`age` int DEFAULT NULL COMMENT '年龄',`email` varchar(100) DEFAULT NULL COMMENT '邮箱',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表';

创建对应的实体类User.java

package com.example.canaldemo.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;/*** 用户实体类** @author ken*/
@Data
@TableName("user")
@Schema(description = "用户实体")
public class User {@TableId(type = IdType.AUTO)@Schema(description = "主键ID")private Long id;@TableField("username")@Schema(description = "用户名")private String username;@TableField("password")@Schema(description = "密码")private String password;@TableField("nickname")@Schema(description = "昵称")private String nickname;@TableField("age")@Schema(description = "年龄")private Integer age;@TableField("email")@Schema(description = "邮箱")private String email;@TableField("create_time")@Schema(description = "创建时间")private LocalDateTime createTime;@TableField("update_time")@Schema(description = "更新时间")private LocalDateTime updateTime;
}

3.4 编写 Canal 客户端核心代码

创建 Canal 客户端配置类CanalClientConfig.java

package com.example.canaldemo.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;/*** Canal客户端配置** @author ken*/
@Configuration
public class CanalClientConfig {@Value("${canal.server}")private String canalServer;@Value("${canal.destination}")private String destination;@Value("${canal.username}")private String username;@Value("${canal.password}")private String password;/*** 创建Canal连接器** @return CanalConnector实例*/@Beanpublic CanalConnector canalConnector() {String[] serverAddress = canalServer.split(":");String host = serverAddress[0];int port = Integer.parseInt(serverAddress[1]);return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port),destination,username,password);}
}

创建 Canal 消息处理器接口CanalMessageHandler.java

package com.example.canaldemo.handler;import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.List;/*** Canal消息处理器接口** @author ken*/
public interface CanalMessageHandler {/*** 处理Canal消息** @param tableName 表名* @param eventType 事件类型* @param rowDatas 行数据列表*/void handle(String tableName, CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDatas);
}

创建用户表消息处理器UserTableHandler.java

package com.example.canaldemo.handler.impl;import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.example.canaldemo.entity.User;
import com.example.canaldemo.handler.CanalMessageHandler;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;/*** 用户表数据变更处理器** @author ken*/
@Slf4j
@Component
public class UserTableHandler implements CanalMessageHandler {private static final String TABLE_NAME = "user";@Overridepublic void handle(String tableName, CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDatas) {// 只处理用户表的变更if (!TABLE_NAME.equals(tableName)) {return;}log.info("开始处理用户表变更,事件类型:{},变更行数:{}", eventType, rowDatas.size());switch (eventType) {case INSERT:handleInsert(rowDatas);break;case UPDATE:handleUpdate(rowDatas);break;case DELETE:handleDelete(rowDatas);break;default:log.info("不处理的事件类型:{}", eventType);}}/*** 处理插入事件** @param rowDatas 行数据列表*/private void handleInsert(List<CanalEntry.RowData> rowDatas) {List<User> users = Lists.newArrayList();for (CanalEntry.RowData rowData : rowDatas) {Map<String, String> dataMap = rowData.getAfterColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));User user = JSON.parseObject(JSON.toJSONString(dataMap), User.class);users.add(user);}if (!CollectionUtils.isEmpty(users)) {log.info("新增用户:{}", JSON.toJSONString(users));// 这里可以添加同步到缓存、搜索等业务逻辑}}/*** 处理更新事件** @param rowDatas 行数据列表*/private void handleUpdate(List<CanalEntry.RowData> rowDatas) {for (CanalEntry.RowData rowData : rowDatas) {Map<String, String> beforeMap = rowData.getBeforeColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));Map<String, String> afterMap = rowData.getAfterColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));User beforeUser = JSON.parseObject(JSON.toJSONString(beforeMap), User.class);User afterUser = JSON.parseObject(JSON.toJSONString(afterMap), User.class);log.info("更新用户,更新前:{},更新后:{}", JSON.toJSONString(beforeUser), JSON.toJSONString(afterUser));// 这里可以添加同步到缓存、搜索等业务逻辑}}/*** 处理删除事件** @param rowDatas 行数据列表*/private void handleDelete(List<CanalEntry.RowData> rowDatas) {List<User> users = Lists.newArrayList();for (CanalEntry.RowData rowData : rowDatas) {Map<String, String> dataMap = rowData.getBeforeColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));User user = JSON.parseObject(JSON.toJSONString(dataMap), User.class);users.add(user);}if (!CollectionUtils.isEmpty(users)) {log.info("删除用户:{}", JSON.toJSONString(users));// 这里可以添加从缓存、搜索等删除的业务逻辑}}
}

创建 Canal 客户端服务CanalClientService.java

package com.example.canaldemo.service;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canaldemo.handler.CanalMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** Canal客户端服务** @author ken*/
@Slf4j
@Service
public class CanalClientService {private final CanalConnector canalConnector;private final List<CanalMessageHandler> messageHandlers;private final ExecutorService executorService = Executors.newSingleThreadExecutor();private volatile boolean running = false;@Autowiredpublic CanalClientService(CanalConnector canalConnector, List<CanalMessageHandler> messageHandlers) {this.canalConnector = canalConnector;this.messageHandlers = messageHandlers;}/*** 启动Canal客户端*/public void start() {if (running) {log.warn("Canal客户端已经在运行中");return;}running = true;executorService.execute(this::process);log.info("Canal客户端启动成功");}/*** 停止Canal客户端*/public void stop() {if (!running) {log.warn("Canal客户端已经停止");return;}running = false;executorService.shutdown();canalConnector.disconnect();log.info("Canal客户端停止成功");}/*** 处理Canal消息*/private void process() {int batchSize = 1000;try {// 连接Canal服务端canalConnector.connect();// 订阅所有消息canalConnector.subscribe();// 回滚到上次处理成功的位置canalConnector.rollback();while (running) {// 获取消息Message message = canalConnector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {// 没有消息,休眠一段时间try {Thread.sleep(1000);} catch (InterruptedException e) {// 忽略中断异常}} else {// 处理消息processMessage(message.getEntries());// 确认消息已处理canalConnector.ack(batchId);}}} catch (Exception e) {log.error("Canal客户端处理消息异常", e);try {// 发生异常时回滚canalConnector.rollback();} catch (Exception ex) {log.error("Canal客户端回滚异常", ex);}} finally {canalConnector.disconnect();}}/*** 处理消息条目** @param entries 消息条目列表*/private void processMessage(List<CanalEntry.Entry> entries) {for (CanalEntry.Entry entry : entries) {// 忽略事务开始和结束的消息if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChange;try {// 解析消息内容rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {log.error("解析Canal消息失败,entry: {}", entry, e);continue;}// 获取数据库名和表名String databaseName = entry.getHeader().getSchemaName();String tableName = entry.getHeader().getTableName();CanalEntry.EventType eventType = rowChange.getEventType();log.info("接收到数据变更,数据库:{},表名:{},事件类型:{}", databaseName, tableName, eventType);// 处理行数据变更List<CanalEntry.RowData> rowDatas = rowChange.getRowDatasList();if (!CollectionUtils.isEmpty(rowDatas) && !CollectionUtils.isEmpty(messageHandlers)) {for (CanalMessageHandler handler : messageHandlers) {try {handler.handle(tableName, eventType, rowDatas);} catch (Exception e) {log.error("处理Canal消息异常,handler: {}", handler.getClass().getName(), e);}}}}}
}

创建启动类CanalDemoApplication.java

package com.example.canaldemo;import com.example.canaldemo.service.CanalClientService;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** Canal示例应用启动类** @author ken*/
@Slf4j
@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
@OpenAPIDefinition(info = @Info(title = "Canal实战示例API", version = "1.0", description = "Canal实战示例接口文档"))
public class CanalDemoApplication implements CommandLineRunner {private final CanalClientService canalClientService;@Autowiredpublic CanalDemoApplication(CanalClientService canalClientService) {this.canalClientService = canalClientService;}public static void main(String[] args) {SpringApplication.run(CanalDemoApplication.class, args);}@Overridepublic void run(String... args) {// 启动Canal客户端canalClientService.start();log.info("Canal示例应用启动成功");}
}

3.5 创建测试接口

为了方便测试,创建用户相关的 Controller、Service 和 Mapper:

UserController.java

package com.example.canaldemo.controller;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.canaldemo.entity.User;
import com.example.canaldemo.service.UserService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;/*** 用户控制器** @author ken*/
@RestController
@RequestMapping("/api/users")
@Tag(name = "用户管理", description = "用户CRUD接口")
public class UserController {private final UserService userService;@Autowiredpublic UserController(UserService userService) {this.userService = userService;}@PostMapping@Operation(summary = "创建用户", description = "新增一个用户")public ResponseEntity<User> createUser(@RequestBody User user) {StringUtils.hasText(user.getUsername(), "用户名不能为空");StringUtils.hasText(user.getPassword(), "密码不能为空");User savedUser = userService.saveUser(user);return ResponseEntity.ok(savedUser);}@GetMapping("/{id}")@Operation(summary = "查询用户", description = "根据ID查询用户详情")public ResponseEntity<User> getUserById(@Parameter(description = "用户ID", required = true)@PathVariable Long id) {User user = userService.getUserById(id);return ResponseEntity.ok(user);}@PutMapping("/{id}")@Operation(summary = "更新用户", description = "根据ID更新用户信息")public ResponseEntity<User> updateUser(@Parameter(description = "用户ID", required = true)@PathVariable Long id,@RequestBody User user) {user.setId(id);User updatedUser = userService.updateUser(user);return ResponseEntity.ok(updatedUser);}@DeleteMapping("/{id}")@Operation(summary = "删除用户", description = "根据ID删除用户")public ResponseEntity<Void> deleteUser(@Parameter(description = "用户ID", required = true)@PathVariable Long id) {userService.deleteUser(id);return ResponseEntity.noContent().build();}@GetMapping@Operation(summary = "分页查询用户", description = "分页查询用户列表")public ResponseEntity<Page<User>> listUsers(@Parameter(description = "页码,从1开始")@RequestParam(defaultValue = "1") Integer pageNum,@Parameter(description = "每页条数")@RequestParam(defaultValue = "10") Integer pageSize) {Page<User> page = userService.listUsers(pageNum, pageSize);return ResponseEntity.ok(page);}
}

UserService.java

package com.example.canaldemo.service;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.canaldemo.entity.User;/*** 用户服务接口** @author ken*/
public interface UserService {/*** 保存用户** @param user 用户实体* @return 保存后的用户实体*/User saveUser(User user);/*** 根据ID查询用户** @param id 用户ID* @return 用户实体*/User getUserById(Long id);/*** 更新用户** @param user 用户实体* @return 更新后的用户实体*/User updateUser(User user);/*** 根据ID删除用户** @param id 用户ID*/void deleteUser(Long id);/*** 分页查询用户** @param pageNum 页码* @param pageSize 每页条数* @return 用户分页列表*/Page<User> listUsers(Integer pageNum, Integer pageSize);
}

UserServiceImpl.java

package com.example.canaldemo.service.impl;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.canaldemo.entity.User;
import com.example.canaldemo.mapper.UserMapper;
import com.example.canaldemo.service.UserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;/*** 用户服务实现类** @author ken*/
@Slf4j
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {private final UserMapper userMapper;@Autowiredpublic UserServiceImpl(UserMapper userMapper) {this.userMapper = userMapper;}@Overridepublic User saveUser(User user) {userMapper.insert(user);log.info("保存用户成功,ID: {}", user.getId());return user;}@Overridepublic User getUserById(Long id) {User user = userMapper.selectById(id);if (ObjectUtils.isEmpty(user)) {log.warn("用户不存在,ID: {}", id);}return user;}@Overridepublic User updateUser(User user) {userMapper.updateById(user);log.info("更新用户成功,ID: {}", user.getId());return userMapper.selectById(user.getId());}@Overridepublic void deleteUser(Long id) {userMapper.deleteById(id);log.info("删除用户成功,ID: {}", id);}@Overridepublic Page<User> listUsers(Integer pageNum, Integer pageSize) {Page<User> page = new Page<>(pageNum, pageSize);return userMapper.selectPage(page, new QueryWrapper<>());}
}

UserMapper.java

package com.example.canaldemo.mapper;import com.example.canaldemo.entity.User;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;/*** 用户Mapper接口** @author ken*/
@Mapper
public interface UserMapper extends BaseMapper<User> {
}

3.6 添加 MyBatis-Plus 分页插件

创建MyBatisPlusConfig.java

package com.example.canaldemo.config;import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** MyBatis-Plus配置** @author ken*/
@Configuration
public class MyBatisPlusConfig {/*** 添加分页插件** @return MybatisPlusInterceptor实例*/@Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor() {MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));return interceptor;}
}

四、测试验证

4.1 启动应用

运行CanalDemoApplication的 main 方法启动应用,控制台会输出类似以下日志:

2023-11-05 10:00:00.000  INFO 12345 --- [           main] c.e.c.CanaldemoApplication               : Canal客户端启动成功
2023-11-05 10:00:00.000  INFO 12345 --- [           main] c.e.c.CanaldemoApplication               : Canal示例应用启动成功

4.2 测试数据变更

可以通过 Swagger 界面(http://localhost:8080/swagger-ui/index.html)或 Postman 等工具调用接口进行测试:

  1. 创建用户:
POST http://localhost:8080/api/users
Content-Type: application/json{"username": "testuser","password": "123456","nickname": "测试用户","age": 25,"email": "test@example.com"
}

应用控制台会输出 Canal 捕获到的新增事件:

2023-11-05 10:01:00.000  INFO 12345 --- [pool-1-thread-1] c.e.c.service.CanalClientService         : 接收到数据变更,数据库:test_canal,表名:user,事件类型:INSERT
2023-11-05 10:01:00.000  INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler               : 开始处理用户表变更,事件类型:INSERT,变更行数:1
2023-11-05 10:01:00.000  INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler               : 新增用户:[{"age":25,"createTime":"2023-11-05T10:01:00","email":"test@example.com","id":1,"nickname":"测试用户","password":"123456","updateTime":"2023-11-05T10:01:00","username":"testuser"}]
  1. 更新用户:
PUT http://localhost:8080/api/users/1
Content-Type: application/json{"nickname": "更新后的测试用户","age": 26
}

应用控制台会输出 Canal 捕获到的更新事件:

2023-11-05 10:02:00.000  INFO 12345 --- [pool-1-thread-1] c.e.c.service.CanalClientService         : 接收到数据变更,数据库:test_canal,表名:user,事件类型:UPDATE
2023-11-05 10:02:00.000  INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler               : 开始处理用户表变更,事件类型:UPDATE,变更行数:1
2023-11-05 10:02:00.000  INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler               : 更新用户,更新前:[{"age":25,"createTime":"2023-11-05T10:01:00","email":"test@example.com","id":1,"nickname":"测试用户","password":"123456","updateTime":"2023-11-05T10:01:00","username":"testuser"}],更新后:[{"age":26,"createTime":"2023-11-05T10:01:00","email":"test@example.com","id":1,"nickname":"更新后的测试用户","password":"123456","updateTime":"2023-11-05T10:02:00","username":"testuser"}]
  1. 删除用户:
DELETE http://localhost:8080/api/users/1

应用控制台会输出 Canal 捕获到的删除事件:

2023-11-05 10:03:00.000  INFO 12345 --- [pool-1-thread-1] c.e.c.service.CanalClientService         : 接收到数据变更,数据库:test_canal,表名:user,事件类型:DELETE
2023-11-05 10:03:00.000  INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler               : 开始处理用户表变更,事件类型:DELETE,变更行数:1
2023-11-05 10:03:00.000  INFO 12345 --- [pool-1-thread-1] c.e.c.h.i.UserTableHandler               : 删除用户:[{"age":26,"createTime":"2023-11-05T10:01:00","email":"test@example.com","id":1,"nickname":"更新后的测试用户","password":"123456","updateTime":"2023-11-05T10:02:00","username":"testuser"}]

五、Canal 高级特性

5.1 数据过滤

Canal 支持通过配置文件进行数据过滤,只同步特定的数据库和表:

# 白名单配置,格式:数据库名.表名,多个用逗号分隔,支持通配符*
canal.instance.filter.regex=test_canal.user,test_canal.order_.*# 黑名单配置,格式同上
canal.instance.filter.black.regex=test_canal\\.user_log

也可以在客户端代码中进行过滤,如前面示例中的UserTableHandler只处理用户表的变更。

5.2 集群部署

在生产环境中,为了保证高可用,通常需要部署 Canal 集群。Canal 的集群部署主要有两种方式:

  1. 多 Canal 实例连接同一个 MySQL 主库,客户端通过负载均衡访问 Canal 服务。

  1. Canal 结合 ZooKeeper 实现 HA 模式,多个 Canal 实例通过 ZooKeeper 选举出一个主节点提供服务,其他节点作为备用。

5.3 数据同步到其他存储

Canal 不仅可以同步数据到应用程序,还可以直接同步到其他存储系统,如 Elasticsearch、Redis、Kafka 等。Canal 提供了相应的适配器(canal-adapter)来实现这些功能。

以同步到 Elasticsearch 为例,配置示例:

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:_index: user_type: _doc_id: idsql: "select u.id, u.username, u.nickname, u.age, u.email from user u"etlCondition: "where u.id > {}"commitBatch: 3000

5.4 断点续传

Canal 支持断点续传功能,即当 Canal 客户端或服务端重启后,能够从上次处理的位置继续处理,而不是从头开始。这是通过记录 binlog 的文件名和位置实现的。

在客户端代码中,可以通过以下方式指定起始位置:

// 指定从某个binlog位置开始消费
canalConnector.seek(destination, journalName, position);

在服务端配置中,可以通过以下参数设置:

# 初始binlog文件名
canal.instance.master.journal.name=mysql-bin.000001
# 初始binlog位置
canal.instance.master.position=12345

六、性能优化

6.1 调整批量处理大小

在客户端代码中,可以调整每次获取的消息数量:

// 每次获取1000条消息
int batchSize = 1000;
Message message = canalConnector.getWithoutAck(batchSize);

合理设置 batchSize 可以提高处理效率,但不宜过大,以免占用过多内存。

6.2 异步处理

对于耗时的业务逻辑,可以采用异步处理的方式,避免阻塞 Canal 消息的消费:

// 使用线程池异步处理
executorService.submit(() -> {// 处理业务逻辑handleBusinessLogic(data);
});

6.3 索引优化

确保 MySQL 的 binlog 相关表有合适的索引,同时 Canal 客户端处理的目标存储(如 Elasticsearch、Redis)也需要有合理的索引设计,以提高查询和写入性能。

6.4 网络优化

Canal 服务端和客户端之间的网络延迟会影响数据同步性能,应尽量将它们部署在同一个局域网内,减少网络传输时间。

七、常见问题与解决方案

7.1 Canal 连接 MySQL 失败

问题现象:Canal 服务端日志中出现连接 MySQL 失败的错误。

可能原因

  1. MySQL 地址或端口不正确
  2. 用户名或密码错误
  3. MySQL 未开启 binlog
  4. 网络不通
  5. MySQL 用户没有足够的权限

解决方案

  1. 检查 MySQL 地址和端口是否正确
  2. 验证用户名和密码是否正确
  3. 确认 MySQL 已开启 binlog(show variables like 'log_bin'
  4. 检查网络连通性(pingtelnet命令)
  5. 确保用户有SELECT, REPLICATION SLAVE, REPLICATION CLIENT权限

7.2 数据同步延迟

问题现象:Canal 同步的数据与 MySQL 中的数据存在明显延迟。

可能原因

  1. MySQL binlog 生成延迟
  2. Canal 服务端处理速度慢
  3. 客户端处理逻辑耗时过长
  4. 网络传输延迟
  5. 系统资源不足(CPU、内存、磁盘 IO 等)

解决方案

  1. 优化 MySQL 性能,确保 binlog 及时生成
  2. 增加 Canal 服务端实例
  3. 优化客户端处理逻辑,采用异步处理
  4. 优化网络环境
  5. 增加系统资源

7.3 数据同步丢失

问题现象:MySQL 中的部分数据变更没有被 Canal 捕获到。

可能原因

  1. 过滤规则配置不当
  2. binlog 格式不是 ROW 模式
  3. Canal 服务端或客户端异常退出
  4. binlog 被清理

解决方案

  1. 检查过滤规则,确保需要同步的表没有被过滤
  2. 确认 binlog 格式为 ROW 模式
  3. 确保客户端正确处理异常,及时提交 offset
  4. 合理设置 binlog 过期时间,避免过早清理

八、总结与展望

Canal 作为一款优秀的增量数据同步工具,凭借其稳定、高效、易用的特点,在分布式系统中得到了广泛应用。本文从原理、配置、开发实战、高级特性、性能优化和问题排查等方面全面介绍了 Canal 的使用。

通过 Canal,我们可以轻松实现 MySQL 数据到其他系统的实时同步,解决分布式系统中的数据一致性问题。无论是缓存更新、搜索引擎同步、数据仓库同步还是跨系统数据集成,Canal 都能发挥重要作用。

附录:参考资料

  1. Canal 官方文档
  2. MySQL 官方文档 - binlog
  3. Spring Boot 官方文档
  4. MyBatis-Plus 官方文档
http://www.dtcms.com/a/573897.html

相关文章:

  • PE工具开发实战:非正向与安全解析
  • 进一步加强网站建设管理郑州app拉新项目
  • 徐州百度推广总代理台州专业关键词优化
  • C++ 中模板元编程与 SFINAE 机制
  • 【实用运维工具】一键完成Oracle数据库的健康巡检,生成WORD报告
  • ICLR 2025 Spotlight | 打破AI“黑箱”!最新IIS评分框架:揭示模型性能与可解释性的共生关系
  • Java实现国密算法 SM2 /SM3 /SM4(基于 BouncyCastle)
  • 文件网站建设高新公司网站建设哪家好
  • Zabbix企业级分布式监控系统(上)
  • 深度解析:接口性能优化实战指南
  • 站点创建成功html代码hr表示
  • 一、Netty-高并发IO底层原理(5种主要的IO模型)
  • 网站开发外包 验收c2c有哪些网站
  • Jenkins是什么
  • 建网站公司浩森宇特大连网络营销师招聘网
  • LeetCode - 杨辉三角 / 二叉树的最大深度
  • RV1126 NO.39:OPENCV查找图形轮廓并画框
  • 【第1章>第3节】基于FPGA的图像腐蚀处理算法的Verilog实现
  • 西安电子科技大学信息化建设处网站模板建站排版跟没缓存好似的
  • 神经网络—— 学习与感知器(细节补充)
  • tensorflow 图像分类 之一
  • 自己网站上做淘宝搜索引擎网站开发属于什么行业
  • 查询网站备案号网站如何做免费的推广
  • 编写一个DXE driver 提供遍历pcie设备配置空间的protocol
  • 随笔之工作方法的“术”
  • 淘宝上做进出口网站有哪些我男同同性做视频网站
  • LLM中的选择性注意:从人类聚焦到模型聚焦
  • 从成本到战略:金仓 KingbaseES 的多维度优势与企业数据库选型的核心考量
  • 做pc网站排wordpress载入慢
  • Java注解在Spring Boot中的详细原理与使用情况解析