当前位置: 首页 > news >正文

使用MySQL的Binlog来同步数据到ES当中

一、技术选型与核心原理

  1. 核心组件
    MySQL Binlog:ROW模式记录数据变更事件(INSERT/UPDATE/DELETE),提供原子性变更流
    Canal/OpenReplicator:伪装MySQL Slave订阅Binlog(本文以Canal 1.1.6为例)
    Kafka:作为消息中间件解耦数据管道,提供削峰填谷能力
    Elasticsearch High Level REST Client:官方推荐写入接口,支持批量提交和重试策略

  2. 同步原理

    MySQL Server → Binlog → Canal Server → Kafka → Consumer → ES Bulk API
    

    伪装从库:Canal通过MySQL Slave协议订阅Binlog
    数据路由:通过Kafka Topic实现分表分索引路由
    最终一致性:通过ACK机制和死信队列保障数据可靠性


二、环境准备与配置

1. MySQL配置(关键步骤)
# my.cnf 配置(需重启MySQL)
[mysqld]
server_id = 1
log_bin = /var/lib/mysql/mysql-bin.log
binlog_format = ROW        # 必须为ROW模式
expire_logs_days = 7       # 避免日志膨胀
binlog_row_image = FULL    # 记录完整行数据
gtid_mode = ON             # 启用GTID(高可用场景)
2. Canal Server部署
# 下载Canal 1.1.6并解压
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

# 配置conf/canal.properties
canal.serverMode = kafka     # 输出到Kafka
kafka.bootstrap.servers = 192.168.1.100:9092
canal.mq.topic=canal_topic   # 按表名动态路由

# 配置conf/example/instance.properties
canal.instance.master.address=192.168.1.101:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\\..*  # 监控所有库表

三、数据管道搭建

1. Kafka Topic规划
Topic名称分区数副本数用途
canal_raw123原始Binlog事件
es_sync123已处理的ES文档事件
2. 消费者程序设计(Java示例)
// 使用Spring Kafka消费并转换数据
@KafkaListener(topics = "canal_raw")
public void syncToES(ConsumerRecord<String, String> record) {
    CanalMessage message = JSON.parseObject(record.value(), CanalMessage.class);
    
    // 转换逻辑
    List<IndexRequest> requests = message.getData().stream()
        .map(row -> {
            IndexRequest request = new IndexRequest("index_name");
            request.id(row.get("id")); // 基于主键幂等
            request.source(row);
            return request;
        }).collect(Collectors.toList());
    
    // 批量写入ES(Bulk API)
    BulkRequest bulkRequest = new BulkRequest();
    requests.forEach(bulkRequest::add);
    esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
}

关键优化点
• 批量提交:每500条或1秒间隔触发Bulk操作
• 重试策略:指数退避重试 + 死信队列记录失败数据


四、数据建模与映射

1. 关系型到文档型转换
// MySQL表结构
CREATE TABLE user (
  id INT PRIMARY KEY,
  name VARCHAR(50),
  tags JSON  # 需要展平为ES嵌套对象
);

// ES Mapping定义
PUT /user
{
  "mappings": {
    "properties": {
      "id": {"type": "keyword"},
      "name": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
      "tags": {
        "type": "nested",  # 处理嵌套结构
        "properties": {
          "tag_name": {"type": "keyword"},
          "create_time": {"type": "date"}
        }
      }
    }
  }
}
2. 同步策略
操作类型处理逻辑
INSERT直接生成IndexRequest
UPDATE根据_id生成UpdateRequest
DELETE生成DeleteRequest(软删除需特殊处理)

五、高可用与监控

  1. 容灾设计
    Canal集群:通过ZooKeeper选举Leader
    Kafka消费者:Consumer Group自动Rebalance
    ES写入:采用跨AZ副本 + 自动故障转移

  2. 监控指标

    # Prometheus监控项
    canal_binlog_lag_seconds  # 同步延迟
    kafka_consumer_lag_total  # 消费堆积
    es_indexing_rate          # 写入吞吐
    
  3. 报警策略
    • Binlog延迟 > 60s
    • 消费堆积 > 10,000条
    • ES Bulk失败率 > 1%


六、扩展性设计

  1. 分库分表同步
    • 通过Canal的canal.instance.filter.regex按正则匹配库表
    • 在Kafka中使用Dynamic Topic Routing(按库表名生成Topic)

  2. 数据清洗中间件

    # 使用Flink处理复杂ETL
    env.addSource(KafkaSource())
      .map(parseCanalMessage)
      .filter(lambda x: x['status'] == 'VALID')  # 数据清洗
      .keyBy(lambda x: x['user_id'])
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .reduce(mergeDuplicateEvents)  # 去重
      .sinkTo(ElasticsearchSink())
    

七、故障恢复与数据校验

  1. 断点续传
    • Canal Server持久化Binlog Position到ZooKeeper
    • Kafka Consumer手动提交Offset

  2. 全量+增量初始化

    # 全量同步流程
    mysqldump --single-transaction > dump.sql
    python transform_to_es_bulk.py dump.sql | es_loader
    
  3. 数据一致性校验
    • 使用Elasticsearch _stats API对比MySQL COUNT
    • 通过checksum比对抽样数据


八、性能压测建议

  1. 基准测试场景
    • 单线程写入 vs 多线程Bulk
    • 不同Bulk Size对吞吐影响(建议256-2048条/批次)
    • ES Refresh Interval调优(从1s调整到30s)

  2. 硬件规格参考

    组件QPS 1万QPS 10万
    Canal Server4C8G8C16G + 独立部署
    Kafka3节点 4C8G6节点 8C32G
    ES集群3节点 8C32G6节点 16C64G

通过该方案可实现毫秒级延迟的数据同步,在日均亿级数据量的生产环境中验证过稳定性。建议在预发布环境进行全链路压测,根据实际业务特征调整参数。

相关文章:

  • Umi-OCR 全家桶
  • vue3:八、登录界面实现-页面初始搭建、基础实现
  • 在小程序中/uni-app中,当没有登录时,点击结算按钮,3s后自动跳转到登录页面
  • 历年云南大学计算机复试上机真题
  • 【安装】kafka单机版升级为3.8.1
  • 各类神经网络学习:(二)RNN 循环神经网络(上集),模型类型和相关知识
  • 分别用树型和UML结构展示java集合框架常见接口和类
  • Swagger 从 .NET 9 中删除:有哪些替代方案
  • java数据结构(复杂度)
  • object.assign和扩展运算法是深拷贝还是浅拷贝,两者区别
  • R语言零基础系列教程-01-R语言初识与学习路线
  • LinuX---Shell正则表达式
  • Redis能否替代MySQL作为主数据库?深入解析两者的持久化差异与适用边界——基于AOF持久化与关系型数据库的对比
  • Java多线程——线程同步
  • 【DeepSeek应用】DeepSeek模型本地化部署方案及Python实现
  • 从零实现Kafka延迟队列:Spring Boot整合实践与原理剖析
  • Golang倒腾一款简配的具有请求排队功能的并发受限服务器
  • 【mysql】centOS7安装mysql详细操作步骤!—通过tar包方式
  • 系统架构设计师—案例分析—数据库篇—关系型数据库设计
  • 蓝桥杯Python赛道备赛——Day5:算术(一)(数学问题)
  • 加强战略矿产出口全链条管控将重点开展哪些工作?商务部答问
  • 杭州钱塘区3宗涉宅用地均以底价成交,共计成交金额25.73亿元
  • 河南信阳拟发文严控预售许可条件:新出让土地开发的商品房一律现房销售
  • 筑牢安全防线、提升应急避难能力水平,5项国家标准发布
  • 铁路部门:确保沿线群众安全,焦柳铁路6个区段将陆续安装防护栅栏
  • 从600名外到跻身大满贯,孙发京:走过的路成就了现在的我