MySQL数据同步之Canal讲解
文章目录
- 1 Canal搭建
- 1.1 简介
- 1.1.1 概述
- 1.1.2 优点
- 1.1.3 作用&核心组件
- 1.2 搭建 Canal
- 1.2.1 准备工作
- 1.2.1.1 检查配置
- 1.2.1.2 MySQL配置
- 1.2.2 下载并安装 Canal
- 1.2.3 配置 Canal Server
- 1.2.3.1 全局配置
- 1.2.3.2 实例配置
- 1.2.3.3 配置目标系统
- 1.2.3.4 配置 Canal Adapter
- 1.2.4 启动服务
- 1.2.5 使用 Docker 部署 Canal
- 1.3 注意和问题
- 1.3.1 注意事项
- 1.3.2 常见问题
- 1.3.3 为什么既要全局配置又要 实例配置
- 1.3.3.1 Canal核心模型
- 1.3.3.2 支持多实例运行
- 1.3.3.3 模块化与职责分离
- 2 SpringBoot结合使用
- 2.1 pom和配置
- 2.2 实体和mapper
- 2.2.1 实体类
- 2.2.2 sql文件
- 2.2.3 交易Mapper接口
- 2.3 Canal监听器类
- 2.4 测试
1 Canal搭建
1.1 简介
1.1.1 概述
Canal
是阿里巴巴开源的一个用于高效抓取 MySQL
数据库增量变更日志(binlog
)并进行处理的中间件。它可以将 MySQL
的 binlog
解析为结构化的 JSON
格式,并提供多种方式将这些数据推送到下游系统。
1.1.2 优点
为什么选择Canal?
- 实时性:
Canal
基于MySQL
的binlog
机制,能够在毫秒级内完成数据同步。 - 批量获取数据:
Canal
支持批量获取数据库变更数据,减少网络开销和处理时间。 - 多线程处理:
Canal
可以配置多线程来处理不同的数据变更事件,提高整体吞吐量。 - 断点续传:
Canal
支持从断点继续消费数据,确保数据不会丢失。 - 持久化存储:
Canal
可以将消费进度持久化到ZooKeeper
中,保证在故障恢复后能够继续正常工作。 - 容错机制:
Canal
内置了多种容错机制,如重试策略和自动恢复功能,提高了系统的可靠性。 - 标准协议:
Canal
使用标准化的binlog
协议,易于与其他系统集成。 - 过滤机制:
Canal
支持灵活的过滤规则,可以选择性地订阅特定的数据库和表。 - 动态配置:
Canal
支持动态配置,可以根据实际需求调整监控范围和处理逻辑。 - 自定义处理:
Canal
允许开发者编写自定义的处理器,实现复杂的数据处理逻辑。 - 精确同步:
Canal
能够精确地捕获和同步数据库的每一行变更,确保数据的一致性。 - 事务支持:
Canal
能够处理复杂的事务场景,确保事务的原子性和完整性。 - 冲突解决:
Canal
提供了多种冲突解决策略,避免数据同步过程中的冲突问题。
1.1.3 作用&核心组件
Canal
的作用:Canal
通过伪装成 MySQL
的从库(slave
),从 MySQL
主库(master
)接收 binlog
(二进制日志),解析增量变更数据(插入、更新、删除等),并将其推送或存储到目标系统(如 Kafka、Redis、Elasticsearch 等)。
核心组件:
Canal Server
:负责连接MySQL
,解析binlog
并提供数据订阅服务。Canal Adapter
(可选):用于将Canal Server
的数据直接写入目标存储(如Elasticsearch
、HBase
、MySQL
等)。Canal Client
(可选):如果需要自定义消费逻辑,可以开发客户端代码来订阅Canal Server
的数据。- 场景需求:如果业务需要实时同步
MySQL
数据(例如同步到 Redis 缓存、Elasticsearch 搜索、Kafka 消息队列等),Canal
是一个轻量且高效的解决方案。
1.2 搭建 Canal
1.2.1 准备工作
1.2.1.1 检查配置
在搭建 Canal
之前,需要确保以下条件:
MySQL
配置:
MySQL
必须开启binlog
,且binlog
格式为ROW
(行模式)。
MySQL
需要有一个用户账号,具备SELECT、REPLICATION SLAVE、REPLICATION CLIENT
等权限,用于Canal
连接。- 环境要求:
Java
环境:Canal
是Java
开发的,需要JDK 1.8
或更高版本。
服务器:一台运行Canal Server
的服务器(可以是 ECS 实例或本地机器)。
目标系统:确保目标系统(如 Redis、Kafka、Elasticsearch)已部署并可访问。
1.2.1.2 MySQL配置
MySQL 配置步骤:
- 检查 binlog 是否启用:
SHOW VARIABLES LIKE 'log_bin';
,如果显示OFF
,需要启用binlog
。 - 修改
MySQL
配置文件(通常是/etc/my.cnf
或/etc/mysql/my.cnf
):
[mysqld]
log-bin=mysql-bin # 启用 binlog
binlog-format=ROW # 设置 binlog 格式为 ROW
server-id=1 # 唯一服务器 ID,不能与 Canal 的 slaveId 重复
保存后重启 MySQL:service mysql restart
创建 Canal 用户并授权:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
1.2.2 下载并安装 Canal
Canal
主要包括 canal.deployer(Canal Server)
,如果需要直接同步到目标存储,可能还需要 canal.adapter
。
访问 Canal
官方 GitHub 发布页面:https://github.com/alibaba/canal/releases
下载最新版本的 canal.deployer
(例如 canal.deployer-1.1.6.tar.gz):
cd ~
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
如果需要 Canal Adapter,下载 canal.adapter 包:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.adapter-1.1.6.tar.gz
解压 canal.deployer:
mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal
解压 canal.adapter:
mkdir -p /usr/local/canal-adapter
tar -zxvf canal.adapter-1.1.6.tar.gz -C /usr/local/canal-adapter
1.2.3 配置 Canal Server
Canal Server
的主要配置文件位于 conf/
目录下,包括 canal.properties
(全局配置)和 conf/example/instance.properties
(实例配置)。
1.2.3.1 全局配置
修改全局配置(conf/canal.properties)
打开文件
vi /usr/local/canal/conf/canal.properties确保以下关键配置正确:
# Canal Server 的 ID,Canal Server 的全局标识
canal.id = 1
# Canal 监听的 IP 和端口
canal.ip = 0.0.0.0
canal.port = 11111
# Zookeeper 地址,用于集群模式
canal.zkServers =
# 实例目录
canal.destinations = example
1.2.3.2 实例配置
修改实例配置(conf/example/instance.properties):
打开文件:
vi /usr/local/canal/conf/example/instance.properties配置 MySQL 连接信息和同步规则:
# MySQL 主库地址
canal.instance.master.address = 192.168.XX.XX:3306
# slaveId 是 Canal 实例伪装成 MySQL 从库时的标识,必须与 MySQL 主库的 server-id不同
canal.instance.slaveId = 1234
# MySQL 用户名和密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# 监听的数据库和表(正则表达式)
canal.instance.filter.regex = .*\\..* # 监听所有库的所有表
# 同步到的目标(如 Kafka 主题,留空如果直接用 Adapter)
canal.mq.topic =
关于 canal.instance.filter.regex
:
- 监听所有表:
.*\\..*
- 监听特定库:
mydb\\..*
- 监听特定表:
mydb\\.mytable
- 多个规则用逗号分隔,例如:
mydb\\.table1,mydb\\.table2
注意
:slaveId
是 Canal
实例伪装成 MySQL
从库时的标识,必须与 MySQL
主库的 server-id
(以及其他从库的 server-id
)不同,否则会导致主从复制冲突。如果未显式配置 slaveId
,Canal
会使用 canal.id
作为默认 slaveId
,这可能导致与 MySQL
的 server-id
冲突
1.2.3.3 配置目标系统
如果同步到 Kafka,需要配置 Kafka 主题:
canal.mq.topic = mysql_test
canal.mq.partition = 0
1.2.3.4 配置 Canal Adapter
如果需要将数据直接同步到目标存储(如 Elasticsearch、Redis、MySQL),需要配置 Canal Adapter
。
修改 Adapter
全局配置(/usr/local/canal-adapter/conf/application.yml
):
canal:#Canal Server 的地址和端口server: canal-server:11111#与 canal.properties 中的 canal.destinations 一致destination: example
为目标系统添加配置:
在 /usr/local/canal-adapter/conf/
下创建目标系统的配置文件,例如同步到 Elasticsearch
:
创建 es7/user.yml
(假设同步 user 表到 Elasticsearch 的 user 索引):
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:_index: user_id: user_idsql: "SELECT id AS user_id, username, fullname FROM user"commitBatch: 3000
确保目标系统驱动:
如果同步到 MySQL 8.x
或 Elasticsearch 7.x
,可能需要更新驱动。例如,替换 MySQL 驱动:
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.29.zip
unzip mysql-connector-java-8.0.29.zip
mv mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar /usr/local/canal-adapter/lib/
1.2.4 启动服务
启动 Canal Server
cd /usr/local/canal
./bin/startup.sh查看日志文件以确保启动成功:
tail -f logs/canal/canal.log
tail -f logs/example/example.log
启动 Canal Adapter:
cd /usr/local/canal-adapter
./bin/startup.sh检查 Adapter 日志:
tail -f logs/adapter/adapter.log
1.2.5 使用 Docker 部署 Canal
如果使用 Docker,可以简化部署流程:
拉取 Canal 镜像:
docker pull canal/canal-server:v1.1.6运行 Canal 容器:
docker run -p 11111:11111 --name canal -d canal/canal-server:v1.1.6进入容器配置:
docker exec -it canal bash
vi /home/admin/canal-server/conf/example/instance.properties
1.3 注意和问题
1.3.1 注意事项
binlog
格式:
确保MySQL
的binlog-format
为ROW
,否则Canal
无法正确解析数据。- 权限:
Canal
用户需要REPLICATION SLAVE
和REPLICATION CLIENT
权限,否则无法接收 binlog。 - 网络和防火墙:
确保 Canal Server 能连接到 MySQL(默认端口 3306)和目标系统。
如果在云环境,配置安全组规则开放 Canal 的端口(默认 11111)。 - 性能优化:
对于高并发场景,考虑部署 Canal 集群,并使用 Kafka 确保消息顺序。
调整 commitBatch(Adapter 配置)以平衡性能和实时性。 - 版本兼容性:
确保 Canal 版本与 MySQL 和目标系统兼容。例如,Canal 1.1.4 对 MySQL 8.x 的驱动可能需要手动更新。 - 监控和维护:
定期检查 Canal 日志,防止因异常(如 ECS 重启)导致同步中断。
考虑使用 Canal Admin 提供 Web 管理界面,便于配置和监控。
1.3.2 常见问题
Canal
支持全量同步吗?
Canal
默认只支持增量同步
(基于binlog
)。如果需要全量同步,可以结合DataX
或其他工具先同步全量数据,再用Canal
同步增量数据。
如何同步到多个目标?
配置多个
Canal Adapter
或开发Canal Client
,分别处理不同的目标系统。或者通过Kafka
作为中间件,多个消费者订阅同一主题。
1.3.3 为什么既要全局配置又要 实例配置
1.3.3.1 Canal核心模型
Canal
的核心设计是Server-Instance
模型:
Canal Server
:Canal
的服务端进程,负责管理一个或多个实例,监听客户端连接,提供全局的服务配置(如端口、集群模式等)。Canal Instance
:每个实例对应一个具体的MySQL
数据库实例(或逻辑上的数据源),负责连接 MySQL、解析 binlog、处理数据同步逻辑。
为了支持这种分层架构,Canal 将配置分为两层:
- 全局配置(
canal.properties
):定义Canal Server
级别的通用参数,适用于整个服务进程。 - 实例配置(
instance.properties
):定义每个Instance
的具体行为,针对某个特定的 MySQL 数据源。
1.3.3.2 支持多实例运行
假如 一个 Canal Server
可以同时管理多个 MySQL
数据源(例如,同步多个数据库或多个表的增量数据)。每个数据源的连接信息、过滤规则等可能不同,因此需要为每个数据源单独配置一个 Instance
。
- 全局配置的作用:
定义Canal Server
的运行环境,例如监听的 IP 和端口(canal.ip、canal.port)、集群模式(canal.zkServers)、全局 ID(canal.id)等。
指定所有实例的名称列表(canal.destinations),告诉 Server 要加载哪些 Instance。 - 实例配置的作用:
为每个Instance
指定具体的MySQL
连接信息(如canal.instance.master.address
、canal.instance.dbUsername
)、数据过滤规则(canal.instance.filter.regex)、伪装从库的 ID(canal.instance.slaveId)等。
每个Instance
的配置独立存储在conf/<destination>/instance.properties
中,<destination>
对应canal.destinations
中定义的实例名称(如 example)。
假设要同步两个 MySQL 数据库(db1 和 db2):
全局配置(canal.properties):
canal.destinations = db1,db2
canal.port = 11111
实例配置:
conf/db1/instance.properties:连接 db1,过滤 db1.table1。
conf/db2/instance.properties:连接 db2,过滤 db2.table2。
一个 Canal Server 通过全局配置加载两个 Instance,分别处理 db1 和 db2 的同步
1.3.3.3 模块化与职责分离
全局配置负责 Server 级别的通用设置,与具体的数据源无关,例如:
- 网络设置(IP、端口)。
- 集群配置(Zookeeper 地址)。
- 数据存储模式(内存、文件、数据库)。
- 全局性能参数(如线程池大小)。
实例配置负责与特定 MySQL 数据源相关的设置,例如:
MySQL
连接信息(地址、用户名、密码)。binlog
解析规则(过滤哪些库或表)。- 伪装从库的行为(
slaveId
)。 - 目标系统的推送逻辑(例如 Kafka 主题)
2 SpringBoot结合使用
2.1 pom和配置
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version>
</dependency>
application.properties
# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example
2.2 实体和mapper
2.2.1 实体类
@Data
public class Transaction {private Long id; // 主键IDprivate String transactionId; // 交易IDprivate Double amount; // 交易金额private String status; // 交易状态
}
2.2.2 sql文件
CREATE TABLE transaction (id BIGINT AUTO_INCREMENT PRIMARY KEY,transaction_id VARCHAR(50) NOT NULL,amount DECIMAL(18, 2) NOT NULL,status VARCHAR(20) NOT NULL
);
2.2.3 交易Mapper接口
/*** 交易Mapper接口*/
@Mapper
public interface TransactionMapper {/*** 插入一条新的交易记录*/@Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")void insert(@Param("transaction") Transaction transaction);/*** 更新一条交易记录*/@Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")void update(@Param("transaction") Transaction transaction);
}
2.3 Canal监听器类
/*** Canal监听器类,用于监听数据库的变化并进行相应的处理*/
@Component
public class CanalListener {private final String destination = "example"; // 这个值需要与Canal配置中的destination一致private final String serverIp = "127.0.0.1";private final int port = 11111;@Autowiredprivate TransactionMapper transactionMapper;/*** 在Bean初始化后启动Canal监听器*/@PostConstructpublic void start() {// 创建Canal连接器CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");try {// 连接到Canal服务器connector.connect();// 订阅所有数据库的所有表connector.subscribe(".*\\..*");// 回滚到上次中断的位置connector.rollback();while (true) {// 获取一批消息,最多100条Message message = connector.getWithoutAck(100);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {// 如果没有消息,则等待1秒Thread.sleep(1000);} else {// 处理消息processMessage(message.getEntries());}// 提交确认connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {// 断开连接connector.disconnect();}}/*** 处理Canal发送过来的消息** @param entryList 消息列表*/private void processMessage(List<CanalEntry.Entry> entryList) {for (CanalEntry.Entry entry : entryList) {// 忽略事务开始和结束事件if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage;try {// 解析RowChange数据rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}CanalEntry.EventType eventType = rowChage.getEventType();// 打印日志System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));// 处理每一行数据变化for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());if (eventType == CanalEntry.EventType.DELETE) {// 处理删除事件(如果需要)} elseif (eventType == CanalEntry.EventType.INSERT) {// 插入新记录transactionMapper.insert(transaction);} else {// 更新现有记录transactionMapper.update(transaction);}}}}/*** 将Canal列数据转换为Transaction对象** @param columns 列数据列表* @return 转换后的Transaction对象*/private Transaction convertToTransaction(List<CanalEntry.Column> columns) {Transaction transaction = new Transaction();for (CanalEntry.Column column : columns) {switch (column.getName()) {case"id":transaction.setId(Long.parseLong(column.getValue()));break;case"transaction_id":transaction.setTransactionId(column.getValue());break;case"amount":transaction.setAmount(Double.parseDouble(column.getValue()));break;case"status":transaction.setStatus(column.getValue());break;}}return transaction;}
}
2.4 测试
插入一条交易记录
curl -X POST http://localhost:8080/api/transactions \
-H "Content-Type: application/json" \
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'
更新一条交易记录
curl -X PUT http://localhost:8080/api/transactions/TX123 \
-H "Content-Type: application/json" \
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'
观察后台日志
================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1 update=true
transaction_id : TX123 update=true
amount : 100.00 update=true
status : PENDING update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1 update=false
transaction_id : TX123 update=false
amount : 100.00 update=false
status : PENDING update=false
-------> after
id : 1 update=false
transaction_id : TX123 update=false
amount : 100.00 update=false
status : COMPLETED update=true