当前位置: 首页 > news >正文

MySQL数据同步之Canal讲解

文章目录

  • 1 Canal搭建
    • 1.1 简介
      • 1.1.1 概述
      • 1.1.2 优点
      • 1.1.3 作用&核心组件
    • 1.2 搭建 Canal
      • 1.2.1 准备工作
        • 1.2.1.1 检查配置
        • 1.2.1.2 MySQL配置
      • 1.2.2 下载并安装 Canal
      • 1.2.3 配置 Canal Server
        • 1.2.3.1 全局配置
        • 1.2.3.2 实例配置
        • 1.2.3.3 配置目标系统
        • 1.2.3.4 配置 Canal Adapter
      • 1.2.4 启动服务
      • 1.2.5 使用 Docker 部署 Canal
    • 1.3 注意和问题
      • 1.3.1 注意事项
      • 1.3.2 常见问题
      • 1.3.3 为什么既要全局配置又要 实例配置
        • 1.3.3.1 Canal核心模型
        • 1.3.3.2 支持多实例运行
        • 1.3.3.3 模块化与职责分离
  • 2 SpringBoot结合使用
    • 2.1 pom和配置
    • 2.2 实体和mapper
      • 2.2.1 实体类
      • 2.2.2 sql文件
      • 2.2.3 交易Mapper接口
    • 2.3 Canal监听器类
    • 2.4 测试

1 Canal搭建

1.1 简介

1.1.1 概述

Canal 是阿里巴巴开源的一个用于高效抓取 MySQL 数据库增量变更日志(binlog)并进行处理的中间件。它可以将 MySQLbinlog 解析为结构化的 JSON 格式,并提供多种方式将这些数据推送到下游系统。

1.1.2 优点

为什么选择Canal?

  • 实时性: Canal 基于MySQLbinlog机制,能够在毫秒级内完成数据同步。
  • 批量获取数据:Canal支持批量获取数据库变更数据,减少网络开销和处理时间。
  • 多线程处理:Canal可以配置多线程来处理不同的数据变更事件,提高整体吞吐量。
  • 断点续传:Canal支持从断点继续消费数据,确保数据不会丢失。
  • 持久化存储:Canal可以将消费进度持久化到ZooKeeper中,保证在故障恢复后能够继续正常工作。
  • 容错机制:Canal内置了多种容错机制,如重试策略和自动恢复功能,提高了系统的可靠性。
  • 标准协议:Canal使用标准化的binlog协议,易于与其他系统集成。
  • 过滤机制:Canal支持灵活的过滤规则,可以选择性地订阅特定的数据库和表。
  • 动态配置:Canal支持动态配置,可以根据实际需求调整监控范围和处理逻辑。
  • 自定义处理:Canal允许开发者编写自定义的处理器,实现复杂的数据处理逻辑。
  • 精确同步:Canal能够精确地捕获和同步数据库的每一行变更,确保数据的一致性。
  • 事务支持:Canal能够处理复杂的事务场景,确保事务的原子性和完整性。
  • 冲突解决:Canal提供了多种冲突解决策略,避免数据同步过程中的冲突问题。

1.1.3 作用&核心组件

Canal 的作用:Canal 通过伪装成 MySQL 的从库(slave),从 MySQL 主库(master)接收 binlog(二进制日志),解析增量变更数据(插入、更新、删除等),并将其推送或存储到目标系统(如 Kafka、Redis、Elasticsearch 等)。
核心组件:

  • Canal Server:负责连接 MySQL,解析 binlog 并提供数据订阅服务。
  • Canal Adapter(可选):用于将 Canal Server 的数据直接写入目标存储(如 ElasticsearchHBaseMySQL 等)。
  • Canal Client(可选):如果需要自定义消费逻辑,可以开发客户端代码来订阅 Canal Server 的数据。
  • 场景需求:如果业务需要实时同步 MySQL 数据(例如同步到 Redis 缓存、Elasticsearch 搜索、Kafka 消息队列等),Canal 是一个轻量且高效的解决方案。

1.2 搭建 Canal

1.2.1 准备工作

1.2.1.1 检查配置

在搭建 Canal 之前,需要确保以下条件:

  • MySQL 配置:
    MySQL 必须开启 binlog,且 binlog 格式为 ROW(行模式)。
    MySQL 需要有一个用户账号,具备 SELECT、REPLICATION SLAVE、REPLICATION CLIENT 等权限,用于 Canal 连接。
  • 环境要求:
    Java 环境:CanalJava 开发的,需要 JDK 1.8 或更高版本。
    服务器:一台运行 Canal Server 的服务器(可以是 ECS 实例或本地机器)。
    目标系统:确保目标系统(如 Redis、Kafka、Elasticsearch)已部署并可访问。
1.2.1.2 MySQL配置

MySQL 配置步骤:

  • 检查 binlog 是否启用:
    SHOW VARIABLES LIKE 'log_bin';,如果显示 OFF,需要启用 binlog
  • 修改 MySQL 配置文件(通常是 /etc/my.cnf/etc/mysql/my.cnf):
[mysqld]
log-bin=mysql-bin          # 启用 binlog
binlog-format=ROW         # 设置 binlog 格式为 ROW
server-id=1               # 唯一服务器 ID,不能与 Canal 的 slaveId 重复

保存后重启 MySQL:service mysql restart

创建 Canal 用户并授权:

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

1.2.2 下载并安装 Canal

Canal 主要包括 canal.deployer(Canal Server),如果需要直接同步到目标存储,可能还需要 canal.adapter

访问 Canal 官方 GitHub 发布页面:https://github.com/alibaba/canal/releases
下载最新版本的 canal.deployer(例如 canal.deployer-1.1.6.tar.gz):

cd ~
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

如果需要 Canal Adapter,下载 canal.adapter 包:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.adapter-1.1.6.tar.gz

解压 canal.deployer:

mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal

解压 canal.adapter:

mkdir -p /usr/local/canal-adapter
tar -zxvf canal.adapter-1.1.6.tar.gz -C /usr/local/canal-adapter

1.2.3 配置 Canal Server

Canal Server 的主要配置文件位于 conf/ 目录下,包括 canal.properties(全局配置)和 conf/example/instance.properties(实例配置)。

1.2.3.1 全局配置

修改全局配置(conf/canal.properties)

打开文件
vi /usr/local/canal/conf/canal.properties确保以下关键配置正确:
# Canal Server 的 ID,Canal Server 的全局标识
canal.id = 1
# Canal 监听的 IP 和端口
canal.ip = 0.0.0.0
canal.port = 11111
# Zookeeper 地址,用于集群模式
canal.zkServers =
# 实例目录
canal.destinations = example
1.2.3.2 实例配置

修改实例配置(conf/example/instance.properties):

打开文件:
vi /usr/local/canal/conf/example/instance.properties配置 MySQL 连接信息和同步规则:
# MySQL 主库地址
canal.instance.master.address = 192.168.XX.XX:3306
# slaveId 是 Canal 实例伪装成 MySQL 从库时的标识,必须与 MySQL 主库的 server-id不同
canal.instance.slaveId = 1234
# MySQL 用户名和密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# 监听的数据库和表(正则表达式)
canal.instance.filter.regex = .*\\..*  # 监听所有库的所有表
# 同步到的目标(如 Kafka 主题,留空如果直接用 Adapter)
canal.mq.topic =

关于 canal.instance.filter.regex

  • 监听所有表:.*\\..*
  • 监听特定库:mydb\\..*
  • 监听特定表:mydb\\.mytable
  • 多个规则用逗号分隔,例如:mydb\\.table1,mydb\\.table2

注意slaveIdCanal 实例伪装成 MySQL 从库时的标识,必须与 MySQL 主库的 server-id(以及其他从库的 server-id)不同,否则会导致主从复制冲突。如果未显式配置 slaveIdCanal 会使用 canal.id 作为默认 slaveId,这可能导致与 MySQLserver-id 冲突

1.2.3.3 配置目标系统

如果同步到 Kafka,需要配置 Kafka 主题:

canal.mq.topic = mysql_test
canal.mq.partition = 0
1.2.3.4 配置 Canal Adapter

如果需要将数据直接同步到目标存储(如 Elasticsearch、Redis、MySQL),需要配置 Canal Adapter

修改 Adapter 全局配置(/usr/local/canal-adapter/conf/application.yml):

canal:#Canal Server 的地址和端口server: canal-server:11111#与 canal.properties 中的 canal.destinations 一致destination: example

为目标系统添加配置:
/usr/local/canal-adapter/conf/ 下创建目标系统的配置文件,例如同步到 Elasticsearch
创建 es7/user.yml(假设同步 user 表到 Elasticsearch 的 user 索引):

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:_index: user_id: user_idsql: "SELECT id AS user_id, username, fullname FROM user"commitBatch: 3000

确保目标系统驱动:
如果同步到 MySQL 8.xElasticsearch 7.x,可能需要更新驱动。例如,替换 MySQL 驱动:

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.29.zip
unzip mysql-connector-java-8.0.29.zip
mv mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar /usr/local/canal-adapter/lib/

1.2.4 启动服务

启动 Canal Server

cd /usr/local/canal
./bin/startup.sh查看日志文件以确保启动成功:
tail -f logs/canal/canal.log
tail -f logs/example/example.log

启动 Canal Adapter:

cd /usr/local/canal-adapter
./bin/startup.sh检查 Adapter 日志:
tail -f logs/adapter/adapter.log

1.2.5 使用 Docker 部署 Canal

如果使用 Docker,可以简化部署流程:

拉取 Canal 镜像:
docker pull canal/canal-server:v1.1.6运行 Canal 容器:
docker run -p 11111:11111 --name canal -d canal/canal-server:v1.1.6进入容器配置:
docker exec -it canal bash
vi /home/admin/canal-server/conf/example/instance.properties

1.3 注意和问题

1.3.1 注意事项

  • binlog 格式:
    确保 MySQLbinlog-formatROW,否则 Canal 无法正确解析数据。
  • 权限:
    Canal 用户需要 REPLICATION SLAVEREPLICATION CLIENT 权限,否则无法接收 binlog。
  • 网络和防火墙:
    确保 Canal Server 能连接到 MySQL(默认端口 3306)和目标系统。
    如果在云环境,配置安全组规则开放 Canal 的端口(默认 11111)。
  • 性能优化:
    对于高并发场景,考虑部署 Canal 集群,并使用 Kafka 确保消息顺序。
    调整 commitBatch(Adapter 配置)以平衡性能和实时性。
  • 版本兼容性:
    确保 Canal 版本与 MySQL 和目标系统兼容。例如,Canal 1.1.4 对 MySQL 8.x 的驱动可能需要手动更新。
  • 监控和维护:
    定期检查 Canal 日志,防止因异常(如 ECS 重启)导致同步中断。
    考虑使用 Canal Admin 提供 Web 管理界面,便于配置和监控。

1.3.2 常见问题

Canal 支持全量同步吗?

Canal 默认只支持增量同步(基于 binlog)。如果需要全量同步,可以结合 DataX 或其他工具先同步全量数据,再用 Canal 同步增量数据。

如何同步到多个目标?

配置多个 Canal Adapter 或开发 Canal Client,分别处理不同的目标系统。或者通过 Kafka 作为中间件,多个消费者订阅同一主题。

1.3.3 为什么既要全局配置又要 实例配置

1.3.3.1 Canal核心模型

Canal 的核心设计是Server-Instance 模型:

  • Canal ServerCanal 的服务端进程,负责管理一个或多个实例,监听客户端连接,提供全局的服务配置(如端口、集群模式等)。
  • Canal Instance:每个实例对应一个具体的 MySQL 数据库实例(或逻辑上的数据源),负责连接 MySQL、解析 binlog、处理数据同步逻辑。

为了支持这种分层架构,Canal 将配置分为两层:

  • 全局配置(canal.properties):定义 Canal Server 级别的通用参数,适用于整个服务进程。
  • 实例配置(instance.properties):定义每个 Instance 的具体行为,针对某个特定的 MySQL 数据源。
1.3.3.2 支持多实例运行

假如 一个 Canal Server 可以同时管理多个 MySQL 数据源(例如,同步多个数据库或多个表的增量数据)。每个数据源的连接信息、过滤规则等可能不同,因此需要为每个数据源单独配置一个 Instance

  • 全局配置的作用:
    定义 Canal Server 的运行环境,例如监听的 IP 和端口(canal.ip、canal.port)、集群模式(canal.zkServers)、全局 ID(canal.id)等。
    指定所有实例的名称列表(canal.destinations),告诉 Server 要加载哪些 Instance。
  • 实例配置的作用:
    为每个 Instance 指定具体的 MySQL 连接信息(如 canal.instance.master.addresscanal.instance.dbUsername)、数据过滤规则(canal.instance.filter.regex)、伪装从库的 ID(canal.instance.slaveId)等。
    每个 Instance 的配置独立存储在 conf/<destination>/instance.properties 中,<destination> 对应 canal.destinations 中定义的实例名称(如 example)。

假设要同步两个 MySQL 数据库(db1 和 db2):
全局配置(canal.properties):

canal.destinations = db1,db2
canal.port = 11111
实例配置:
conf/db1/instance.properties:连接 db1,过滤 db1.table1。
conf/db2/instance.properties:连接 db2,过滤 db2.table2。

一个 Canal Server 通过全局配置加载两个 Instance,分别处理 db1 和 db2 的同步

1.3.3.3 模块化与职责分离

全局配置负责 Server 级别的通用设置,与具体的数据源无关,例如:

  • 网络设置(IP、端口)。
  • 集群配置(Zookeeper 地址)。
  • 数据存储模式(内存、文件、数据库)。
  • 全局性能参数(如线程池大小)。

实例配置负责与特定 MySQL 数据源相关的设置,例如:

  • MySQL 连接信息(地址、用户名、密码)。
  • binlog 解析规则(过滤哪些库或表)。
  • 伪装从库的行为(slaveId)。
  • 目标系统的推送逻辑(例如 Kafka 主题)

2 SpringBoot结合使用

2.1 pom和配置

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version>
</dependency>

application.properties

# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example

2.2 实体和mapper

2.2.1 实体类

@Data
public class Transaction {private Long id;          // 主键IDprivate String transactionId; // 交易IDprivate Double amount;      // 交易金额private String status;      // 交易状态
}

2.2.2 sql文件

CREATE TABLE transaction (id BIGINT AUTO_INCREMENT PRIMARY KEY,transaction_id VARCHAR(50) NOT NULL,amount DECIMAL(18, 2) NOT NULL,status VARCHAR(20) NOT NULL
);

2.2.3 交易Mapper接口

/*** 交易Mapper接口*/
@Mapper
public interface TransactionMapper {/*** 插入一条新的交易记录*/@Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")void insert(@Param("transaction") Transaction transaction);/*** 更新一条交易记录*/@Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")void update(@Param("transaction") Transaction transaction);
}

2.3 Canal监听器类

/*** Canal监听器类,用于监听数据库的变化并进行相应的处理*/
@Component
public class CanalListener {private final String destination = "example"; // 这个值需要与Canal配置中的destination一致private final String serverIp = "127.0.0.1";private final int port = 11111;@Autowiredprivate TransactionMapper transactionMapper;/*** 在Bean初始化后启动Canal监听器*/@PostConstructpublic void start() {// 创建Canal连接器CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");try {// 连接到Canal服务器connector.connect();// 订阅所有数据库的所有表connector.subscribe(".*\\..*");// 回滚到上次中断的位置connector.rollback();while (true) {// 获取一批消息,最多100条Message message = connector.getWithoutAck(100);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {// 如果没有消息,则等待1秒Thread.sleep(1000);} else {// 处理消息processMessage(message.getEntries());}// 提交确认connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {// 断开连接connector.disconnect();}}/*** 处理Canal发送过来的消息** @param entryList 消息列表*/private void processMessage(List<CanalEntry.Entry> entryList) {for (CanalEntry.Entry entry : entryList) {// 忽略事务开始和结束事件if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage;try {// 解析RowChange数据rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}CanalEntry.EventType eventType = rowChage.getEventType();// 打印日志System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));// 处理每一行数据变化for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());if (eventType == CanalEntry.EventType.DELETE) {// 处理删除事件(如果需要)} elseif (eventType == CanalEntry.EventType.INSERT) {// 插入新记录transactionMapper.insert(transaction);} else {// 更新现有记录transactionMapper.update(transaction);}}}}/*** 将Canal列数据转换为Transaction对象** @param columns 列数据列表* @return 转换后的Transaction对象*/private Transaction convertToTransaction(List<CanalEntry.Column> columns) {Transaction transaction = new Transaction();for (CanalEntry.Column column : columns) {switch (column.getName()) {case"id":transaction.setId(Long.parseLong(column.getValue()));break;case"transaction_id":transaction.setTransactionId(column.getValue());break;case"amount":transaction.setAmount(Double.parseDouble(column.getValue()));break;case"status":transaction.setStatus(column.getValue());break;}}return transaction;}
}

2.4 测试

插入一条交易记录

curl -X POST http://localhost:8080/api/transactions \
-H "Content-Type: application/json" \
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'

更新一条交易记录

curl -X PUT http://localhost:8080/api/transactions/TX123 \
-H "Content-Type: application/json" \
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'

观察后台日志

================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1    update=true
transaction_id : TX123    update=true
amount : 100.00    update=true
status : PENDING    update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : PENDING    update=false
-------> after
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : COMPLETED    update=true

相关文章:

  • 【Hice入门】Hive性能优化:存储与计算优化深度解析
  • 【C++指南】vector(三):迭代器失效问题详解
  • 前端如何转后端
  • JGQ516Ⅱ数据采集湿法袋式除尘器实验装置
  • Python学习笔记(第二部分)
  • 华为eNSP:IS-IS认证
  • 加载ko驱动模块:显示Arm版本问题解决!
  • 分享5款开源、美观的 WinForm UI 控件库
  • PS学习笔记(一)
  • 【dify—8】Chatflow实战——博客文章生成器
  • 方案精读:58页华为:全面预算管理与实践【附全文阅读】
  • ruoyi-plus Spring Boot + MyBatis 中 BaseEntity 的设计与动态查询实践
  • 【计算机视觉】三维视觉:Nerfstudio:模块化神经辐射场框架的技术突破与实战指南
  • 本文不定期更新,用于收录各种怪异的python脚本
  • 【排序算法】八大经典排序算法详解
  • 17. LangChain流式响应与实时交互:打造“类ChatGPT“体验
  • 某修改版软件,已突破限制!
  • 中国发布Web3计划:区块链列为核心基础技术,不排除发展加密资产应用!
  • 攻防世界 - Web - Level 4 | Confusion1
  • HTTP协议:原理、应用与python实践
  • 全红婵/陈芋汐夺得跳水世界杯总决赛女子双人10米台冠军
  • 中方拟解除对5名欧洲议会议员制裁?外交部:望中欧立法机构相向而行
  • 广东省副省长刘红兵跨省调任湖南省委常委、宣传部长
  • 解放日报:上海深化改革开放,系统集成创新局
  • 78家公募年度业绩比拼:23家营收净利双升,十强座次微调
  • 中国人保不再设监事会,国寿集团未再设置监事长职务