阿里开源数据同步神器DataX异构数据源间数据同步基础介绍与快速入门

发布于:2022-11-10 ⋅ 阅读:(1057) ⋅ 点赞:(0)

一、基础概述

1. 什么是DataX?

DataX 是阿里云商用产品 DataWorks 数据集成的开源版本,它是一个异构数据源的离线数据同步工具/平台(ETL工具)。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

DataX.Logo

Tips :

异构即不同类型的应用或者数据源,例如MySQL/Oracle/DB2/MongDB等不同类型的数据库源
离线数据同步以及CDC实时数据复制,前者常用Sqoop以及DataX工具。
ETL(Extract-Transform-Load)工具描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据,其常用在数据仓库,但其对象并不限于数据仓库(DW)。

前面我们说到 DataX 的前身是阿里云商业化产品 DataWorks, 其稳定、高效、支持多样化等优点就不言而喻, DataWorks 致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力,以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户,单日同步数据超过3万亿条。DataWorks 数据集成目前支持离线50+种数据源,可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。商业版本参见(https://www.aliyun.com/product/bigdata/ide)

DataX它有何特点?

答: DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

Github 仓库: https://github.com/alibaba/DataX
Gitee 国内仓库: https://gitee.com/mirrors/DataX

2.DataX的设计思想

描述:为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

简单得说,DataX就像中间商一样为每一个服务对象进行需求供应。

3.DataX的框架设计

描述: DataX本身作为离线数据同步框架,离线(批量)的数据通道通过定义数据来源和去向的数据源和数据集,提供一套抽象化的数据抽取插件(Reader)、数据写入插件(Writer),并基于此框架设计一套简化版的中间数据传输格式,从而实现任意结构化、半结构化数据源之间数据传输。

Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。

Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

例如,将MySQL中的数据离线同步到HDFS之中来展示DataX的框架设计结构。

DataX架构设计流程类似source(数据来源)-> channel(数据存储池中转通道) -> sink (目的地)流程,

4.DataX的运行原理

描述: DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

DataX 调度流程:

描述: DataX完成单个数据同步的作业(Job),当DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。在Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。DataX作业运行起来之后,Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出,否则异常退出,并且进程退出值非0。

核心模块解析:

DataX Job 模块 : 是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

DataX Task : 由Job切分而来, 是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作(包含Reader—>Channel—>Writer流程)。

DataX Schedule 模块 : 将Task组成TaskGroup ,注意单个组的默认并发数量为5(动态概念即同时有5个在运行)。

DataX TaskGroup : 负责启动以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

举例说明,当用户提交一个Datax Job 并且配置了20并非数,目的是将一个100张分别的MySQL数据同步到odps中。

(1) 首先根据分库分表切分成为100个Task。

(2) 根据要达到20个并发,此时我们需要分配4个TaskGroup,简单的说20并发除以每个TaskGroup的默认并发5得到4。

(3) 此时每一个TaskGroup负责以5并发数,共计运行25个Task,简单的说100Task除以4个TaskGroup就得到每个组需要执行的Task数。

5.DataX支持的数据源

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下表 (https://github.com/alibaba/DataX#support-data-channels)

类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL
Oracle
OceanBase
SQLServer
PostgreSQL
DRDS
Kingbase
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储 ODPS
ADS
OSS
OCS
Hologres
AnalyticDB For PostgreSQL
阿里云中间件 datahub 读 、写
SLS 读 、写
阿里云图数据库 GDB
NoSQL数据存储 OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Cassandra
数仓数据存储 StarRocks 读 、
ApacheDoris
ClickHouse
Hive
kudu
无结构化数据存储 TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库 OpenTSDB
TSDB
TDengine

DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。

6.为何选择DataX?

描述: 我们可以从DataX 3.0六大核心优势入手了解我们为何选择它。

(1) 可靠的数据质量监控

完美解决数据传输个别类型失真问题

提供作业全链路的流量、数据量运行时监控

提供脏数据探测

(2) 丰富的数据转换功能 : DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。

(3) 精准的速度控制 : 新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

"speed": {
   "channel": 5,  //并发数
   "byte": 1048576,
   "record": 10000
}

(4) 强劲的同步性能 : DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长

(5) 健壮的容错机制

  • 线程内部重试

  • 线程级别重试

(6) 极简的使用体验

易用: 开箱即用支持linux和windows,只需要短短几步骤就可以完成数据的传输

详细: 在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况

7. DataX 与 Sqoop 间的对比(VS)

应用于企业应用中数据迁移备份,以及供不同接入的应用数据库的应用进行数据的访问。

适用于从事数据采集工作人员,以及企业中从0到1建设阶段IT运维、以及DBA运维管理人员。

二、 Datax 安装使用

1.快速开始

描述: DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

系统环境依赖-System Requirements

  • Linux

  • JDK ( 1.8以上,推荐1.8 )

  • Python ( 推荐 Python2.6.X )

# 国内镜像下载: https://npm.taobao.org/mirrors/python/2.6.9/Python-2.6.9.tgz
export PYTHON_HOME="/usr/local/python27"
wget https://www.python.org/ftp/python/2.7.18/Python-2.7.18.tgz
tar -zxf Python-2.7.18.tgz -C /usr/local/
apt install gcc g++ make
cd /usr/local/Python-2.7.18
./configure --prefix=/usr/local/python27
make && make install
ln -s /usr/local/python27/bin/python2.7 /usr/bin/python
ln -s /usr/local/python27/bin/python2.7 /usr/bin/python2
python --version && python2 --version
  • Apache Maven 3.x (Compile DataX)

2. 安装部署方式

方法一、直接下载 DataX工具包:DataX下载地址

下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

$ export DATAX_HOME="/usr/local/datax"
$ wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
$ tar -zxf datax.tar.gz -C /usr/local/
$ cd ${DATAX_HOME}/bin
$ python ${DATAX_HOME}/bin/datax.py ${DATAX_HOME}/job/job.json
$ ln -s ${DATAX_HOME}/bin/datax.py /usr/local/datax.py
方法二、下载DataX源码,自己编译:DataX源码
  • (1)、下载DataX源码:
    $ git clone git@github.com:alibaba/DataX.git

  • (2)、通过maven打包:

$ cd  {DataX_source_code_home}
$ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
  • (3) 打包成功,日志显示如下:
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 08:12 min
[INFO] Finished at: 2015-12-13T16:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------

打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/
Datax 解压后其结构如下:

$ cd /usr/local/datax
$ /usr/local/datax# tree -d -L 2
├── bin   # 可执行的Python脚本
├── conf  # Datax 配置文件
├── job   # 离线同步任务
├── lib   # 依赖库
├── log   # 任务执行过程日志
├── log_perf
├── plugin # 各类数据库读写插件
│   ├── reader
│   └── writer
├── script # 脚本存放
└── tmp    # 临时目录

3. 运行测试

描述: 采用 Datax 自带的 job/job.json 进行运行测试验证安装环境。

# (1) 显示机器相关信息(CPU/内存、以及JVM相关信息)
2021-10-26 11:20:54.301 [main] INFO  Engine - the machine info  =>
    osInfo: Eclipse Foundation 16 16.0.2+7
    jvmInfo:        Linux amd64 5.4.0-88-generic
    cpu num:        4
    totalPhysicalMemory:    -0.00G
    freePhysicalMemory:     -0.00G
    maxFileDescriptorCount: -1
    currentOpenFileDescriptorCount: -1

    GC Names        [G1 Young Generation, G1 Old Generation]
    MEMORY_NAME                    | allocation_size                | init_size
    CodeHeap 'profiled nmethods'   | 117.21MB                       | 2.44MB
    G1 Old Gen                     | 1,024.00MB                     | 970.00MB
    G1 Survivor Space              | -0.00MB                        | 0.00MB
    CodeHeap 'non-profiled nmethods' | 117.22MB                       | 2.44MB
    Compressed Class Space         | 1,024.00MB                     | 0.00MB
    Metaspace                      | -0.00MB                        | 0.00MB
    G1 Eden Space                  | -0.00MB                        | 54.00MB
    CodeHeap 'non-nmethods'        | 5.57MB                         | 2.44MB

# (2) Job 任务执行情况
2021-10-26 11:21:04.364 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.021s |  All Task WaitReaderTim

# (3) job 任务执行CPU与GC占比信息
2021-10-26 11:21:04.367 [job-0] INFO  JobContainer -
[total cpu info] =>
averageCpu                     | maxDeltaCpu                    | minDeltaCpu
-1.00%                         | -1.00%                         | -1.00%
[total gc info] =>
NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
G1 Young Generation  | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s
G1 Old Generation    | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s

# (4) Job 任务执行完毕总计数据(非常重要) 、可以验证同步的数据是否全部同步成功。
2021-10-26 11:21:04.367 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.021s |  All Task WaitReaderTime 0.041s | Percentage 100.00%
2021-10-26 11:21:04.368 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-10-26 11:20:54
任务结束时刻                    : 2021-10-26 11:21:04
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

上面 Job 读写输出数据为DataX 19890604 1989-06-03 23:00:00 true test

4. 基础使用

从 stream 流读取数据并打印到控制台

描述: 我们可以通过DataX数据源参考指南(https://github.com/alibaba/DataX/wiki/DataX-all-data-channels)来查看具体每个插件需要或者可选的插件。

插件示例获取:

$ ./bin/datax.py -r streamreader -w streamwriter
# (1) 此处将会显示 读写 插件的使用文档说明
Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
Please refer to the streamwriter document:https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md

# (2) 命令执行示例
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json

# (3) Job 任务配置示例 Json 格式 (以下参数我简单描述)
tee job/stream2stream.json <<'EOF'
{
  "job": {
      "content": [
          {
            // 读插件
            "reader": {
                "name": "streamreader", // 指定插件名称
                "parameter": {
                    "column": [  // 字段类与值 (必须进行指定)
                      {
                        "value": "Young、",
                        "type": "string"
                      },
                      {
                        "value": 2022,
                        "type": "long"
                      },
                      {
                        "value": "2021-01-01 00:00:00",
                        "type": "date"
                      },
                      {
                        "value": true,
                        "type": "bool"
                      },
                      {
                        "value": "test",
                        "type": "bytes"
                      }
                    ],
                    "sliceRecordCount": "10" // 切片记录计数
                }
            },
            // 写插件
            "writer": {
                "name": "streamwriter",  // 指定使用的插件名称
                "parameter": {
                    "encoding": "UTF-8",  // 编码格式
                    "print": true         // 是否终端打印
                }
            }
          }
      ],
      "setting": {
          "speed": {             // 同步速度采用的类型
              "channel": "2"     // 并发数
              //"byte": 10485760 // 字节数
          }
      }
  }
}
EOF

执行结果: (执行时请删除上述备注)

python bin/datax.py job/stream2stream.json
# (1) 两个任务进程
2022-10-26 16:28:33.568 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [2] channels for [2] tasks.

# (2) 每个任务进程执行10条 (即总数20条)
2022-10-26 16:28:33.579 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2022-10-26 16:28:33.595 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[1] attemptCount[1] is started
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test
Young、       2022    2022-01-01 00:00:00     true    test

# (3) 执行结果信息
2022-10-26 16:28:43.576 [job-0] INFO  StandAloneJobContainerCommunicator - Total 20 records, 520 bytes | Speed 52B/s, 2 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReade        rTime 0.002s | Percentage 100.00%
2022-10-26 16:28:43.576 [job-0] INFO  JobContainer -
任务启动时刻                    : 2022-10-26 16:28:33
任务结束时刻                    : 2022-10-26 16:28:43
任务总计耗时                    :                 10s
任务平均流量                    :               52B/s
记录写入速度                    :              2rec/s
读出记录总数                    :                  20
读写失败总数                    :                   0

执行后的日志除了终端打印还会在本地日志目录中存放(/usr/local/datax/log/2022-10-26/b_stream2stream_json-16_28_33.312.log)文件。

非常执行同步写入的总次数为setting.speed.channel * sliceRecordCount。

开发指南

配置文件参考 https://github.com/alibaba/DataX

类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL
Oracle
OceanBase
SQLServer
PostgreSQL
DRDS
Kingbase
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储 ODPS
ADS
OSS
OCS
Hologres
AnalyticDB For PostgreSQL
阿里云中间件 datahub 读 、写
SLS 读 、写
阿里云图数据库 GDB
NoSQL数据存储 OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Cassandra
数仓数据存储 StarRocks 读 、
ApacheDoris
ClickHouse
Hive
kudu
无结构化数据存储 TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库 OpenTSDB
TSDB
TDengine

配置好后 执行 python bin/datax.py job/xxx.json

DataX 使用优化

关键参数

➢ job.setting.speed.channel : channel 并发数

➢ job.setting.speed.record : 2 全局配置 channel 的 record 限速

➢ job.setting.speed.byte:全局配置 channel 的 byte 限速

➢ core.transport.channel.speed.record:单个 channel 的 record 限速

➢ core.transport.channel.speed.byte:单个 channel 的 byte 限速

优化 1:提升每个 channel 的速度

在 DataX 内部对每个 Channel 会有严格的速度控制,分两种,一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是 1MB/s,可以根据具体硬件情况设置这个 byte 速度或者 record 速度,一般设置 byte 速度,比如:我们可以把单个 Channel 的速度上限配置为 5MB

优化 2:提升 DataX Job 内 Channel 并发数

并发数 = taskGroup 的数量 * 每个 TaskGroup 并发执行的 Task 数 (默认为 5)。提升 job 内 Channel 并发有三种配置方式:

配置全局 Byte 限速以及单 Channel Byte 限速

Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速

{ 

  "core": { 

    "transport": { 

      "channel": { 

        "speed": { 

          "byte": 1048576 

        } 

      } 

    } 

  }, 

  "job": { 

    "setting": { 

      "speed": { 

        "byte" : 5242880 

      } 

    },     ... 

  } 

} 

core.transport.channel.speed.byte=1048576,job.setting.speed.byte=5242880,所以 Channel

个数 = 全局 Byte 限速 / 单 Channel Byte 限速=5242880/1048576=5 个

配置全局 Record 限速以及单 Channel Record 限速

Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速

{ 

  "core": { 

    "transport": { 

      "channel": { 

        "speed": { 

          "record": 100 

        } 

      } 

    } 

  }, 

  "job": { 

    "setting": { 

      "speed": { 

"record" : 500 

      } 

    },     ... 

  } 

} 

core.transport.channel.speed.record=100 , job.setting.speed.record=500, 所以配置全局

Record 限速以及单 Channel Record 限速,Channel 个数 = 全局 Record 限速 / 单 Channel

Record 限速=500/100=5

直接配置 Channel 个数

只有在上面两种未设置才生效,上面两个同时设置是取值小的作为最终的 channel 数。

{ 

  "job": { 

    "setting": { 

      "speed": { 

        "channel" : 5 

      } 

    },     ... 

  } 

} 

直接配置 job.setting.speed.channel=5,所以 job 内 Channel 并发=5 个

优化 3:提高 JVM 堆内存

当提升 DataX Job 内 Channel 并发数时,内存的占用会显著增加,因为 DataX 作为数据交换通道,在内存中会缓存较多的数据。例如 Channel 中会有一个 Buffer,作为临时的数据交换的缓冲区,而在部分 Reader 和 Writer 的中,也会存在一些 Buffer,为了防止 OOM 等错误,调大 JVM 的堆内存。建议将内存设置为 4G 或者 8G,这个也可以根据实际情况来调整。

调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本;另一种是在启动

的时候,加上对应的参数,如下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到