Hadoop进阶原理(HDFS、MR、YARN的原理)

发布于:2024-11-28 ⋅ 阅读:(24) ⋅ 点赞:(0)

HDFS

hdfs默认文件配置:https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml

为什么需要分布式存储?

假设文件有10T,此时文件太大,单台服务器无法承担。 

 

为什么需要分布式存储?

数据量太大,单机存储能力有上限,需要靠数量来解决问题.

数量的提升带来的是网络传输、磁盘读写、CPU、内存等各方面的综合提升。 分布式组合在一起可以达到1+1>2的效果.

分布式文件存储 - HDFS

HDFS 即Hadoop内提供分布式数据存储的文件系统

HDFS的基础架构

块和副本

 

block块: HDFS被设计成能够在一个大集群中跨机器可靠地存储超大文件。它将每个文件拆分成一系列的数据块进行存储,这个数据块被称为block,除了最后一个,所有的数据块都是同样大小的。
block 块大小默认: 128M(134217728字节)     注意: 块同样大小方便统一管理
注意: 为了容错,文件的所有block都会有副本。每个文件的数据块大小和副本系数都是可配置的。

副本系数默认:  3个  副本好处: 副本为了保证数据安全(用消耗存储资源方式保证安全,导致了大数据瓶颈是数据存储)

HDFS副本块数量的配置
hdfs默认文件配置:https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml
在前面我们了解了HDFS文件系统的数据安全,是依靠多个副本来确保的。

如何设置默认文件上传到HDFS中拥有的副本数量呢?可以在hdfs-site.xml中配置如下属性:

这个属性默认是3,一般情况下,我们无需主动配置(除非需要设置非3的数值)

如果需要自定义这个属性,请修改每一台服务器的hdfs-site.xml文件,并设置此属性。

<property>    

    <name>dfs.replication</name>    

    <value>3</value>

</property>

block配置
可以看到通过fsck命令我们验证了:

文件有多个副本

文件被分成多个块存储在hdfs

对于块(block),hdfs默认设置为128MB一个

块大小可以通过参数:  

<property>    

    <name>dfs.blocksize</name>    

    <value>134217728</value>    

    <description>设置HDFS块大小,单位是b</description>  

</property>

如上,设置为128MB

图解:

块 :默认统一大小128m : 为了方便统一管理

副本: 默认3个的原因: 为了保证数据的安全性

 Hive切块的作用?

大小一致默认为128m,方便统一管理 Hive副本的作用? 默认为3副本,保证数据存储安全

NameNode是如何管理Block块的

edits和fsimage文件

edits文件

在hdfs中,文件是被划分了一堆堆的block块,那如果文件很大、以及文件很多,Hadoop是如何记录和整理文件和block块的关系呢?

答案就在于NameNode

edits文件,是一个流水账文件,记录了hdfs中的每一次操作,以及本次操作影响的文件其对应的block

 

 

fsimage文件 

将全部的edits文件,合并为最终结果,即可得到一个FSImage文件

NameNode元数据管理维护

NameNode基于edits和FSImage的配合,完成整个文件系统文件的管理。

1. 每次对HDFS的操作,均被edits文件记录

2. edits达到大小上限后,开启新的edits记录

3. 定期进行edits的合并操作

        如当前没有fsimage文件,  将全部edits合并为第一个fsimage

        如当前已存在fsimage文件,将新的全部edits和已存在的fsimage进行合并,形成新的fsimage

4. 重复123流程

SecondaryNameNode元数据合并

SecondaryNameNode会通过http从NameNode拉取数据(edits和fsimage)

然后合并完成后提供给NameNode使用。

对于元数据的合并,是一个定时过程,基于:

        dfs.namenode.checkpoint.period,默认3600(秒)即1小时

        dfs.namenode.checkpoint.txns,默认1000000,即100W次事务

只要有一个达到条件就执行。 检查是否达到条件,默认60秒检查一次,基于:

        dfs.namenode.checkpoint.check.period,默认60(秒),来决定

1. NameNode基于

        edits记录每次操作

        fsimage,记录某一个时间节点前的当前文件系统全部文件的状态和信息维护整个文件系统元数据

2. edits文件会被合并到fsimage中,这个合并由:

        SecondaryNameNode来操作

3. fsimage记录的内容是:

三大机制

副本机制: 为了保证数据安全和效率,block块信息存储多个副本,第一副本保存在客户端所在服务器,第二副本保存在和第一副本不同机架服务器上,第三副本保存在和第二副本相同机架不同服务器


负载均衡机制: namenode为了保证不同的datanode中block块信息大体一样,分配存储任务的时候会优先保存在距离近且余量比较大的datanaode上


心跳机制: datanode每隔3秒钟向namenode汇报自己的状态信息,如果某个时刻,datanode连续10次不汇报了,namenode会认为datanode有可能宕机了,namenode就会每5分钟(300000毫秒)发送一次确认消息,连续2次没有收到回复,就认定datanode此时一定宕机了(确认datanode宕机总时间3*10+5*2*60=630秒)

写入数据原理

1.客户端发起写入数据的请求给namenode
2.namenode接收到客户端请求,开始校验(是否有权限,路径是否存在,文件是否存在等),如果校验没问题,就告知客户端可以写入
3.客户端收到消息,开始把文件数据分割成默认的128m大小的的block块,并且把block块数据拆分成64kb的packet数据包,放入传输序列
4.客户端携带block块信息再次向namenode发送请求,获取能够存储block块数据的datanode列表
5.namenode查看当前距离上传位置较近且不忙的datanode,放入列表中返回给客户端
6.客户端连接datanode,开始发送packet数据包,第一个datanode接收完后就给客户端ack应答(客户端就可以传入下一个packet数据包),同时第一个datanode开始复制刚才接收到的数据包给node2,node2接收到数据包也复制给node3(复制成功也需要返回ack应答),最终建立了pipeline传输通道以及ack应答通道
7.其他packet数据根据第一个packet数据包经过的传输通道和应答通道,循环传入packet,直到当前block块数据传输完成(存储了block信息的datanode需要把已经存储的块信息定期的同步给namenode)
8.其他block块数据存储,循环执行上述4-7步,直到所有block块传输完成,意味着文件数据被写入成功(namenode把该文件的元数据保存上)
9.最后客户端和namenode互相确认文件数据已经保存完成(也会汇报不能使用的datanode)
注意: 不要死记硬背,要结合自己的理解,转换为自己的话术,适用于面试

读取数据原理

1.客户端发送读取文件请求给namenode
2.namdnode接收到请求,然后进行一系列校验(路径是否存在,文件是否存在,是否有权限等),如果没有问题,就告知可以读取
3.客户端需要再次和namenode确认当前文件在哪些datanode中存储
4.namenode查看当前距离下载位置较近且不忙的datanode,放入列表中返回给客户端
5.客户端找到最近的datanode开始读取文件对应的block块信息(每次传输是以64kb的packet数据包)放到内存缓冲区中
6.接着读取其他block块信息,循环上述3-5步,直到所有block块读取完毕(根据块编号拼接成完整数据)
7.最后从内存缓冲区把数据通过流写入到目标文件中
8.最后客户端和namenode互相确认文件数据已经读取完成(也会汇报不能使用的datanode)
注意: 不要死记硬背,要结合自己的理解,转换为自己的话术,适用于面试

内存/文件元数据

namenode和secondarynamenode:  配合完成对元数据的保存
元数据: 描述数据的数据,在hdfs中主要分为内存元数据 和 文件元数据 两种分别在内存和磁盘上
内存元数据: namnode运行过程中产生的元数据会先保存在内存中,再保存到文件元数据中。
内存元数据优缺点:

优点: 因为内存处理数据的速度要比磁盘快。 

缺点: 内存一断电,数据全部丢失
文件元数据: Edits 编辑日志文件和fsimage 镜像文件
Edits编辑日志文件: 存放的是Hadoop文件系统的所有更改操作(文件创建,删除或修改)的日志,文件系统客户端执行的更改操作首先会被记录到edits文件中
Fsimage镜像文件: 是元数据的一个持久化的检查点,包含Hadoop文件系统中的所有目录和文件元数据信息,但不包含文件块位置的信息。文件块位置信息只存储在内存中,是在 datanode加入集群的时候,namenode询问datanode得到的,并且不间断的更新.
fsimage和edits关系: 两个文件都是经过序列化的,只有在NameNode启动的时候才会将fsimage文件中的内容加载到内存中,之后NameNode把增删改查等操作记录同步到edits文件中.使得内存中的元数据和实际的同步,存在内存中的元数据支持客户端的读操作,也是最完整的元数据。

元数据存储的原理

注意: 第一次启动namenode的时候是没有编辑日志文件和镜像文件的,下图主要介绍的是第二次及以后访问的时候情况流程

1.namenode第一次启动的时候先把最新的fsimage文件中内容加载到内存中,同时把edits文件中内容也加载到内存中
2.客户端发起指令(增删改查等操作),namenode接收到客户端指令把每次产生的新的指令操作先放到内存中
3.然后把刚才内存中新的指令操作写入到edits_inprogress文件中
4.edits_inprogress文件中数据到了一定阈值的时候,把文件中历史操作记录写入到序列化的edits备份文件中
5.namenode就在上述2-4步中循环操作...
6.当secondarynamenode检测到自己距离上一次检查点(checkpoint)已经1小时或者事务数达到100w,就触发secondarynamenode询问namenode是否对edits文件和fsimage文件进行合并操作
7.namenode告知可以进行合并
8.secondarynamenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行合并(这个过程称checkpoint)
9.secondarynamenode把刚才合并后的fsimage.checkpoint文件拷贝给namenode
10.namenode把拷贝过来的最新的fsimage.checkpoint文件,重命名为fsimage,覆盖原来的文件
注意: 不要死记硬背,要结合自己的理解,转换为自己的话术,适用于面试

1、对于客户端读取HDFS数据的流程中,一定要知道

        不论读、还是写,NameNode都不经手数据,均是客户端和DataNode直接通讯

        不然对NameNode压力太大

2、写入和读取的流程,简单来说就是:

        NameNode做授权判断(是否能写、是否能读)

        客户端直连DataNode进行写入(由DataNode自己完成副本复制)、客户端直连DataNode进行block读取

        写入,客户端会被分配找离自己最近的DataNode写数据

        读取,客户端拿到的block列表,会是网络距离最近的一份

安全模式

安全模式: 不允许HDFS客户端进行任何修改文件的操作,包括上传文件,删除文件等操作。

查看安全模式状态:
[root@node1 /]# hdfs dfsadmin -safemode get  
Safe mode is OFF
开启安全模式:
[root@node1 /]# hdfs dfsadmin -safemode enter
Safe mode is ON
退出安全模式:
[root@node1 /]# hdfs dfsadmin -safemode leave
Safe mode is OFF

归档机制

归档原因: 每个小文件单独存放到hdfs中(占用一个block块),那么hdfs就需要依次存储每个小文件的元数据信息,相对来说浪费资源

归档格式: hadoop archive -archiveName 归档名.har -p 原始文件的目录[hdfs] 归档文件的存储目录[hdfs]

准备工作: HDFS上准备一个目录binzi,里面存储三个文件 1.txt 2.txt 3.txt ...

[root@node1 /]# hadoop archive -archiveName test.har -p /binzi  /
...
[root@node1 /]# hdfs dfs -ls /test1.har
Found 4 items
-rw-r--r--   3 root supergroup          0 2022-12-27 15:56 /test.har/_SUCCESS
-rw-r--r--   3 root supergroup        254 2022-12-27 15:56 /test.har/_index
-rw-r--r--   3 root supergroup         23 2022-12-27 15:56 /test.har/_masterindex
-rw-r--r--   3 root supergroup         47 2022-12-27 15:56 /test.har/part-0
[root@node1 /]# hdfs dfs -cat /test1.har/part-0
...

归档特性:

Hadoop Archives的URI是:har://scheme-hostname:port/路径/归档名.har

scheme-hostname格式为hdfs-域名:端口

如果没有提供scheme-hostname,它会使用默认的文件系统: har:///路径/归档名.har

[root@node1 /]#  hdfs dfs -ls har:///test.har      
Found 3 items
-rw-r--r--   3 root supergroup         15 2022-12-27 15:55 har:///test.har/1.txt
-rw-r--r--   3 root supergroup         16 2022-12-27 15:55 har:///test.har/2.txt
-rw-r--r--   3 root supergroup         16 2022-12-27 15:55 har:///test.har/3.txt
[root@node1 /]#  hdfs dfs -cat  har:///test.har/1.txt  
...
[root@node1 /]#  hdfs dfs -cat  har:///test.har/2.txt  
...
[root@node1 /]#  hdfs dfs -cat  har:///test.har/3.txt  
...

垃圾桶机制

在虚拟机中rm命令删除文件,默认是永久删除

在虚拟机中需要手动设置才能使用垃圾桶回收: 把删除的内容放到: /user/root/.Trash/Current/ 先关闭服务: 在 node1 中执行 stop-all.sh 新版本不关闭服务也没有问题

再修改文件 core-site.xml : 进入/export/server/hadoop-3.3.0/etc/hadoop目录下进行修改

<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>

其中,1440 表示 1440分钟,也就是 24小时,一天的时间。

设置了垃圾桶机制好处: 文件不会立刻消失,可以去垃圾桶里把文件恢复,继续使用

没有开启垃圾桶效果
[root@node1 hadoop]# hdfs dfs -rm /binzi/hello.txt
Deleted /binzi/hello.txt
​开启垃圾桶
[root@node1 ~]#cd /export/server/hadoop-3.3.0/etc/hadoop
[root@node1 hadoop]# vim core-site.xml
注意: 放到<configuration>内容</configuration>中间

<property>
   <name>fs.trash.interval</name>
   <value>1440</value>
</property>

开启垃圾桶效果
[root@node1 hadoop]# hdfs dfs -rm -r /test1.har
2023-05-24 15:07:33,470 INFO fs.TrashPolicyDefault: Moved: 'hdfs://node1.itcast.cn:8020/test1.har' to trash at: hdfs://node1.itcast.cn:8020/user/root/.Trash/Current/test1.har
开启垃圾桶后并没有真正删除,还可以恢复
[root@node1 hadoop]# hdfs dfs -mv /user/root/.Trash/Current/test1.har  / 

拓展元数据查看

node1中edits和fsimage文件路径: /export/data/hadoop/dfs/name/current

图解

查看历史编辑文件

命令:  hdfs oev -i edits文件名 -o 自定义文件名.xml

[root@node1 current]# cd /export/data/hadoop/dfs/name/current
[root@node1 current]# hdfs oev -i edits_0000000000000033404-0000000000000033405 -o 405_edit.xml
[root@node1 current]# cat 405_edit.xml

查看镜像文件

命令:  hdfs oiv -i fsimage文件名 -p XML -o 自定义文件名.xml

[root@node1 current]# cd /export/data/hadoop/dfs/name/current
[root@node1 current]# hdfs oiv -i fsimage_0000000000000033405 -p XML -o 405_fsimage.xml
[root@node1 current]# cat 405_fsimage.xml

MR

分布式计算概述

什么是(数据)计算

我们一直在提及:分布式计算, 分布式暂且不论, “计算”到底是指什么呢?

 大数据体系内的计算, 举例:

        销售额统计、区域销售占比、季度销售占比

        利润率走势、客单价走势、成本走势

        品类分析、消费者分析、店铺分析

等等一系列,基于数据得出的结论。  这些就是我们所说的计算。

分布式计算:顾名思义,即以分布式的形式完成数据的统计,得到需要的结果。

 

 

 

 

 

 

 

 

 

 

 1. 什么是计算、分布式计算?

        计算:对数据进行处理,使用统计分析等手段得到需要的结果

        分布式计算:多台服务器协同工作,共同完成一个计算任务

2. 分布式计算常见的2种工作模式

        分散->汇总  (MapReduce就是这种模式)

        中心调度->步骤执行 (大数据体系的Spark、Flink等是这种模式)

MapReduce概述

MapReduce的思想核心: 分而治之

所谓 分而治之 就是把一个复杂的问题按一定的 分解 方法分为规模较小的若干部分,然后逐个解决,分别找出各部分的解,再把把各部分的解组成整个问题的解。

Map:负责 分,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,几乎没有依赖关系。

Reduce:负责 合,即对map阶段的结果进行全局汇总。

MapReduce是“分散->汇总”模式的分布式计算框架,可供开发人员开发相关程序进行分布式数据计算。

        Map功能接口提供了“分散”的功能, 由服务器分布式对数据进行处理

        Reduce功能接口提供了“汇总(聚合)”的功能,将分布式的处理结果汇总统计

MapReduce执行原理

现在, 我们借助一个案例,简单分析一下,MapReduce是如何完成分布式计算的。

假设有如下文件,内部记录了许多的单词。且已经开发好了一个MapReduce程序,功能是统计每个单词出现的次数。

单词统计流程

已知文件内容:  
hadoop hive hadoop spark hive
flink hive linux hive mysql


MR的天龙八部:
input结果:
       k1(行偏移量)   v1(每行文本内容)
       0            hadoop hive hadoop spark hive
       30           flink hive linux hive mysql
map结果:
       k2(split切割后的单词)  v2(拼接1)    
       hadoop               1
       hive                 1
       hadoop               1
       spark                1
       hive                 1
       flink                1
       hive                 1
       linu                 1
       hive                 1
       mysql                1
分区/排序/规约/分组结果:
       k2(排序分组后的单词)   v2(每个单词数量的集合)
       flink               [1]
       hadoop              [1,1]
       hive                [1,1,1,1]
       linux               [1]
       mysql               [1]
       spark               [1]
reduce结果:
       k3(排序分组后的单词)   v3(聚合后的单词数量)
       flink                1
       hadoop               2
       hive                 4
       linux                1
       mysql                1
       spark                1
output结果:   注意: 输出目录一定不要存在,否则报错
       flink   1
       hadoop  2
       hive    4
       linux   1
       mysql   1
       spark   1

map阶段

第一阶段是把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下Split size 等于 Block size。每一个切片由一个MapTask处理(当然也可以通过参数单独修改split大小)
第二阶段是对切片中的数据按照一定的规则解析成对。默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节),value是本行的文本内容。(TextInputFormat)
第三阶段是调用Mapper类中的map方法。上阶段中每解析出来的一个,调用一次map方法。每次调用map方法会输出零个或多个键值对
第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务
第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。
如果有第六阶段,那么进入第六阶段;如果没有,直接输出到文件中
第六阶段是对数据进行局部聚合处理,也就是combiner处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。本阶段默认是没有的。
注意: 不要死记硬背,要结合自己的理解,转换为自己的话术,用于面试

shuffle阶段

shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。
Collect阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value,Partition分区信息等
Spill阶段:当内存中的数据量达到一定的阀值(80%)的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序
Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件
Copy阶段: ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上
Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。
注意: 不要死记硬背,要结合自己的理解,转换为自己的话术,用于面试

reduce阶段

第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。
第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。
注意: 不要死记硬背,要结合自己的理解,转换为自己的话术,用于面试

扩展全流程图:

MR核心八步(天龙八步):

MR核心8步:
1:定义输入组件,负责读取源文件,获得K1,V1
  K1:行偏移量,表示从哪开始读. V1:整行数据.
2: 自定义MapTask任务,负责 拆分的动作,获得K2, V2
  1个切片 = 1个MapTask任务,切片大小默认和Block块大小一致.
  K2:单词, V2:单词的次数,默认记录为1.
[3]: 分区.给数据打标记的,标记将来被哪个ReduceTask任务处理.
4: 排序.默认按照字典顺序,按照K2进行升序排列.
[5]: 规约对MapTask端的数据做局部合并的,降低Reduce拉取的数据量
[6]: 分组..默认按照K2进行分组,K2一致的数据,会被放到一起.
7: 自定义ReduceTask任务,负责合并的动作,获得K3,V3
  1个分区= 1个ReduceTask任务, K3:单词, V3:单词总数.
8:定义输出组件,将结果写到目的地文件中.

[输入] -> [切片] -> [分区] -> [分组] -> [排序] -> [规约] -> [和并] -> [输出]
1          2        3        4         5        6         7       8

1. 什么是MapReduce

        MapReduce是Hadoop中的分布式计算组件

        MapReduce可以以分散->汇总(聚合)模式执行分布式计算任务

2. MapReduce的主要编程接口

        map接口,主要提供“分散”功能,由服务器分布式处理数据

        reduce接口,主要提供“汇总”功能,进行数据汇总统计得到结果

3. MapReduce的运行机制

        将要执行的需求,分解为多个Map Task和Reduce Task

        将Map Task 和 Reduce Task分配到对应的服务器去执行

YARN

资源调度概述

资源调度

什么是资源调度? 我们为什么需要资源调度?

 

 

程序的资源调度
 

 

 

 YARN的资源调度

 为什么需要资源调度?

        将资源统一管控进行分配可以提高资源利用率

程序如何在YARN内运行?

        程序向YARN申请所需资源

        YARN为程序分配所需资源供程序使用

分布式资源调度 - YARN

YARN的资源调度

YARN 管控整个集群的资源进行调度, 那么应用程序在运行时,就是在YARN的监管(管理)下去运行的。

这就像:全部资源都是公司(YARN)的,由公司分配给个人(具体的程序)去使用。

比如,一个具体的MapReduce程序。

我们知道, MapReduce程序会将任务分解为若干个Map任务和Reduce任务。

假设,有一个MapReduce程序, 分解了3个Map任务,和1个Reduce任务,那么如何在YARN的监管(管理)下运行呢?

 

YARN架构 

YARN容器

yarn提交mr流程

1.客户端提交一个MR程序给ResourceManager(校验请求是否合法...)

2.如果请求合法,ResourceManager随机选择一个NodeManager用于生成appmaster(应用程序控制者,每个应用程序都单独有一个appmaster)
3.appmaster会主动向ResourceManager的应用管理器(application manager)注册自己,告知自己的状态信息,并且保持心跳
4.appmaster会根据任务情况计算自己所需要的container资源(cpu,内存...),主动向ResourceManager的资源调度器(resource scheduler)申请并获取这些container资源

5.appmaster获取到container资源后,把对应指令和container分发给其他NodeManager,让NodeManager启动task任务(maptask任务,reducetask任务)
6.NodeManager要和appmaster保持心跳,把自己任务计算进度和状态信息等同步给appmaster,(注意当maptask任务完成后会通知appmaster,appmaster接到消息后会通知reducetask去maptask那儿拉取数据)直到最后任务完成

7.appmaster会主动向ResourceManager注销自己(告知ResourceManager可以把自己的资源进行回收了,回收后自己就销毁了)

调度器

YARN的三大调度器

yarn提供了三大调度器分别为:

先进先出调度器: FIFO Scheduler

公平调度器:Fair Scheduler

容量调度器: Capacity Scheduler

调度器的配置在yarn-site.xml查找,如果没有就去yarn-default.xml中找
网址: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-common/yarn-default.xml
配置项和默认值如下yarn.resourcemanager.scheduler.class=org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler

先进先出调度器

FIFO Scheduler: 把应用按提交的顺序排成一个队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。
好处:  能够保证每一个任务都能拿到充足的资源, 对于大任务的运行非常有好处
弊端:  如果先有大任务后有小任务,会导致后续小任务无资源可用, 长期处于等待状态
应用:  测试环境

公平调度器

Fair Scheduler :不需要保留集群的资源,因为它会动态在所有正在运行的作业之间平衡资源 , 当一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当后面有小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。
好处:  保证每个任务都有资源可用, 不会有大量的任务等待在资源分配上
弊端: 如果大任务非常的多, 就会导致每个任务获取资源都非常的有限, 也会导致执行时间会拉长
应用: CDH商业版本的hadoop

容量调度器

Capacity Scheduler: 为每个组织分配专门的队列和一定的集群资源,这样整个集群就可以通过设置多个队列的方式给多个组织提供服务了。在每个队列内部,资源的调度是采用的是先进先出(FIFO)策略。
好处: 可以保证多个任务都可以使用一定的资源, 提升资源的利用率
弊端: 如果遇到非常的大的任务, 此任务不管运行在那个队列中, 都无法使用到集群中所有的资源,  导致大任务执行效率比较低,当任务比较繁忙的时候, 依然会出现等待状态
应用: apache开源版本的hadoop

示例:

调度器的使用是通过yarn-site.xml配置文件中的 yarn.resourcemanager.scheduler.class参数进行配置的,默认采用Capacity Scheduler调度器 下面是一个简单的Capacity调度器的配置文件,文件名为capacity-scheduler.xml

在这个配置中,在root队列下面定义了两个子队列prod和dev,分别占40%和60%的容量

<property>
   <!-- 队列分为两份 prod 和 dev    -->
   <name>yarn.scheduler.capacity.root.queues</name>
   <value>prod,dev</value>
</property>
  <property>
   <!--prod占比 40%-->
   <name>yarn.scheduler.capacity.root.prod.capacity</name>
   <value>40</value>
</property>
<property>
   <!--dev占比 60%-->
   <name>yarn.scheduler.capacity.root.dev.capacity</name>
   <value>60</value>
</property>
<property>
   <!-- dev的最大占比 75%-->
   <name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
   <value>75</value>
</property>

prod由于没有设置maximum-capacity属性,它有可能会占用集群全部资源。

dev的maximum-capacity属性被设置成了75%,所以即使prod队列完全空闲dev也不会占用全部集群资源,也就是说,prod队列仍有25%的可用资源用来应急。