Canal实时同步MySQL数据到Elasticsearch
一、场景概述与核心组件
目标:将MySQL中的 orders 表的任何变更(INSERT, UPDATE, DELETE)近实时地同步到Elasticsearch的对应索引中,并确保数据一致性和处理效率。
核心组件:
- MySQL: 数据源,需开启Binlog。
- Canal Server (Canal-Deployer): 模拟MySQL Slave,解析Binlog,将数据变更事件投递出去。
- Canal Adapter: 作为Canal的客户端,订阅Canal Server的消息,并将其转换成对Elasticsearch的REST API调用。
- Elasticsearch & Kibana: 数据目的地和可视化工具。
- 消息队列 (可选,但强烈推荐): 如Kafka/RocketMQ,作为Canal Server和Canal Adapter之间的缓冲层,提升可靠性。
二、高效与可靠架构设计
MySQL → Canal Server → RocketMQ → Canal Adapter → Elasticsearch(Binlog解析) (消息中转) (数据转换层) (数据存储)
整体流程如下:
- MySQL开启binlog,并配置Canal Server来读取binlog。
- Canal Server将解析后的数据发送到RocketMQ。
- Canal Adapter从RocketMQ中消费数据,并写入Elasticsearch。
为什么需要RocketMQ ?
- 可靠性: RocketMQ 具有持久化能力。如果Elasticsearch或Canal Adapter宕机,消息会堆积在RocketMQ 中,待恢复后继续消费,防止数据丢失。
- 解耦: Canal Server只负责解析和投递到RocketMQ ,无需关心下游处理速度和状态。Canal Adapter可以水平扩展,从RocketMQ 并行消费,提升吞吐量。
- 缓冲: 应对流量峰值,防止Elasticsearch被写爆。
三、详细部署与配置案例
步骤1: 准备MySQL
开启Binlog,并设置为 ROW 模式。同时创建一个用于Canal的数据库用户,并授权。
-- 1. 开启MySQL Binlog
-- 修改 my.cnf 配置文件
[mysqld]
# 开启二进制日志
log-bin=mysql-bin
# 使用ROW模式
binlog-format=ROW
# 服务器ID
server_id=1
# 需要同步的数据库
binlog-do-db=your_database
# 二进制日志保留天数
expire_logs_days=7
# 最大二进制日志大小
max_binlog_size=100M-- 2. 创建Canal用户并授权
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;-- 3. 检查Binlog配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
步骤2: 部署并配置Canal Server
1. 编写docker-compose-canal-server.yml 部署文件:
# docker-compose-canal-server.yml
version: '3.8'
services:canal-server:image: canal/canal-server:v1.1.6container_name: canal-serverports:- "11111:11111"environment:- canal.auto.scan=false- canal.destinations=example- canal.instance.master.address=mysql-host:3306- canal.instance.dbUsername=canal- canal.instance.dbPassword=canal_password- canal.instance.connectionCharset=UTF-8- canal.instance.tsdb.enable=true- canal.instance.gtidon=falsevolumes:- ./canal-server/conf:/home/admin/canal-server/conf- ./canal-server/logs:/home/admin/canal-server/logsdepends_on:- rocketmq-broker
2. 修改 conf/canal.properties - 主要配置RocketMQ连接:
# Canal Server 基础配置
canal.id = 1
canal.ip = 0.0.0.0
canal.port = 11111
canal.zkServers =# destinations
canal.destinations = example
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5# 使用RocketMQ作为MQ
canal.serverMode = rocketMQ# RocketMQ NameServer地址
rocketmq.namesrv.addr = 127.0.0.1:9876# RocketMQ主站地址 (生产环境建议配置集群)
rocketmq.namespace = # 生产者配置
rocketmq.producer.group = canal_producer_group
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =# 批量大小
rocketmq.producer.batchSize = 50# 其他性能参数
rocketmq.producer.retryTimesWhenSendFailed = 3
rocketmq.producer.retryTimesWhenSendAsyncFailed = 3
rocketmq.producer.retryAnotherBrokerWhenNotStoreOK = false
3. 修改 conf/example/instance.properties - 实例级别配置:
# 数据源配置
canal.instance.master.address=mysql-host:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal_password
canal.instance.connectionCharset=UTF-8
canal.instance.defaultDatabaseName=your_database# 表过滤规则
canal.instance.filter.regex=your_database\\..*
canal.instance.filter.black.regex= # RocketMQ Topic配置
canal.mq.topic=canal_topic# 动态Topic配置(可选)
# canal.mq.dynamicTopic=.*\\..*# 使用Tag进行表级别路由(推荐)
canal.mq.enableDynamicQueuePartition=false# 分区配置 - 按表+主键进行分区,保证顺序性
canal.mq.partition=0
canal.mq.partitionsNum=1# 分区哈希配置,保证同一主键的数据进入同一队列
canal.mq.partitionHash=test.orders:id,test.users:id# 批量大小
canal.mq.batchSize=50# 消息延迟投递(毫秒)
canal.mq.delay=100
步骤3: 部署RocketMQ
使用docker-compose进行RocketMQ 服务的部署,并创建好Topic。
编写docker-compose-rocketmq.yml 部署文件:
# docker-compose-rocketmq.yml
version: '3.8'
services:namesrv:image: rocketmqinc/rocketmq:4.9.4container_name: rocketmq-namesrvports:- "9876:9876"environment:JAVA_OPT: "-Duser.home=/home/rocketmq -Xms512m -Xmx512m -Xmn128m"command: sh mqnamesrvvolumes:- ./data/namesrv/logs:/home/rocketmq/logs- ./data/namesrv/store:/home/rocketmq/storebroker:image: rocketmqinc/rocketmq:4.9.4container_name: rocketmq-brokerports:- "10911:10911"- "10909:10909"environment:NAMESRV_ADDR: "rocketmq-namesrv:9876"JAVA_OPT: "-Duser.home=/home/rocketmq -Xms1g -Xmx1g -Xmn512m"command: sh mqbroker -c /home/rocketmq/conf/broker.confvolumes:- ./data/broker/logs:/home/rocketmq/logs- ./data/broker/store:/home/rocketmq/store- ./conf/broker.conf:/home/rocketmq/conf/broker.confdepends_on:- namesrvconsole:image: styletang/rocketmq-console-ng:latestcontainer_name: rocketmq-consoleports:- "8180:8080"environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- namesrv- broker
步骤4: 部署并配置Canal Adapter (连接RocketMQ)
1. 目录结构
canal.adapter-1.1.7/
├── bin/
│ ├── startup.sh # 启动脚本
│ └── stop.sh # 停止脚本
├── conf/
│ ├── application.yml # 主配置文件
│ ├── es7/ # ES 映射配置目录
│ └── mq/ # MQ 相关配置
├── lib/ # 依赖库
├── logs/ # 日志目录
└── plugin/ # 插件目录
2. 使用 docker-compose 进行部署
编写docker-compose-canal-adapter.yml 部署文件:
# docker-compose-canal-adapter.yml
version: '3.8'
services:canal-adapter:image: slpcat/canal-adapter:v1.1.6container_name: canal-adapterports:- "8081:8081"environment:- canal.manager.jdbc.url=jdbc:mysql://mysql-host:3306/canal_manager?useUnicode=true- canal.manager.jdbc.username=canal- canal.manager.jdbc.password=canal_passwordvolumes:- ./canal-adapter/conf:/opt/canal-adapter/conf- ./canal-adapter/logs:/opt/canal-adapter/logsdepends_on:- rocketmq-broker- elasticsearch
3. 修改 conf/application.yml 主配置文件:
server:port: 8081undertow:# 线程池配置io-threads: 16worker-threads: 256buffer-size: 1024# 是否开启直接内存分配direct-buffers: truespring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null# Canal Adapter 配置
canal.conf:# 模式: tcp, kafka, rocketMQ, rabbitMQmode: rocketMQ# 是否异步处理async: true# 消费批次大小consumerProperties:canal.topic: canal_topiccanal.group: canal_adapter_group# 批量获取消息数量rocketmq.batch.size: 1000# 消费线程数rocketmq.consume.thread.max: 32# 源数据源配置(用于 ETL 查询)srcDataSources:defaultDS:url: jdbc:mysql://192.168.1.100:3306/business_db?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: canal_userpassword: Canal@123456# 连接池配置maxActive: 20initialSize: 5maxWait: 60000minIdle: 5timeBetweenEvictionRunsMillis: 60000minEvictableIdleTimeMillis: 300000validationQuery: SELECT 1testWhileIdle: truetestOnBorrow: falsetestOnReturn: false# 目标数据源配置(支持多个)canalAdapters:- instance: example # 对应 canal instance 名称groups:- groupId: g1outerAdapters:# Elasticsearch 适配器- name: es7key: es_businesshosts: 192.168.1.101:9200,192.168.1.102:9200 # ES 集群节点properties:mode: rest # 传输模式: rest 或 transportsecurity.auth: elastic:Elastic@123456 # 认证信息cluster.name: es-cluster # 集群名称# 连接池配置maxActive: 20maxTotal: 30socketTimeout: 30000connectTimeout: 5000# 重试配置maxRetryTimeout: 30000# 压缩配置compression: true# 可以配置多个适配器,如同步到其他存储# - name: logger # 日志适配器,用于调试# - name: rdb # 关系数据库适配器# - name: hbase # HBase 适配器# 监控配置monitor:enabled: trueport: 8082
4. 配置Elasticsearch映射文件
在 conf/es7/ 目录下创建映射配置文件,一个Adapter可以配置多个映射文件:
/conf/es7/├── user.yml├── order.yml└── product.yml
-
user.yml - 用户表同步配置
dataSourceKey: defaultDS destination: example groupId: g1 esMapping:_index: "user_index_v1" # ES 索引名称_type: "_doc" # 类型(ES7+ 统一为 _doc)_id: _id # 文档 ID 字段upsert: true # 启用 upsert 操作# pk: id # 主键字段(用于更新判断)# SQL 查询,这里的sql是虚拟的,实际是从binlog事件中提取字段,但会利用这个sql的表达式来映射字段sql: "SELECT id as _id, # 必须:将 id 映射为 _idid,username,email,mobile,nickname,avatar,status,created_time,updated_time,DATE_FORMAT(created_time, '%Y-%m-%d %H:%i:%s') as create_time_formatFROM user WHERE updated_time >= :sql_condition"# 提交批次大小commitBatch: 1000# 提交间隔(毫秒)commitInterval: 1000# 字段类型映射skips:- password # 跳过敏感字段- salt- deleted# 对象字段处理objFields:# 如果 tags 是 JSON 字符串,转换为对象# tags: object# 如果 extra_info 是 JSON 字符串,转换为对象# extra_info: object# etl 条件(用于全量同步)etlCondition: "where updated_time >= '{}'" -
order.yml - 订单表同步配置
dataSourceKey: defaultDS destination: example groupId: g1 esMapping:_index: "order_index_v1"_type: "_doc"_id: _idupsert: truesql: "SELECT CONCAT('order_', id) as _id, # 复合主键处理id,order_no,user_id,total_amount,actual_amount,discount_amount,status,payment_status,shipping_status,create_time,update_time,-- 计算字段CASE status WHEN 1 THEN '待支付'WHEN 2 THEN '已支付' WHEN 3 THEN '已发货'WHEN 4 THEN '已完成'WHEN 5 THEN '已取消'ELSE '未知'END as status_descFROM `order`"commitBatch: 500etlCondition: "where update_time >= '{}'"
步骤5: 部署与启动脚本
5.1 一键启动脚本
#!/bin/bash
# start_sync_system.shset -eecho "开始启动数据同步系统..."# 创建网络
docker network create canal-network || true# 启动RocketMQ
echo "启动RocketMQ..."
docker-compose -f docker-compose-rocketmq.yml up -d# 等待RocketMQ就绪
echo "等待RocketMQ就绪..."
sleep 30# 创建Topic
docker exec rocketmq-broker sh mqadmin updateTopic -t canal_topic -n rocketmq-namesrv:9876 -c DefaultCluster# 启动Canal Server
echo "启动Canal Server..."
docker-compose -f docker-compose-canal-server.yml up -d# 启动Canal Adapter
echo "启动Canal Adapter..."
docker-compose -f docker-compose-canal-adapter.yml up -decho "数据同步系统启动完成!"
echo "RocketMQ控制台: http://localhost:8180"
5.2 数据初始化脚本
-- init_es_mapping.sql
-- 创建Elasticsearch索引映射PUT /user_index
{"settings": {"number_of_shards": 3,"number_of_replicas": 1,"refresh_interval": "1s"},"mappings": {"properties": {"user_id": {"type": "long"},"user_name": {"type": "text","fields": {"keyword": {"type": "keyword"}}},"user_email": {"type": "keyword"},"user_age": {"type": "integer"},"user_status": {"type": "integer"},"age_group": {"type": "keyword"},"create_time": {"type": "date"},"update_time": {"type": "date"},"sync_timestamp": {"type": "date"}}}
}PUT /order_index
{"settings": {"number_of_shards": 3,"number_of_replicas": 1,"refresh_interval": "1s"},"mappings": {"properties": {"order_id": {"type": "long"},"order_no": {"type": "keyword"},"customer_id": {"type": "long"},"total_amount": {"type": "double"},"order_status": {"type": "integer"},"status_text": {"type": "keyword"},"order_date": {"type": "date"},"sync_timestamp": {"type": "date"}}}
}
五、Canal Server 核心原理解析
Canal Server将MySQL的数据变更事件推送到RocketMQ的过程如下:
- 在Canal Server的配置中,我们需要设置MQ模式(通常为RocketMQ)以及RocketMQ的NameServer地址、Topic等信息。
- Canal Server在启动后,会根据配置的实例(instance)来连接MySQL并解析binlog。
- 当有数据变更时,Canal Server会将解析后的事件组装成CanalEntry.Entry的格式,然后通过RocketMQ生产者发送到指定的Topic。
- 消息格式:发送到RocketMQ的消息内容通常是CanalEntry.Entry的序列化字节数组,或者根据配置转换为JSON格式。每个Entry对应一个binlog事件,包含了变更数据的详细信息,如表名、操作类型(增删改)、变更前的数据(可选)、变更后的数据等。
- 分区规则:为了保持顺序性,通常会将同一张表的数据变更发送到同一个消息队列中,这样可以保证同一张表的数据变更顺序与binlog中的顺序一致。Canal Server在发送消息时,可以指定分区键(例如,表名、主键等),RocketMQ会根据这个分区键将消息路由到对应的队列。
5.1 配置Canal Server连接RocketMQ
首先需要在Canal Server的配置文件中启用RocketMQ作为消息队列:
# canal.properties
canal.serverMode = rocketMQ
canal.mq.servers = 127.0.0.1:9876
canal.mq.retries = 3
canal.mq.batchSize = 50
canal.mq.maxRequestSize = 1024
5.2 实例配置指定目标Topic
# example/instance.properties
canal.mq.topic = canal_topic
# 动态topic配置,按库表路由
canal.mq.dynamicTopic =.*\\..*
# 分区策略
canal.mq.partitionHash = test\\.user:id
5.3 数据同步核心流程
5.3.1 Binlog监听与解析
// Canal Server作为MySQL Slave,通过binlog dump协议获取数据变更
public class CanalServer {public void start() {// 1. 连接MySQL,模拟Slave注册MysqlConnection connection = new MysqlConnection();connection.dump(binlogFileName, binlogPosition);// 2. 解析binlog事件LogEvent event = connection.readEvent();while (event != null) {Entry entry = binlogParser.parse(event);if (entry != null) {// 发送到MQmqProducer.send(entry);}event = connection.readEvent();}}
}
5.3.2 消息封装与发送
public class RocketMQProducer {public void send(Entry entry) {// 1. 构建RocketMQ消息Message message = new Message();message.setTopic(getTopic(entry));// 2. 序列化Entry数据byte[] body = EntrySerializer.serialize(entry);message.setBody(body);// 3. 设置消息属性message.putUserProperty("database", entry.getHeader().getSchemaName());message.putUserProperty("table", entry.getHeader().getTableName());message.putUserProperty("type", entry.getHeader().getEventType().toString());// 4. 计算分区(保证同一主键的数据有序)String partitionKey = calculatePartitionKey(entry);message.setKeys(partitionKey);// 5. 发送到RocketMQSendResult result = defaultMQProducer.send(message);// 6. 更新位点(确保至少一次投递)if (result.getSendStatus() == SendStatus.SEND_OK) {positionManager.updatePosition(entry.getHeader());}}
}
5.3.3 RocketMQ消息结构
{"topic": "canal_topic","keys": "user:12345", // 分区键"tags": "test.user", // 库表标识"body": {"type": "INSERT/UPDATE/DELETE","database": "test","table": "user","executeTime": 1633046400000,"before": { // 变更前数据(UPDATE/DELETE)"id": 12345,"name": "old_name","age": 25},"after": { // 变更后数据(INSERT/UPDATE)"id": 12345,"name": "new_name", "age": 26},"sql": "" // 原始SQL(可选)}
}
5.4 关键配置参数
5.4.1 性能调优参数
# 批量发送大小
canal.mq.batchSize = 50
# 发送超时时间
canal.mq.sendTimeout = 3000
# 消息压缩
canal.mq.compression = gzip
# 事务支持
canal.mq.transaction = false
5.4.2 容错配置
# 重试次数
canal.mq.retries = 3
# 位点存储(确保故障恢复)
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# 内存队列大小
canal.instance.memory.buffer.size = 16384
5.4.3 顺序性保证
为了保证数据同步的顺序性,Canal采用分区策略:
public class PartitionHashCalculator {public static String calculate(Entry entry) {// 按表名+主键哈希,确保同一主键的操作发送到同一队列String tableName = entry.getHeader().getTableName();String primaryKey = extractPrimaryKey(entry);return tableName + ":" + primaryKey;}
}
六、Canal Adapter 核心原理解析
Canal Adapter通过预定义的映射配置,将RocketMQ中来自MySQL的binlog消息转换成对Elasticsearch的写操作,从而实现数据的实时同步。具体步骤如下:
- Canal Adapter订阅RocketMQ:Canal Adapter启动时,会根据配置文件(application.yml)中设置的RocketMQ NameServer地址、Topic和Group ID,创建一个RocketMQ消费者。
- 消息处理:消费者持续从指定的Topic中拉取Canal Server发送的binlog解析消息。将获取到的RocketMQ消息体解析成Canal认识的Message对象,其中包含了:
- 数据库名、表名
- 事件类型(INSERT/UPDATE/DELETE)
- 变更前的数据(before)
- 变更后的数据(after)
- 执行时间戳等元数据
- 映射配置:映射配置中定义了如何将MySQL的表数据映射到Elasticsearch的索引和文档。例如:
- 指定MySQL的表名和Elasticsearch索引的对应关系。
- 指定MySQL表的主键作为Elasticsearch文档的ID。
- 指定字段的映射关系(如果字段名不同,可以配置映射)。
- 指定操作类型(insert、update、delete)对应的处理方式。
- 写入Elasticsearch:根据映射配置,Canal Adapter将消息体中的数据转换成Elasticsearch的文档,然后通过Elasticsearch的API进行写入操作。具体如下:
- 对于insert操作,相当于向Elasticsearch索引中添加一个文档。
- 对于update操作,相当于更新Elasticsearch中对应的文档(根据主键ID)。
- 对于delete操作,相当于删除Elasticsearch中对应的文档。
- 批量提交:为了提高性能,Canal Adapter可能会将多个操作批量提交到Elasticsearch。
6.1 从RocketMQ拉取消息
Canal Adapter作为RocketMQ Consumer,启动后会订阅指定的Topic:
// 伪代码表示Adapter的RocketMQ消费者
public class RocketMQAdapterConsumer {public void start() {// 创建消费者实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("canal_adapter_group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("canal_topic", "*"); // 订阅所有Tag// 注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messages, ConsumeOrderlyContext context) {// 处理批量消息processMessages(messages);return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}
}
6.2 消息解析与路由
- 消息解析:Adapter接收到RocketMQ消息后,解析消息内容,会将其反序列化为内部的 FlatMessage 或 CanalMessage 对象。
// Message Body 内容 (FlatMessage格式) {"data": [{"id": "1001","order_code": "ORD20231025001","user_id": "1", "amount": "199.99","status": "1","create_time": "2023-10-25 10:00:00","update_time": "2023-10-25 10:00:00"}],"database": "test_db","table": "orders","type": "INSERT","old": null,"es": 1698213600000,"ts": 1698213600000 } - 路由过程:Adapter会根据消息中的 database 和 table 字段(例如 test_db.users),去 conf/es7/ 目录下寻找对应的映射配置文件(例如 user_mapping.yml)。
public class MessageRouter {public void route(FlatMessage flatMessage) {String database = flatMessage.getDatabase(); // "test_db"String table = flatMessage.getTable(); // "orders"// 根据库名和表名查找对应的ES映射配置// 查找规则:conf/es7/ 目录下的 .yml 文件EsMappingConfig mappingConfig = findMappingConfig(database, table);if (mappingConfig != null) {processWithMapping(flatMessage, mappingConfig);}}private EsMappingConfig findMappingConfig(String database, String table) {// 查找 order_mapping.yml, test_db_orders.yml 等配置文件String[] possibleFiles = {table + "_mapping.yml",database + "_" + table + ".yml", "default_mapping.yml"};// ... 实际查找逻辑} }
6.3 数据映射处理(核心步骤)
根据映射配置,Adapter采用不同的处理策略:
-
模式A:SQL映射(功能强大,支持复杂转换)
当在 .yml 文件中配置了 sql 属性时,Adapter会使用此模式。# order_mapping.yml esMapping:_index: "order_index_v1"_id: "_id"sql: >SELECT o.id as _id,o.order_code,o.amount,o.status,u.name as user_nameFROM orders oLEFT JOIN users u ON o.user_id = u.idWHERE o.id = ?处理逻辑:
public class SQLMappingProcessor {public List<Map<String, Object>> process(FlatMessage message, EsMappingConfig config) {List<Map<String, Object>> esDocuments = new ArrayList<>();for (Map<String, String> rowData : message.getData()) {// 1. 提取主键值作为SQL参数String primaryKeyValue = rowData.get("id");// 2. 执行SQL查询(回查数据库)List<Map<String, Object>> queryResult = jdbcTemplate.queryForList(config.getSql(), primaryKeyValue);// 3. 构建ES文档for (Map<String, Object> dbRow : queryResult) {Map<String, Object> esDoc = new HashMap<>();esDoc.put("_index", config.getIndex());esDoc.put("_id", dbRow.get("_id"));esDoc.put("_source", buildSource(dbRow, config));esDocuments.add(esDoc);}}return esDocuments;} } -
模式B:直接映射(性能更高,配置简单)
当在 .yml 文件中没有配置 sql 属性,并且在 application.yml 中设置了 flatMessage: true 时,Adapter使用此模式。# user_mapping.yml esMapping:_index: "user_index_v1"_id: "id"# 无sql配置,使用直接映射处理逻辑:
public class DirectMappingProcessor {public List<Map<String, Object>> process(FlatMessage message, EsMappingConfig config) {List<Map<String, Object>> esDocuments = new ArrayList<>();for (Map<String, String> rowData : message.getData()) {// 直接使用Binlog中的字段数据Map<String, Object> esDoc = new HashMap<>();esDoc.put("_index", config.getIndex());esDoc.put("_id", rowData.get(config.getIdField()));// 构建_source文档,直接字段映射Map<String, Object> source = new HashMap<>();for (Map.Entry<String, String> entry : rowData.entrySet()) {source.put(entry.getKey(), convertValue(entry.getValue()));}esDoc.put("_source", source);esDocuments.add(esDoc);}return esDocuments;} }
6.4 构建Elasticsearch请求
Canal Adapter根据MySQL的binlog事件类型(INSERT/UPDATE/DELETE)和配置的映射规则,生成对应的Elasticsearch操作。
6.4.1 INSERT 事件处理
MySQL执行:
INSERT INTO user (id, name, email, age) VALUES (1, '张三', 'zhangsan@example.com', 25);
binlog消息内容:
{"type": "INSERT","table": "user","data": [{"id": 1,"name": "张三","email": "zhangsan@example.com", "age": 25}]
}
Adapter映射配置:
esMapping:_index: user_index_id: idupsert: truesql: "SELECT id, name, email, age FROM user"
生成的Elasticsearch请求:
-
情况1:upsert: true(推荐)
// 使用update API实现upsert POST /user_index/_update/1 {"doc": {"id": 1,"name": "张三","email": "zhangsan@example.com","age": 25},"doc_as_upsert": true }- 特点:
- 如果文档不存在,创建新文档
- 如果文档已存在,更新文档(虽然INSERT场景下不应该存在)
- 具有幂等性,可以防止重复插入
- 特点:
-
情况2:upsert: false
// 直接创建文档 PUT /user_index/_doc/1 {"id": 1,"name": "张三","email": "zhangsan@example.com","age": 25 }- 风险: 如果文档已存在会报错,导致同步失败
6.4.2 UPDATE 事件处理
MySQL执行:
UPDATE user SET name = '李四', age = 26 WHERE id = 1;
binlog消息内容:
{"type": "UPDATE", "table": "user","data": [{"id": 1,"name": "李四","email": "zhangsan@example.com","age": 26}],"old": [{"name": "张三","age": 25}]
}
Adapter映射配置:
esMapping:_index: user_index _id: idupsert: truesql: "SELECT id, name, email, age FROM user"
生成的Elasticsearch请求:
-
情况1:upsert: true(推荐)
// 使用update API进行部分更新 POST /user_index/_update/1 {"doc": {"id": 1,"name": "李四", "email": "zhangsan@example.com","age": 26},"doc_as_upsert": true }- 特点:
- 只更新变化的字段
- 如果文档不存在,会创建新文档(upsert)
- 性能较好,网络传输数据量小
- 特点:
-
情况2:upsert: false
// 使用index API进行全量替换 PUT /user_index/_doc/1 {"id": 1,"name": "李四","email": "zhangsan@example.com", "age": 26 }- 特点:
- 整个文档被替换
- 如果原文档有其他字段,会被覆盖丢失
- 网络传输数据量较大
- 特点:
6.4.3 DELETE 事件处理
MySQL执行:
DELETE FROM user WHERE id = 1;
binlog消息内容:
{"type": "DELETE","table": "user", "data": [{"id": 1,"name": "李四","email": "zhangsan@example.com","age": 26}]
}
Adapter映射配置:
esMapping:_index: user_index_id: id# DELETE操作不受upsert配置影响sql: "SELECT id FROM user" // 通常只需要id字段
生成的Elasticsearch请求:
// 直接删除文档
DELETE /user_index/_doc/1
- 特点:
- DELETE操作简单直接
- 只需要文档_id即可执行删除
- 如果文档不存在,ES会返回404,但通常不影响同步流程
6.5 批量提交
Adapter不会每条消息都立即写入ES,而是会积累一批操作后批量提交.
-
配置控制:
esMapping:commitBatch: 1000 # 每积累1000条消息批量提交一次commitInterval: 1000 # 每隔1000ms提交一次 -
示例:
MySQL执行多条INSERTINSERT INTO user (id, name) VALUES (1, '张三'), (2, '李四'), (3, '王五');生成的Elasticsearch批量请求:
POST /_bulk {"update":{"_index":"user_index","_id":"1"}} {"doc":{"id":1,"name":"张三"},"doc_as_upsert":true} {"update":{"_index":"user_index","_id":"2"}} {"doc":{"id":2,"name":"李四"},"doc_as_upsert":true} {"update":{"_index":"user_index","_id":"3"}} {"doc":{"id":3,"name":"王五"},"doc_as_upsert":true}
6.6 高级场景处理
场景1:字段映射转换
配置:
esMapping:_index: user_index_id: idsql: "SELECT id, name, email, age, create_time FROM user"fieldMappings:- src: name # MySQL字段名dest: username # ES字段名- src: create_timedest: createTime
转换结果:
// MySQL: name → ES: username
// MySQL: create_time → ES: createTime
POST /user_index/_update/1
{"doc": {"id": 1,"username": "张三","email": "zhangsan@example.com","age": 25,"createTime": "2023-01-01 10:00:00"},"doc_as_upsert": true
}
场景2:条件过滤
配置:
esMapping:_index: user_index_id: idsql: "SELECT id, name, status FROM user"etlCondition: "where status = 1" # 只同步状态为1的用户
效果:
- 只有status = 1的INSERT/UPDATE才会同步到ES
- status != 1的记录变更会被过滤掉
场景3:复杂SQL转换
配置:
esMapping:_index: order_index _id: order_idsql: "SELECT o.order_id, o.amount, u.username, DATE_FORMAT(o.create_time, '%Y-%m-%d') as create_date FROM order o LEFT JOIN user u ON o.user_id = u.id"
说明:
- 支持多表关联查询(从binlog消息中提取所需字段)
- 支持SQL函数处理
- 但不会真正执行SQL,只是用SQL语法描述字段转换规则
七、如何保障数据不丢失
在Canal实时同步MySQL数据到Elasticsearch的整个过程中,通过以下机制来保障数据同步过程中的数据一致性和故障恢复:
-
MySQL Binlog环节:
- 确保MySQL的binlog是开启的,并且设置为ROW模式,因为Canal解析需要ROW模式。
- 保证binlog的保存时间足够长,以便在同步延迟较大时还能获取到数据。
-
Canal Server环节:
- 高可用部署:Canal Server支持集群部署,通过Zookeeper协调多个实例,当其中一个实例宕机时,其他实例可以接管。
- 位点存储:Canal Server会将解析的binlog位点(position)持久化存储(例如到Zookeeper或本地文件),在重启后可以从位点继续解析,避免重复或丢失。
- 事务支持:Canal解析binlog时,会按照事务为单位进行处理,确保事务的原子性。
-
RocketMQ环节:
- 生产者端:Canal Server作为生产者发送消息到RocketMQ,需要确保消息被成功发送。可以使用同步发送并检查发送结果,或者使用事务消息。
- Broker端:RocketMQ Broker通过刷盘策略(同步刷盘)和副本机制(多副本)来保证消息不丢失。
- 消费者端:Canal Adapter作为消费者,需要确保消息被成功消费。使用手动提交位点(ACK机制),只有在处理成功后才提交消费位点,否则重试。
-
Canal Adapter环节:
- 重试机制:当处理消息失败时,Canal Adapter应该具备重试机制。可以设置重试次数,超过重试次数后将消息放入死信队列,然后继续处理后续消息,避免阻塞。
- 幂等性:由于RocketMQ可能重复投递(例如消费端超时提交失败,导致重复消费),所以Canal Adapter需要保证幂等性,即同一消息多次处理不会导致数据不一致。
-
Elasticsearch环节:
- 写入确认:Elasticsearch在写入文档时,可以设置等待刷新(refresh)策略,或者使用事务日志(translog)来保证数据不丢失。
- 集群健康:确保Elasticsearch集群健康,有足够的节点和副本,避免因为节点宕机导致数据丢失。
7.1 MySQL Binlog环节保障
-
Binlog持久化配置
-- 确保Binlog可靠持久化 SET GLOBAL sync_binlog = 1; -- 每次事务提交都同步刷盘 SET GLOBAL innodb_flush_log_at_trx_commit = 1; -- 每次事务提交都刷新redo log-- 查看当前配置 SHOW VARIABLES LIKE 'sync_binlog'; SHOW VARIABLES LIKE 'innodb_flush_log_at_trx_commit'; -
Binlog保留策略
-- 设置足够的Binlog保留时间 SET GLOBAL expire_logs_days = 7; -- 保留7天 SET GLOBAL max_binlog_size = 1073741824; -- 单个Binlog文件1GB
7.2 Canal Server环节保障
-
位点持久化机制
# canal.properties # Zookeeper 配置 canal.zkServers = zk1:2181,zk2:2181,zk3:2181 canal.instance.global.spring.xml = classpath:spring/default-instance.xml# 位点持久化配置 canal.instance.global.mode = spring canal.file.data.dir = ${canal.conf:../conf}/${canal.instance.destination:} canal.file.flush.period = 1000# MQ 配置 canal.serverMode = rocketMQ canal.mq.servers = rocketmq-namesrv:9876 canal.mq.topic = canal_topic canal.mq.partitionsNum = 3 canal.mq.hashPartitions = true canal.mq.partitionHash = .*\\..*:$pk$ # 按主键hash分区# 重试配置 canal.mq.retries = 3 canal.mq.batchSize = 50 canal.mq.flatMessage = true -
高可用部署
# canal集群部署 version: '3' services:canal-server-1:image: canal/canal-serverenvironment:- canal.zkServers=zk1:2181,zk2:2181,zk3:2181- canal.instance.mysql.slaveId=1001canal-server-2:image: canal/canal-serverenvironment:- canal.zkServers=zk1:2181,zk2:2181,zk3:2181- canal.instance.mysql.slaveId=1002 -
位点管理服务
@Component public class CanalPositionManager {@Autowiredprivate CuratorFramework zkClient;private static final String POSITION_PATH = "/canal/positions/%s";/*** 持久化位点信息*/public void persistPosition(String destination, LogPosition position) {try {String path = String.format(POSITION_PATH, destination);byte[] data = buildPositionData(position);if (zkClient.checkExists().forPath(path) == null) {zkClient.create().creatingParentsIfNeeded().forPath(path, data);} else {zkClient.setData().forPath(path, data);}log.info("持久化位点成功: destination={}, position={}", destination, position);} catch (Exception e) {log.error("持久化位点失败", e);throw new RuntimeException("持久化位点失败", e);}}/*** 加载位点信息*/public LogPosition loadPosition(String destination) {try {String path = String.format(POSITION_PATH, destination);if (zkClient.checkExists().forPath(path) != null) {byte[] data = zkClient.getData().forPath(path);return parsePositionData(data);}} catch (Exception e) {log.error("加载位点失败", e);}return null;}/*** 备份位点到本地文件(双重保障)*/public void backupPositionToFile(String destination, LogPosition position) {try {String fileName = String.format("/data/canal/backup/%s.position", destination);File file = new File(fileName);FileUtils.writeStringToFile(file, position.toString(), "UTF-8");} catch (IOException e) {log.warn("位点本地备份失败", e);}} }
7.3 RocketMQ环节保障
-
生产者可靠发送
@Component public class ReliableCanalProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;private static final int MAX_RETRY_COUNT = 3;private static final String CANAL_TOPIC = "canal_topic";/*** 可靠发送Canal消息*/public SendResult sendReliableMessage(CanalMessage canalMessage) {// 构建消息Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(canalMessage)).setHeader(MessageConst.PROPERTY_KEYS, buildMessageKey(canalMessage)).setHeader(MessageConst.PROPERTY_TAGS, buildMessageTag(canalMessage)).build();// 同步发送,最多重试3次int retryCount = 0;while (retryCount < MAX_RETRY_COUNT) {try {SendResult sendResult = rocketMQTemplate.syncSend(CANAL_TOPIC, message, 3000);if (sendResult.getSendStatus() == SendStatus.SEND_OK) {log.info("消息发送成功: msgId={}, keys={}", sendResult.getMsgId(), buildMessageKey(canalMessage));return sendResult;} else {log.warn("消息发送状态异常: {}", sendResult.getSendStatus());}} catch (Exception e) {log.error("消息发送失败,重试次数: {}", retryCount + 1, e);}retryCount++;if (retryCount < MAX_RETRY_COUNT) {try {Thread.sleep(1000 * retryCount); // 指数退避} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}// 发送失败,持久化到本地等待恢复persistFailedMessage(canalMessage);throw new RuntimeException("消息发送失败,已持久化到本地");}/*** 构建消息Key(用于去重和追踪)*/private String buildMessageKey(CanalMessage message) {return String.format("%s:%s:%s:%s", message.getDatabase(),message.getTable(), message.getPrimaryKey(),message.getExecuteTime());}/*** 构建消息Tag(用于过滤)*/private String buildMessageTag(CanalMessage message) {return String.format("%s_%s", message.getDatabase(), message.getTable());}/*** 持久化失败消息到本地文件*/private void persistFailedMessage(CanalMessage message) {try {String fileName = String.format("/data/canal/failed_messages/%s_%s.msg",System.currentTimeMillis(), UUID.randomUUID().toString());File file = new File(fileName);FileUtils.writeStringToFile(file, JSON.toJSONString(message), "UTF-8");log.warn("消息发送失败,已持久化到本地: {}", fileName);} catch (IOException e) {log.error("持久化失败消息异常", e);}}/*** 恢复失败消息(手动或定时任务调用)*/public void recoverFailedMessages() {File dir = new File("/data/canal/failed_messages");File[] failedFiles = dir.listFiles((d, name) -> name.endsWith(".msg"));if (failedFiles != null) {for (File file : failedFiles) {try {String content = FileUtils.readFileToString(file, "UTF-8");CanalMessage message = JSON.parseObject(content, CanalMessage.class);sendReliableMessage(message);// 发送成功后删除文件FileUtils.forceDelete(file);log.info("恢复失败消息成功: {}", file.getName());} catch (Exception e) {log.error("恢复失败消息异常: {}", file.getName(), e);}}}} } -
Broker持久化配置
# broker.conf - 可靠存储配置 brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0# 消息存储 storePathRootDir = /data/rocketmq/store storePathCommitLog = /data/rocketmq/store/commitlog# 刷盘策略 - 同步刷盘 flushDiskType = SYNC_FLUSH# 主从同步 - 同步复制 brokerRole = SYNC_MASTER flushDiskType = SYNC_FLUSH# 事务消息 transactionTimeout = 6000 transactionCheckMax = 15 -
消费者可靠消费
@Component @RocketMQMessageListener(topic = "canal_topic",consumerGroup = "canal_es_adapter_group",consumeMode = ConsumeMode.ORDERLY, // 顺序消费messageModel = MessageModel.CLUSTERING, // 集群模式consumeThreadMax = 20,consumeTimeout = 30L ) public class ReliableCanalConsumer implements RocketMQListener<MessageExt> {@Autowiredprivate ElasticsearchRestTemplate esTemplate;@Autowiredprivate IdempotentProcessor idempotentProcessor;@Autowiredprivate OffsetManager offsetManager;private static final int MAX_RETRY_COUNT = 3;@Overridepublic void onMessage(MessageExt message) {String messageId = message.getMsgId();String keys = message.getKeys();try {// 1. 解析消息CanalMessage canalMessage = parseMessage(message);// 2. 幂等检查if (!idempotentProcessor.checkAndSetProcessed(canalMessage)) {log.info("消息已处理,跳过: keys={}", keys);return;}// 3. 处理消息boolean success = processToElasticsearch(canalMessage);if (success) {// 4. 更新消费位点offsetManager.updateOffset(message.getQueueOffset());log.debug("消息处理成功: msgId={}, keys={}", messageId, keys);} else {log.error("消息处理失败,等待重试: msgId={}, keys={}", messageId, keys);throw new RuntimeException("ES处理失败");}} catch (Exception e) {log.error("消费消息异常: msgId={}, keys={}", messageId, keys, e);// 判断重试次数int reconsumeTimes = message.getReconsumeTimes();if (reconsumeTimes >= MAX_RETRY_COUNT) {// 超过重试次数,进入死信队列sendToDlq(message, e);log.error("消息进入死信队列: msgId={}, 重试次数={}", messageId, reconsumeTimes);} else {// 抛出异常让RocketMQ重试throw new RuntimeException("消费失败,需要重试", e);}}}/*** 处理消息到Elasticsearch*/private boolean processToElasticsearch(CanalMessage message) {try {switch (message.getEventType()) {case INSERT:case UPDATE:return handleUpsert(message);case DELETE:return handleDelete(message);default:log.warn("未知事件类型: {}", message.getEventType());return true;}} catch (Exception e) {log.error("ES操作失败", e);return false;}}/*** 处理插入/更新*/private boolean handleUpsert(CanalMessage message) {IndexRequest request = new IndexRequest(message.getIndexName()).id(message.getPrimaryKey()).source(buildSource(message)).opType(DocWriteRequest.OpType.INDEX); // UPSERT语义try {IndexResponse response = esTemplate.getClient().index(request, RequestOptions.DEFAULT);return response.status().getStatus() == 200 || response.status().getStatus() == 201;} catch (IOException e) {throw new RuntimeException("ES索引操作失败", e);}}/*** 处理删除*/private boolean handleDelete(CanalMessage message) {DeleteRequest request = new DeleteRequest(message.getIndexName()).id(message.getPrimaryKey());try {DeleteResponse response = esTemplate.getClient().delete(request, RequestOptions.DEFAULT);// 删除成功或文档不存在都算成功return response.status().getStatus() == 200 ||response.getResult() == DocWriteResponse.Result.NOT_FOUND;} catch (IOException e) {throw new RuntimeException("ES删除操作失败", e);}}/*** 发送到死信队列*/private void sendToDlq(MessageExt message, Exception cause) {try {DlqMessage dlqMessage = new DlqMessage(message, cause.getMessage());// 发送到死信队列,后续人工处理// dlqProducer.send(dlqMessage);// 同时持久化到本地文件persistDlqMessage(message, cause);} catch (Exception e) {log.error("发送死信队列失败", e);}} }
7.4 Canal Adapter环节保障
-
消费位点管理
# application.yml - 消费位点配置 canal.conf:mode: rocketMQmqServers: 127.0.0.1:9876topic: canal_topicgroupId: es_adapter_group# 消费位点持久化offsetPersistence: trueoffsetStore: redis # 使用Redis存储消费位点# 幂等配置idempotent:enabled: truestore: redisexpireTime: 86400 # 24小时# 重试策略retryCount: 3retryInterval: 1000# 批量处理batchSize: 1000commitBatch: 1000 -
幂等性处理器
@Component public class IdempotentProcessor {@Autowiredprivate RedisTemplate<String, String> redisTemplate;private static final String IDEMPOTENT_KEY_PREFIX = "canal:idempotent:";private static final long DEFAULT_EXPIRE_TIME = 24 * 60 * 60; // 24小时/*** 检查并设置消息处理状态*/public boolean checkAndSetProcessed(CanalMessage message) {String messageId = buildMessageId(message);String key = IDEMPOTENT_KEY_PREFIX + messageId;// 使用SETNX原子操作Boolean result = redisTemplate.opsForValue().setIfAbsent(key, "processed", Duration.ofSeconds(DEFAULT_EXPIRE_TIME));if (Boolean.TRUE.equals(result)) {log.debug("消息首次处理: {}", messageId);return true;} else {log.info("消息重复,跳过处理: {}", messageId);return false;}}/*** 构建唯一消息ID*/private String buildMessageId(CanalMessage message) {return String.format("%s:%s:%s:%s:%s",message.getDatabase(),message.getTable(),message.getPrimaryKey(),message.getEventType(),message.getExecuteTime());}/*** 清理过期的幂等记录(定时任务调用)*/@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行public void cleanExpiredIdempotentRecords() {try {String pattern = IDEMPOTENT_KEY_PREFIX + "*";Set<String> keys = redisTemplate.keys(pattern);if (keys != null && !keys.isEmpty()) {redisTemplate.delete(keys);log.info("清理过期幂等记录完成,数量: {}", keys.size());}} catch (Exception e) {log.error("清理过期幂等记录异常", e);}} }
7.5 Elasticsearch 环节保障
-
ES 写入配置优化
# elasticsearch.yml 关键配置 # 索引配置优化 index:refresh_interval: 30s # 降低刷新频率提高写入性能number_of_shards: 3number_of_replicas: 1translog:durability: request # 每次请求都刷translogsync_interval: 5sunassigned:node_left:delayed_timeout: 5m# 集群配置 cluster:routing:allocation:awareness:attributes: zone -
批量写入优化
@Component public class ESBatchProcessor {@Autowiredprivate RestHighLevelClient esClient;private static final int BATCH_SIZE = 1000;private static final int FLUSH_INTERVAL = 1000; // 1秒private final BulkProcessor bulkProcessor;private final AtomicLong successCount = new AtomicLong(0);private final AtomicLong failureCount = new AtomicLong(0);public ESBatchProcessor() {this.bulkProcessor = buildBulkProcessor();}private BulkProcessor buildBulkProcessor() {BulkProcessor.Listener listener = new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long executionId, BulkRequest request) {log.debug("准备执行批量操作,请求数量: {}", request.numberOfActions());}@Overridepublic void afterBulk(long executionId, BulkRequest request, BulkResponse response) {successCount.addAndGet(request.numberOfActions());log.debug("批量操作执行成功,处理数量: {}", request.numberOfActions());if (response.hasFailures()) {log.warn("批量操作存在部分失败: {}", response.buildFailureMessage());handleBulkFailures(response);}}@Overridepublic void afterBulk(long executionId, BulkRequest request, Throwable failure) {failureCount.addAndGet(request.numberOfActions());log.error("批量操作执行失败", failure);// 失败重试或记录日志handleBulkFailure(request, failure);}};return BulkProcessor.builder((request, bulkListener) -> esClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),listener).setBulkActions(BATCH_SIZE).setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)).setFlushInterval(TimeValue.timeValueMillis(FLUSH_INTERVAL)).setConcurrentRequests(2).build();}/*** 添加文档到批量处理器*/public void addDocument(String index, String id, Map<String, Object> source) {IndexRequest request = new IndexRequest(index).id(id).source(source).opType(DocWriteRequest.OpType.INDEX);bulkProcessor.add(request);}/*** 处理批量操作失败*/private void handleBulkFailures(BulkResponse response) {for (BulkItemResponse item : response) {if (item.isFailed()) {BulkItemResponse.Failure failure = item.getFailure();log.error("文档操作失败: index={}, id={}, 原因: {}", item.getIndex(), item.getId(), failure.getMessage());// 记录失败信息,后续重试recordFailedDocument(item.getIndex(), item.getId(), failure);}}} }
7.6 补偿机制
-
数据一致性校验
@Component public class DataConsistencyChecker {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate ElasticsearchRestTemplate esTemplate;/*** 定时校验数据一致性*/@Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行public void checkConsistency() {List<String> tables = getSyncTables();for (String table : tables) {checkTableConsistency(table);}}/*** 检查单表数据一致性*/private void checkTableConsistency(String table) {try {// 1. 查询MySQL数据量long mysqlCount = getMySqlCount(table);// 2. 查询ES数据量long esCount = getEsCount(table);// 3. 对比数量if (mysqlCount != esCount) {log.warn("数据数量不一致: table={}, mysql={}, es={}", table, mysqlCount, esCount);// 4. 记录不一致信息recordInconsistency(table, mysqlCount, esCount);// 5. 触发数据修复if (shouldAutoRepair(table)) {triggerDataRepair(table);} else {sendInconsistencyAlert(table, mysqlCount, esCount);}} else {log.info("数据一致性检查通过: table={}, count={}", table, mysqlCount);}} catch (Exception e) {log.error("数据一致性检查异常: table={}", table, e);}}/*** 获取MySQL数据量*/private long getMySqlCount(String table) {String sql = "SELECT COUNT(*) FROM " + table;return jdbcTemplate.queryForObject(sql, Long.class);}/*** 获取ES数据量*/private long getEsCount(String table) {String indexName = table + "_index";CountRequest countRequest = new CountRequest(indexName);try {CountResponse countResponse = esTemplate.getClient().count(countRequest, RequestOptions.DEFAULT);return countResponse.getCount();} catch (IOException e) {throw new RuntimeException("查询ES数量失败", e);}}/*** 触发数据修复*/private void triggerDataRepair(String table) {log.info("开始修复数据: table={}", table);try {// 1. 暂停增量同步pauseIncrementalSync(table);// 2. 执行全量同步fullSyncService.syncTable(table);// 3. 恢复增量同步resumeIncrementalSync(table);log.info("数据修复完成: table={}", table);} catch (Exception e) {log.error("数据修复失败: table={}", table, e);sendRepairFailureAlert(table, e);}} }
7.7 灾备和恢复方案
-
全量同步服务
@Service public class FullSyncService {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate ElasticsearchRestTemplate esTemplate;@Autowiredprivate CanalPositionManager positionManager;/*** 全量同步表数据*/public void syncTable(String tableName) {log.info("开始全量同步表: {}", tableName);String indexName = tableName + "_index";long totalCount = 0;long successCount = 0;try {// 1. 清空ES索引clearEsIndex(indexName);// 2. 分页查询MySQL数据int pageSize = 5000;int pageNum = 0;while (true) {List<Map<String, Object>> dataList = queryDataByPage(tableName, pageNum, pageSize);if (dataList.isEmpty()) {break;}// 3. 批量写入ESbulkIndexToEs(indexName, dataList);successCount += dataList.size();totalCount += dataList.size();pageNum++;log.info("全量同步进度: table={}, 已处理={}", tableName, totalCount);// 4. 防止内存溢出,每批处理完休息一下if (pageNum % 100 == 0) {Thread.sleep(1000);}}log.info("全量同步完成: table={}, 总记录数={}", tableName, totalCount);} catch (Exception e) {log.error("全量同步失败: table={}", tableName, e);throw new RuntimeException("全量同步失败", e);}}/*** 分页查询数据*/private List<Map<String, Object>> queryDataByPage(String tableName, int pageNum, int pageSize) {int offset = pageNum * pageSize;String sql = String.format("SELECT * FROM %s ORDER BY id LIMIT %d OFFSET %d", tableName, pageSize, offset);return jdbcTemplate.queryForList(sql);}/*** 批量索引到ES*/private void bulkIndexToEs(String indexName, List<Map<String, Object>> dataList) {BulkRequest bulkRequest = new BulkRequest();for (Map<String, Object> data : dataList) {String id = data.get("id").toString();IndexRequest indexRequest = new IndexRequest(indexName).id(id).source(data);bulkRequest.add(indexRequest);}try {BulkResponse bulkResponse = esTemplate.getClient().bulk(bulkRequest, RequestOptions.DEFAULT);if (bulkResponse.hasFailures()) {log.warn("批量索引存在失败: {}", bulkResponse.buildFailureMessage());}} catch (IOException e) {throw new RuntimeException("批量索引失败", e);}} } -
灾难恢复脚本
#!/bin/bash # disaster_recovery.sh# 配置参数 MYSQL_HOST="mysql" CANAL_SERVER="canal-server" ROCKETMQ_NS="rocketmq-namesrv" ES_HOST="elasticsearch" BACKUP_DIR="/data/backup" LOG_FILE="/var/log/disaster_recovery.log"# 日志函数 log() {echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE }# 停止服务 stop_services() {log "停止相关服务..."docker stop canal-adapterdocker stop canal-server# 保留MySQL和RocketMQ运行 }# 备份关键数据 backup_critical_data() {log "备份关键数据..."# 备份Canal位点docker exec canal-server tar -czf /tmp/canal_positions.tar.gz /data/canal/positionsdocker cp canal-server:/tmp/canal_positions.tar.gz $BACKUP_DIR/# 备份RocketMQ消费位点# 这里需要根据实际情况备份RocketMQ的消费进度 }# 恢复Canal位点 restore_canal_positions() {log "恢复Canal位点..."if [ -f "$BACKUP_DIR/canal_positions.tar.gz" ]; thendocker cp $BACKUP_DIR/canal_positions.tar.gz canal-server:/tmp/docker exec canal-server tar -xzf /tmp/canal_positions.tar.gz -C /elselog "警告: 未找到Canal位点备份文件"fi }# 检查数据一致性 check_data_consistency() {log "检查数据一致性..."# 调用数据一致性检查接口curl -X POST http://canal-adapter:8081/check/consistency# 等待检查完成sleep 30 }# 启动服务 start_services() {log "启动服务..."docker start canal-serversleep 10docker start canal-adaptersleep 10 }# 监控恢复状态 monitor_recovery() {log "监控恢复状态..."for i in {1..60}; do# 检查Canal Server状态canal_status=$(curl -s http://canal-server:11112/ | grep -c "OK" || true)# 检查消费延迟delay=$(get_consume_delay)if [ "$canal_status" -gt 0 ] && [ "$delay" -lt 300000 ]; thenlog "恢复完成,系统运行正常"return 0filog "恢复中... Canal状态: $canal_status, 消费延迟: ${delay}ms"sleep 10donelog "错误: 恢复超时"return 1 }# 主恢复流程 main() {log "开始灾难恢复流程..."stop_servicesbackup_critical_datarestore_canal_positionsstart_servicescheck_data_consistencymonitor_recoveryif [ $? -eq 0 ]; thenlog "灾难恢复成功完成"send_recovery_success_alertelselog "灾难恢复失败"send_recovery_failure_alertexit 1fi }# 执行主流程 main "$@"
