【业务场景实战】数据增量同步
时间过得真快,又是一年求职季,再过几个月我也要找暑假实习了,最近比较慢,所有博客文章落灰很久了。今天我们来讲讲关于数据同步。
在一些比较大、用户量比较多、实时性要求比较高的的系统中,我们通常需要进行数据同步。这不只是为了提高系统的并发量,降低数据库访问的压力,提升用户的体验。同时也是为了让系统能够稳定运行,满足特定的场景需求。对于一些购物网站实时性和稳定性的要求是非常高的。
一、数据同步方式
对于一些需要提供高精度查询的网站中,需要使用搜索引擎Elasticsearch来存储一些文章、评论、商品信息等数据,通过搜索引擎对一些特定的数据进行查询,进行筛选然后得到最后的结果。
这时候就要体现数据同步的重要性了,有些网站对于同步实时性的要求是比较高的,有些可能不需要这么高。
1、定时任务
一般来说,如果对数据同步的实时性要求不那么高,像这种情况的话,可以使用定时任务实现
比如1分钟1次,找到MySQL中过去几分钟内(至少是定时周期的2倍)发生改变的数据,然后更新到ES。
如果不会出现同步相同的数据,因为数据的ID是唯一的,如果发现ID相同,会更新数据。
优点:简单易懂、占用资源少、不用引入第三方中间件
缺点:有时间差
应用场景:数据短时间内不同步影响不大、或者数据几乎不发生修改
使用定时任务对于数据同步实时性不高的网站来说,应该可以应付了。就是可能听起来有点low,没啥技术含量。
2、数据双写
在写数据库的时候同时也写ES,更新删除数据,则又需要将ES原本的数据删除。但这种方式太过麻烦,如果ES在写入时挂了,数据写入失败,就会造成数据不一致。
3、Logstack
Logstack是一个数据库和ES进行全量和增量同步数据的管道。但它的性能似乎不是很好,配置起来也很麻烦。这个我们下次再演示
4、Canal
下面这种方式就是我要隆重为大家推荐的。Canal是alibaba搞的一个提供增量数据订阅和消费的数据同步管道。它是通过解析二进制增量日志来实现ES和MySQL数据同步的。
二、Canal数据同步原理
Canal就是一个用来提供增量数据订阅和消费的数据同步管道工具。
文档:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
这张就是来自官网的示例图
可以看到Canal其实上是作为两个服务的一个中间管道进行数据的传输。
那它是怎么从MySQL数据库中偷走数据却没被发现,并且还只偷发生改变的数据呢?
这就不得不提到数据库二进制增量日志binlog了。这哥们,专门记录数据库中所有的 DDL(数据定义语言)语句和 DML(数据操纵语言)语句,但是不包括数据查询语句。
这个日志对于灾难时的数据恢复起着极其重要的作用,MySQL的主从复制, 就是通过该binlog实现的。可想而知,它的重要性了👍
刚才又提到一个概念:”主从复制“。在一些高并发场景下,一个系统可能有多个数据库实例,这些数据库实例会被分成许多个节点——分为主节点和从节点。一般主节点较少主要负责写,从节点较多主要负责读,也就是常说的一主多从。
一个主节点写入数据,然后从节点向主节点发送请求通过解析binlog日志,获取到日志中变化的数据。
那不可以部署多个主节点吗?多个数据库实例来分担写数据的压力,岂不更好?
首先多个主节点写入数据,很可能会出现数据冲突,也很容易出现数据不一致的问题。需要使用分布式事务来保证操作的原子性,增加了系统复杂度,性能也会大幅度降低,这显然是不好的。
Canal工作原理:
Canal把自己伪装成MySQL的从节点,然后模拟从节点的交互协议向主节点发送请求,主节点收到请求后就会推送binlog,然后Canal通过解析binlog获取到增量数据,再将数据传输给ES。
除了ES之外,Canal还可以将数据传输给一些其他的中间件,如Kafka、RocketMQ等等。
三、数据同步实现
1、环境搭建
快速开始:QuickStart · alibaba/canal Wiki · GitHub
环境搭建步骤:
- 开启binlog日志
- 创建MySQL账号,授权canal
- 下载Canal
- 修改canal配置文件,设置账号密码
先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式
找到本地的MySQL中的my.ini 配置文件,修改文件内容
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
下载canal:Releases · alibaba/canal · GitHub(这里我选择1.1.6版本)
修改canal目录canal-deployer-1.1.6\conf\example下的instance.properties文件
修改用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
如果出现打开Canal失败,出现找不到JAVA_HOME,修改一下startup.bat脚本
set JAVA_HOME=C:\Program Files\Java\jdk1.8.0_221
echo %JAVA_HOME%
set PATH=%JAVA_HOME%\bin;%PATH%
echo %PATH%
canal启动成功!!!
2、功能实现
工程示例:ClientExample · alibaba/canal Wiki · GitHub
1)导入依赖
<!--https://github.com/alibaba/canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
2)代码实现
代码无需改变,直接复制即可
/**
* Cancal监测MySQL数据库数据变化
*/
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
这样就可以实现MySQL和ES的数据同步了。
今天的分享就到这里,我们下期再见!