Hive 逻辑优化器

发布于:2025-06-19 ⋅ 阅读:(13) ⋅ 点赞:(0)

Optimizer

PointLookupOptimizer

作用:把符合条件的 OR 表达式转为 IN。
参数hive.optimize.point.lookup 设置是否开启 PointLookupOptimizer,默认为 true.
参数 hive.optimize.point.lookup.min 控制多少个 OR 表达式转为 IN,默认 31。
例如以下 sql, or 有3个 child,分别是 web_site_sk =1,web_site_sk =2, web_site_sk =3。这些 child 都必须是 = 。并且判断的字段是同一字段 web_site_sk。web_site_sk =1 or web_site_sk =2 or web_site_sk =3 可以转为 `web_site_sk in (1,2,3)``

set hive.optimize.point.lookup.min=2;
select web_county,count(1) cnt 
from web_site 
where web_site_sk = 1 or web_site_sk = 2 or web_site_sk =3 
group by web_county;

判断 web_site_sk = 2 改成 2 = web_site_sk 也是可以的。

  • 下边的用 or 关联的多个字段可以说可以转化的。
set hive.optimize.point.lookup.min=2;
explain select web_county,count(1) cnt 
from web_site 
where  (web_site_sk = 1 and web_open_date_sk=1) or 
(web_site_sk = 2 and web_open_date_sk=2) or
(web_site_sk = 3 and web_open_date_sk=3)
group by web_county;

生成的执行计划如下:

Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)

Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Reducer 2
      File Output Operator [FS_7]
        Group By Operator [GBY_5] (rows=1 width=109)
          Output:["_col0","_col1"],aggregations:["count(VALUE._col0)"],keys:KEY._col0
        <-Map 1 [SIMPLE_EDGE]
          SHUFFLE [RS_4]
            PartitionCols:_col0
            Group By Operator [GBY_3] (rows=1 width=109)
              Output:["_col0","_col1"],aggregations:["count(1)"],keys:web_county
              Select Operator [SEL_2] (rows=1 width=117)
                Output:["web_county"]
                Filter Operator [FIL_8] (rows=1 width=117)
                  predicate:(struct(web_open_date_sk,web_site_sk)) IN (const struct(1L,1L), const struct(2L,2L), const struct(3L,3L))
                  TableScan [TS_0] (rows=32 width=117)
                    tpcds_hdfs_orc_3@web_site,web_site,Tbl:COMPLETE,Col:COMPLETE,Output:["web_site_sk","web_open_date_sk","web_county"]
  • struct 字段不能转化为 in
drop table if exists complex_table;
create table complex_table(
    c_int int,
    c_struct struct<name:string,age:int>
)
row format delimited
FIELDS TERMINATED BY '\t'
collection items terminated by ','
map keys terminated by ':'
lines terminated by '\n'
stored as textfile;


insert into complex_table values(
  1, 
  named_struct("name","Alice", "age",18)
);

select distinct c_int from complex_table where 
(c_struct.name="Alice" and  c_struct.age=18) or 
(c_struct.name="Alice" and  c_struct.age=19) or 
(c_struct.name="Alice" and  c_struct.age=20);

PartitionColumnsSeparator

作用:把分区字段从 in 提取出来。如 STRUCT(T1.a, T1.b, T2.b, T2.c) IN (STRUCT(1, 2, 3, 4) , STRUCT(2, 3, 4, 5)),T1.a, T1.b, T2.c 是分区字段,T2.b 不是分区字段。处理之后删除额外的断言 STRUCT(T1.a, T1.b) IN (STRUCT(1, 2), STRUCT(2, 3))
AND
STRUCT(T2.c) IN (STRUCT(4), STRUCT(5))
这些额外的断言用来分区过滤。一旦分区过滤完成,分区条件会从执行计划中去除。

例如以下语句仅列出了3个分区的,因为 ws_sold_date_sk 是分区字段。

explain extended select distinct ws_item_sk from web_sales 
where (ws_sold_date_sk=2452642 and ws_sold_time_sk=1) 
  or (ws_sold_date_sk=2452641 and ws_sold_time_sk=2) 
  or (ws_sold_date_sk=2452640 and ws_sold_time_sk=3);

PredicateTransitivePropagate(没找到触发 sql)

以下语句能走到逻辑,但是没有改执行计划

select sum(ws_net_paid) from web_sales join(select * from web_site where web_site_sk < 10) web_site where ws_sold_date_sk=2452640 and web_site_sk = 1;

ConstantPropagate

从 root 到 child 的顺序遍历 DAG,对于每个条件表达式,做以下处理:

  1. 折叠表达式,如果表达式是 UDF 并且所以的参数是常数。
  • Filter 中 包含 true 的表达式去掉。
explain select count(1) from web_site where 1=1 and web_site_sk<10;

可以看到 1=1 总是为 true,可以去掉。

Filter Operator [FIL_8] (rows=10 width=8)
                  predicate:(web_site_sk < 10L)
  • Filter 中 包含 false 的表达式可以短路计算。
explain select count(1) from web_site where 1=1 or web_site_sk<10;

可以看到 1=1 总是为 true,所以所有的表达式都不需要计算。

  • null 条件等于 false

以下两个 sql 的结果都为 0

select count(1) from web_site where null;
select count(1) from web_site where false;
  • 表达式的传递
    如以下语句中 web_site_sk=5 可以向上传递, 和 web_site_sk < 10 组在一起。
explain select * from (select * from web_site where web_site_sk < 10) t where web_site_sk=5;

最终的 Filter 如下

Filter Operator [FIL_6]
        predicate:((web_site_sk < 10L) and (web_site_sk = 5L))

SyntheticJoinPredicate

explain logical 
select sum(ws_net_paid) net_paid 
from  web_sales 
where ws_web_site_sk in(
 select web_site_sk 
 from web_site 
 where web_site_sk < 10 ) 
and ws_sold_date_sk=2452642;

输出结果如下:
web_site 的 TablScan 后 Filter 增加了 (web_site_sk) IN (RS[7])

predicate: ((web_site_sk < 10L) and (web_site_sk) IN (RS[7])) (type: boolean)

web_sales 之后增加了 Filter 如下:

alias: web_sales
    Filter Operator (FIL_20)
      predicate: (ws_web_site_sk is not null and (ws_web_site_sk) IN (RS[9])) (type: boolean)
explain logical 
select sum(ws_net_paid) net_paid 
from  web_sales join web_site on ws_web_site_sk=web_site_sk
where
 ws_sold_date_sk=2452642;

生成的执行计划如下:

Explain
LOGICAL PLAN:
web_sales 
  TableScan (TS_0)
    alias: web_sales
    Filter Operator (FIL_16)
      predicate: (ws_web_site_sk is not null and (ws_web_site_sk) IN (RS[5])) (type: boolean)
      // ...
web_site 
  TableScan (TS_1)
    alias: web_site
    Filter Operator (FIL_17)
      predicate: (web_site_sk is not null and (web_site_sk) IN (RS[3])) (type: boolean)
      // ...

SortedDynPartitionOptimizer

动态分区排序优化,启用时,在插入记录之前,按分区字段,或者 bucket 字段运行排序,保证一个 reducer 仅有一个 writer,可以减少 reducer 的内存压力。

set hive.stats.autogather=false;
create table profile(c1 string) stored as textfile;
load data local inpath '/etc/profile' overwrite into table profile;
create table p_profile(c1 string) partitioned by (len int);
  • 没有经过 sort 优化
set hive.optimize.sort.dynamic.partition=false;
explain logical insert overwrite table  p_profile select c1,length(c1) from profile;

输出如下:

LOGICAL PLAN:
profile 
  TableScan (TS_0)
    alias: profile
    Select Operator (SEL_1)
      expressions: c1 (type: string), length(c1) (type: int)
      outputColumnNames: _col0, _col1
      File Output Operator (FS_2)
        compressed: false
        table:
            input format: org.apache.hadoop.mapred.TextInputFormat
            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
            name: test.p_profile
  • 经过 sort 优化
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.optimize.sort.dynamic.partition=true;
explain logical insert overwrite table  p_profile select c1,length(c1) from profile;

多了排序

Explain
LOGICAL PLAN:
profile 
  TableScan (TS_0)
    alias: profile
    Select Operator (SEL_1)
      expressions: c1 (type: string), length(c1) (type: int)
      outputColumnNames: _col0, _col1
      Reduce Output Operator (RS_3)
        key expressions: _col1 (type: int)
        sort order: +
        Map-reduce partition columns: _col1 (type: int)
        value expressions: _col0 (type: string)
        Select Operator (SEL_4)
          expressions: VALUE._col0 (type: string), KEY._col1 (type: int)
          outputColumnNames: _col0, _col1
          File Output Operator (FS_2)
            compressed: false
            Dp Sort State: PARTITION_SORTED
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                name: test.p_profile

SortedDynPartitionTimeGranularityOptimizer

专门为 FileSink 并且FileSink 为 org.apache.hadoop.hive.druid.io.DruidOutputFormat 可以优化。

PartitionPruner(简称 ppr) & PartitionConditionRemover(简称 PCR)

select sum(ws_net_paid) sum_ws_net_paid
from web_sales 
where ws_sold_date_sk >= 2452640 
 and ws_sold_date_sk <=2452642 
 and ws_net_paid>1.0 ;

PartitionPruner 把 TableScan 后面的 Filter 中的 predicate 信息放入 opToPartList 中。

HashMap<TableScanOperator, PrunedPartitionList> opToPartList;

PartitionConditionRemover 从 opToPartList 取出 ts 对应的,进行 PartitionPruner.prune 得到 partsList。调用 opToPartList.put(ts, partsList);

如果 Filter 中,仅包含 分区字段的条件,则然后删除 TableScan 后的Filter。否则删除 Filter 中关于分区字段的判断。

ListBucketingPruner

drop table if exists list_bucket_test;
CREATE TABLE list_bucket_test (key int, value int) partitioned by (dt string)
  SKEWED BY (key) ON (1,2) STORED AS DIRECTORIES;

insert overwrite table list_bucket_test 
  partition(dt=20250519) 
  values(1,1),(1,2),(2,3),(2,4),(3,5);

list_bucket_test 表目录下有3个目录,分别是 key=1,key=2 和 HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME。

drwxr-xr-x  4 houzhizhen  staff  128  5 19 16:32 HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME
drwxr-xr-x  4 houzhizhen  staff  128  5 19 16:32 key=1
drwxr-xr-x  4 houzhizhen  staff  128  5 19 16:32 key=2
set hive.optimize.listbucketing=true;
select sum(value) from list_bucket_test where dt=20250519 and key=1;

ListBucketingPruner 把 TableScanOperator->(Partition-> “GenericUDFOPEqual(Column[key], Const int 1)”) 信息放入 ParseContext 的以下字段。

Map<TableScanOperator, Map<String, ExprNodeDesc>> opToPartToSkewedPruner;

value 是 dt=20250519 -> “GenericUDFOPEqual(Column[key], Const int 1)”

然后在 GenMapRedUtils 里,根据 opToPartToSkewedPruner 信息生成 Partition 需要处理的 path.

GroupByOptimizer

Group by 优化,如果 group key 包括所有的 bucketing key 和 sort key,并且顺序相同,那么 group by 可以在map 中进行。


drop table if exists test ;
CREATE TABLE test (bkey int,skey int, value int) partitioned by(pkey int)
clustered by(bkey) sorted by (bkey,skey asc) into 8 buckets;


insert overwrite table test partition(pkey=5)
  values(1,1,1),(1,2,3),(2,3,3),(2,3,4);
-- 数据量少,不转成 Fetcher
set hive.fetch.task.conversion=none;
explain select bkey,skey, sum(value) from test where pkey=5 group by bkey,skey ;

输出如下,可以看到,在 map 端完成了group by 操作,没有 reduce,这样可能降低并行度。

Explain
Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Map 1
      File Output Operator [FS_7]
        Group By Operator [GBY_3] (rows=4 width=16)
          Output:["_col0","_col1","_col2"],aggregations:["sum(value)"],keys:bkey, skey
          Select Operator [SEL_2] (rows=4 width=12)
            Output:["bkey","skey","value"]
            TableScan [TS_0] (rows=4 width=12)
              tpcds_hdfs_orc_3@test,test,Tbl:COMPLETE,Col:COMPLETE,Output:["bkey","skey","value"]

ColumnPruner

后根遍历所有的 Operator,仅仅保留子操作用到的列。

explain logical 
select sum(ws_net_paid) net_paid 
from  web_sales 
where ws_sold_date_sk=2452642;

如没有优化的 TableScan,是所有的列,之后的 Select 也是。优化后,仅包含子操作需要的列。

CountDistinctRewriteProc

这个优化仅仅针对 tez 引擎生效。

count(distinct)只能有一个字段。并且仅能有一个 count distinct.

可以生效示例:

explain  
select count(distinct ws_web_site_sk)
from  web_sales;

生成的执行计划如下:

Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)

Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Reducer 3 vectorized
      File Output Operator [FS_19]
        Group By Operator [GBY_18] (rows=1 width=8)
          Output:["_col0"],aggregations:["count(VALUE._col0)"]
        <-Reducer 2 [CUSTOM_SIMPLE_EDGE] vectorized
          PARTITION_ONLY_SHUFFLE [RS_17]
            Group By Operator [GBY_16] (rows=1 width=8)
              Output:["_col0"],aggregations:["count(_col0)"]
              Group By Operator [GBY_15] (rows=360000188 width=934)
                Output:["_col0"],keys:KEY._col0
              <-Map 1 [SIMPLE_EDGE] vectorized
                SHUFFLE [RS_14]
                  PartitionCols:_col0
                  Group By Operator [GBY_13] (rows=720000376 width=934)
                    Output:["_col0"],keys:ws_web_site_sk
                    Select Operator [SEL_12] (rows=720000376 width=934)
                      Output:["ws_web_site_sk"]
                      TableScan [TS_0] (rows=720000376 width=934)
                        tpcds_bos_parquet_1000@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_web_site_sk"]

Count Distinct 的执行过程:
对于 mr 引擎.

  1. 多个 Map task 执行 distinct ws_web_site_sk,输出去重后的 ws_web_site_sk。
  2. 然后启动一个 reducer,拉取所有 Map Task 的数据,进行最终的 count(distinct ws_web_site_sk) 处理。如果每个 distinct ws_web_site_sk 数量比较多,可能 OOM。

对于 tez 引擎

  1. 多个 Map task 执行 distinct ws_web_site_sk, 根据 ws_web_site_sk 为 partition key,把数据分给多个 reducer。
  2. 多个 reducer 做 count(distinct ws_web_site_sk) 处理。把数值输出。
  3. 一个 reducer 对第 2 步的数值进行相加。

不能生效示例1:

explain 
select count(distinct ws_web_site_sk),
count(distinct ws_sold_time_sk)
from  web_sales;
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)

Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Reducer 3 vectorized
      File Output Operator [FS_21]
        Group By Operator [GBY_20] (rows=1 width=16)
          Output:["_col0","_col1"],aggregations:["count(VALUE._col0)","count(VALUE._col1)"]
        <-Reducer 2 [CUSTOM_SIMPLE_EDGE] vectorized
          PARTITION_ONLY_SHUFFLE [RS_19]
            Group By Operator [GBY_18] (rows=1 width=16)
              Output:["_col0","_col1"],aggregations:["count(_col0)","count(_col1)"]
              Select Operator [SEL_17] (rows=720000376 width=934)
                Output:["_col0","_col1"]
                Group By Operator [GBY_16] (rows=720000376 width=934)
                  Output:["_col0","_col1","_col2"],keys:KEY._col0, KEY._col1, KEY._col2
                <-Map 1 [SIMPLE_EDGE] vectorized
                  SHUFFLE [RS_15]
                    PartitionCols:_col0, _col1, _col2
                    Group By Operator [GBY_14] (rows=1440000752 width=934)
                      Output:["_col0","_col1","_col2"],keys:_col0, _col1, 0L
                      Select Operator [SEL_13] (rows=720000376 width=934)
                        Output:["_col0","_col1"]
                        TableScan [TS_0] (rows=720000376 width=934)
                          tpcds_bos_parquet_1000@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_web_site_sk","ws_sold_time_sk"]

不能生效示例2:

explain 
select count(distinct ws_web_site_sk, ws_sold_time_sk)
from  web_sales;

生成的执行计划如下:

Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)

Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Reducer 2
      File Output Operator [FS_6]
        Group By Operator [GBY_4] (rows=1 width=16)
          Output:["_col0"],aggregations:["count(DISTINCT KEY._col0:0._col0, KEY._col0:0._col1)"]
        <-Map 1 [SIMPLE_EDGE]
          SHUFFLE [RS_3]
            Group By Operator [GBY_2] (rows=720000376 width=934)
              Output:["_col0","_col1","_col2"],aggregations:["count(DISTINCT ws_web_site_sk, ws_sold_time_sk)"],keys:ws_web_site_sk, ws_sold_time_sk
              Select Operator [SEL_1] (rows=720000376 width=934)
                Output:["ws_sold_time_sk","ws_web_site_sk"]
                TableScan [TS_0] (rows=720000376 width=934)
                  tpcds_bos_parquet_1000@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_sold_time_sk","ws_web_site_sk"]

SkewJoinOptimizer

仅仅对 MR 有效。

drop table if exists skew_web_sales;
CREATE TABLE `skew_web_sales`(
  `ws_sold_time_sk` bigint, 
  `ws_ship_date_sk` bigint, 
  `ws_item_sk` bigint, 
  `ws_bill_customer_sk` bigint, 
  `ws_bill_cdemo_sk` bigint, 
  `ws_bill_hdemo_sk` bigint, 
  `ws_bill_addr_sk` bigint, 
  `ws_ship_customer_sk` bigint, 
  `ws_ship_cdemo_sk` bigint, 
  `ws_ship_hdemo_sk` bigint, 
  `ws_ship_addr_sk` bigint, 
  `ws_web_page_sk` bigint, 
  `ws_web_site_sk` bigint, 
  `ws_ship_mode_sk` bigint, 
  `ws_warehouse_sk` bigint, 
  `ws_promo_sk` bigint, 
  `ws_order_number` bigint, 
  `ws_quantity` int, 
  `ws_wholesale_cost` decimal(7,2), 
  `ws_list_price` decimal(7,2), 
  `ws_sales_price` decimal(7,2), 
  `ws_ext_discount_amt` decimal(7,2), 
  `ws_ext_sales_price` decimal(7,2), 
  `ws_ext_wholesale_cost` decimal(7,2), 
  `ws_ext_list_price` decimal(7,2), 
  `ws_ext_tax` decimal(7,2), 
  `ws_coupon_amt` decimal(7,2), 
  `ws_ext_ship_cost` decimal(7,2), 
  `ws_net_paid` decimal(7,2), 
  `ws_net_paid_inc_tax` decimal(7,2), 
  `ws_net_paid_inc_ship` decimal(7,2), 
  `ws_net_paid_inc_ship_tax` decimal(7,2), 
  `ws_net_profit` decimal(7,2),
  `ws_sold_date_sk` bigint) 
skewed by (ws_web_site_sk) on (1);
insert overwrite table skew_web_sales select * from web_sales where ws_web_site_sk=1;
insert into table skew_web_sales select * from web_sales where ws_sold_date_sk=2452642 and ws_web_site_sk=3 limit 1;

set hive.optimize.skewjoin.compiletime=true;
set hive.execution.engine=mr;
explain  
select ws_web_site_sk,web_class,sum(ws_net_paid) 
from  web_site join  skew_web_sales
on skew_web_sales.ws_web_site_sk=web_site.web_site_sk 
group by ws_web_site_sk,web_class;

生成的执行计划如下:

Explain
LOGICAL PLAN:
skew_web_sales 
  TableScan (TS_1)
    alias: skew_web_sales
    Statistics: Num rows: 126827 Data size: 15217112 Basic stats: COMPLETE Column stats: COMPLETE
    Filter Operator (FIL_24)
      predicate: (ws_web_site_sk is not null and (ws_web_site_sk = 1L)) (type: boolean)
      Statistics: Num rows: 63414 Data size: 7608672 Basic stats: COMPLETE Column stats: COMPLETE
      Reduce Output Operator (RS_5)
        key expressions: ws_web_site_sk (type: bigint)
        sort order: +
        Map-reduce partition columns: ws_web_site_sk (type: bigint)
        Statistics: Num rows: 63414 Data size: 7608672 Basic stats: COMPLETE Column stats: COMPLETE
        value expressions: ws_net_paid (type: decimal(7,2))
        Join Operator (JOIN_6)
          condition map:
               Inner Join 0 to 1
          keys:
            0 web_site_sk (type: bigint)
            1 ws_web_site_sk (type: bigint)
          outputColumnNames: _col7, _col42, _col58
          Statistics: Num rows: 63414 Data size: 13379346 Basic stats: COMPLETE Column stats: COMPLETE
          Union (UNION_27)
            Statistics: Num rows: 94130 Data size: 19859414 Basic stats: COMPLETE Column stats: COMPLETE
            Group By Operator (GBY_8)
              aggregations: sum(_col58)
              keys: _col42 (type: bigint), _col7 (type: varchar(50))
              mode: hash
              outputColumnNames: _col0, _col1, _col2
              Statistics: Num rows: 2 Data size: 422 Basic stats: COMPLETE Column stats: COMPLETE
              Reduce Output Operator (RS_9)
                key expressions: _col0 (type: bigint), _col1 (type: varchar(50))
                sort order: ++
                Map-reduce partition columns: _col0 (type: bigint), _col1 (type: varchar(50))
                Statistics: Num rows: 2 Data size: 422 Basic stats: COMPLETE Column stats: COMPLETE
                value expressions: _col2 (type: decimal(17,2))
                Group By Operator (GBY_10)
                  aggregations: sum(VALUE._col0)
                  keys: KEY._col0 (type: bigint), KEY._col1 (type: varchar(50))
                  mode: mergepartial
                  outputColumnNames: _col0, _col1, _col2
                  Statistics: Num rows: 2 Data size: 422 Basic stats: COMPLETE Column stats: COMPLETE
                  File Output Operator (FS_12)
                    compressed: false
                    Statistics: Num rows: 2 Data size: 422 Basic stats: COMPLETE Column stats: COMPLETE
                    table:
                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
subquery1:skew_web_sales 
  TableScan (TS_18)
    alias: skew_web_sales
    Statistics: Num rows: 126827 Data size: 15217112 Basic stats: COMPLETE Column stats: COMPLETE
    Filter Operator (FIL_26)
      predicate: (ws_web_site_sk is not null and (not (ws_web_site_sk = 1L))) (type: boolean)
      Statistics: Num rows: 63413 Data size: 7608552 Basic stats: COMPLETE Column stats: COMPLETE
      Reduce Output Operator (RS_20)
        key expressions: ws_web_site_sk (type: bigint)
        sort order: +
        Map-reduce partition columns: ws_web_site_sk (type: bigint)
        Statistics: Num rows: 63413 Data size: 7608552 Basic stats: COMPLETE Column stats: COMPLETE
        value expressions: ws_net_paid (type: decimal(7,2))
        Join Operator (JOIN_21)
          condition map:
               Inner Join 0 to 1
          keys:
            0 web_site_sk (type: bigint)
            1 ws_web_site_sk (type: bigint)
          outputColumnNames: _col7, _col42, _col58
          Statistics: Num rows: 30716 Data size: 6480068 Basic stats: COMPLETE Column stats: COMPLETE
          Union (UNION_27)
            Statistics: Num rows: 94130 Data size: 19859414 Basic stats: COMPLETE Column stats: COMPLETE
subquery1:web_site 
  TableScan (TS_15)
    alias: web_site
    Statistics: Num rows: 32 Data size: 3168 Basic stats: COMPLETE Column stats: COMPLETE
    Filter Operator (FIL_25)
      predicate: (web_site_sk is not null and (not (ws_web_site_sk = 1L))) (type: boolean)
      Statistics: Num rows: 1 Data size: 99 Basic stats: COMPLETE Column stats: COMPLETE
      Reduce Output Operator (RS_17)
        key expressions: web_site_sk (type: bigint)
        sort order: +
        Map-reduce partition columns: web_site_sk (type: bigint)
        Statistics: Num rows: 1 Data size: 99 Basic stats: COMPLETE Column stats: COMPLETE
        value expressions: web_class (type: varchar(50))
        Join Operator (JOIN_21)
          condition map:
               Inner Join 0 to 1
          keys:
            0 web_site_sk (type: bigint)
            1 ws_web_site_sk (type: bigint)
          outputColumnNames: _col7, _col42, _col58
          Statistics: Num rows: 30716 Data size: 6480068 Basic stats: COMPLETE Column stats: COMPLETE
web_site 
  TableScan (TS_0)
    alias: web_site
    Statistics: Num rows: 32 Data size: 3168 Basic stats: COMPLETE Column stats: COMPLETE
    Filter Operator (FIL_23)
      predicate: (web_site_sk is not null and (ws_web_site_sk = 1L)) (type: boolean)
      Statistics: Num rows: 32 Data size: 3168 Basic stats: COMPLETE Column stats: COMPLETE
      Reduce Output Operator (RS_3)
        key expressions: web_site_sk (type: bigint)
        sort order: +
        Map-reduce partition columns: web_site_sk (type: bigint)
        Statistics: Num rows: 32 Data size: 3168 Basic stats: COMPLETE Column stats: COMPLETE
        value expressions: web_class (type: varchar(50))
        Join Operator (JOIN_6)
          condition map:
               Inner Join 0 to 1
          keys:
            0 web_site_sk (type: bigint)
            1 ws_web_site_sk (type: bigint)
          outputColumnNames: _col7, _col42, _col58
          Statistics: Num rows: 63414 Data size: 13379346 Basic stats: COMPLETE Column stats: COMPLETE

SamplePruner

示例:

drop table if exists t1 ;
create table t1(c1 string) stored as textfile;
load data local inpath '/etc/profile' overwrite into table t1;

drop table if exists t2;
create external table t2(c1 string) CLUSTERED BY (c1) into 8 buckets;
insert overwrite table t2 select * from t1;
set hive.execution.engine=mr;
set hive.fetch.task.conversion=none;
explain select * from t2 TABLESAMPLE(BUCKET 3 OUT OF 8 ON c1) s;

SamplePruner 生效条件:

  1. t2 是外部表。
  2. CLUSTERED BY 字段 和 select 中 TABLESAMPLE 的 ON 字段一样。如果是on rand 不行。
  3. 创建表中into 8 buckets 和检索中 OUT OF 8 的数字要一样。
  4. 检索中的 (BUCKET 3 OUT OF 8 ON c1) 的 3 代表是第3个文件。

MapJoinProcessor

用户指定使用 MapJoin 的 hint。这种方法已经不再使用,现在是基于统计信息自动把 join 转为 mapjoin.

set hive.ignore.mapjoin.hint=false;
explain select /*+mapjoin(web_site) */ sum(ws_net_paid) from web_sales join web_site on web_site_sk=ws_web_site_sk 
where ws_sold_date_sk=2452640 ;

BucketMapJoinOptimizer

使用条件:

  1. mr 引擎
  2. sql 中 mapjoin hint 生效。
  3. hive.optimize.bucketmapjoin 为 true,默认为 false.

BucketMapJoinOptimizer,SortedMergeBucketMapJoinOptimizer

mr 引擎生效,并且默认不启用。

BucketingSortingReduceSinkOptimizer

对于 insert overwrite table T2 select * from T1;
如果 T1 和 T2 都 bucket 和 sort key 相同,并且 bucket 的数量相同,那么就不用 reduer 任务。

UnionProcessor

如果 UNION 的两个子查询都是 map-only,把此信息存入 unionDesc/UnionOperator。如果其中的一个子查询是 map-reduce 的作业,在 UNION 之前加入 FS。

UNION 的两个子查询都是 map-only 的示例.
union 和 union all 之间的区别。

union 在 operator union 后,有 group by 和 reduce sink,因为需要去重。
union all 在 operator union 后,没有 group by 和 reduce sink,因为不需要去重。

explain logical select ws_web_site_sk from web_sales where ws_sold_date_sk=2452640
union
select ws_web_site_sk from web_sales where ws_sold_date_sk=2452641;

如果其中的一个子查询是 map-reduce 的作业示例:

explain logical select ws_web_site_sk from web_sales where ws_sold_date_sk=2452640
union
select distinct ws_web_site_sk from web_sales where ws_sold_date_sk=2452641;

在 MapReduceCompiler 使用

BucketVersionPopulator

根据 hash 算法的不同,数据分到 N 个bucket 时会不一致。
Hive 根据 bucketingVersion 确定使用哪个 hash 算法。
在每个 Reduce Sink 之后,可以使用不同的 Bucketing version。因为 full shuffle 可以重新按新的 hash 算法分配数据。

hive 使用表的 bucket version 写入数据。
如果从表读数据,会考虑表的 bucketing_version 字段。

ReduceSinkDeDuplication

如果相邻的两个 reduce sink 有共同的 partition/sort 字段,并且字段的顺序相同,可以合并为一个 reduce。

例如,以下sql 的两处 ‘parition by’ 字段不一样.

explain select 
avg(sum(web_tax_percentage)) over
          (partition by web_city)
          avg_monthly_sales,
        rank() over
          (partition by web_county, web_city
           order by web_county) rn
from web_site
group by web_county, web_city;

生成的语法树如下:

Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (SIMPLE_EDGE)

以下sql 生成的

explain select 
avg(sum(web_tax_percentage)) over
          (partition by web_county, web_city)
          avg_monthly_sales,
        rank() over
          (partition by web_county, web_city
           order by web_county) rn
from web_site
 group by web_county, web_city;

生成的 vertex 如下, 比上一个 sql 少一个 reduce,因为 partition by 相同

Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)

NonBlockingOpDeDupProc

相邻的两个投影(SEL)操作合并为一个投影(SEL)操作,相邻的两个过滤(FIL)操作合并为一个过滤(FIL)操作。

IdentityProjectRemover

删除不必要的投影(SEL)操作。如果投影(SEL)仅仅是forward,没有计算,如 select x+1 这种带计算,并且没有给列重命名,则可以去除。

explain select web_city 
from web_site  
where web_city > '1' 
group by web_city;

生成的 SQL 语法树为, 没有 select

Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)

Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Reducer 2
      File Output Operator [FS_7]
        Group By Operator [GBY_5] (rows=1 width=91)
          Output:["_col0"],keys:KEY._col0
        <-Map 1 [SIMPLE_EDGE]
          SHUFFLE [RS_4]
            PartitionCols:_col0
            Group By Operator [GBY_3] (rows=1 width=91)
              Output:["_col0"],keys:web_city
              Filter Operator [FIL_8] (rows=10 width=91)
                predicate:(web_city > '1')
                TableScan [TS_0] (rows=32 width=91)
                  tpcds_hdfs_orc_3@web_site,web_site,Tbl:COMPLETE,Col:COMPLETE,Output:["web_city"]

如没有 filter

explain select web_city 
from web_site  
group by web_city;

在 TS 后,有一个 select.

Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)

Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Reducer 2
      File Output Operator [FS_6]
        Group By Operator [GBY_4] (rows=2 width=91)
          Output:["_col0"],keys:KEY._col0
        <-Map 1 [SIMPLE_EDGE]
          SHUFFLE [RS_3]
            PartitionCols:_col0
            Group By Operator [GBY_2] (rows=2 width=91)
              Output:["_col0"],keys:web_city
              Select Operator [SEL_1] (rows=32 width=91)
                Output:["web_city"]
                TableScan [TS_0] (rows=32 width=91)
                  tpcds_hdfs_orc_3@web_site,web_site,Tbl:COMPLETE,Col:COMPLETE,Output:["web_city"]

explain select upper(web_city) d_web_city
from web_site  
where web_city > '1' 
group by web_city;

在 File Output Operator 之前,多了一个 select 操作

Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)

Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Reducer 2
      File Output Operator [FS_7]
        Select Operator [SEL_6] (rows=1 width=144)
          Output:["_col0"]
          Group By Operator [GBY_5] (rows=1 width=91)
            Output:["_col0"],keys:KEY._col0
          <-Map 1 [SIMPLE_EDGE]
            SHUFFLE [RS_4]
              PartitionCols:_col0
              Group By Operator [GBY_3] (rows=1 width=91)
                Output:["_col0"],keys:web_city
                Filter Operator [FIL_8] (rows=10 width=91)
                  predicate:(web_city > '1')
                  TableScan [TS_0] (rows=32 width=91)
                    tpcds_hdfs_orc_3@web_site,web_site,Tbl:COMPLETE,Col:COMPLETE,Output:["web_city"]

GlobalLimitOptimizer

set hive.limit.optimize.enable=true;
select * from web_sales limit 10;

如 web_sales 有很多分区,每个分区下很多文件,查询10条数据,没必要打开所有文件,可能第1个文件就有10条记录,可以减少收入。

CorrelationOptimizer

仅仅对 MR 引擎生效。参考论文:YSmart: Yet Another SQL-to-MapReduce Translator(Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, and Xiaodong Zhang)

LimitPushdownOptimizer

explain extended select sum(web_tax_percentage)
from web_site  
group by web_city limit 1;

order by: 把 limit 推到最后的 reduce sink。

explain select ws_web_site_sk
from web_sales  
order by ws_web_site_sk limit 1;

StatsOptimizer

有些 query 的结果,可以从 stats 信息中,直接获取。
先生成表的统计信息。

analyze table web_site compute statistics for columns;

执行 sql:

explain select min(web_site_sk) from web_site;

从统计信息获取结果的执行计划如下,仅有一个 Fetch

Explain
Stage-0
  Fetch Operator
    limit:1
  • 不能从统计信息获取结果示例
explain select min(web_site_sk) from web_site where web_site_sk <> 2;

此 SQL 的执行计划如下:

Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)

Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Reducer 2
      File Output Operator [FS_7]
        Group By Operator [GBY_5] (rows=1 width=8)
          Output:["_col0"],aggregations:["min(VALUE._col0)"]
        <-Map 1 [CUSTOM_SIMPLE_EDGE]
          PARTITION_ONLY_SHUFFLE [RS_4]
            Group By Operator [GBY_3] (rows=1 width=8)
              Output:["_col0"],aggregations:["min(web_site_sk)"]
              Filter Operator [FIL_8] (rows=32 width=8)
                predicate:(web_site_sk <> 2L)
                TableScan [TS_0] (rows=32 width=8)
                  tpcds_hdfs_orc_3@web_site,web_site,Tbl:COMPLETE,Col:COMPLETE,Output:["web_site_sk"]

AnnotateWithStatistics, AnnotateWithOpTraits

当 explain 的时候,并且是 mr 引擎时,在逻辑执行计划优化(Optimizer)时执行。
TEZ 在物理执行计划优化时执行。
AnnotateWithStatistics 给各 Operator 设置 stats(统计信息)。
AnnotateWithOpTraits 设置 opTraits,适用于 bucket 表.如两个 bucket 表做 join。
OpTraits 定义如下,

public OpTraits(List<List<String>> bucketColNames, int numBuckets,
      List<List<String>> sortColNames, int numReduceSinks) {
    this.bucketColNames = bucketColNames;
    this.numBuckets = numBuckets;
    this.sortColNames = sortColNames;
    this.numReduceSinks = numReduceSinks;
  }
explain
select web_site.web_city, sum(ws_ext_tax) ws_ext_tax_sum
from web_site join web_sales on web_site.web_site_sk = web_sales.ws_web_site_sk
where ws_sold_date_sk=2452642 and ws_web_site_sk =1 
group by web_site.web_city;

SimpleFetchOptimizer

对于单表简单的操作(没有 group by, 没有 distinct,单表),不启动分布式任务,直接在 fetch task里读取表返回,可以加快执行速度。
hive.fetch.task.conversion 的值
none :禁用
minimal : 支持 SELECT *, 在分区字段过滤, LIMIT
more : SELECT, FILTER, LIMIT only (支持 TABLESAMPLE 和虚拟字段)

hive.fetch.task.conversion.threshold:控制适用这个优化的数据量,默认1G。

TablePropertyEnrichmentOptimizer

默认关闭,除了表的 properties 外,可以获取表的SerDe 的属性信息,然后都放到表的properties 中。
Serde 由参数 hive.optimize.update.table.properties.from.serde.list 控制,默认只有 AvroSerDe。

HiveOpConverterPostProc

以下3个条件都符合才执行,returnPathEnabled 默认 false,所以会不执行。

final boolean cboEnabled = HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVE_CBO_ENABLED);
    final boolean returnPathEnabled = HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP);
    final boolean cboSucceeded = pctx.getContext().isCboSucceeded();

网站公告

今日签到

点亮在社区的每一天
去签到