Hadoop学习

发布于:2024-12-18 ⋅ 阅读:(103) ⋅ 点赞:(0)

0 小结

一、Hadoop入门

1、常用端口号

hadoop3.x 

	HDFS NameNode 内部通常端口:8020/9000/9820

	HDFS NameNode 对用户的查询端口:9870

	Yarn查看任务运行情况的:8088

	历史服务器:19888

hadoop2.x 

	HDFS NameNode 内部通常端口:8020/9000

	HDFS NameNode 对用户的查询端口:50070

	Yarn查看任务运行情况的:8088

	历史服务器:19888

2、常用的配置文件

3.x core-site.xml  hdfs-site.xml  yarn-site.xml  mapred-site.xml workers

2.x core-site.xml  hdfs-site.xml  yarn-site.xml  mapred-site.xml slaves

二、HDFS

1、HDFS文件块大小(面试重点)

	硬盘读写速度

	在企业中  一般128m(中小公司)   256m (大公司)

2、HDFS的Shell操作(开发重点)

3、HDFS的读写流程(面试重点)

三、Map Reduce

1、InputFormat

	1)默认的是TextInputformat  kv  key偏移量,v :一行内容

	2)处理小文件CombineTextInputFormat 把多个文件合并到一起统一切片

2、Mapper 

	setup()初始化;  map()用户的业务逻辑; clearup() 关闭资源;

3、分区

	默认分区HashPartitioner ,默认按照key的hash值%numreducetask个数

	自定义分区

4、排序

	1)部分排序  每个输出的文件内部有序。

	2)全排序:  一个reduce ,对所有数据大排序。

	3)二次排序:  自定义排序范畴, 实现 writableCompare接口, 重写compareTo方法

		总流量倒序  按照上行流量 正序

5、Combiner 

	前提:不影响最终的业务逻辑(求和 没问题   求平均值)

	提前聚合map  => 解决数据倾斜的一个方法

6、Reducer

	用户的业务逻辑;

	setup()初始化;reduce()用户的业务逻辑; clearup() 关闭资源;

7、OutputFormat

	1)默认TextOutputFormat  按行输出到文件

	2)自定义

四、Yarn

1、Yarn的工作机制(面试题)

2、Yarn的调度器

	1)FIFO/容量/公平

	2)apache 默认调度器  容量; CDH默认调度器 公平

	3)公平/容量默认一个default ,需要创建多队列

	4)中小企业:hive  spark flink  mr  

	5)中大企业:业务模块:登录/注册/购物车/营销

	6)好处:解耦  降低风险  11.11  6.18  降级使用

	7)每个调度器特点:

		相同点:支持多队列,可以借资源,支持多用户

		不同点:容量调度器:优先满足先进来的任务执行

				公平调度器,在队列里面的任务公平享有队列资源

	8)生产环境怎么选:

		中小企业,对并发度要求不高,选择容量

		中大企业,对并发度要求比较高,选择公平。

3、开发需要重点掌握:

	1)队列运行原理	

	2)Yarn常用命令

	3)核心参数配置

	4)配置容量调度器和公平调度器。

	5)tool接口使用。

1 Hadoop概述

1.1 概念

1.2 版本组成区别

1.3 HDFS概述

NameNode、DataNode、2NN

1.4 YARN概述

1.5 MapReduce概述

1.6 HDFS-YARN-MapReduce关系

1.7 大数据生态体系

2 VM与Hadoop入门

所有虚拟机用户名:a33,密码123456

2.1 配置虚拟机网络后yum update不行–换源

2.2 hadoop主要文件介绍

2.3 服务器之间拷贝,同步

拷贝

同步

rsync

xsync

#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
then
    echo Not Enough Arguement!
    exit;
fi

#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
    echo ====================  $host  ====================
    #3. 遍历所有目录,挨个发送

    for file in $@
    do
        #4. 判断文件是否存在
        if [ -e $file ]
            then
                #5. 获取父目录
                pdir=$(cd -P $(dirname $file); pwd)

                #6. 获取当前文件的名称
                fname=$(basename $file)
                ssh $host "mkdir -p $pdir"
                rsync -av $pdir/$fname $host:$pdir
            else
                echo $file does not exists!
        fi
    done
done

2.4 ssh免密登录

2.5 集群配置

注意是因为吃资源(内存等),所以分开,可以放在同一台

配置文件说明

自己的目录cd /opt/module/hadoop-3.1.3/

2.6 yarn 后面的wordcount的finalStatus一直失败,如果有显示虚拟内存溢出相关提示的

https://blog.csdn.net/weixin_37600848/article/details/83418646?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control&dist_request_id=1328706.473.16166803136077011&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control

2.7 集群启动/停止方式总结

停historyserver单节点

mapred --daemon stop historyserver

1**)各个模块分开启动****/停止(配置ssh是前提)常用**

(1)整体启动/停止HDFS

start-dfs.sh/stop-dfs.sh

(2)整体启动/停止YARN

start-yarn.sh/stop-yarn.sh

2**)各个服务组件逐一启动****/**停止

(1)分别启动/停止HDFS组件

hdfs --daemon start/stop namenode/datanode/secondarynamenode

(2)启动/停止YARN

yarn --daemon start/stop resourcemanager/nodemanager

2.8 常用端口号

端口名称 Hadoop2.x Hadoop3.x
NameNode内部通信端口 8020 / 9000 8020 / 9000/9820
NameNode HTTP UI 50070 9870
MapReduce查看执行任务端口 8088 8088
历史服务器通信端口 19888 19888

2.9 常用脚本(自己配置的命令,重要)

myhadoop.sh

cd /home/a33/bin

#!/bin/bash

if [ $# -lt 1 ]
then
    echo "No Args Input..."
    exit ;
fi

case $1 in
"start")
        echo " =================== 启动 hadoop集群 ==================="

        echo " --------------- 启动 hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
        echo " --------------- 启动 yarn ---------------"
        ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
        echo " --------------- 启动 historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")
        echo " =================== 关闭 hadoop集群 ==================="

        echo " --------------- 关闭 historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"
        echo " --------------- 关闭 yarn ---------------"
        ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
        echo " --------------- 关闭 hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)
    echo "Input Args Error..."
;;
esac

jpsall

#!/bin/bash

for host in hadoop102 hadoop103 hadoop104
do
        echo =============== $host ===============
        ssh $host jps 
done

3 HDFS

3.1 相关函数代码

package com.atguigu.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;

public class HdfsClient {

    private FileSystem fs;

    @Before
    public void init() throws URISyntaxException, IOException, InterruptedException {
        URI uri = new URI("hdfs://hadoop102:8020");
        Configuration configuration = new Configuration();
        configuration.set("dfs.replication", "2");
        String user = "a33";
        fs = FileSystem.get(uri, configuration, user);
    }

    @After
    public void close() throws IOException {
        fs.close();
    }

    @Test
    // 创建目录
    public void testMkdir() throws URISyntaxException, IOException, InterruptedException {
        fs.mkdirs(new Path("/xiyou/huaguoshan"));
    }

    @Test
    //上传
    /*
     * 参数优先级
     * hdfs-default.xml => hdfs-site.xml => 在项目资源目录下的配置文件 => 代码中的配置
     *
     * */
    public void testPut() throws IOException {
        fs.copyFromLocalFile(false, true,
                new Path("C:\\Users\\33\\Desktop\\HDFSClient\\files\\sunwukong.txt"),
                new Path("/xiyou/huaguoshan"));
    }

    @Test
    // 下载
    public void testGet() throws IOException {
        fs.copyToLocalFile(false, new Path("/xiyou/huaguoshan/sunwukong.txt"),
                new Path("C:\\Users\\33\\Desktop\\HDFSClient\\files\\sunwukong.txt"),
                true);
    }

    @Test
    // 删除
    public void testRm() throws IOException {
        fs.delete(new Path("/xiyou/huaguoshan/sunwukong.txt"), true);
    }

    @Test
    // 文件的更名和移动
    public void testMv() throws IOException {
        // 目录更名
//        fs.rename(new Path("/xiyou/huaguoshan"), new Path("/xiyou/huguo"));
        // 目录移动和更名
        fs.rename(new Path("/xiyou/hanguo"), new Path("/huaguoshan"));
    }

    @Test
    // 获取文件详细信息
    public void testFileDetail() throws IOException {
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
        while (listFiles.hasNext()) {
            LocatedFileStatus fileStatus = listFiles.next();
            System.out.println("=====" + fileStatus.getPath() + "=====");
            System.out.println(fileStatus.getPermission());
            System.out.println(fileStatus.getOwner());
            System.out.println(fileStatus.getGroup());
            System.out.println(fileStatus.getLen());
            System.out.println(fileStatus.getModificationTime());
            System.out.println(fileStatus.getReplication());
            System.out.println(fileStatus.getBlockSize());
            System.out.println(fileStatus.getPath().getName());
            // 获取块信息
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            System.out.println(Arrays.toString(blockLocations));

        }
    }
    @Test
    // 文件和文件夹判断
    public void testFile() throws IOException {
        FileStatus[] listStatus = fs.listStatus(new Path("/"));
        for (FileStatus status : listStatus) {
            if (status.isFile()) {
                System.out.println("文件:" + status.getPath().getName());
            }else{
                System.out.println("目录:" + status.getPath().getName());
            }
        }
    }

}

3.2 剖析文件写入

写数据

(1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。

(2)NameNode返回是否可以上传。

(3)客户端请求第一个Block上传到哪几个DataNode服务器上。

(4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。

(5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。

(6)dn1、dn2、dn3逐级应答客户端。

(7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。

(8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。

读数据

(1)客户端通过DistributedFileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。

(2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。

(3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。

(4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

3.3 NN和2NN 工作机制

1**)第一阶段:NameNode启动**

(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。

(2)客户端对元数据进行增删改的请求。

(3)NameNode记录操作日志,更新滚动日志。

(4)NameNode在内存中对元数据进行增删改。

2**)第二阶段:Secondary NameNode工作**

(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。

(2)Secondary NameNode请求执行CheckPoint。

(3)NameNode滚动正在写的Edits日志。

(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。

(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。

(6)生成新的镜像文件fsimage.chkpoint。

(7)拷贝fsimage.chkpoint到NameNode。

(8)NameNode将fsimage.chkpoint重新命名成fsimage。

3.4 Fsimage和Edits解析

[atguigu@hadoop102 current]$ pwd

/opt/module/hadoop-3.1.3/data/dfs/name/current

[atguigu@hadoop102 current]$ hdfs oiv -p XML -i fsimage_0000000000000000025 -o /opt/module/hadoop-3.1.3/fsimage.xml

3.5 DataNode

工作机制

(1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。

(2)DataNode启动后向NameNode注册,通过后,周期性(6小时)的向NameNode上报所有的块信息。

:::info
xml配置信息是在hdfs-default.xml配置文件里

:::

(3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。

(4)集群运行中可以安全加入和退出一些机器。

数据完整性-crc校验

如下是DataNode节点保证数据完整性的方法。

(1)当DataNode读取Block的时候,它会计算CheckSum。

(2)如果计算后的CheckSum,与Block创建时值不一样,说明Block已经损坏。

(3)Client读取其他DataNode上的Block。

(4)常见的校验算法crc(32),md5(128),sha1(160)

(5)DataNode在其文件创建后周期验证CheckSum。

4 MapReduce

4.1 概述

MapReduce定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

MapReduce优缺点

优点

1**)MapReduce易于编程**

它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

2**)良好的扩展性**

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

3**)高容错性**

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

4**)适合PB级以上海量数据的离线处理**

可以实现上千台服务器集群并发工作,提供数据处理能力。

缺点–spark+flink解决

1**)不擅长实时计算**

MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

2**)不擅长流式计算**

流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

3**)不擅长DAG(有向无环图)计算**

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

核心编程思想

(1)分布式的运算程序往往需要分成至少2个阶段。

(2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。

(3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。

(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

总结:分析WordCount数据流走向深入理解MapReduce核心思想。

MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:

(1)MrAppMaster:负责整个程序的过程调度及状态协调。

(2)MapTask:负责Map阶段的整个数据处理流程。

(3)ReduceTask:负责Reduce阶段的整个数据处理流程。

数据序列化类型

Java****类型 Hadoop Writable****类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
Null NullWritable

MapReduce编程规范

mapper(k是偏移量,v是这一行对应的内容)

reducer

driver

案例

hadoop jar wc.jar

com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/input /user/atguigu/output

4.2 Hadoop序列化

概述

1)什么是序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。

反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

2)为什么要序列化

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

3)为什么不用Java的序列化

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。

4)Hadoop序列化特点:

1)紧凑**:**高效使用存储空间。

**(2)快速:**读写数据的额外开销小。

**(3)互操作:**支持多语言的交互

自定义bean对象实现序列化接口(Writable)

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。

具体实现bean对象序列化步骤如下7步。

(1)必须实现Writable接口

(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {

super();

}

(3)重写序列化方法

@Override

public void write(DataOutput out) throws IOException {

out.writeLong(upFlow);

out.writeLong(downFlow);

out.writeLong(sumFlow);

}

(4)重写反序列化方法

@Override

public void readFields(DataInput in) throws IOException {

upFlow = in.readLong();

downFlow = in.readLong();

sumFlow = in.readLong();

}

(5)注意反序列化的顺序和序列化的顺序完全一致

(6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。

(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。

@Override

public int compareTo(FlowBean o) {

// 倒序排列,从大到小

return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

4.3 框架原理

InputFormat

并行度决定机制

MapTask****并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。

Shuffle

没有reduce的话就没有shuffle

combiner

方案二 直接使用原来的reducer

OutputFormat

4.4 MapTask工作机制

(1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value。

(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

溢写阶段详情:

步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

(5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

4.5 ReduceTask工作机制

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

(3)Reduce阶段:reduce()函数将计算结果写到HDFS上。

ReduceTask并行度决定机制

1)设置ReduceTask****并行度(个数)

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4

job.setNumReduceTasks(4);

2)实验:测试ReduceTask****多少合适

(1)实验环境:1个Master节点,16个Slave节点:CPU:8GHZ,内存: 2G

(2)实验结论:

表改变ReduceTask(数据量为1GB)

MapTask =16
ReduceTask 1 5 10 15 16 20 25 30 45 60
总时间 892 146 110 92 88 100 128 101 145 104

3)注意事项

4.6 Join应用

Reduce Join

Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

总结

缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

解决方案:Map端实现数据合并。

Map Join

1**)使用场景**

Map Join适用于一张表十分小、一张表很大的场景。

2**)优点**

思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

3**)具体办法:采用****DistributedCache**

(1)在Mapper的setup阶段,将文件读取到缓存集合中。

(2)在Driver驱动类中加载缓存。

//缓存普通文件到Task运行节点。

job.addCacheFile(new URI(“file:///e:/cache/pd.txt”));

//如果是集群运行,需要设置HDFS路径

job.addCacheFile(new URI(“hdfs://hadoop102:8020/cache/pd.txt”));

4.7 数据清洗(ETL)

“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

如:1)需求

去除日志中字段个数小于等于11的日志。

4.9 Hadoop压缩

1**)压缩的好处和坏处**

压缩的优点:以减少磁盘IO、减少磁盘存储空间。

压缩的缺点:增加CPU开销。

2**)压缩原则**

(1)运算密集型的Job,少用压缩

(2)IO密集型的Job,多用压缩

1)压缩算法对比介绍

压缩格式 Hadoop自带? 算法 文件扩展名 是否可切片 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 是,直接使用 Snappy .snappy 和文本处理一样,不需要修改

2)压缩性能的比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。

Gzip压缩

优点:压缩率比较高;

缺点:不支持Split;压缩/解压速度一般;

Bzip2压缩

优点:压缩率高;支持Split;

缺点:压缩/解压速度慢。

Lzo压缩

优点:压缩/解压速度比较快;支持Split;

缺点:压缩率一般;想支持切片需要额外创建索引。

Snappy压缩

优点:压缩和解压缩速度快;

缺点:不支持Split;压缩率一般

压缩位置选择

在Hadoop中启用压缩,可以配置如下参数

参数 默认值 阶段 建议
io.compression.codecs
(在core-site.xml中配置)
无,这个需要在命令行输入hadoop checknative查看 输入压缩 Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress(在mapred-site.xml中配置) false mapper输出 这个参数设为true启用压缩
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec mapper输出 企业多使用LZO或Snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) false reducer输出 这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec reducer输出 使用标准工具或者编解码器,如gzip和bzip2
map压缩–对于结果不会有变化,因为reduce会解压缩

reduce压缩

5 Yarn

5.1 基础架构

5.2 Yarn的工作机制

(1)MR程序提交到客户端所在的节点。

(2)YarnRunner向ResourceManager申请一个Application。

(3)RM将该应用程序的资源路径返回给YarnRunner。

(4)该程序将运行所需资源提交到HDFS上。

(5)程序资源提交完毕后,申请运行mrAppMaster。

(6)RM将用户的请求初始化成一个Task。

(7)其中一个NodeManager领取到Task任务。

(8)该NodeManager创建容器Container,并产生MRAppmaster。

(9)Container从HDFS上拷贝资源到本地。

(10)MRAppmaster向RM 申请运行MapTask资源。

(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。

(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。

(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。

(14)ReduceTask向MapTask获取相应分区的数据。

(15)程序运行完毕后,MR会向RM申请注销自己。

5.3 Yarn调度器和调度算法

目前,Hadoop作业调度器主要有三种:FIFO、容量(Capacity Scheduler)和公平(Fair Scheduler)。Apache Hadoop3.1.3默认的资源调度器是Capacity Scheduler。

CDH框架默认调度器是Fair Scheduler。

具体设置详见:yarn-default.xml文件

先进先出调度器(FIFO)

容量调度器(Capacity Scheduler)

分配算法

公平调度器(Fair Scheduler)

资源分配方式

5.4 Yarn核心参数


网站公告

今日签到

点亮在社区的每一天
去签到