一、基础
1、简介
阿里内部使用的离线数据同步工具/平台,实现了MySQL、SQLServer、Oracle、HDFS、Hive等各种异构数据源之间高效的数据同步功能。
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取到reader插件,以及向目标端写入数据的writer插件。
对于异构数据源同步的问题,DataX将复杂的网状链路变成了星型的链路。DataX做为中间传输载体,负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
2、框架
DataX本身作为离线数据同步框架,采用frameWork+plugin架构。将数据源读取和写入抽象成为Reader/writer插件,纳入到整个同步框架中。
Reader:作为数据采集模块,负责采集数据源的数据,将数据发给FrameWork。
Writer:作为数据写入模块,负责不断从FrameWork取数据,并写入到端。
FrameWork:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、流控、并
发、数据转换等核心技术。
3、架构核心
3.1、核心模块介绍
DataX支持单机多线程模式完成同步作业运行,下列是一个作业周期的时序图。
- DataX完成单个数据同步的作业(数据库A数据同步到数据库B)称为Job。将启动一个进程来完成整个作业同步过程。Job模块是单个作业的中枢管理节点,承担了数据清洗、子任务切分(将单一作业计算转换为多个子Task)、TaskGroup管理等功能。
- Job启动后,回根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据同步工作。
- 切分多个Task之后,DataXJob会调用schedule模块,根据配置的并发数量,将拆分的Task重新组合,组装成TaskGroup。每一个TaskGroup负责一定的并发度运行完毕分配好的所有Task,默认单个任务组的并发数量为5。一个并发就是一个channel,一个任务组就是一个进程。
- 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader->channel->writer的线程来完成任务同步工作。
- DataX作业运行起来后,Job监控并等待多个TaskGroup模块完成,等待所有TaskGroup任务完成后,Job成功退出。否则异常退出,进程退出值非0。
3.2、调度流程
例子:用户提交了一个DataX作业,并配置了20个并发,目的是将100张分表的MySQL数据同步到ODPS里面,DataX的调度决策思路是:
- DataXJob根据分库分表分成了100个Task。
- 根据20个并发,DataX计算共需分配4个TaskGroup。(每个组默认5个,20/5=4)
- 4个TaskGroup平分100个Task,每个TaskGroup负责以5个并发,共计25个Task。
理论上是每一个TaskGroup负责25个Task,但实际执行的过程中,每一个Task所需要处理的数据量是不同的,执行耗时也是不同的,所以有可能TaskGroup会分配的多一些,有的分配的少一些。
二、实际操作
1、一个简单的任务
1.1、编写Job的JSON文件
模板可以在dataX安装目录plugin中分别在reader和writer文件夹中查看各个对应的数据库模板(plugin_job_template.json)。
例子:一个基本的streamJob业务。
{
"job": {
"setting": {
"speed": {
"channel": 1 // 并发数(1 表示单线程)
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 5, // 生成 5 条测试数据
"column": [ // 定义两列数据
{
"type": "long", // 第一列是 long 类型
"value": "10" // 值固定为 10
},
{
"type": "string", // 第二列是 string 类型
"value": "Hello DataX" // 值固定为 "Hello DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8", // 编码格式
"print": true // 在控制台打印输出
}
}
}
]
}
}
1.2、执行任务
执行命令:python datax安装目录/bin/dataX.py 任务job的json文件目录
执行结果:
2、动态传参
在JSON的同步方案中,使用类似变量的方式来定义一些可改变的参数。在执行同步时,可以指定这些参数具体的值。(类似于代码中的函数传参)。
写法:在JSON文件中定义:$参数名。
执行时:…… -p “-D参数名=值” job路径
例子:job的JSON文件
执行命令:
3、并发设置
DataX中的执行流程会将Job划分为多个task,并将task使用不同的taskGroup管理,每个task执行时,都是reader—channel—writer组成,channel的数量就决定了并发度。
设置channel数量:
- 直接通过指定channel数量
- 通过Bps计算channel数量
- 通过tps计算channel数量
3.1、直接指定
在JSON文件中进行配置:job.setting.speed.channel。这种方式,channel的Bps默认为1MBps。
3.2、Bps
通过限制整个Job和单个channel的传输速率来设置channel数量。
数量=channel速率 / job速率
设置方式:Job速率:job.setting.speed.byte (单位是字节)
Channel速率:core.transport.channel.speed.byte (单位字节)
3.3、tps
通过限制Job及channel的tps来限速和channel计算。
数量=Job的tps / channel的tps
设置方式:Job:job.setting.speed.record
Channel:core.transport.channel.speed.record
3.4、优先级
同时配置Bps和tps,以小的为准。
Bps和tps都没设置时,则以channel数量配置为准。