h5页面怎么做seo优化技术厂家
时间过得真快,又是一年求职季,再过几个月我也要找暑假实习了,最近比较慢,所有博客文章落灰很久了。今天我们来讲讲关于数据同步。
在一些比较大、用户量比较多、实时性要求比较高的的系统中,我们通常需要进行数据同步。这不只是为了提高系统的并发量,降低数据库访问的压力,提升用户的体验。同时也是为了让系统能够稳定运行,满足特定的场景需求。对于一些购物网站实时性和稳定性的要求是非常高的。
一、数据同步方式
对于一些需要提供高精度查询的网站中,需要使用搜索引擎Elasticsearch来存储一些文章、评论、商品信息等数据,通过搜索引擎对一些特定的数据进行查询,进行筛选然后得到最后的结果。
这时候就要体现数据同步的重要性了,有些网站对于同步实时性的要求是比较高的,有些可能不需要这么高。
1、定时任务
一般来说,如果对数据同步的实时性要求不那么高,像这种情况的话,可以使用定时任务实现
比如1分钟1次,找到MySQL中过去几分钟内(至少是定时周期的2倍)发生改变的数据,然后更新到ES。
如果不会出现同步相同的数据,因为数据的ID是唯一的,如果发现ID相同,会更新数据。
优点:简单易懂、占用资源少、不用引入第三方中间件
缺点:有时间差
应用场景:数据短时间内不同步影响不大、或者数据几乎不发生修改
使用定时任务对于数据同步实时性不高的网站来说,应该可以应付了。就是可能听起来有点low,没啥技术含量。
2、数据双写
在写数据库的时候同时也写ES,更新删除数据,则又需要将ES原本的数据删除。但这种方式太过麻烦,如果ES在写入时挂了,数据写入失败,就会造成数据不一致。
3、Logstack
Logstack是一个数据库和ES进行全量和增量同步数据的管道。但它的性能似乎不是很好,配置起来也很麻烦。这个我们下次再演示
4、Canal
下面这种方式就是我要隆重为大家推荐的。Canal是alibaba搞的一个提供增量数据订阅和消费的数据同步管道。它是通过解析二进制增量日志来实现ES和MySQL数据同步的。
二、Canal数据同步原理
Canal就是一个用来提供增量数据订阅和消费的数据同步管道工具。
文档:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
这张就是来自官网的示例图
可以看到Canal其实上是作为两个服务的一个中间管道进行数据的传输。
那它是怎么从MySQL数据库中偷走数据却没被发现,并且还只偷发生改变的数据呢?
这就不得不提到数据库二进制增量日志binlog了。这哥们,专门记录数据库中所有的 DDL(数据定义语言)语句和 DML(数据操纵语言)语句,但是不包括数据查询语句。
这个日志对于灾难时的数据恢复起着极其重要的作用,MySQL的主从复制, 就是通过该binlog实现的。可想而知,它的重要性了👍
刚才又提到一个概念:”主从复制“。在一些高并发场景下,一个系统可能有多个数据库实例,这些数据库实例会被分成许多个节点——分为主节点和从节点。一般主节点较少主要负责写,从节点较多主要负责读,也就是常说的一主多从。
一个主节点写入数据,然后从节点向主节点发送请求通过解析binlog日志,获取到日志中变化的数据。
那不可以部署多个主节点吗?多个数据库实例来分担写数据的压力,岂不更好?
首先多个主节点写入数据,很可能会出现数据冲突,也很容易出现数据不一致的问题。需要使用分布式事务来保证操作的原子性,增加了系统复杂度,性能也会大幅度降低,这显然是不好的。
Canal工作原理:
Canal把自己伪装成MySQL的从节点,然后模拟从节点的交互协议向主节点发送请求,主节点收到请求后就会推送binlog,然后Canal通过解析binlog获取到增量数据,再将数据传输给ES。
除了ES之外,Canal还可以将数据传输给一些其他的中间件,如Kafka、RocketMQ等等。
三、数据同步实现
1、环境搭建
快速开始:QuickStart · alibaba/canal Wiki · GitHub
环境搭建步骤:
- 开启binlog日志
- 创建MySQL账号,授权canal
- 下载Canal
- 修改canal配置文件,设置账号密码
先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式
找到本地的MySQL中的my.ini 配置文件,修改文件内容
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
下载canal:Releases · alibaba/canal · GitHub(这里我选择1.1.6版本)
修改canal目录canal-deployer-1.1.6\conf\example下的instance.properties文件
修改用户名密码
canal.instance.dbUsername=canalcanal.instance.dbPassword=canal
如果出现打开Canal失败,出现找不到JAVA_HOME,修改一下startup.bat脚本
set JAVA_HOME=C:\Program Files\Java\jdk1.8.0_221
echo %JAVA_HOME%
set PATH=%JAVA_HOME%\bin;%PATH%
echo %PATH%
canal启动成功!!!
2、功能实现
工程示例:ClientExample · alibaba/canal Wiki · GitHub
1)导入依赖
<!--https://github.com/alibaba/canal--><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency>
2)代码实现
代码无需改变,直接复制即可
/*** Cancal监测MySQL数据库数据变化*/
public class SimpleCanalClientExample {
public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}
}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}
}
这样就可以实现MySQL和ES的数据同步了。
今天的分享就到这里,我们下期再见!