数据采集与同步
一般,我们说把业务系统数据库如 MySQL、Oracle、MongoDB 的库表数据、日志如埋点等数据以及第三方等其他数据同步至数据仓库之中,称为数据同步,一般同步过来的数据直接原封不动的存放在 ODS(Operational Data Store)贴源层。数据接入是数据入仓的第一关,数据接入后一般可以开发一些数据对账任务(对账系统就是要及时发现哪些数据表缺失,然后系统或者业务人员根据这些缺失数据表及时进行干预处理,确保同步前后保证数据的一致性。),或者配置一些数据质量监控规则,在数据入仓后就做好第一道数据质量防线。
数据同步的方案
1. 直连(JDBC)
object MysqlToHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local").appName("mysql to hive").enableHiveSupport().getOrCreate()// 隐式转换import spark.implicits._// 配置 mysql 数据源val df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/ds_test?useSSl=false").option("driver", "com.mysql.jdbc.Driver").option("user", "ds_read").option("password", "ds#readA0906").option("dbtable", "stud").load()// 创建一个临时视图df.createTempView("stud")// hive 先创建动态分区表spark.sql("create table ds_spark.stud1(name string, age int)")// 动态分区表建好之后就要往里面导入数据spark.sql("insert overwrite table ds_spark.stud1 select name,age from stud")}
}
以上是企业中连接 mysql 的方式,如果你需要连接 Oracle、SQLServer 等数据库,只需要将上述的 jdbc 换成指定的数据库协议即可。
直连同步这也是目前大数据平台离线同步数据的主要手段,但该方式对业务源系统的性能影响可能会较大(因为读取数据表的同时会影响业务库的 qps),比如涉及到大批量数据同步时会拖垮业务系统的性能,如果业务数据库有采用主从库策略的话,那么我们从数据库抽取数据还可以避免对业务系统的影响,但是对于数据量大的表同步,效率将不会太高。
-
优点:实现简单,是大数据平台离线同步的主要手段。
-
缺点:同步数据量大的表,会影响业务系统的性能,谨慎操作,数据库若有主从库策略则要从从库抽数避免对业务系统直接影响,而且还要制定数据接入规范,避免业务高峰期大批量抽数操作,不然随时会带来影响线上服务的风险。
2. 数据文件同步
互联网是不安全的,很多涉密的数据是没有办法直接从公网进行直接拉的,这个时候数据就要以数据文件的形式进行线上或者线下传输。数据文件同步通过约定好的文件编码、大小、格式等,直接从源系统生成数据的文本文件,由专门的文件服务器,如 FTP 服务器传输到目标系统后,加载到目标数据库系统中;甚至涉密级别更高的一些数据直接走线下的 U 盘、硬盘进行来回拷贝。
互联网的日志类数据通常是以文件形式保存的,适合这种同步方式。不过如果通过文件服务器上传下载的话难免会有丢包或者错误的风险,所以在传输数据文件的同时,为了确保数据文件同步的完整性,会一并传输一个校验文件,该校验文件记录了数据文件的数据量以及文件大小等校验信息,以供下游目标系统验证数据同步的准确性。目前在大数据的下载平台上支持三种文件校验算法:asc、md5 及 sha1。
object CsvSource {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local").appName("csv").getOrCreate()// 隐式转换import spark.implicits._// 使用 header 选项// 使用 schema 参数val schema ="""|name string,|age int,|height int|""".stripMarginval df = spark.read// .option("header",true).schema(schema).csv("src/main/resources/data/people.csv")df.write.csv("src/main/resources/data/people1.csv")}}
- 优点:当数据源包含多个异构的数据库系统时,用这种方式比较简单、实用。另外,日志类数据通常是以文本文件形式存在的,也适合使用数据文件同步方式。
- 缺点:通过服务器上传和下载的话难免会出现丢包或者错误的风险,保证数据文件同步的完整性和安全性以及传输效率,可在传输数据文件的同时需要一并传输一个校验文件供目标系统校验,同时在传输存储较大文件时需要对数据文件进行加密和压缩操作
3. 数据库日志解析同步(偏实时)
大多数主流数据库都已经实现了使用日志文件(binlog)进行系统恢复,因为日志文件信息足够丰富,而且数据格式也很稳定,完全可以通过解析日志文件获取发生变更的数据(CDC:Change Database Capture)。因此有了这一机制,我们可以通过监控该文件,通过该文件的变动从而满足全量或者增量数据同步的需求,比如 mysql,一般是通过解析 binlog 日志方式来获取增量的数据更新,并通过消息订阅模式来实现数据的实时同步。
Mysql 的 binlog 一般的产生如下:
优点:
- 日志文件信息足够丰富,数据格式稳定,通过解析日志文件满足增量数据同步的需求。
- 通过网络协议,确保数据文件的正确接收,提供网络传输冗余,保证数据的完整性。
- 实现了实时与准实时同步的能力,延迟控制在毫秒级别。
缺点:
- 当数据更新量超出系统处理峰值,会导致数据延迟。
- 投入较大,需要在源数据库和目标数据库之前部署一个实时数据同步系统。
- 数据漂移和遗漏。
数据同步方式
1. 增量同步
增量同步,就是每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表,通常需要在首日先进行一次全量同步。
insert overwrite table ods.table_in partition (dt='${yyyy-MMdd}')
select *
from table_source -- 业务源表
where create_time >= '调度当日时间'
or update_time >= '调度当日时间'
这样每日增量同步过来的数据都放在增量分区表的每日分区内,可以保存数据的历史变更状态,不过要取历史全量数据的数仓最新状态数据对于增量分区表的话就会取数麻烦些。比如要取全量订单记录当前最新状态,需要先扫描增量分区表历史全部分区得到全量订单数据,再取每笔订单数仓最新的那条状态记录。
2. 全量同步
全量同步,就是每天都将业务数据库中的全部数据同步一份到数据仓库,这是保证两侧数据同步的最简单的方式。
-- 方式一,很少使用
insert overwrite table ods.table_tm
select *
from table_source -- 业务源表-- 方式二,分区全量表(又称为快照表)
insert overwrite table ods.table_tm partition (dt='${yyyy-MM-dd}')
select *
from table_source -- 业务源表
不过全量同步这种抽取数据的方式不建议使用,数据量小还行,因为每日分区存储一份历史全量数据会对 HDFS 存储造成浪费是一方面,当数据量大的话这样做是绝不可行的,会拖垮业务源系统数据库,对业务源数据库造成压力负载,严重时还会造成线上故障。所以建议一般对于小数据量可以选择全量同步,而对于大数据量的数据则选择增量同步。
3. 增量合并全量同步
基于以上增量同步和全量同步 2 种同步数据方式的利弊,结合起来形成一种同步 T+1的增量数据,再合并历史 T+2 的全量数据的抽取数据的方式,这样就在最新分区表里即满足了数仓的全量最新快照数据,同时也降低了对存储的浪费以及对业务源数据库的压力负载,兼容了上述 2 种策略。
-- 增量合并全量方案一:union all,推荐
insert overwrite table 全量表 partition (dt = '${yyyy-MM-dd}')
select *
from
(select* ,row_number() over (partition by 去重主键 order by 时间字段desc) as rkfrom(select *from 增量分区表where 1 = 1and dt = '${yyyy-MM-dd}' -- 增量表 T+1 当前增量数据union allselect *from 全量分区表where 1 = 1and dt = '${yyyy-MM-dd,-1d}' -- 全量表 T+2 历史全量数据) a
) b
where rk = 1; -- 取最新状态数据-- 增量合并全量方案二:full outer join
insert overwrite table 全量表 partition (dt = '${yyyy-MM-dd}')
select-- 优先取增量表最新数据更新覆盖coalesce(a.主键,b.主键) as 主键,...
from
(select *from 增量分区表where dt = '${yyyy-MM-dd}' -- 增量表 T+1 当前增量数据
) a
full outer join
(select *from 全量分区表where dt = '${yyyy-MM-dd,-1d}' -- 全量表 T+2 历史全量数据
) bon a.主键 = b.主键
方案一和方案二这 2 种解决方案的本质上都是用增量数据来更新覆盖历史全量数据,不过通过 full outer join 方式合并前要确保记录是唯一的,这样关联时才不会发生数据发散。
若说 2 种解决方案的性能对比的话就大差不差吧,数据量大的话方案一可能发生的性能瓶颈主要在合并过程的全局排序去重,方案二可能发生的性能瓶颈主要在合并过程的大表关联上,目前在行业中离线数据接入大致也是这种思路-增量合并全量。
数据同步相关技术
离线
sqoop
Sqoop 工作机制是将导入或导出命令翻译成 MapReduce 程序来实现。
Hadoop 生态系统包括:HDFS、Hive、Hbase 等
RDBMS(关系型数据库) 体系包括:Mysql、Oracle、DB2 等
Sqoop 可以理解为:“SQL 到 Hadoop 和 Hadoop 到 SQL 的工具”。
Sqoop 执行的整个过程只有 map 阶段,没有 reduce阶段。
启动验证
#本命令会列出所有 mysql 的数据库。
sqoop list-databases \
--connect jdbc:mysql://localhost:3306/ \
--username root
--password 123456#本命令会列出所有 mysql 的数据库。
sqoop list-tables \
--connect jdbc:mysql://localhost:3306/ds_test \
--username root \
--password 123456 \
全量导入 mysql 表数据到 HDFS
sqoop import \
--connect jdbc:mysql://localhost:3306/ds_test \
--username root\
--password 123456 \
--delete-target-dir \
--target-dir /test_demo/sqoop_test/ \
--fields-terminated-by '\t' \
--table stud \
--m 1
- delete-target-dir:如果路径存在就自动删除
- target-dir:可以用来指定导出数据存放至 HDFS 的目录;
- m:是–num-mappers 的缩写,表示启动 N 个 mapper 任务并行,默认 4
默认用逗号,分隔,可以用–fields-terminated-by ‘\t’:来指定分隔符
全量导入 mysql 表数据到 HIVE
-- 先在hive中创建表
sqoop create-hive-table \
--connect jdbc:mysql://localhost:3306/ds_test \
--username root\
--password 123456 \
--table stud \
--hive-table stud-- 同步
sqoop import \
--connect jdbc:mysql://localhost:3306/ds_test \
--username root\
--password 123456 \
--table stud \
--hive-table stud \
--hive-import \
--m 1
如果hive目标表已经存在了,那么创建任务失败。
导入表数据子集(where 过滤)
where 可以指定从关系数据库导入数据时的查询条件。它执行在数据库服务器相应的SQL 查询,并将结果存储在 HDFS 的目标目录。
sqoop import \
--connect jdbc:mysql://localhost:3306/ds_test \
--username root\
--password 123456 \
--table stud \
--where “name =’Tom’” \
--hive-table stud \
--hive-import \
--m 1
默认是是 append 的模式,可以指定–hive-overwrite 进行数据覆盖
增量导入
在实际工作当中,数据的导入,很多时候都是只需要导入增量数据即可,并不需要将表中的数据每次都全部导入到 hive 或者 hdfs 当中去,这样会造成数据重复的问题。因此一般都是选用一些字段进行增量的导入,sqoop 支持增量的导入数据,用于只导入比已经导入行新的数据行。
-
–check-column col
用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据进行导入,和关系型数据库中的自增字段及时间戳类似。注意:这些被指定的列的类型不能使任意字符类型,如 char、varchar 等类型都是不可以的,同时-- check-column 可以去指定多个列。
-
–incremental mode
增量导入数据分为两种方式:
i、append:基于递增列的增量数据导入,必须为数值型
Ii、lastmodified:基于时间列的数据增量导入,必须为时间戳类型 -
–last-value value
指定自从上次导入后列的最大值(大于该指定的值),也可以自己设定某一值
append 模式
sqoop import \
--connect jdbc:mysql://localhost:3306/ds_test \
--username root\
--password 123456 \
--table stu_inc \
--m 1 \
--target-dir /test_demo/sqoop_test1 \
--incremental append \
--check-column id \
--last-value 3 // 同步时不包含词条数据
- 注意点: --append and --delete-target-dir can not be used together.
lastmodified 模式
sqoop import \
--connect jdbc:mysql://localhost:3306/ds_test \
--username root\
--password 123456 \
--table stu_last \
--target-dir /test_demo/sqoop_test2 \
--m 1 \
--check-column last_modified \
--incremental lastmodified \
--last-value "2023-11-18 20:52:02" \ // 同步时包含此条数据
sqoop 到处 export
默认情况下,sqoop export 将每行输入记录转换成一条 INSERT 语句,添加到目标数据库表中。如果数据库中的表具有约束条件(例如,其值必须唯一的主键列)并且已有数据存在,则必须注意避免插入违反这些约束条件的记录。如果 INSERT 语句失败,导出过程将失败。此模式主要用于将记录导出到可以接收这些结果的空表中。通常用于全表数据导出。
sqoop export \
--connect jdbc:mysql://localhost:3306/ds_test \
--username root\
--password 123456 \
--table employee \
--export-dir /test_demo/emp_data.txt
sqoop缺点
1、Sqoop 属于 Hadoop 生态圈中一员,和 Hadoop 深度的绑定,深受青睐。但随着新的基于内存计算的批处理框架 Spark 的诞生,传统的 MapReduce 逐渐的淡出人们的视野,而 Sqoop 底层与 MapReduce 又是强耦合,不是解耦的。因此,随着企业逐渐 Spark 化,不能与 Spark 结合的 Sqoop 也是被历史淘汰的最大原因之一。
2、Apache Sqoop 在更新最后一个版本 1.4.7 之后就永久从 Apache 退役了,因此如果在使用的过程中如果遇到什么非常棘手的问题,很大可能找不到相应的解决办法,只能通过自己去研读源码,自己去解决相应的问题。这在快速便捷式开发的今天,可能也是被弃用的最大原因之一。
3、只支持常见的 RDBMS 如 MySQL、Oracle 等和常见的 Hadoop 生态圈的 HDFS、Hive 等 , 对于目前上市面上追捧火热的数据分析工具如 StarRocks 、 Doris 、ClickHouse 等的支持为零,主要还是停更的原因。
DataX
https://github.com/alibaba/DataX/blob/master/README.md
https://gitee.com/mirrors/DataX/blob/master/README.md
DataX 是阿里云 DataWorks 数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台,它是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。
为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源。
当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步。DataX 目前已经有了比较全面的插件体系,主流的 RDBMS 数据库、NOSQL、大数据计算系统都已经接入。DataX 支持目前市面上几乎所有的数据库类型,也是让他在国内大展头角的原因。
作为离线数据同步框架,采用 Framework + plugin 架构。将数据源读取、写入抽象成为 Reader/Writer 插件,纳入到整个同步框架:
- Reader:数据采集模块,采集数据源的数据,将数据发送给 Framework
- Writer: 数据写入模块,不断向 Framework 取数据,并将数据写入到目的端
- Framework:连接 reader 和 writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术
相关概念
DataX 完成单个数据同步的作业,称为== Job==,DataX 接受到一个 Job 后,将启动一个进程完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担数据清理、子任务切分(将单一作业计算转化为多个子 Task)、TaskGroup 管理等功能
DataX Job 启动后,会根据不同的源端切分策略,将 Job 切分成多个小的 Task(子任务),以便并发执行。Task 便是 DataX 作业的最小单元,每一个 Task 都会负责一部分数据的同步工作
切分多个 Task 之后,DataX Job 会调用 Scheduler 模块,根据配置的并发数据量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)。每一个 TaskGroup 负责以一定的并发运行完毕分配好的所有 Task,默认单个任务组的并发数量为 5。
每一个 Task 都由 TaskGroup 负责启动,Task 启动后,会固定启动 Reader—
Channel—Writer 的线程来完成任务同步工作
DataX 作业运行起来之后, Job 监控并等待多个 TaskGroup 模块任务完成,等待所有 TaskGroup 任务完成后 Job 成功退出。否则,异常退出,进程退出值非 0
举例来说,用户提交了一个 DataX 作业,并且配置了 20 个并发,目的是将一个 100张分表的 mysql 数据同步到 ods 里面。DataX 的调度决策思路是:
- DataXJob 根据分库分表切分成了 100 个 Task。
- 根据 20 个并发,DataX 计算共需要分配 4 个 TaskGroup。
- 4 个 TaskGroup 平分切分好的 100 个 Task,每一个 TaskGroup 负责以 5 个并发共计运行 25 个 Task
使用案例
任务提交命令:用户需要根据同步数据的数据源和目的地选择相应的 Reader 和Writer,并将 Reader 和 Writer 的信息配置在一个 json 文件中,然后执行命令提交数据同步任务即可,即:python datax.py youPath/job.json
配置文件格式:查看 DataX 配置文件模板可以通过以下命令,如将 mysql 中的数据同步到 hdfs 中可以使用: python datax.py -r mysqlreader -whdfswriter
json 最外层是一个 job,job 包含 setting 和 content 两部分,其中 setting 用于对整个 job 进行配置,content 用户配置数据源和目的地。
读取 MySQL 中的数据存放到 HDFS
MySQLReader 具有两种模式,分别是 TableMode 和 QuerySQLMode,前者使用table,column,where 等属性声明需要同步的数据;后者使用一条 SQL 查询语句声明需要同步的数据。
TableMode
首先创建配置文件:…/job/stud_datax.json
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id", "name", "age"],"connection": [{"jdbcUrl": ["jdbc:mysql://localhost:3306/test_demo"],"table": ["student"]}],"password": "123456","username": "root","where": "name = 'Alice' "}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "name","type": "string"},{"name": "age","type": "int"}],"compress": "gzip","defaultFS": "hdfs://localhost:8020","fieldDelimiter": "\t","fileName": "stud_datax","fileType": "text","path": "/test_demo/mysql_to_datax/","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}
注意:HFDS Writer 并未提供 nullFormat 参数:也就是用户并不能自定义 null 值写到 HFDS 文件中的存储格式。默认情况下,HFDS Writer 会将 null 值存储为空字符串(‘’),而 Hive 默认的 null 值存储格式为\N。所以后期将 DataX 同步的文件导入 Hive表就会出现问题。
解决方案:
一是修改 DataX HDFS Writer 的源码,增加自定义 null 值存储格式的逻辑,参考
https://blog.csdn.net/u010834071/article/details/105506580
二是在 Hive 中建表时指定 null 值存储格式为空字符串(‘’),例如:
DROP TABLE IF EXISTS stud;
CREATE EXTERNAL TABLE stud
(`name` STRING ',`age` INT
) COMMENT '学生表'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'NULL DEFINED AS ''LOCATION '/test_demo/stud';
然后就可以提交任务了
- 在 HDFS 创建/test_demo/目录,使用 DataX 向 HDFS 同步数据时,需确保目标路径已存在
- 进入 DataX 的 bin 目录
- 执行命令 python datax.py …/job/stud_datax.json
QuerySQLMode
首先创建配置文件:…/job/stud_datax.json
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"connection": [{"jdbcUrl": ["jdbc:mysql://localhost:3306/ds_test"],"querySql": ["select name,age from stud where name = 'Alice'"]}],"password": "root","username": "123456 "}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "name","type": "string"},{"name": "age","type": "int"}],"compress": "gzip","defaultFS": "hdfs://ds-bigdata-001:8020","fieldDelimiter": "\t","fileName": "stud_datax","fileType": "text","path": "/test_demo/mysql_to_datax/","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}
其他操作和 TableMode 中类似,不在赘述。
DataX 的性能优化
速度控制
DataX3.0 提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让作业在数据库可承受的范围内达到最佳的同步速度。
注意事项:
- 若配置了总 record 限速,则必须配置单个 channel 的 record 限速
- 若配置了总 byte 限速,则必须配置单个 channe 的 byte 限速
- 若配置了总 record 限速和总 byte 限速,channel 并发数参数就会失效。因为配置了总 record 限速和总 byte 限速之后,实际 channel 并发数是通过计算得到的,计算公式为如下:
{"core": {"transport": {"channel": {"speed": {"byte": 1048576 //单个 channel byte 限速 1M/s}}}},"job": {"setting": {"speed": {"byte": 5242880 //总 byte 限速 5M/s}},...}
}
内存调整
当提升 DataX Job 内 Channel 并发数时,内存的占用会显著增加,因为 DataX作为数据交换通道,在内存中会缓存较多的数据。例如 Channel 中会有一个 Buffer,作为临时的数据交换的缓冲区,而在部分 Reader 和 Writer 的中,也会存在一些Buffer,为了防止 OOM 等错误,需调大 JVM 的堆内存。
建议将内存设置为 4G 或者 8G,这个也可以根据实际情况来调整。
调整 JVM xms xmx 参数的两种方式:
一种是直接更改 datax.py 脚本;
另一种是在启动的时候,加上对应的参数,如下:
python datax.py --jvm="-Xms8G -Xmx8G" yourPath/job.json
实时
Maxwell
Maxwell 是由美国 Zendesk 开源,用 Java 编写的 MySQL 实时抓取工具。实时读取MySQL 二进制日志 Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。
官网地址:http://maxwells-daemon.io/
Maxwell 的工作原理很简单,就是把自己伪装成 MySQL 的一个 slave,然后以slave 的身份假装从 MySQL(master)复制数据。
MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:
- MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志
传递给 slaves 来达到 master-slave 数据一致的目的。 - 自然就是数据恢复了,通过使用 mysqlbinlog 工具来使恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和DML(除了数据查询语句)语句事件
Maxwell 进程启动
Maxwell 进程启动方式有如下两种:
一、使用命令行参数启动 Maxwell 进程
./maxwell --user='root' --password='123456' --host='localhost' --producer=stdout
其中:–producer 生产者模式(stdout:控制台,kafka:kafka 集群)
备注:在实操的时候发现以下错误,因为就是在 maxwell 1.30.0 开始,对 jdk 最低的要求已经是 11。
上述执行的前提是 mysql 已经开启 binlog,且相关的配置如下:
[client]
default_character_set=utf8
[mysqld]
server-id=1
collation_server=utf8_general_ci
character_set_server=utf8
log-bin=mysql-bin
binlog_format=row
expire_logs_days=30
二、修改配置文件,定制化启动 Maxwell
[root@localhost bin]$ cp config.properties.exampleconfig.properties
[root@localhost bin]$ vim config.properties
[root@localhost bin]$ bin/maxwell --config ./config.properties
Maxwell 的缺点
1、对 jdk 的要求比较高,目前 maxwell 的版本是 1.40.6,但是从 1.30 开始对 jdk的支持就已经上升到 11,而目前企业中用的最多的还是 jdk8,因此对企业的实际的生产环境有一定的要求
2、maxwell 对数据是无差别的监控,maxwell 的参数并没有提供类似于 database和 table 这样的参数,意味着就监控这某个源的所有数据库和所有的表,因为不适用于大数据量的环境下,只适合使用数据量比较小的情况
3、只能读取 mysql 源,因此对于目前企业中多源异构的数据源的情况并不能做很好的支撑;并且对输出端的支持也不够多,如果要写入 hive 或者 hdfs 之中,需要借助kafka 作为中间件进行转发
Flink CDC
CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来(binlog),写入到消息中间件中以供其他服务进行订阅及消费。
传统上,数据源的变化通常通过周期性地轮询整个数据集进行检查来实现。但是,这种轮询的方式效率低下且不能实时反应变化。而== CDC 技术则通过在数据源上设置一种机制,使得变化的数据可以被实时捕获并传递给下游处理系统,从而实现了实时的数据变动监控==。
CDC 主要分为基于查询和基于 Binlog 两种方式,先主要了解一下这两种之间的区别:
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors。
使用 Flink CDC,我们可以轻松地构建实时数据管道,对数据变动进行实时响应和处理,为实时分析、实时报表和实时决策等场景提供强大的支持。
flink 与 flink cdc 的版本对应
CDC 实现的原理
通常来讲,CDC 分为主动查询和事件接收两种技术实现模式。
对于主动查询而言,用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。
- 优点:不涉及数据库底层特性,实现比较通用;
- 缺点:要对业务表做改造,且实时性不高,不能确保跟踪到所有的变更记录,且持续的频繁查询对数据库的压力较大。
事件接收模式可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。
- 优点:实时性高,可以精确捕捉上游的各种变动;
- 缺点:部署数据库的事件接收和解析器(例如 Debezium、Canal、Maxwell 等),有一定的学习和运维成本,对一些冷门的数据库支持不够。
综合来看,事件接收模式整体在实时性、吞吐量方面占优,如果数据源是 MySQL、PostgreSQL、MongoDB 等常见的数据库实现,建议使用 Debezium 来实现变更数据的捕获(下图来自 Debezium 官方文档如果使用的只有 MySQL,则还可以用 Canal)。
为什么选择 Flink
上图可以看到,Debezium 官方架构图中,是通过 Kafka Streams 直接实现的 CDC 功能。而我们这里更建议使用 Flink CDC 模块,因为 Flink 相对 Kafka Streams 而言,有如下优势:
- 强大的流处理引擎: Flink 是一个强大的流处理引擎,具备高吞吐量、低延迟、Exactly-Once 语义等特性。它通过基于事件时间的处理模型,支持准确和有序的数据处理,适用于实时数据处理和分析场景。这使得 Flink 成为实现 CDC 的理想选择。
- 内置的 CDC 功能: Flink 提供了内置的 CDC 功能,可以直接连接到各种数据源,捕获数据变化,并将其作为数据流进行处理。这消除了我们自行开发或集成 CDC 解决方案的需要,使得实现 CDC 变得更加简单和高效,开箱即用。
- 多种数据源的支持: Flink CDC 支持与各种数据源进行集成,如关系型数据库(如MySQL、PostgreSQL)、消息队列(如 Kafka、RabbitMQ)、文件系统等。这意味着无论你的数据存储在哪里,Flink 都能够轻松地捕获其中的数据变化,并进行进一步的实时处理和分析。
- 灵活的数据处理能力: Flink 提供了灵活且强大的数据处理能力,可以通过编写自定义的转换函数、处理函数等来对 CDC 数据进行各种实时计算和分析。同时,Flink 还集成了 SQL 和 Table API,为用户提供了使用 SQL 查询语句或 Table API 进行简单查询和分析的方式
- 完善的生态系统: Flink 拥有活跃的社区和庞大的生态系统,这意味着你可以轻松地获取到丰富的文档、教程、示例代码和解决方案。此外,Flink 还与其他流行的开源项目(如 Apache Kafka、Elasticsearch)深度集成,提供了更多的功能和灵活性。
Flink CDC 特性
- 支持读取数据库快照,即使出现故障也能继续读取 binlog,并进行 Exactly-once 处理
- DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka(已经嵌套)
- Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监视单个表上的更改。
下表显示了连接器的当前特性
用法实例
在使用 flink cdc 开发数据同步时,需要一定的编程基础,相关的依赖如下:
<properties><flink.version>1.13.0</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target>
</properties><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>${flink.version}</version>
</dependency><!-- flink 核心 API -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version>
</dependency>
并在开发前,mysql 已经开启 binlog,且相关的配置如下:
[client]
default_character_set=utf8
[mysqld]
server-id=1
collation_server=utf8_general_ci
character_set_server=utf8
log-bin=mysql-bin
binlog_format=row
expire_logs_days=30
DataStream API 的用法
public class CDCDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(3000);DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().hostname("localhost").port(3306).username("root").password("123456").databaseList("lili")// 这里一定要是 db.table 的形式.tableList("lili.test_cdc").deserializer(new StringDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()) // 全量.build();DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);dataStreamSource.print();env.execute("CDCDemo");}
}
Flink SQL API 的用法
public class FlinkSQLCDC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(3000);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("CREATE TABLE test_cdc (" +" id int primary key," +" name STRING," +" age int" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'scan.startup.mode' = 'latest-offset'," +" 'hostname' = 'localhost'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '123456'," +" 'database-name' = 'lili'," +" 'table-name' = 'test_cdc'" +")");Table table = tableEnv.sqlQuery("select * from test_cdc");DataStream<Tuple2<Boolean, Row>> dataStreamSource = tableEnv.toRetractStream(table, Row.class);dataStreamSource.print();env.execute("FlinkSQLCDC");}
}