DataX:(一)入门

发布于:2025-07-12 ⋅ 阅读:(35) ⋅ 点赞:(0)

一、基础

1、简介

        阿里内部使用的离线数据同步工具/平台,实现了MySQL、SQLServer、Oracle、HDFS、Hive等各种异构数据源之间高效的数据同步功能。

        DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取到reader插件,以及向目标端写入数据的writer插件。

        对于异构数据源同步的问题,DataX将复杂的网状链路变成了星型的链路。DataX做为中间传输载体,负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

2、框架

        DataX本身作为离线数据同步框架,采用frameWork+plugin架构。将数据源读取和写入抽象成为Reader/writer插件,纳入到整个同步框架中。

       Reader:作为数据采集模块,负责采集数据源的数据,将数据发给FrameWork。

       Writer:作为数据写入模块,负责不断从FrameWork取数据,并写入到端。

       FrameWork:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、流控、并
              发、数据转换等核心技术。

3、架构核心

3.1、核心模块介绍

         DataX支持单机多线程模式完成同步作业运行,下列是一个作业周期的时序图。

        

  1. DataX完成单个数据同步的作业(数据库A数据同步到数据库B)称为Job。将启动一个进程来完成整个作业同步过程。Job模块是单个作业的中枢管理节点,承担了数据清洗、子任务切分(将单一作业计算转换为多个子Task)、TaskGroup管理等功能。
  2. Job启动后,回根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据同步工作。
  3. 切分多个Task之后,DataXJob会调用schedule模块,根据配置的并发数量,将拆分的Task重新组合,组装成TaskGroup。每一个TaskGroup负责一定的并发度运行完毕分配好的所有Task,默认单个任务组的并发数量为5。一个并发就是一个channel,一个任务组就是一个进程。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader->channel->writer的线程来完成任务同步工作。
  5. DataX作业运行起来后,Job监控并等待多个TaskGroup模块完成,等待所有TaskGroup任务完成后,Job成功退出。否则异常退出,进程退出值非0。

3.2、调度流程

        例子:用户提交了一个DataX作业,并配置了20个并发,目的是将100张分表的MySQL数据同步到ODPS里面,DataX的调度决策思路是:

  1. DataXJob根据分库分表分成了100个Task。
  2. 根据20个并发,DataX计算共需分配4个TaskGroup。(每个组默认5个,20/5=4)
  3. 4个TaskGroup平分100个Task,每个TaskGroup负责以5个并发,共计25个Task。

        理论上是每一个TaskGroup负责25个Task,但实际执行的过程中,每一个Task所需要处理的数据量是不同的,执行耗时也是不同的,所以有可能TaskGroup会分配的多一些,有的分配的少一些。

二、实际操作

1、一个简单的任务

1.1、编写Job的JSON文件

        模板可以在dataX安装目录plugin中分别在reader和writer文件夹中查看各个对应的数据库模板(plugin_job_template.json)。

        例子:一个基本的streamJob业务。

{
  "job": {
     "setting": {
          "speed": {
            "channel": 1  // 并发数(1 表示单线程)
          }
      },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 5,  // 生成 5 条测试数据
            "column": [             // 定义两列数据
              {
                "type": "long",    // 第一列是 long 类型
                "value": "10"       // 值固定为 10
              },
              {
                "type": "string",   // 第二列是 string 类型
                "value": "Hello DataX"  // 值固定为 "Hello DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",  // 编码格式
            "print": true         // 在控制台打印输出
          }
        }
      }
    ]
  }
}

1.2、执行任务

        执行命令:python datax安装目录/bin/dataX.py  任务job的json文件目录        

        执行结果: 

2、动态传参 

        在JSON的同步方案中,使用类似变量的方式来定义一些可改变的参数。在执行同步时,可以指定这些参数具体的值。(类似于代码中的函数传参)。
       写法:在JSON文件中定义:$参数名。
                执行时:…… -p “-D参数名=值” job路径

       例子:job的JSON文件

执行命令:

3、并发设置

DataX中的执行流程会将Job划分为多个task,并将task使用不同的taskGroup管理,每个task执行时,都是reader—channel—writer组成,channel的数量就决定了并发度。
       设置channel数量:

  •               直接通过指定channel数量
  •               通过Bps计算channel数量
  •               通过tps计算channel数量

 3.1、直接指定

        在JSON文件中进行配置:job.setting.speed.channel。这种方式,channel的Bps默认为1MBps。

3.2、Bps        

        通过限制整个Job和单个channel的传输速率来设置channel数量。

        数量=channel速率 / job速率

       设置方式:Job速率:job.setting.speed.byte  (单位是字节)
                       Channel速率:core.transport.channel.speed.byte (单位字节)

3.3、tps

        通过限制Job及channel的tps来限速和channel计算。

              数量=Job的tps / channel的tps

       设置方式:Job:job.setting.speed.record
                       Channel:core.transport.channel.speed.record

3.4、优先级

        同时配置Bps和tps,以小的为准。
       Bps和tps都没设置时,则以channel数量配置为准。


网站公告

今日签到

点亮在社区的每一天
去签到