当前位置: 首页 > news >正文

【业务场景实战】数据增量同步

时间过得真快,又是一年求职季,再过几个月我也要找暑假实习了,最近比较慢,所有博客文章落灰很久了。今天我们来讲讲关于数据同步。

        在一些比较大、用户量比较多、实时性要求比较高的的系统中,我们通常需要进行数据同步。这不只是为了提高系统的并发量,降低数据库访问的压力,提升用户的体验。同时也是为了让系统能够稳定运行,满足特定的场景需求。对于一些购物网站实时性和稳定性的要求是非常高的。

一、数据同步方式

对于一些需要提供高精度查询的网站中,需要使用搜索引擎Elasticsearch来存储一些文章、评论、商品信息等数据,通过搜索引擎对一些特定的数据进行查询,进行筛选然后得到最后的结果。

这时候就要体现数据同步的重要性了,有些网站对于同步实时性的要求是比较高的,有些可能不需要这么高。

1、定时任务

一般来说,如果对数据同步的实时性要求不那么高,像这种情况的话,可以使用定时任务实现

比如1分钟1次,找到MySQL中过去几分钟内(至少是定时周期的2倍)发生改变的数据,然后更新到ES。

如果不会出现同步相同的数据,因为数据的ID是唯一的,如果发现ID相同,会更新数据。

优点:简单易懂、占用资源少、不用引入第三方中间件

缺点:有时间差

应用场景:数据短时间内不同步影响不大、或者数据几乎不发生修改

使用定时任务对于数据同步实时性不高的网站来说,应该可以应付了。就是可能听起来有点low,没啥技术含量。

2、数据双写

在写数据库的时候同时也写ES,更新删除数据,则又需要将ES原本的数据删除。但这种方式太过麻烦,如果ES在写入时挂了,数据写入失败,就会造成数据不一致。

3、Logstack

Logstack是一个数据库和ES进行全量和增量同步数据的管道。但它的性能似乎不是很好,配置起来也很麻烦。这个我们下次再演示

4、Canal

下面这种方式就是我要隆重为大家推荐的。Canal是alibaba搞的一个提供增量数据订阅和消费的数据同步管道。它是通过解析二进制增量日志来实现ES和MySQL数据同步的。

二、Canal数据同步原理

Canal就是一个用来提供增量数据订阅和消费的数据同步管道工具。

文档:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

这张就是来自官网的示例图

可以看到Canal其实上是作为两个服务的一个中间管道进行数据的传输。

那它是怎么从MySQL数据库中偷走数据却没被发现,并且还只偷发生改变的数据呢?

这就不得不提到数据库二进制增量日志binlog了。这哥们,专门记录数据库中所有的 DDL(数据定义语言)语句和 DML(数据操纵语言)语句,但是不包括数据查询语句。

这个日志对于灾难时的数据恢复起着极其重要的作用,MySQL的主从复制, 就是通过该binlog实现的。可想而知,它的重要性了👍

刚才又提到一个概念:”主从复制“。在一些高并发场景下,一个系统可能有多个数据库实例,这些数据库实例会被分成许多个节点——分为主节点和从节点。一般主节点较少主要负责写,从节点较多主要负责读,也就是常说的一主多从

一个主节点写入数据,然后从节点向主节点发送请求通过解析binlog日志,获取到日志中变化的数据。

那不可以部署多个主节点吗?多个数据库实例来分担写数据的压力,岂不更好?

首先多个主节点写入数据,很可能会出现数据冲突,也很容易出现数据不一致的问题。需要使用分布式事务来保证操作的原子性,增加了系统复杂度,性能也会大幅度降低,这显然是不好的。

Canal工作原理:

Canal把自己伪装成MySQL的从节点,然后模拟从节点的交互协议向主节点发送请求,主节点收到请求后就会推送binlog,然后Canal通过解析binlog获取到增量数据,再将数据传输给ES。

除了ES之外,Canal还可以将数据传输给一些其他的中间件,如Kafka、RocketMQ等等。

三、数据同步实现

1、环境搭建

快速开始:QuickStart · alibaba/canal Wiki · GitHub

环境搭建步骤:

  • 开启binlog日志
  • 创建MySQL账号,授权canal
  • 下载Canal
  • 修改canal配置文件,设置账号密码

先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式

找到本地的MySQL中的my.ini 配置文件,修改文件内容

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

下载canal:Releases · alibaba/canal · GitHub(这里我选择1.1.6版本)

修改canal目录canal-deployer-1.1.6\conf\example下的instance.properties文件

修改用户名密码

 canal.instance.dbUsername=canal
 canal.instance.dbPassword=canal

如果出现打开Canal失败,出现找不到JAVA_HOME,修改一下startup.bat脚本

set JAVA_HOME=C:\Program Files\Java\jdk1.8.0_221
echo %JAVA_HOME%
set PATH=%JAVA_HOME%\bin;%PATH%
echo %PATH%

canal启动成功!!!

2、功能实现

工程示例:ClientExample · alibaba/canal Wiki · GitHub

1)导入依赖

<!--https://github.com/alibaba/canal-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>

2)代码实现

代码无需改变,直接复制即可

/**
 * Cancal监测MySQL数据库数据变化
 */
public class SimpleCanalClientExample {
public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }
        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                           e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                             entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                             entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                             eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

 这样就可以实现MySQL和ES的数据同步了。

今天的分享就到这里,我们下期再见!

 

 

相关文章:

  • 【计算机网络 第8版】谢希仁编著 第一章概述 课后题10、11、28、34解析
  • 使用 Arduino 和 ESP8266 Wi-Fi 模块发送电子邮件
  • python字符串练习题
  • 【QT】-一文读懂抽象类
  • 限流及熔断的场景?
  • 星越L_ 常规车门解锁方式讲解
  • C#中类‌的核心定义
  • git备份or打补丁
  • TCP为什么可靠?
  • Gartner发布量子网络安全策略指南:2030年量子计算将能够破坏传统的加密算法
  • 每日十题八股-2025年3月13日-关于垃圾回收的笔记
  • 【后端开发面试题】每日 3 题(十二)
  • 用 Qt 动画制作炫酷的界面效果:属性动画教程
  • SSM基础专项复习4——Maven项目管理工具(1)
  • 小记一下Zookeeper配置中心的部分原理
  • 使用服务器如何DNS呢
  • 【eNSP实战】基本ACL实现网络安全
  • 前端怎么测网速?
  • Python数据类型进阶——详解
  • 麒麟v10 ARM64架构系统升级mysql数据库从mysql-5.7.27到mysql-8.4.4图文教程
  • 个人备案网站做淘宝客可以用吗/房地产销售
  • 装修网站建设方案书/长春免费网上推广
  • 旅行社b2b网站建设方案/全国人大常委会副委员长
  • 俄罗斯网站域名注册/seo建站技巧
  • 济南网站建设 贯日/外链生成网站
  • 珠海网站建设及优化/百度网页排名怎么提升