Hive第四天

发布于:2024-07-29 ⋅ 阅读:(105) ⋅ 点赞:(0)

3.HIVE 调优:
    需要调优的几个方面:
        1.HIVE语句执行不了
        2.HIVE查询语句,在集群中执行时,数据无法落地
            HIVE执行时,一开始语句检查没有问题,生成了多个JOB,
            并且一开JOB中的Map 及 Reduce 正常运行,之后便报异常包括 OOM 异常等
        3.HIVE查询语句,执行时,Map或者Reduce端数据处理异常慢,导致整个执行效率低


    调优方式:
        1.分区、分桶
            为什么分区或者分桶?
                 分区的好处,在扫描表时,会根据查询语句中的过滤条件,将固定分区中的数据加载至内存中
            避免了表的全表扫描。
            分桶好处? 在获取数据时,根据查询的数据,进行做hash操作,将需要获取的数据指定到具体的桶中
            ,这样只获取固定部分桶数据,减小了数据的加载量
        
        2.使用外部表
            外部表和普通表的区别? 删除数据时,外部表不会将HDFS中对应表路径中的数据删除

        3.选择适当的文件压缩格式
            1.对于刚采集过的源数据,需要用TextFile格式进行保存,需要保证源数据的格式及内容和原先一致
            2.对于处理过的数据,一般对数据进行压缩保存(需要考虑实际情况)

        4.命名要规范
            创建表时,需要遵守:
                如果数据存储在dwd中那么建表时需要将 dwd 放至 表的开端
                同时后面的业务名称需要和库名用 _ 进行分隔

        5.数据分层,表分离,但是也不要分的太散
            数据分层:
                将不同类型的数据,应当存储在不同库中,
                比如 维度表 应当存储在 维度库 、原始数据应当存储在ODS库中专门做管理
            表分离:
                在实际业务过程中,有一些表的维度比较大,单个表的存储压力大
                同时数据读取时,拉去的数据内容比较多,但是所需要的字段较少,浪费计算资源
                可以将表中相同类型的信息切分至多个表中,根据实际业务需要进行读取数据
                如果分的太散,那么也会造成数据冗余,并且加载表过多,计算慢

        6.分区裁剪 where过滤,先过滤,后join
            1.针对分区表数据,可以通过where条件进行过滤数据,之后再进行其他操作
            2.适当的使用一些子查询,将子查询中的数据进行初步过滤,然后再与其他表数据进行关联

        7.mapjoin (1.2以后自动默认启动mapjoin)
            select /*+mapjoin(b)*/ a.xx,b.xxx from a left outer join b on a.id=b.id

CREATE EXTERNAL TABLE IF NOT EXISTS learn4.student1(
id STRING COMMENT "学生ID",
name STRING COMMENT "学生姓名",
age int COMMENT "年龄",
gender STRING COMMENT "性别",
clazz STRING COMMENT "班级"
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ",";

load data local inpath "/usr/local/soft/hive-3.1.2/data/students.txt" INTO TABLE learn4.student1;


CREATE EXTERNAL TABLE IF NOT EXISTS learn4.score1(
id STRING COMMENT "学生ID",
subject_id STRING COMMENT "科目ID",
score int COMMENT "成绩"
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ",";
load data local inpath "/usr/local/soft/hive-3.1.2/data/score.txt" INTO TABLE learn4.score1;


CREATE TABLE mapJonTest AS SELECT max.name,min.subject_id,min.score FROM learn4.student1 max JOIN learn4.score1 min ON max.id = min.id;

--建表所需时间:
    INFO  : Completed executing command(queryId=root_20220414144741_3ef066e6-c756-4e8f-b06a-8349a067f134); Time taken: 21.143 seconds
    INFO  : OK
    INFO  : Concurrency mode is disabled, not creating a lock manager
    No rows affected (21.625 seconds)

-- 通过 explain 展示执行计划

explain SELECT max.name,min.subject_id,min.score FROM learn4.student1 max JOIN learn4.score1 min ON max.id = min.id;


| STAGE DEPENDENCIES:                                |  -- 执行stage的依赖
|   Stage-4 is a root stage                          |     Stage-4 表示根流程 --表示最先执行的流程
|   Stage-3 depends on stages: Stage-4               |     Stage-3 依赖 Stage-4
|   Stage-0 depends on stages: Stage-3               |     Stage-0 依赖 Stage-3 依赖 Stage-4
|                                                    |
| STAGE PLANS:                                       |
|   Stage: Stage-4                                   |
|     Map Reduce Local Work                          |
|       Alias -> Map Local Tables:                   |
|         $hdt$_1:min                                |
|           Fetch Operator                           |
|             limit: -1                              |
|       Alias -> Map Local Operator Tree:            |
|         $hdt$_1:min                                |
|           TableScan                                |    TableScan  扫描的表
|             alias: min                             |
|             Statistics: Num rows: 1 Data size: 1385400 Basic stats: COMPLETE Column stats: NONE |
|             Filter Operator                        |
|               predicate: id is not null (type: boolean) |
|               Statistics: Num rows: 1 Data size: 1385400 Basic stats: COMPLETE Column stats: NONE |
|               Select Operator                      |
|                 expressions: id (type: string), subject_id (type: string), score (type: int) |
|                 outputColumnNames: _col0, _col1, _col2 |
|                 Statistics: Num rows: 1 Data size: 1385400 Basic stats: COMPLETE Column stats: NONE |
|                 HashTable Sink Operator            |
|                   keys:                            |
|                     0 _col0 (type: string)         |
|                     1 _col0 (type: string)         |
|                                                    |
|   Stage: Stage-3                                   |
|     Map Reduce                                     |
|       Map Operator Tree:                           |
|           TableScan                                |
|             alias: max                             |
|             Statistics: Num rows: 1 Data size: 388080000 Basic stats: COMPLETE Column stats: NONE |
|             Filter Operator                        |
|               predicate: id is not null (type: boolean) |
|               Statistics: Num rows: 1 Data size: 388080000 Basic stats: COMPLETE Column stats: NONE |
|               Select Operator                      |
|                 expressions: id (type: string), name (type: string) |
|                 outputColumnNames: _col0, _col1    |
|                 Statistics: Num rows: 1 Data size: 388080000 Basic stats: COMPLETE Column stats: NONE |
|                 Map Join Operator                  |   -- 不需要做任何操作 默认开启 Map JOIN 操作
|                   condition map:                   |
|                        Inner Join 0 to 1           |
|                   keys:                            |
|                     0 _col0 (type: string)         |
|                     1 _col0 (type: string)         |
|                   outputColumnNames: _col1, _col3, _col4 |
|                   Statistics: Num rows: 1 Data size: 426888009 Basic stats: COMPLETE Column stats: NONE |
|                   Select Operator                  |
|                     expressions: _col1 (type: string), _col3 (type: string), _col4 (type: int) |
|                     outputColumnNames: _col0, _col1, _col2 |
|                     Statistics: Num rows: 1 Data size: 426888009 Basic stats: COMPLETE Column stats: NONE |
|                     File Output Operator           |
|                       compressed: false            |
|                       Statistics: Num rows: 1 Data size: 426888009 Basic stats: COMPLETE Column stats: NONE |
|                       table:                       |
|                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat |
|                           output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat |
|                           serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|       Execution mode: vectorized                   |
|       Local Work:                                  |
|         Map Reduce Local Work                      |
|                                                    |
|   Stage: Stage-0                                   |
|     Fetch Operator                                 |
|       limit: -1                                    |
|       Processor Tree:                              |
|         ListSink                                   |

explain extended SELECT max.name,min.subject_id,min.score FROM learn4.score1 min JOIN learn4.student1 max  ON max.id = min.id;

Map JOIN 相关设置:
 1)设置自动选择Mapjoin
set hive.auto.convert.join = true; 默认为true
(2)大表小表的阈值设置(默认25M以下认为是小表):
set hive.mapjoin.smalltable.filesize = 25000000;

        8.分区分桶,合并小文件
            为什么小文件需要合并?
                1.小文件过多,MR处理数据时,会产生多个MapTask,然而每个MapTask处理的数据量很少,
                那么导致MapTask启动时间大于执行时间,整体任务时间消耗较大


            如何合并小文件:

            1)在map执行前合并小文件,减少map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
                    set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
            
            2)在Map-Reduce的任务结束时合并小文件的设置:

                    2.1 在map-only任务结束时合并小文件,默认true
                    SET hive.merge.mapfiles = true;  
                    -- 如果有一个HIVE表,小文件过多,可以通过select * from 表 只产生一个Map
                    --     任务将原表中的小文件合并并输出至新表
                    

                    2.2 在map-reduce任务结束时合并小文件,默认false
                    SET hive.merge.mapredfiles = true;
                    合并文件的大小,默认256M
                    SET hive.merge.size.per.task = 268435456;
                    当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
                    SET hive.merge.smallfiles.avgsize = 128000000;


            3)--限制Map,Reduce数
                set mapreduce.tasktracker.map.tasks.maximum=30;       --每个nodemanager节点上可运行的最大map任务数,默认值2,可根据实际值调整为10~100;
                set mapreduce.tasktracker.reduce.tasks.maximum=30;    --每个nodemanager节点上可运行的最大reduce任务数,默认值2,可根据实际值调整为10~100;


            4)--将多个小文件打包作为一个整体的inputsplit,减少map任务数     
                set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
                set mapreduce.input.fileinputformat.split.maxsize=128000000;         --切片大小最大值,不设置,则所有输入只启动一个map任务
                set mapreduce.input.fileinputformat.split.minsize.per.node=16000000;  --同一节点的数据块形成切片时,切片大小的最小值

            5)
                -- 最大split大小
                set mapred.max.split.size=128000000;   


        9.排序优化
            order by 
                全局排序操作, 只有一个Reduce任务去对数据进行排序,会造成全部的数据堆积在一个Reduce任务中进行处理
                    经常会出现OOM异常? 一个Reduce任务的内存是有限的,承载不了太多数据


            distribute by + sort by 
                distribute by:是指我们的分区操作
                sort by: 跟上distribute by 之后可以实现在分区内进行排序,
                如果当前的Reduce数量为1,那么也没有优化的效果,可以通过设置reduce的数量对不同分区中的数据进行拆分排序
                 通过 set mapreduce.job.reduces = N; 设置Reduce数量,默认参数值为-1 即根据资源及查询语句情况进行分配reduce,

            cluster by语句:
                如果分区字段和排序字段是同一个字段,那么和 distribute by + sort by 效果一致

        10.数据倾斜优化
            原因:
                1.Key分布不均,导致某一些Key在做Reduce端处理时,执行较慢
                2.数据重复,在关联时,会产生笛卡尔积,致使数据膨胀,严重的可能会导致集群挂掉 
            
            表现:
                任务进度长时间维持在99%(或100%),
                查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大

1.创建带有部分NULL的学生表和成绩表
DROP TABLE learn4.student_null;
CREATE EXTERNAL TABLE IF NOT EXISTS learn4.student_null(
id STRING COMMENT "学生ID",
name STRING COMMENT "学生姓名",
age int COMMENT "年龄",
gender STRING COMMENT "性别",
clazz STRING COMMENT "班级"
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ",";

load data local inpath "/usr/local/soft/hive-3.1.2/data/student_null.txt" INTO TABLE learn4.student_null;

DROP TABLE learn4.score_null;
CREATE EXTERNAL TABLE IF NOT EXISTS learn4.score_null(
id STRING COMMENT "学生ID",
subject_id STRING COMMENT "科目ID",
score int COMMENT "成绩"
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ",";

load data local inpath "/usr/local/soft/hive-3.1.2/data/score_null.txt" INTO TABLE learn4.score_null;


select * from learn4.student_null limit 10;
select * from learn4.score_null limit 10;


2.大空表与小空表进行关联

set mapreduce.job.reduces = 5;


SELECT
T1.id
,count(*) as num
FROM learn4.student_null T1 JOIN learn4.score_null T2 ON T1.id = T2.id 
GROUP BY T1.id

-- 如果想展示数据倾斜效果:
    1.保证两张表的数据相差不大
    2.可以关闭MapJOIN优化,防止执行MapJOIN流程

    --查看具体的偏差的结果可以去 Reduce Tasks for job 历史中查看
        -- 可以得到结论:数据倾斜之后 部分Reduce的执行时间明显比其他Reduce任务要长

解决方法:
    1.对会产生笛卡尔积的数据进行初步过滤
SELECT
T1.id
,count(*) as num
FROM learn4.student_null T1 JOIN learn4.score_null T2 ON T1.id = T2.id  
AND (T1.id != "null_student" or T2.id != "null_student")
GROUP BY T1.id


    2.可以对Key比较集中的数据进行加随机数,然后再进行处理

SELECT
T1.id
,count(*) as num
FROM learn4.student_null T1 JOIN learn4.score_null T2 ON concat(T1.id,floor((rand()*10)%5)) = concat(T2.id,floor((rand()*10)%5)) 
GROUP BY T1.id


SELECT
substring(T.id,1,-1) as id
,sum(T.num) --对打散的ID统计过的结果进行汇总
FROM (
SELECT
T1.id 
,count(*) as num    --对打散的ID进行统计
FROM (
SELECT
concat(T1.id,floor((rand()*10)%5)) as id  --将T1表中的数据进行打散
FROM learn4.student_null) T1
JOIN 
(
SELECT
concat(T1.id,floor((rand()*10)%5)) as id  --将T2表中的数据进行打散
FROM learn4.score_null) T2 ON T1.id =T2.id
)T GROUP BY substring(T.id,1,-1)

    3.通过参数进行调整
设置开启Map端的聚合操作
set hive.map.aggr = true

设置groupby的检查点条数
set hive.groupby.mapaggr.checkinterval = 100000

开启GROUP BY 的负载均衡操作
set hive.groupby.skewindata = true 
    通过调整该参数,使得有倾斜的数据在Map reduce过程中 被随机的从Map端发送到Reduce端进行处理,
    reduce会对随机发送过来的数据进行 初步的处理,之后再进行后续的统计


网站公告

今日签到

点亮在社区的每一天
去签到