一、同步策略选择
数据同步就是通过同步工具将数据拿到数仓中。
同步策略:
- 全量同步:每次同步将表的所有数据都同步过来。
优点:同步逻辑简单,使用简单
缺点:有数据冗余。
适用于数据量小的情况。
- 增量同步:只同步改变的数据。
增量同步通常在首日需要做全量同步。
优点:没有数据冗余
缺点:同步逻辑相对比较复杂,使用比较麻烦。
适用于数据量大,变化频率低的数据。(如果数据量大且变化频率高,建议全量同步)
针对本快餐项目,哪些表使用增量,哪些使用全量?
全量同步通常使用DataX、Sqoop等基于select的离线同步工具。而增量同步既可以使用DataX、Sqoop等工具,也可使用Maxwell、Canal等工具, 在本项目中,全量使用DataX,增量使用Maxwell。
二、Datax安装部署
2.1 DataX简介
简介:DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。(把hdfs导入到mysql,把mysql数据导入到hdfs)
总结:DataX就是不同数据源之间的桥梁。
2.2 DataX架构
2.3 DataX运行流程
2.4 dataX调度
2.5 DataX安装部署
安装DataX
- 将datax安装包上传到/opt/software
- 解压到/opt/module并且改名字
datax安装之后不需要进行任何的配置就可以使用,那怎么用?
(根据以往的组件使用经验,一般都是在bin目录下有个脚本,然后运行这个脚本就可以启动或停止)
cd /opt/module/datax/bin python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
-- 注意DataX的作用就是把数据从某个数据源读出来,再写入某个数据源。
使用/opt/module/datax/bin/datax.py 文件,该文件运行的话需要一个配置文件,该配置文件会指明一些参数:reader在哪,writer在哪等。
-- 那该json文件格式是什么?
datax提供了生成json文件的模型,我们通过该命令生成json文件模板,然后再对其进行修改。
python bin/datax.py -r mysqlreader -w hdfswriter
2.6 同步MySQL数据到HDFS案例
案例要求:同步gmall数据库中base_province表数据到HDFS的/base_province目录
需求分析:要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。(使用sql语句也会涉及到table,column,where,只不过使用sql语句。)
1.准备json文件:github.com 从该github网址下,mysql读的地方,复制json模板。(tablemodule)
{ "job": { "setting": { "speed": { "channel": 3 # 并发度 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "000000", # mysql的账户和密码 "column": [ "id", "create_time", "phone_number" # 读取哪些列的数据。 ], "splitPk": "id", # 表示使用哪个字段对表进行切片,一般推荐使用主键,不切片就不写,表示一个表一个切片,这是切片会分成多个task读取。 "where":"id<=100 and id>=5" # 条件 "connection": [ { "table": [ "customer" # 导哪张表的数据,如果是多张表必须保证表的字段一样。 ], "jdbcUrl": [ "jdbc:mysql://hadoop102:3306/fast_food" # mysql库的地址 ] } ] } },
2.把该json文件中的writer部分删除,还是在刚才的网址,复制hdfs的writer部分。
"writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://hadoop102:8020", # hdfs地址 "fileType": "text", # 保存数据的形式 "path": "/datas", # 保存数据的路径 "fileName": "customer", # 文件名的前缀,hdfswriter写入时的文件名,实际执行时会在文件名后添加随机的后缀作为实际文件名 # column:读几个字段,写几个字段,不支持部分写入 "column": [ { "name": "id", "type": "string" }, { "name": "create_time", "type": "string" }, { "name": "phone_nuber", "type": "string" } ], "writeMode": "append", # append 有该文件时,追加 |nonconflict 有该文件时报错|truncate 有该文件时,清除再写入 "fieldDelimiter": "\t", } } } ] } }
3.在/opt/module/datax/job目录下,创建该文件,取名test.json
4.启动:python bin/datax.py job/test.json(这样会报错,因为hdfs写数据的哪个目录必须先存在)
5.创建datas目录: hdfs dfs -mkdir /datas,之后在执行。
注意:
- datax读写数据是为了将数据导入到数据仓库,比如从sql中读数据,然后写到hdfs上,后边hive就可以拿hdfs的数据去分析。
- hdfs写数据由两种type:text和orc
- 使用text时,列名可以随便写,因为text的方式,列名不会被保存,但是列的类型要和后续hive建表时的类型一致。
- 使用orc时,列名和类型必须和hive表一致.
QuerySQLMode的形式配置和上边不同的是:不需要where,colunm,table,在connection下添加一个querysql字段:
一个sql语句是一个切片。
"querySql": {"select id ,create_time,phone_number from customer where id >=5 and id <=100"}
2.7 同步HDFS数据到MySQL
案例要求:同步HDFS上的/base_province目录下的数据到MySQL gmall数据库下的test_province表。
需求分析:要实现该功能,需选用HDFSReader和MySQLWriter。
2.8 DataX传参
在把数据写到hdfs上时,(如2.6),需要配置一个hdfs的位置来保存这个数据,但是实际生产中,每天上传数据的目录不一样,这时候需要将目录以参数的方式传给DataX的json文件。
1.首先需要在hdfs上创建相应的目录。
hdfs dfs -mkdir /datas/2023-06-16
2.修改hdfs上边写的路径:
"path":"datas/"---> "path":"datas/${dt} # 这里使用shell脚本的方式进行接收参数。
3.将参数传递给配置文件
python bin/datax.py -p "-Ddt=2023-06-16" job/test.json
2.9 DataX的优化
2.9.1 速度调整
这里的限速表示多少条数据。
channel表示并发数,同时跑几个task。
配置示例:
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 1048576 //单个channel byte限速1M/s
}
}
}
},
"job": {
"setting": {
"speed": {
"byte" : 5242880 //总byte限速5M/s
}
},
...
}
}
2.9.2 内存调整
对应job里边有对应的channel,channel会占用内存,当channel并发比较多时,占用内存就比较多,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
怎样调整?
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json
2.10 datax只在Hadoop02上配置了。
三、全量数据同步
3.1 数据通道
全量表数据由DataX从MySQL业务数据库直接同步到HDFS,具体数据流向,如下图所示。
- 其中,在hdfs上,/origin_data 表示目录,/fast_food/表示不同的库,/db表示业务数据,/*full表示全量同步的某个表的名字,/2023-06-14表示哪天同步的。
3.2 编写相关json脚本
这里已经在上边示例过了,不再过多解释。
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"create_time",
"update_time",
"name",
"phone_number",
"type",
"region_id"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/fast_food?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8"
],
"table": [
"shop"
]
}
],
"password": "000000",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "update_time",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "phone_number",
"type": "string"
},
{
"name": "type",
"type": "bigint"
},
{
"name": "region_id",
"type": "bigint"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "shop",
"fileType": "text",
"path": "${targetdir}",
"writeMode": "truncate",
"nullFormat": ""
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
- vim /job/test4.json 首先把上边内容创建为脚本。
- hdfs dfs -mkdir -p /datas/shop/2023-06-17 在hdfs上边创建目录,因为下一步要把mysql的数据导入到hdfs上的该目录下。
- 查看hadoop102:9870发现确实有创建的目录。
- 接下来开始导入数据:python bin/datax.py -p "-Dtargetdir=/datas/shop/2023-06-17" job/test4.json
我们可以发现,在导不同表中的数据的时候,字段,条件、表名等都要对应相应的表进行更改,所以不同的表json文件不同,所以我们要为每张表编写一个DataX的json配置文件。
3.3 全量表json文件生成。
针对不同的表生成不同的json文件用于导入数据。(拿来就用)
- 创建/opt/module/gen_datax_config。
- 将生成器上传到服务器的/opt/module/gen_datax_config目录。
上传这两个包到该目录下。
- jar包是用来生成json文件的,那生成哪些json文件?在configuration.properties中配置。
- 配置好jar包需要的配置之后,使用java -jar 运行该jar包。
- 在/opt/module/datax/job/import/下会发现生成好了这些表的json文件。
- 随便运行一个可以成功。
- 感觉该jar包是用来生成json文件的,用java写的,该包用python写也可以。
3.4 全量表数据同步脚本
每个表每天都要全量同步,所以为了避免每天都得执行同步命令,所以封装一个全量同步脚本。(拿来就用)
#!bin/bash
#mysql_to_hdfs_full.sh/表名|日期
#1.判断参数是否传入。
if [ $# -lt 1]
then
echo "必须传入参数"
fi
# 2、判断日期是否传入,如果日期传入,那么全量导该日期的数据,否则导前一天的数据。
["$2"] && datastr=$2 || datastr=$(date -d "-1 day" +%F)
# 这里写一个函数,供3使用
function import_data(){
tableNames=$*
for tablename in ${tableNames}
do
#判断hdfs上边对应的目录是否存在,不存在则创建,“hdfs dfs -test -e 目录” 如果存在返回0,不存在返回1
hdfs dfs -test -e /origin_data/fast_food/db/${tablename}_full/${datastr}
if [ $? -eq 1 ]
then
hdfs dfs -mkdir -p /origin_data/fast_food/db/${tablename}_full/${datastr}
fi
python /opt/module/datax/bin/datax.py -p"Dtargetdir=/origin_data/fast_food/db/${tablename}_full/${datastr}" /opt/module/datax/job/import/fast_food.${tablename}.json
done
}
# 3 根据传进来的表名匹配数据
case $1 in
"all")
import data "shop" "region" "promotion" "product_spu_attr" "product_spu_attr_value" "product_spu" "product_sku" "product_group" "product_category" "product_group_sku"
;;
"shop")
import_data $1
;;
"region")
import_data $1
;;
"promotion")
import_data $1
;;
"product_spu_attr")
import_data $1
;;
"product_spu_attr_value")
import_data $1
;;
"product_spu")
import_data $1
;;
"product_sku")
import_data $1
;;
"product_group")
import_data $1
;;
"product_group_sku")
import_data $1
;;
"product_category")
import_data $1
;;
*)
echo "表名输入错误"
- 然后: vim /home/bigdata/bin/mysql_to_hdfs_full.sh 创建该bash文件。
- chmod +x /home/bigdata/bin/mysql_to_hdfs_full.sh
- mysql_to_hdfs_full.sh all 2023-06-05
之后 全量数据 同步只需要每天执行该脚本即可。