当前位置: 首页 > news >正文

Canal实时同步MySQL数据到Elasticsearch

一、场景概述与核心组件

目标:将MySQL中的 orders 表的任何变更(INSERT, UPDATE, DELETE)近实时地同步到Elasticsearch的对应索引中,并确保数据一致性和处理效率。

核心组件:

  1. MySQL: 数据源,需开启Binlog。
  2. Canal Server (Canal-Deployer): 模拟MySQL Slave,解析Binlog,将数据变更事件投递出去。
  3. Canal Adapter: 作为Canal的客户端,订阅Canal Server的消息,并将其转换成对Elasticsearch的REST API调用。
  4. Elasticsearch & Kibana: 数据目的地和可视化工具。
  5. 消息队列 (可选,但强烈推荐): 如Kafka/RocketMQ,作为Canal Server和Canal Adapter之间的缓冲层,提升可靠性。

二、高效与可靠架构设计

MySQL → Canal Server → RocketMQ → Canal Adapter → Elasticsearch(Binlog解析)    (消息中转)   (数据转换层)   (数据存储)

整体流程如下:

  1. MySQL开启binlog,并配置Canal Server来读取binlog。
  2. Canal Server将解析后的数据发送到RocketMQ。
  3. Canal Adapter从RocketMQ中消费数据,并写入Elasticsearch。

为什么需要RocketMQ ?

  1. 可靠性: RocketMQ 具有持久化能力。如果Elasticsearch或Canal Adapter宕机,消息会堆积在RocketMQ 中,待恢复后继续消费,防止数据丢失。
  2. 解耦: Canal Server只负责解析和投递到RocketMQ ,无需关心下游处理速度和状态。Canal Adapter可以水平扩展,从RocketMQ 并行消费,提升吞吐量。
  3. 缓冲: 应对流量峰值,防止Elasticsearch被写爆。

三、详细部署与配置案例

步骤1: 准备MySQL

开启Binlog,并设置为 ROW 模式。同时创建一个用于Canal的数据库用户,并授权。

-- 1. 开启MySQL Binlog
-- 修改 my.cnf 配置文件
[mysqld]
# 开启二进制日志
log-bin=mysql-bin
# 使用ROW模式
binlog-format=ROW
# 服务器ID
server_id=1
# 需要同步的数据库
binlog-do-db=your_database
# 二进制日志保留天数
expire_logs_days=7
# 最大二进制日志大小
max_binlog_size=100M-- 2. 创建Canal用户并授权
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;-- 3. 检查Binlog配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
步骤2: 部署并配置Canal Server
1. 编写docker-compose-canal-server.yml 部署文件:
# docker-compose-canal-server.yml
version: '3.8'
services:canal-server:image: canal/canal-server:v1.1.6container_name: canal-serverports:- "11111:11111"environment:- canal.auto.scan=false- canal.destinations=example- canal.instance.master.address=mysql-host:3306- canal.instance.dbUsername=canal- canal.instance.dbPassword=canal_password- canal.instance.connectionCharset=UTF-8- canal.instance.tsdb.enable=true- canal.instance.gtidon=falsevolumes:- ./canal-server/conf:/home/admin/canal-server/conf- ./canal-server/logs:/home/admin/canal-server/logsdepends_on:- rocketmq-broker
2. 修改 conf/canal.properties - 主要配置RocketMQ连接:
# Canal Server 基础配置
canal.id = 1
canal.ip = 0.0.0.0
canal.port = 11111
canal.zkServers =#  destinations
canal.destinations = example
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5# 使用RocketMQ作为MQ
canal.serverMode = rocketMQ# RocketMQ NameServer地址
rocketmq.namesrv.addr = 127.0.0.1:9876# RocketMQ主站地址 (生产环境建议配置集群)
rocketmq.namespace = # 生产者配置
rocketmq.producer.group = canal_producer_group
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =# 批量大小
rocketmq.producer.batchSize = 50# 其他性能参数
rocketmq.producer.retryTimesWhenSendFailed = 3
rocketmq.producer.retryTimesWhenSendAsyncFailed = 3
rocketmq.producer.retryAnotherBrokerWhenNotStoreOK = false
3. 修改 conf/example/instance.properties - 实例级别配置:
# 数据源配置
canal.instance.master.address=mysql-host:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal_password
canal.instance.connectionCharset=UTF-8
canal.instance.defaultDatabaseName=your_database# 表过滤规则
canal.instance.filter.regex=your_database\\..*
canal.instance.filter.black.regex=	# RocketMQ Topic配置
canal.mq.topic=canal_topic# 动态Topic配置(可选)
# canal.mq.dynamicTopic=.*\\..*# 使用Tag进行表级别路由(推荐)
canal.mq.enableDynamicQueuePartition=false# 分区配置 - 按表+主键进行分区,保证顺序性
canal.mq.partition=0
canal.mq.partitionsNum=1# 分区哈希配置,保证同一主键的数据进入同一队列
canal.mq.partitionHash=test.orders:id,test.users:id# 批量大小
canal.mq.batchSize=50# 消息延迟投递(毫秒)
canal.mq.delay=100
步骤3: 部署RocketMQ

使用docker-compose进行RocketMQ 服务的部署,并创建好Topic。

编写docker-compose-rocketmq.yml 部署文件:

# docker-compose-rocketmq.yml
version: '3.8'
services:namesrv:image: rocketmqinc/rocketmq:4.9.4container_name: rocketmq-namesrvports:- "9876:9876"environment:JAVA_OPT: "-Duser.home=/home/rocketmq -Xms512m -Xmx512m -Xmn128m"command: sh mqnamesrvvolumes:- ./data/namesrv/logs:/home/rocketmq/logs- ./data/namesrv/store:/home/rocketmq/storebroker:image: rocketmqinc/rocketmq:4.9.4container_name: rocketmq-brokerports:- "10911:10911"- "10909:10909"environment:NAMESRV_ADDR: "rocketmq-namesrv:9876"JAVA_OPT: "-Duser.home=/home/rocketmq -Xms1g -Xmx1g -Xmn512m"command: sh mqbroker -c /home/rocketmq/conf/broker.confvolumes:- ./data/broker/logs:/home/rocketmq/logs- ./data/broker/store:/home/rocketmq/store- ./conf/broker.conf:/home/rocketmq/conf/broker.confdepends_on:- namesrvconsole:image: styletang/rocketmq-console-ng:latestcontainer_name: rocketmq-consoleports:- "8180:8080"environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- namesrv- broker
步骤4: 部署并配置Canal Adapter (连接RocketMQ)
1. 目录结构
canal.adapter-1.1.7/
├── bin/
│   ├── startup.sh          # 启动脚本
│   └── stop.sh             # 停止脚本
├── conf/
│   ├── application.yml     # 主配置文件
│   ├── es7/                # ES 映射配置目录
│   └── mq/                 # MQ 相关配置
├── lib/                    # 依赖库
├── logs/                   # 日志目录
└── plugin/                 # 插件目录
2. 使用 docker-compose 进行部署

编写docker-compose-canal-adapter.yml 部署文件:

# docker-compose-canal-adapter.yml
version: '3.8'
services:canal-adapter:image: slpcat/canal-adapter:v1.1.6container_name: canal-adapterports:- "8081:8081"environment:- canal.manager.jdbc.url=jdbc:mysql://mysql-host:3306/canal_manager?useUnicode=true- canal.manager.jdbc.username=canal- canal.manager.jdbc.password=canal_passwordvolumes:- ./canal-adapter/conf:/opt/canal-adapter/conf- ./canal-adapter/logs:/opt/canal-adapter/logsdepends_on:- rocketmq-broker- elasticsearch
3. 修改 conf/application.yml 主配置文件:
server:port: 8081undertow:# 线程池配置io-threads: 16worker-threads: 256buffer-size: 1024# 是否开启直接内存分配direct-buffers: truespring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null# Canal Adapter 配置
canal.conf:# 模式: tcp, kafka, rocketMQ, rabbitMQmode: rocketMQ# 是否异步处理async: true# 消费批次大小consumerProperties:canal.topic: canal_topiccanal.group: canal_adapter_group# 批量获取消息数量rocketmq.batch.size: 1000# 消费线程数rocketmq.consume.thread.max: 32# 源数据源配置(用于 ETL 查询)srcDataSources:defaultDS:url: jdbc:mysql://192.168.1.100:3306/business_db?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: canal_userpassword: Canal@123456# 连接池配置maxActive: 20initialSize: 5maxWait: 60000minIdle: 5timeBetweenEvictionRunsMillis: 60000minEvictableIdleTimeMillis: 300000validationQuery: SELECT 1testWhileIdle: truetestOnBorrow: falsetestOnReturn: false# 目标数据源配置(支持多个)canalAdapters:- instance: example  # 对应 canal instance 名称groups:- groupId: g1outerAdapters:# Elasticsearch 适配器- name: es7key: es_businesshosts: 192.168.1.101:9200,192.168.1.102:9200  # ES 集群节点properties:mode: rest  # 传输模式: rest 或 transportsecurity.auth: elastic:Elastic@123456  # 认证信息cluster.name: es-cluster  # 集群名称# 连接池配置maxActive: 20maxTotal: 30socketTimeout: 30000connectTimeout: 5000# 重试配置maxRetryTimeout: 30000# 压缩配置compression: true# 可以配置多个适配器,如同步到其他存储# - name: logger  # 日志适配器,用于调试# - name: rdb     # 关系数据库适配器# - name: hbase   # HBase 适配器# 监控配置monitor:enabled: trueport: 8082
4. 配置Elasticsearch映射文件

在 conf/es7/ 目录下创建映射配置文件,一个Adapter可以配置多个映射文件:

/conf/es7/├── user.yml├── order.yml└── product.yml
  • user.yml - 用户表同步配置

    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    esMapping:_index: "user_index_v1"  # ES 索引名称_type: "_doc"            # 类型(ES7+ 统一为 _doc)_id: _id                 # 文档 ID 字段upsert: true             # 启用 upsert 操作# pk: id                 # 主键字段(用于更新判断)# SQL 查询,这里的sql是虚拟的,实际是从binlog事件中提取字段,但会利用这个sql的表达式来映射字段sql: "SELECT id as _id,       # 必须:将 id 映射为 _idid,username,email,mobile,nickname,avatar,status,created_time,updated_time,DATE_FORMAT(created_time, '%Y-%m-%d %H:%i:%s') as create_time_formatFROM user WHERE updated_time >= :sql_condition"# 提交批次大小commitBatch: 1000# 提交间隔(毫秒)commitInterval: 1000# 字段类型映射skips:- password             # 跳过敏感字段- salt- deleted# 对象字段处理objFields:# 如果 tags 是 JSON 字符串,转换为对象# tags: object# 如果 extra_info 是 JSON 字符串,转换为对象# extra_info: object# etl 条件(用于全量同步)etlCondition: "where updated_time >= '{}'"
    
  • order.yml - 订单表同步配置

    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    esMapping:_index: "order_index_v1"_type: "_doc"_id: _idupsert: truesql: "SELECT CONCAT('order_', id) as _id,  # 复合主键处理id,order_no,user_id,total_amount,actual_amount,discount_amount,status,payment_status,shipping_status,create_time,update_time,-- 计算字段CASE status WHEN 1 THEN '待支付'WHEN 2 THEN '已支付' WHEN 3 THEN '已发货'WHEN 4 THEN '已完成'WHEN 5 THEN '已取消'ELSE '未知'END as status_descFROM `order`"commitBatch: 500etlCondition: "where update_time >= '{}'"
    
步骤5: 部署与启动脚本
5.1 一键启动脚本
#!/bin/bash
# start_sync_system.shset -eecho "开始启动数据同步系统..."# 创建网络
docker network create canal-network || true# 启动RocketMQ
echo "启动RocketMQ..."
docker-compose -f docker-compose-rocketmq.yml up -d# 等待RocketMQ就绪
echo "等待RocketMQ就绪..."
sleep 30# 创建Topic
docker exec rocketmq-broker sh mqadmin updateTopic -t canal_topic -n rocketmq-namesrv:9876 -c DefaultCluster# 启动Canal Server
echo "启动Canal Server..."
docker-compose -f docker-compose-canal-server.yml up -d# 启动Canal Adapter
echo "启动Canal Adapter..."
docker-compose -f docker-compose-canal-adapter.yml up -decho "数据同步系统启动完成!"
echo "RocketMQ控制台: http://localhost:8180"
5.2 数据初始化脚本
-- init_es_mapping.sql
-- 创建Elasticsearch索引映射PUT /user_index
{"settings": {"number_of_shards": 3,"number_of_replicas": 1,"refresh_interval": "1s"},"mappings": {"properties": {"user_id": {"type": "long"},"user_name": {"type": "text","fields": {"keyword": {"type": "keyword"}}},"user_email": {"type": "keyword"},"user_age": {"type": "integer"},"user_status": {"type": "integer"},"age_group": {"type": "keyword"},"create_time": {"type": "date"},"update_time": {"type": "date"},"sync_timestamp": {"type": "date"}}}
}PUT /order_index
{"settings": {"number_of_shards": 3,"number_of_replicas": 1,"refresh_interval": "1s"},"mappings": {"properties": {"order_id": {"type": "long"},"order_no": {"type": "keyword"},"customer_id": {"type": "long"},"total_amount": {"type": "double"},"order_status": {"type": "integer"},"status_text": {"type": "keyword"},"order_date": {"type": "date"},"sync_timestamp": {"type": "date"}}}
}

五、Canal Server 核心原理解析

Canal Server将MySQL的数据变更事件推送到RocketMQ的过程如下:

  1. 在Canal Server的配置中,我们需要设置MQ模式(通常为RocketMQ)以及RocketMQ的NameServer地址、Topic等信息。
  2. Canal Server在启动后,会根据配置的实例(instance)来连接MySQL并解析binlog。
  3. 当有数据变更时,Canal Server会将解析后的事件组装成CanalEntry.Entry的格式,然后通过RocketMQ生产者发送到指定的Topic。
  4. 消息格式:发送到RocketMQ的消息内容通常是CanalEntry.Entry的序列化字节数组,或者根据配置转换为JSON格式。每个Entry对应一个binlog事件,包含了变更数据的详细信息,如表名、操作类型(增删改)、变更前的数据(可选)、变更后的数据等。
  5. 分区规则:为了保持顺序性,通常会将同一张表的数据变更发送到同一个消息队列中,这样可以保证同一张表的数据变更顺序与binlog中的顺序一致。Canal Server在发送消息时,可以指定分区键(例如,表名、主键等),RocketMQ会根据这个分区键将消息路由到对应的队列。
5.1 配置Canal Server连接RocketMQ

首先需要在Canal Server的配置文件中启用RocketMQ作为消息队列:

# canal.properties
canal.serverMode = rocketMQ
canal.mq.servers = 127.0.0.1:9876
canal.mq.retries = 3
canal.mq.batchSize = 50
canal.mq.maxRequestSize = 1024
5.2 实例配置指定目标Topic
# example/instance.properties
canal.mq.topic = canal_topic
# 动态topic配置,按库表路由
canal.mq.dynamicTopic =.*\\..*
# 分区策略
canal.mq.partitionHash = test\\.user:id
5.3 数据同步核心流程
5.3.1 Binlog监听与解析
// Canal Server作为MySQL Slave,通过binlog dump协议获取数据变更
public class CanalServer {public void start() {// 1. 连接MySQL,模拟Slave注册MysqlConnection connection = new MysqlConnection();connection.dump(binlogFileName, binlogPosition);// 2. 解析binlog事件LogEvent event = connection.readEvent();while (event != null) {Entry entry = binlogParser.parse(event);if (entry != null) {// 发送到MQmqProducer.send(entry);}event = connection.readEvent();}}
}
5.3.2 消息封装与发送
public class RocketMQProducer {public void send(Entry entry) {// 1. 构建RocketMQ消息Message message = new Message();message.setTopic(getTopic(entry));// 2. 序列化Entry数据byte[] body = EntrySerializer.serialize(entry);message.setBody(body);// 3. 设置消息属性message.putUserProperty("database", entry.getHeader().getSchemaName());message.putUserProperty("table", entry.getHeader().getTableName());message.putUserProperty("type", entry.getHeader().getEventType().toString());// 4. 计算分区(保证同一主键的数据有序)String partitionKey = calculatePartitionKey(entry);message.setKeys(partitionKey);// 5. 发送到RocketMQSendResult result = defaultMQProducer.send(message);// 6. 更新位点(确保至少一次投递)if (result.getSendStatus() == SendStatus.SEND_OK) {positionManager.updatePosition(entry.getHeader());}}
}
5.3.3 RocketMQ消息结构
{"topic": "canal_topic","keys": "user:12345", // 分区键"tags": "test.user",  // 库表标识"body": {"type": "INSERT/UPDATE/DELETE","database": "test","table": "user","executeTime": 1633046400000,"before": { // 变更前数据(UPDATE/DELETE)"id": 12345,"name": "old_name","age": 25},"after": {  // 变更后数据(INSERT/UPDATE)"id": 12345,"name": "new_name", "age": 26},"sql": "" // 原始SQL(可选)}
}
5.4 关键配置参数
5.4.1 性能调优参数
# 批量发送大小
canal.mq.batchSize = 50
# 发送超时时间
canal.mq.sendTimeout = 3000
# 消息压缩
canal.mq.compression = gzip
# 事务支持
canal.mq.transaction = false
5.4.2 容错配置
# 重试次数
canal.mq.retries = 3
# 位点存储(确保故障恢复)
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# 内存队列大小
canal.instance.memory.buffer.size = 16384
5.4.3 顺序性保证

为了保证数据同步的顺序性,Canal采用分区策略:

public class PartitionHashCalculator {public static String calculate(Entry entry) {// 按表名+主键哈希,确保同一主键的操作发送到同一队列String tableName = entry.getHeader().getTableName();String primaryKey = extractPrimaryKey(entry);return tableName + ":" + primaryKey;}
}

六、Canal Adapter 核心原理解析

Canal Adapter通过预定义的映射配置,将RocketMQ中来自MySQL的binlog消息转换成对Elasticsearch的写操作,从而实现数据的实时同步。具体步骤如下:

  1. Canal Adapter订阅RocketMQ:Canal Adapter启动时,会根据配置文件(application.yml)中设置的RocketMQ NameServer地址、Topic和Group ID,创建一个RocketMQ消费者。
  2. 消息处理:消费者持续从指定的Topic中拉取Canal Server发送的binlog解析消息。将获取到的RocketMQ消息体解析成Canal认识的Message对象,其中包含了:
    • 数据库名、表名
    • 事件类型(INSERT/UPDATE/DELETE)
    • 变更前的数据(before)
    • 变更后的数据(after)
    • 执行时间戳等元数据
  3. 映射配置:映射配置中定义了如何将MySQL的表数据映射到Elasticsearch的索引和文档。例如:
    • 指定MySQL的表名和Elasticsearch索引的对应关系。
    • 指定MySQL表的主键作为Elasticsearch文档的ID。
    • 指定字段的映射关系(如果字段名不同,可以配置映射)。
    • 指定操作类型(insert、update、delete)对应的处理方式。
  4. 写入Elasticsearch:根据映射配置,Canal Adapter将消息体中的数据转换成Elasticsearch的文档,然后通过Elasticsearch的API进行写入操作。具体如下:
    • 对于insert操作,相当于向Elasticsearch索引中添加一个文档。
    • 对于update操作,相当于更新Elasticsearch中对应的文档(根据主键ID)。
    • 对于delete操作,相当于删除Elasticsearch中对应的文档。
  5. 批量提交:为了提高性能,Canal Adapter可能会将多个操作批量提交到Elasticsearch。
6.1 从RocketMQ拉取消息

Canal Adapter作为RocketMQ Consumer,启动后会订阅指定的Topic:

// 伪代码表示Adapter的RocketMQ消费者
public class RocketMQAdapterConsumer {public void start() {// 创建消费者实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("canal_adapter_group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("canal_topic", "*"); // 订阅所有Tag// 注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messages, ConsumeOrderlyContext context) {// 处理批量消息processMessages(messages);return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}
}
6.2 消息解析与路由
  • 消息解析:Adapter接收到RocketMQ消息后,解析消息内容,会将其反序列化为内部的 FlatMessage 或 CanalMessage 对象。
    // Message Body 内容 (FlatMessage格式)
    {"data": [{"id": "1001","order_code": "ORD20231025001","user_id": "1", "amount": "199.99","status": "1","create_time": "2023-10-25 10:00:00","update_time": "2023-10-25 10:00:00"}],"database": "test_db","table": "orders","type": "INSERT","old": null,"es": 1698213600000,"ts": 1698213600000
    }
    
  • 路由过程:Adapter会根据消息中的 database 和 table 字段(例如 test_db.users),去 conf/es7/ 目录下寻找对应的映射配置文件(例如 user_mapping.yml)。
    public class MessageRouter {public void route(FlatMessage flatMessage) {String database = flatMessage.getDatabase(); // "test_db"String table = flatMessage.getTable();       // "orders"// 根据库名和表名查找对应的ES映射配置// 查找规则:conf/es7/ 目录下的 .yml 文件EsMappingConfig mappingConfig = findMappingConfig(database, table);if (mappingConfig != null) {processWithMapping(flatMessage, mappingConfig);}}private EsMappingConfig findMappingConfig(String database, String table) {// 查找 order_mapping.yml, test_db_orders.yml 等配置文件String[] possibleFiles = {table + "_mapping.yml",database + "_" + table + ".yml", "default_mapping.yml"};// ... 实际查找逻辑}
    }
    
6.3 数据映射处理(核心步骤)

根据映射配置,Adapter采用不同的处理策略:

  • 模式A:SQL映射(功能强大,支持复杂转换)
    当在 .yml 文件中配置了 sql 属性时,Adapter会使用此模式。

    # order_mapping.yml
    esMapping:_index: "order_index_v1"_id: "_id"sql: >SELECT o.id as _id,o.order_code,o.amount,o.status,u.name as user_nameFROM orders oLEFT JOIN users u ON o.user_id = u.idWHERE o.id = ?
    

    处理逻辑:

    public class SQLMappingProcessor {public List<Map<String, Object>> process(FlatMessage message, EsMappingConfig config) {List<Map<String, Object>> esDocuments = new ArrayList<>();for (Map<String, String> rowData : message.getData()) {// 1. 提取主键值作为SQL参数String primaryKeyValue = rowData.get("id");// 2. 执行SQL查询(回查数据库)List<Map<String, Object>> queryResult = jdbcTemplate.queryForList(config.getSql(), primaryKeyValue);// 3. 构建ES文档for (Map<String, Object> dbRow : queryResult) {Map<String, Object> esDoc = new HashMap<>();esDoc.put("_index", config.getIndex());esDoc.put("_id", dbRow.get("_id"));esDoc.put("_source", buildSource(dbRow, config));esDocuments.add(esDoc);}}return esDocuments;}
    }
    
  • 模式B:直接映射(性能更高,配置简单)
    当在 .yml 文件中没有配置 sql 属性,并且在 application.yml 中设置了 flatMessage: true 时,Adapter使用此模式。

    # user_mapping.yml
    esMapping:_index: "user_index_v1"_id: "id"# 无sql配置,使用直接映射
    

    处理逻辑:

    public class DirectMappingProcessor {public List<Map<String, Object>> process(FlatMessage message, EsMappingConfig config) {List<Map<String, Object>> esDocuments = new ArrayList<>();for (Map<String, String> rowData : message.getData()) {// 直接使用Binlog中的字段数据Map<String, Object> esDoc = new HashMap<>();esDoc.put("_index", config.getIndex());esDoc.put("_id", rowData.get(config.getIdField()));// 构建_source文档,直接字段映射Map<String, Object> source = new HashMap<>();for (Map.Entry<String, String> entry : rowData.entrySet()) {source.put(entry.getKey(), convertValue(entry.getValue()));}esDoc.put("_source", source);esDocuments.add(esDoc);}return esDocuments;}
    }
    
6.4 构建Elasticsearch请求

Canal Adapter根据MySQL的binlog事件类型(INSERT/UPDATE/DELETE)和配置的映射规则,生成对应的Elasticsearch操作。

6.4.1 INSERT 事件处理

MySQL执行:

INSERT INTO user (id, name, email, age) VALUES (1, '张三', 'zhangsan@example.com', 25);

binlog消息内容:

{"type": "INSERT","table": "user","data": [{"id": 1,"name": "张三","email": "zhangsan@example.com", "age": 25}]
}

Adapter映射配置:

esMapping:_index: user_index_id: idupsert: truesql: "SELECT id, name, email, age FROM user"

生成的Elasticsearch请求:

  • 情况1:upsert: true(推荐)

    // 使用update API实现upsert
    POST /user_index/_update/1
    {"doc": {"id": 1,"name": "张三","email": "zhangsan@example.com","age": 25},"doc_as_upsert": true
    }
    
    • 特点:
      • 如果文档不存在,创建新文档
      • 如果文档已存在,更新文档(虽然INSERT场景下不应该存在)
      • 具有幂等性,可以防止重复插入
  • 情况2:upsert: false

    // 直接创建文档
    PUT /user_index/_doc/1
    {"id": 1,"name": "张三","email": "zhangsan@example.com","age": 25
    }
    
    • 风险: 如果文档已存在会报错,导致同步失败
6.4.2 UPDATE 事件处理

MySQL执行:

UPDATE user SET name = '李四', age = 26 WHERE id = 1;

binlog消息内容:

{"type": "UPDATE", "table": "user","data": [{"id": 1,"name": "李四","email": "zhangsan@example.com","age": 26}],"old": [{"name": "张三","age": 25}]
}

Adapter映射配置:

esMapping:_index: user_index  _id: idupsert: truesql: "SELECT id, name, email, age FROM user"

生成的Elasticsearch请求:

  • 情况1:upsert: true(推荐)

    // 使用update API进行部分更新
    POST /user_index/_update/1
    {"doc": {"id": 1,"name": "李四", "email": "zhangsan@example.com","age": 26},"doc_as_upsert": true
    }
    
    • 特点:
      • 只更新变化的字段
      • 如果文档不存在,会创建新文档(upsert)
      • 性能较好,网络传输数据量小
  • 情况2:upsert: false

    // 使用index API进行全量替换
    PUT /user_index/_doc/1
    {"id": 1,"name": "李四","email": "zhangsan@example.com", "age": 26
    }
    
    • 特点:
      • 整个文档被替换
      • 如果原文档有其他字段,会被覆盖丢失
      • 网络传输数据量较大
6.4.3 DELETE 事件处理

MySQL执行:

DELETE FROM user WHERE id = 1;

binlog消息内容:

{"type": "DELETE","table": "user", "data": [{"id": 1,"name": "李四","email": "zhangsan@example.com","age": 26}]
}

Adapter映射配置:

esMapping:_index: user_index_id: id# DELETE操作不受upsert配置影响sql: "SELECT id FROM user"  // 通常只需要id字段

生成的Elasticsearch请求:

// 直接删除文档
DELETE /user_index/_doc/1
  • 特点:
    • DELETE操作简单直接
    • 只需要文档_id即可执行删除
    • 如果文档不存在,ES会返回404,但通常不影响同步流程
6.5 批量提交

Adapter不会每条消息都立即写入ES,而是会积累一批操作后批量提交.

  • 配置控制:

    esMapping:commitBatch: 1000      # 每积累1000条消息批量提交一次commitInterval: 1000   # 每隔1000ms提交一次
    
  • 示例:
    MySQL执行多条INSERT

    INSERT INTO user (id, name) VALUES (1, '张三'), (2, '李四'), (3, '王五');
    

    生成的Elasticsearch批量请求:

    POST /_bulk
    {"update":{"_index":"user_index","_id":"1"}}
    {"doc":{"id":1,"name":"张三"},"doc_as_upsert":true}
    {"update":{"_index":"user_index","_id":"2"}}  
    {"doc":{"id":2,"name":"李四"},"doc_as_upsert":true}
    {"update":{"_index":"user_index","_id":"3"}}
    {"doc":{"id":3,"name":"王五"},"doc_as_upsert":true}
    
6.6 高级场景处理
场景1:字段映射转换

配置:

esMapping:_index: user_index_id: idsql: "SELECT id, name, email, age, create_time FROM user"fieldMappings:- src: name          # MySQL字段名dest: username     # ES字段名- src: create_timedest: createTime

转换结果:

// MySQL: name → ES: username
// MySQL: create_time → ES: createTime  
POST /user_index/_update/1
{"doc": {"id": 1,"username": "张三","email": "zhangsan@example.com","age": 25,"createTime": "2023-01-01 10:00:00"},"doc_as_upsert": true
}
场景2:条件过滤

配置:

esMapping:_index: user_index_id: idsql: "SELECT id, name, status FROM user"etlCondition: "where status = 1"  # 只同步状态为1的用户

效果:

  • 只有status = 1的INSERT/UPDATE才会同步到ES
  • status != 1的记录变更会被过滤掉
场景3:复杂SQL转换

配置:

esMapping:_index: order_index  _id: order_idsql: "SELECT o.order_id, o.amount, u.username, DATE_FORMAT(o.create_time, '%Y-%m-%d') as create_date FROM order o LEFT JOIN user u ON o.user_id = u.id"

说明:

  • 支持多表关联查询(从binlog消息中提取所需字段)
  • 支持SQL函数处理
  • 但不会真正执行SQL,只是用SQL语法描述字段转换规则

七、如何保障数据不丢失

在Canal实时同步MySQL数据到Elasticsearch的整个过程中,通过以下机制来保障数据同步过程中的数据一致性和故障恢复:

  1. MySQL Binlog环节:

    • 确保MySQL的binlog是开启的,并且设置为ROW模式,因为Canal解析需要ROW模式。
    • 保证binlog的保存时间足够长,以便在同步延迟较大时还能获取到数据。
  2. Canal Server环节:

    • 高可用部署:Canal Server支持集群部署,通过Zookeeper协调多个实例,当其中一个实例宕机时,其他实例可以接管。
    • 位点存储:Canal Server会将解析的binlog位点(position)持久化存储(例如到Zookeeper或本地文件),在重启后可以从位点继续解析,避免重复或丢失。
    • 事务支持:Canal解析binlog时,会按照事务为单位进行处理,确保事务的原子性。
  3. RocketMQ环节:

    • 生产者端:Canal Server作为生产者发送消息到RocketMQ,需要确保消息被成功发送。可以使用同步发送并检查发送结果,或者使用事务消息。
    • Broker端:RocketMQ Broker通过刷盘策略(同步刷盘)和副本机制(多副本)来保证消息不丢失。
    • 消费者端:Canal Adapter作为消费者,需要确保消息被成功消费。使用手动提交位点(ACK机制),只有在处理成功后才提交消费位点,否则重试。
  4. Canal Adapter环节:

    • 重试机制:当处理消息失败时,Canal Adapter应该具备重试机制。可以设置重试次数,超过重试次数后将消息放入死信队列,然后继续处理后续消息,避免阻塞。
    • 幂等性:由于RocketMQ可能重复投递(例如消费端超时提交失败,导致重复消费),所以Canal Adapter需要保证幂等性,即同一消息多次处理不会导致数据不一致。
  5. Elasticsearch环节:

    • 写入确认:Elasticsearch在写入文档时,可以设置等待刷新(refresh)策略,或者使用事务日志(translog)来保证数据不丢失。
    • 集群健康:确保Elasticsearch集群健康,有足够的节点和副本,避免因为节点宕机导致数据丢失。
7.1 MySQL Binlog环节保障
  1. Binlog持久化配置

    -- 确保Binlog可靠持久化
    SET GLOBAL sync_binlog = 1;  -- 每次事务提交都同步刷盘
    SET GLOBAL innodb_flush_log_at_trx_commit = 1;  -- 每次事务提交都刷新redo log-- 查看当前配置
    SHOW VARIABLES LIKE 'sync_binlog';
    SHOW VARIABLES LIKE 'innodb_flush_log_at_trx_commit';
    
  2. Binlog保留策略

    -- 设置足够的Binlog保留时间
    SET GLOBAL expire_logs_days = 7;  -- 保留7天
    SET GLOBAL max_binlog_size = 1073741824;  -- 单个Binlog文件1GB
    
7.2 Canal Server环节保障
  1. 位点持久化机制

    # canal.properties
    # Zookeeper 配置
    canal.zkServers = zk1:2181,zk2:2181,zk3:2181
    canal.instance.global.spring.xml = classpath:spring/default-instance.xml# 位点持久化配置
    canal.instance.global.mode = spring
    canal.file.data.dir = ${canal.conf:../conf}/${canal.instance.destination:}
    canal.file.flush.period = 1000# MQ 配置
    canal.serverMode = rocketMQ
    canal.mq.servers = rocketmq-namesrv:9876
    canal.mq.topic = canal_topic
    canal.mq.partitionsNum = 3
    canal.mq.hashPartitions = true
    canal.mq.partitionHash = .*\\..*:$pk$  # 按主键hash分区# 重试配置
    canal.mq.retries = 3
    canal.mq.batchSize = 50
    canal.mq.flatMessage = true
    
  2. 高可用部署

    # canal集群部署
    version: '3'
    services:canal-server-1:image: canal/canal-serverenvironment:- canal.zkServers=zk1:2181,zk2:2181,zk3:2181- canal.instance.mysql.slaveId=1001canal-server-2:image: canal/canal-serverenvironment:- canal.zkServers=zk1:2181,zk2:2181,zk3:2181- canal.instance.mysql.slaveId=1002
    
  3. 位点管理服务

    @Component
    public class CanalPositionManager {@Autowiredprivate CuratorFramework zkClient;private static final String POSITION_PATH = "/canal/positions/%s";/*** 持久化位点信息*/public void persistPosition(String destination, LogPosition position) {try {String path = String.format(POSITION_PATH, destination);byte[] data = buildPositionData(position);if (zkClient.checkExists().forPath(path) == null) {zkClient.create().creatingParentsIfNeeded().forPath(path, data);} else {zkClient.setData().forPath(path, data);}log.info("持久化位点成功: destination={}, position={}", destination, position);} catch (Exception e) {log.error("持久化位点失败", e);throw new RuntimeException("持久化位点失败", e);}}/*** 加载位点信息*/public LogPosition loadPosition(String destination) {try {String path = String.format(POSITION_PATH, destination);if (zkClient.checkExists().forPath(path) != null) {byte[] data = zkClient.getData().forPath(path);return parsePositionData(data);}} catch (Exception e) {log.error("加载位点失败", e);}return null;}/*** 备份位点到本地文件(双重保障)*/public void backupPositionToFile(String destination, LogPosition position) {try {String fileName = String.format("/data/canal/backup/%s.position", destination);File file = new File(fileName);FileUtils.writeStringToFile(file, position.toString(), "UTF-8");} catch (IOException e) {log.warn("位点本地备份失败", e);}}
    }
    
7.3 RocketMQ环节保障
  1. 生产者可靠发送

    @Component
    public class ReliableCanalProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;private static final int MAX_RETRY_COUNT = 3;private static final String CANAL_TOPIC = "canal_topic";/*** 可靠发送Canal消息*/public SendResult sendReliableMessage(CanalMessage canalMessage) {// 构建消息Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(canalMessage)).setHeader(MessageConst.PROPERTY_KEYS, buildMessageKey(canalMessage)).setHeader(MessageConst.PROPERTY_TAGS, buildMessageTag(canalMessage)).build();// 同步发送,最多重试3次int retryCount = 0;while (retryCount < MAX_RETRY_COUNT) {try {SendResult sendResult = rocketMQTemplate.syncSend(CANAL_TOPIC, message, 3000);if (sendResult.getSendStatus() == SendStatus.SEND_OK) {log.info("消息发送成功: msgId={}, keys={}", sendResult.getMsgId(), buildMessageKey(canalMessage));return sendResult;} else {log.warn("消息发送状态异常: {}", sendResult.getSendStatus());}} catch (Exception e) {log.error("消息发送失败,重试次数: {}", retryCount + 1, e);}retryCount++;if (retryCount < MAX_RETRY_COUNT) {try {Thread.sleep(1000 * retryCount); // 指数退避} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}// 发送失败,持久化到本地等待恢复persistFailedMessage(canalMessage);throw new RuntimeException("消息发送失败,已持久化到本地");}/*** 构建消息Key(用于去重和追踪)*/private String buildMessageKey(CanalMessage message) {return String.format("%s:%s:%s:%s", message.getDatabase(),message.getTable(), message.getPrimaryKey(),message.getExecuteTime());}/*** 构建消息Tag(用于过滤)*/private String buildMessageTag(CanalMessage message) {return String.format("%s_%s", message.getDatabase(), message.getTable());}/*** 持久化失败消息到本地文件*/private void persistFailedMessage(CanalMessage message) {try {String fileName = String.format("/data/canal/failed_messages/%s_%s.msg",System.currentTimeMillis(), UUID.randomUUID().toString());File file = new File(fileName);FileUtils.writeStringToFile(file, JSON.toJSONString(message), "UTF-8");log.warn("消息发送失败,已持久化到本地: {}", fileName);} catch (IOException e) {log.error("持久化失败消息异常", e);}}/*** 恢复失败消息(手动或定时任务调用)*/public void recoverFailedMessages() {File dir = new File("/data/canal/failed_messages");File[] failedFiles = dir.listFiles((d, name) -> name.endsWith(".msg"));if (failedFiles != null) {for (File file : failedFiles) {try {String content = FileUtils.readFileToString(file, "UTF-8");CanalMessage message = JSON.parseObject(content, CanalMessage.class);sendReliableMessage(message);// 发送成功后删除文件FileUtils.forceDelete(file);log.info("恢复失败消息成功: {}", file.getName());} catch (Exception e) {log.error("恢复失败消息异常: {}", file.getName(), e);}}}}
    }
    
  2. Broker持久化配置

    # broker.conf - 可靠存储配置
    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0# 消息存储
    storePathRootDir = /data/rocketmq/store
    storePathCommitLog = /data/rocketmq/store/commitlog# 刷盘策略 - 同步刷盘
    flushDiskType = SYNC_FLUSH# 主从同步 - 同步复制
    brokerRole = SYNC_MASTER
    flushDiskType = SYNC_FLUSH# 事务消息
    transactionTimeout = 6000
    transactionCheckMax = 15
    
  3. 消费者可靠消费

    @Component
    @RocketMQMessageListener(topic = "canal_topic",consumerGroup = "canal_es_adapter_group",consumeMode = ConsumeMode.ORDERLY,  // 顺序消费messageModel = MessageModel.CLUSTERING,  // 集群模式consumeThreadMax = 20,consumeTimeout = 30L
    )
    public class ReliableCanalConsumer implements RocketMQListener<MessageExt> {@Autowiredprivate ElasticsearchRestTemplate esTemplate;@Autowiredprivate IdempotentProcessor idempotentProcessor;@Autowiredprivate OffsetManager offsetManager;private static final int MAX_RETRY_COUNT = 3;@Overridepublic void onMessage(MessageExt message) {String messageId = message.getMsgId();String keys = message.getKeys();try {// 1. 解析消息CanalMessage canalMessage = parseMessage(message);// 2. 幂等检查if (!idempotentProcessor.checkAndSetProcessed(canalMessage)) {log.info("消息已处理,跳过: keys={}", keys);return;}// 3. 处理消息boolean success = processToElasticsearch(canalMessage);if (success) {// 4. 更新消费位点offsetManager.updateOffset(message.getQueueOffset());log.debug("消息处理成功: msgId={}, keys={}", messageId, keys);} else {log.error("消息处理失败,等待重试: msgId={}, keys={}", messageId, keys);throw new RuntimeException("ES处理失败");}} catch (Exception e) {log.error("消费消息异常: msgId={}, keys={}", messageId, keys, e);// 判断重试次数int reconsumeTimes = message.getReconsumeTimes();if (reconsumeTimes >= MAX_RETRY_COUNT) {// 超过重试次数,进入死信队列sendToDlq(message, e);log.error("消息进入死信队列: msgId={}, 重试次数={}", messageId, reconsumeTimes);} else {// 抛出异常让RocketMQ重试throw new RuntimeException("消费失败,需要重试", e);}}}/*** 处理消息到Elasticsearch*/private boolean processToElasticsearch(CanalMessage message) {try {switch (message.getEventType()) {case INSERT:case UPDATE:return handleUpsert(message);case DELETE:return handleDelete(message);default:log.warn("未知事件类型: {}", message.getEventType());return true;}} catch (Exception e) {log.error("ES操作失败", e);return false;}}/*** 处理插入/更新*/private boolean handleUpsert(CanalMessage message) {IndexRequest request = new IndexRequest(message.getIndexName()).id(message.getPrimaryKey()).source(buildSource(message)).opType(DocWriteRequest.OpType.INDEX);  // UPSERT语义try {IndexResponse response = esTemplate.getClient().index(request, RequestOptions.DEFAULT);return response.status().getStatus() == 200 || response.status().getStatus() == 201;} catch (IOException e) {throw new RuntimeException("ES索引操作失败", e);}}/*** 处理删除*/private boolean handleDelete(CanalMessage message) {DeleteRequest request = new DeleteRequest(message.getIndexName()).id(message.getPrimaryKey());try {DeleteResponse response = esTemplate.getClient().delete(request, RequestOptions.DEFAULT);// 删除成功或文档不存在都算成功return response.status().getStatus() == 200 ||response.getResult() == DocWriteResponse.Result.NOT_FOUND;} catch (IOException e) {throw new RuntimeException("ES删除操作失败", e);}}/*** 发送到死信队列*/private void sendToDlq(MessageExt message, Exception cause) {try {DlqMessage dlqMessage = new DlqMessage(message, cause.getMessage());// 发送到死信队列,后续人工处理// dlqProducer.send(dlqMessage);// 同时持久化到本地文件persistDlqMessage(message, cause);} catch (Exception e) {log.error("发送死信队列失败", e);}}
    }
    
7.4 Canal Adapter环节保障
  1. 消费位点管理

    # application.yml - 消费位点配置
    canal.conf:mode: rocketMQmqServers: 127.0.0.1:9876topic: canal_topicgroupId: es_adapter_group# 消费位点持久化offsetPersistence: trueoffsetStore: redis  # 使用Redis存储消费位点# 幂等配置idempotent:enabled: truestore: redisexpireTime: 86400  # 24小时# 重试策略retryCount: 3retryInterval: 1000# 批量处理batchSize: 1000commitBatch: 1000
    
  2. 幂等性处理器

    @Component
    public class IdempotentProcessor {@Autowiredprivate RedisTemplate<String, String> redisTemplate;private static final String IDEMPOTENT_KEY_PREFIX = "canal:idempotent:";private static final long DEFAULT_EXPIRE_TIME = 24 * 60 * 60; // 24小时/*** 检查并设置消息处理状态*/public boolean checkAndSetProcessed(CanalMessage message) {String messageId = buildMessageId(message);String key = IDEMPOTENT_KEY_PREFIX + messageId;// 使用SETNX原子操作Boolean result = redisTemplate.opsForValue().setIfAbsent(key, "processed", Duration.ofSeconds(DEFAULT_EXPIRE_TIME));if (Boolean.TRUE.equals(result)) {log.debug("消息首次处理: {}", messageId);return true;} else {log.info("消息重复,跳过处理: {}", messageId);return false;}}/*** 构建唯一消息ID*/private String buildMessageId(CanalMessage message) {return String.format("%s:%s:%s:%s:%s",message.getDatabase(),message.getTable(),message.getPrimaryKey(),message.getEventType(),message.getExecuteTime());}/*** 清理过期的幂等记录(定时任务调用)*/@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行public void cleanExpiredIdempotentRecords() {try {String pattern = IDEMPOTENT_KEY_PREFIX + "*";Set<String> keys = redisTemplate.keys(pattern);if (keys != null && !keys.isEmpty()) {redisTemplate.delete(keys);log.info("清理过期幂等记录完成,数量: {}", keys.size());}} catch (Exception e) {log.error("清理过期幂等记录异常", e);}}
    }
    
7.5 Elasticsearch 环节保障
  1. ES 写入配置优化

    # elasticsearch.yml 关键配置
    # 索引配置优化
    index:refresh_interval: 30s  # 降低刷新频率提高写入性能number_of_shards: 3number_of_replicas: 1translog:durability: request  # 每次请求都刷translogsync_interval: 5sunassigned:node_left:delayed_timeout: 5m# 集群配置
    cluster:routing:allocation:awareness:attributes: zone
    
  2. 批量写入优化

    @Component
    public class ESBatchProcessor {@Autowiredprivate RestHighLevelClient esClient;private static final int BATCH_SIZE = 1000;private static final int FLUSH_INTERVAL = 1000; // 1秒private final BulkProcessor bulkProcessor;private final AtomicLong successCount = new AtomicLong(0);private final AtomicLong failureCount = new AtomicLong(0);public ESBatchProcessor() {this.bulkProcessor = buildBulkProcessor();}private BulkProcessor buildBulkProcessor() {BulkProcessor.Listener listener = new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long executionId, BulkRequest request) {log.debug("准备执行批量操作,请求数量: {}", request.numberOfActions());}@Overridepublic void afterBulk(long executionId, BulkRequest request, BulkResponse response) {successCount.addAndGet(request.numberOfActions());log.debug("批量操作执行成功,处理数量: {}", request.numberOfActions());if (response.hasFailures()) {log.warn("批量操作存在部分失败: {}", response.buildFailureMessage());handleBulkFailures(response);}}@Overridepublic void afterBulk(long executionId, BulkRequest request, Throwable failure) {failureCount.addAndGet(request.numberOfActions());log.error("批量操作执行失败", failure);// 失败重试或记录日志handleBulkFailure(request, failure);}};return BulkProcessor.builder((request, bulkListener) -> esClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),listener).setBulkActions(BATCH_SIZE).setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)).setFlushInterval(TimeValue.timeValueMillis(FLUSH_INTERVAL)).setConcurrentRequests(2).build();}/*** 添加文档到批量处理器*/public void addDocument(String index, String id, Map<String, Object> source) {IndexRequest request = new IndexRequest(index).id(id).source(source).opType(DocWriteRequest.OpType.INDEX);bulkProcessor.add(request);}/*** 处理批量操作失败*/private void handleBulkFailures(BulkResponse response) {for (BulkItemResponse item : response) {if (item.isFailed()) {BulkItemResponse.Failure failure = item.getFailure();log.error("文档操作失败: index={}, id={}, 原因: {}", item.getIndex(), item.getId(), failure.getMessage());// 记录失败信息,后续重试recordFailedDocument(item.getIndex(), item.getId(), failure);}}}
    }
    
7.6 补偿机制
  1. 数据一致性校验

    @Component
    public class DataConsistencyChecker {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate ElasticsearchRestTemplate esTemplate;/*** 定时校验数据一致性*/@Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行public void checkConsistency() {List<String> tables = getSyncTables();for (String table : tables) {checkTableConsistency(table);}}/*** 检查单表数据一致性*/private void checkTableConsistency(String table) {try {// 1. 查询MySQL数据量long mysqlCount = getMySqlCount(table);// 2. 查询ES数据量long esCount = getEsCount(table);// 3. 对比数量if (mysqlCount != esCount) {log.warn("数据数量不一致: table={}, mysql={}, es={}", table, mysqlCount, esCount);// 4. 记录不一致信息recordInconsistency(table, mysqlCount, esCount);// 5. 触发数据修复if (shouldAutoRepair(table)) {triggerDataRepair(table);} else {sendInconsistencyAlert(table, mysqlCount, esCount);}} else {log.info("数据一致性检查通过: table={}, count={}", table, mysqlCount);}} catch (Exception e) {log.error("数据一致性检查异常: table={}", table, e);}}/*** 获取MySQL数据量*/private long getMySqlCount(String table) {String sql = "SELECT COUNT(*) FROM " + table;return jdbcTemplate.queryForObject(sql, Long.class);}/*** 获取ES数据量*/private long getEsCount(String table) {String indexName = table + "_index";CountRequest countRequest = new CountRequest(indexName);try {CountResponse countResponse = esTemplate.getClient().count(countRequest, RequestOptions.DEFAULT);return countResponse.getCount();} catch (IOException e) {throw new RuntimeException("查询ES数量失败", e);}}/*** 触发数据修复*/private void triggerDataRepair(String table) {log.info("开始修复数据: table={}", table);try {// 1. 暂停增量同步pauseIncrementalSync(table);// 2. 执行全量同步fullSyncService.syncTable(table);// 3. 恢复增量同步resumeIncrementalSync(table);log.info("数据修复完成: table={}", table);} catch (Exception e) {log.error("数据修复失败: table={}", table, e);sendRepairFailureAlert(table, e);}}
    }
    
7.7 灾备和恢复方案
  1. 全量同步服务

    @Service
    public class FullSyncService {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate ElasticsearchRestTemplate esTemplate;@Autowiredprivate CanalPositionManager positionManager;/*** 全量同步表数据*/public void syncTable(String tableName) {log.info("开始全量同步表: {}", tableName);String indexName = tableName + "_index";long totalCount = 0;long successCount = 0;try {// 1. 清空ES索引clearEsIndex(indexName);// 2. 分页查询MySQL数据int pageSize = 5000;int pageNum = 0;while (true) {List<Map<String, Object>> dataList = queryDataByPage(tableName, pageNum, pageSize);if (dataList.isEmpty()) {break;}// 3. 批量写入ESbulkIndexToEs(indexName, dataList);successCount += dataList.size();totalCount += dataList.size();pageNum++;log.info("全量同步进度: table={}, 已处理={}", tableName, totalCount);// 4. 防止内存溢出,每批处理完休息一下if (pageNum % 100 == 0) {Thread.sleep(1000);}}log.info("全量同步完成: table={}, 总记录数={}", tableName, totalCount);} catch (Exception e) {log.error("全量同步失败: table={}", tableName, e);throw new RuntimeException("全量同步失败", e);}}/*** 分页查询数据*/private List<Map<String, Object>> queryDataByPage(String tableName, int pageNum, int pageSize) {int offset = pageNum * pageSize;String sql = String.format("SELECT * FROM %s ORDER BY id LIMIT %d OFFSET %d", tableName, pageSize, offset);return jdbcTemplate.queryForList(sql);}/*** 批量索引到ES*/private void bulkIndexToEs(String indexName, List<Map<String, Object>> dataList) {BulkRequest bulkRequest = new BulkRequest();for (Map<String, Object> data : dataList) {String id = data.get("id").toString();IndexRequest indexRequest = new IndexRequest(indexName).id(id).source(data);bulkRequest.add(indexRequest);}try {BulkResponse bulkResponse = esTemplate.getClient().bulk(bulkRequest, RequestOptions.DEFAULT);if (bulkResponse.hasFailures()) {log.warn("批量索引存在失败: {}", bulkResponse.buildFailureMessage());}} catch (IOException e) {throw new RuntimeException("批量索引失败", e);}}
    }
    
  2. 灾难恢复脚本

    #!/bin/bash
    # disaster_recovery.sh# 配置参数
    MYSQL_HOST="mysql"
    CANAL_SERVER="canal-server"
    ROCKETMQ_NS="rocketmq-namesrv"
    ES_HOST="elasticsearch"
    BACKUP_DIR="/data/backup"
    LOG_FILE="/var/log/disaster_recovery.log"# 日志函数
    log() {echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
    }# 停止服务
    stop_services() {log "停止相关服务..."docker stop canal-adapterdocker stop canal-server# 保留MySQL和RocketMQ运行
    }# 备份关键数据
    backup_critical_data() {log "备份关键数据..."# 备份Canal位点docker exec canal-server tar -czf /tmp/canal_positions.tar.gz /data/canal/positionsdocker cp canal-server:/tmp/canal_positions.tar.gz $BACKUP_DIR/# 备份RocketMQ消费位点# 这里需要根据实际情况备份RocketMQ的消费进度
    }# 恢复Canal位点
    restore_canal_positions() {log "恢复Canal位点..."if [ -f "$BACKUP_DIR/canal_positions.tar.gz" ]; thendocker cp $BACKUP_DIR/canal_positions.tar.gz canal-server:/tmp/docker exec canal-server tar -xzf /tmp/canal_positions.tar.gz -C /elselog "警告: 未找到Canal位点备份文件"fi
    }# 检查数据一致性
    check_data_consistency() {log "检查数据一致性..."# 调用数据一致性检查接口curl -X POST http://canal-adapter:8081/check/consistency# 等待检查完成sleep 30
    }# 启动服务
    start_services() {log "启动服务..."docker start canal-serversleep 10docker start canal-adaptersleep 10
    }# 监控恢复状态
    monitor_recovery() {log "监控恢复状态..."for i in {1..60}; do# 检查Canal Server状态canal_status=$(curl -s http://canal-server:11112/ | grep -c "OK" || true)# 检查消费延迟delay=$(get_consume_delay)if [ "$canal_status" -gt 0 ] && [ "$delay" -lt 300000 ]; thenlog "恢复完成,系统运行正常"return 0filog "恢复中... Canal状态: $canal_status, 消费延迟: ${delay}ms"sleep 10donelog "错误: 恢复超时"return 1
    }# 主恢复流程
    main() {log "开始灾难恢复流程..."stop_servicesbackup_critical_datarestore_canal_positionsstart_servicescheck_data_consistencymonitor_recoveryif [ $? -eq 0 ]; thenlog "灾难恢复成功完成"send_recovery_success_alertelselog "灾难恢复失败"send_recovery_failure_alertexit 1fi
    }# 执行主流程
    main "$@"
    
http://www.dtcms.com/a/585968.html

相关文章:

  • Python 操作 Elasticsearch
  • 微网站建设找哪家公司好中国网站备案取消
  • AI Agent设计模式 Day 3:Self-Ask模式:自我提问驱动的推理链
  • RAG论文阅读笔记
  • 网站建设优化推广修改wordpress数据库域名
  • 西安成品网站建设临沂最新消息
  • 影视网站搭建技术大纲
  • 多粒子模型-简单化学反应1
  • 基于Springboot的影视推荐系统的设计与实现371d749h(程序、源码、数据库、调试部署方案及开发环境)系统界面展示及获取方式置于文档末尾,可供参考。
  • 网站设计与制作说明书应聘网站优化的简历怎么做
  • 网站项目云主机玩游戏怎么样
  • 什么是PMOS?什么是NMOS?两者有什么区别?
  • Selective Kernel Networks (SKNet)
  • Unreal5从入门到精通之 游戏技能系统(Gameplay Ability System)
  • 首钢水钢赛德建设有限公司网站广电如何做视频网站
  • 简洁网站欣赏制作自己的网站代码吗
  • 如何将图片进行压缩-图片压缩格式+压缩方法
  • 桂林临桂区建设局网站seo建站平台哪家好
  • tornado+gunicorn部署设置max_body_size
  • 大鹏网络网站建设报价asp 建站
  • SSM基于Java的医疗器械销售系统oy281(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • CTFHub Web进阶-PHP:Bypass_disable_function通关9之iconv,bypass iconv1,bypass iconv2
  • 排序算法介绍
  • 服装私人订制网站高端网站建设公司名字
  • CSP-S 练习题:美丽的集合(ST表、二分查找、数论基础-GCD 的应用)
  • 建设一个本地网站网站内容怎么编辑
  • 接口测试基础知识
  • 新网站建设的感想做网站虚拟主机配置
  • LeetCode 419 - 棋盘上的战舰
  • 【视觉】对比分析 GigE Vision、USB3Vision、UCV三种协议