DataX 框架学习笔记
官方仓库:
https://github.com/alibaba/DataX?tab=readme-ov-file
1. 介绍
1.1. 基本介绍:
DadaX 是阿里云 DataWorks 数据集成 的开源版本(异构数据同步、离线数据同步工具 / 平台)。主要抽象为 Reader 和 Writer 插件,管理源数据和目标数据的读和写。
1.2. DataX 3.0 框架(插件式、Reader、writer ):
Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给 Framework。
- Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端
- Framework:Framework 用于连接 reader 和 writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。目前支持单机多线程的模式完成同步作业任务。
1.2.1. 官方例子:
- 单个 Job 可以分为多个 task,一个 TaskGroup 可以按设置的并发度执行任务。
- 核心模块介绍:
- DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
- DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
- 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
- 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
- DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
- DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
- DataXJob根据分库分表切分成了100个Task。
- 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
- 根据20个并发,DataX计算共需要分配4个TaskGroup。
- 数据来源多、封装成网页平台(数据平台)优先选择 DataX (单进程多线程、日志完善)
1.3. 安装使用
- 由于 datax 依赖于 java 1.8 及 以上,和 python 2 (3) 均可以,需要提前安装 JDK 和 Conda 环境。
- datax 安装:直接下载 DataX工具包:DataX下载地址 下载后解压至本地某个目录,进入bin目录,即可运行同步作业:
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
# 自检脚本:
$ python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json
2. 案例 Demo
2.1. Stream To Stream Demo
- 可以在
datax
目录下使用bin/datax.py -r YOURreader -w YOURwriter
查看当前组合的 json 配置模板。
- 书写配置文件,将配置文件放入
datax/job
路径下,执行python bin/datax.py job/demo.json
命令。
{"job": {"content": [{"reader": {"name": "streamreader","parameter": {"column": [{"type": "string","value": "xy"},{"type": "string","value": "25"}],"sliceRecordCount": "10"}},"writer": {"name": "streamwriter","parameter": {"encoding": "UTF-8","print": true}}}],"setting": {"speed": {"channel": "1"}}}
}
2.2. 从 Mysql 读取数据存放到 HDFS
- HDFS 是为大数据而生的分布式文件系统,具备高容错、高吞吐、强扩展性,适合存储海量的结构化和非结构化数据
- mysqlreader 参数解析:
- hdfswrite 参数解析:
- 其参数配置如下:
{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": [],"connection": [{"jdbcUrl": [],"table": [],"querySql":[]}],"password": "","username": "","where": ""}},"writer": {"name": "hdfswriter","parameter": {"column": [],"compress": "","defaultFS": "","fieldDelimiter": "","fileName": "","fileType": "","path": "","writeMode": ""}}}],"setting": {"speed": {"channel": ""}}}
}
- 如何链路中有部分线程的内容失败,datax 会回滚部分成功的数据。
2.2.1. Mysql (准备) 建库建表
$ mysql -uroot -p
create database datax;
use datax;
create table student(id int, name varchar(20));
// 插入数据
insert into student values(1001,'zhangsan'),(1002,'lisi'),(1003,'wangwu');
2.2.2. hdfs 准备
- 安装 hdfs 需要前置准备 Java 8+。
- 下载并解压 Hadoop:
wget https://downloads.apache.org/hadoop/common/hadoop-3.4.1/hadoop-3.4.1.tar.gz
tar -zxvf hadoop-3.4.1.tar.gz -C ~ # 解压到根路径下
cd hadoop-3.4.1
- 配置环境变量:
vim ~/.bashrc
# 加入以下内容:
export HADOOP_HOME=你的路径/hadoop-3.4.1
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATHsource ~/.bashrc
- 配置 HDFS , 进入
$HADOOP_HOME/etc/hadoop/
,修改如下文件 :
core-site.xml
:
<configuration><property><name>fs.defaultFS</name><value>hdfs://localhost:9000</value></property>
</configuration>
hdfs-site.xml
:
<configuration><property><name>dfs.replication</name><value>1</value> <!-- 单节点设为 1 --></property><property><name>dfs.namenode.name.dir</name><value>file:///home/YOURUSER/hadoopdata/hdfs/namenode</value></property><property><name>dfs.datanode.data.dir</name><value>file:///home/YOURUSER/hadoopdata/hdfs/datanode</value></property>
</configuration>
替换 youruser
为当前用户名,并确保创建了对应目录:
mkdir -p ~/hadoopdata/hdfs/namenode
mkdir -p ~/hadoopdata/hdfs/datanode
5. 格式化并启动 HDFS,并验证启动。
# 格式化 HDFS
hdfs namenode -format# 启动 NameNode 和 DataNode
start-dfs.sh# 查看 Java 进程(有 namenode 和 datanode 即成功)
jps# 输出类似:
12345 NameNode
12346 DataNode# 测试命令是否可用:
hdfs dfs -mkdir /demo
hdfs dfs -ls /
6. 测试 JSON,以及测试结果
{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["id","name"],"connection": [{"jdbcUrl": ["jdbc:mysql://localhost:3306/datax?useUnicode=true&characterEncoding=UTF-8"],"table": ["student"]}],"password": "123456","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "int"},{"name": "name","type": "string"}],"defaultFS": "hdfs://localhost:9000","fieldDelimiter": "|","fileName": "student.txt","fileType": "text","path": "/","writeMode": "append"}}}],"setting": {"speed": {"channel": "1"}}}
}
内部有保证数据一致性的判断,当多线程环境执行完后需要对比是否全部任务成功,否则触发回滚机制。
- 可以看到管理界面下有成功同步的两个 student 文件。
3. DataX 原理分析
3.1. datax.py
- 配置参数:
-j
可以指定 jvm 参数,可以用于配置堆内存。- 在该文件中启动了 java 执行
com.alibaba.datax.core.Engine
启动类。
3.2. JobContainor
有完整的生命周期管理(对应上方结构图的描述):
- init()
-
- reader 初始化。(根据不同的配置文件)
- writer 初始化。
- prepare()
-
- 调用插件,做一些基本准备工作,清理等。
- split()
-
- task 切分逻辑
- schedule()
- post()
3.2.1. Task 切分
- 调整 channel 数量(并发数的确定)
在配置中主要分三个模块 reader writer setting,setting 中可以配置 channel、byte、record(数据条数),源码中选择如果设定了 数据量或带宽的速度,算出来的 channel 以小的为主。直接指定channel 的优先级最低。
3.2.2. Schedule 调度
根据拆分中计算的任务数和并发数取最小值,避免资源浪费。
- assignFairly 将 task 公平的分配到 taskgroup
3.3. 优化(并发、限流)
- job.setting.speed.channel : channel 并发数
- job.setting.speed.record : 全局配置 channel 的 record 限速
- job.setting.speed.byte:全局配置 channel 的 byte 限速
- core.transport.channel.speed.record:单个 channel 的 record 限速
- core.transport.channel.speed.byte:单个 channel 的 byte 限速
3.3.1. 优化1:提升每个 channel 的速度
在 DataX 内部对每个 Channel 会有严格的速度控制,
- 控制每秒同步的记录数 record。
- 每秒同步的字节数 byte,默认的速度限制是 1MB/s,可以根据具体硬件情况设
置这个 byte 速度或者 record 速度,一般设置 byte 速度,比如:我们可以把单个 Channel 的
速度上限配置为 5MB。
3.3.2. 优化 2:提升 DataX Job 内 Channel 并发数
并发数 = taskGroup 的数量 * 每个 TaskGroup 并发执行的 Task 数 (默认为 5)。
提升 job 内 Channel 并发有三种配置方式:
- Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速。
- Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速。
- 直接配置 Channel 个数。
3.3.3. 优化 3:提高 JVM 堆内存
当提升 DataX Job 内 Channel 并发数时,内存的占用会显著增加,因为 DataX 作为数据
交换通道,在内存中会缓存较多的数据。例如 Channel 中会有一个 Buffer,作为临时的数据
交换的缓冲区,而在部分 Reader 和 Writer 的中,也会存在一些 Buffer,为了防止 OOM 等错
误,调大 JVM 的堆内存。
建议将内存设置为 4G 或者 8G,这个也可以根据实际情况来调整。
调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本;另一种是在启动
的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
-Xms8G JVM 堆内存 初始大小
-Xmx8G JVM 堆内存 最大大小
设置一致防止内存抖动 。