实验十 合理定义分布列实现性能优化-分布式表关联

发布于:2025-09-11 ⋅ 阅读:(14) ⋅ 点赞:(0)

实验介绍

本实验通过分析普通查询过程中存在的性能瓶颈点,通过执行计划的分析找到可能的性能优化点并加以实施,最终达到优化的效果,重点关注分布式关联相关查询语句的优化。

实验目的

了解通过合理定义分布列实现分布式关联的性能优化。

实验步骤

步骤1 使用以下语句查询

Explain analyze select
l_orderkey,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
order by
o_orderdate limit 10;

步骤2 修改分布键

观察查询语句中,需要对 customer、order 和 lineitem 三张表进行关联,其中关联条件有c_custkey = o_custkey 和 l_orderkey = o_orderkey。如果只考虑本查询的优化,在该列数据没有显著倾斜的情况下,优先考虑 customer.c_custkey 和 lineitem.l_orderkey 作为分布键,从而减少 DN 数据的重分布。

customer、lineitem 建表语句,并导入数据文件customer.csv、lineitem.csv。

Drop table if exists customer;

CREATE TABLE CUSTOMER (
C_CUSTKEY INTEGER NOT NULL,
C_NAME VARCHAR(25) NOT NULL,
C_ADDRESS VARCHAR(400) NOT NULL,
C_NATIONKEY INTEGER  NOT NULL,
C_PHONE CHAR(15) NOT NULL,
C_ACCTBAL DECIMAL(15,2)	NOT NULL,
C_MKTSEGMENT CHAR(10) NOT NULL,
C_COMMENT VARCHAR(400) NOT NULL
)
DISTRIBUTE BY HASH(c_custkey);

Drop table if exists lineitem;

CREATE TABLE LINEITEM(
L_ORDERKEY INTEGER NOT NULL,
L_PARTKEY INTEGER NOT NULL,
L_SUPPKEY INTEGER NOT NULL,
L_LINENUMBER INTEGER NOT NULL,
L_QUANTITY DECIMAL(15,2) NOT NULL,
L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
L_DISCOUNT DECIMAL(15,2) NOT NULL,
L_TAX DECIMAL(15,2) NOT NULL,
L_RETURNFLAG CHAR(1) NOT NULL,
L_LINESTATUS CHAR(1) NOT NULL,
L_SHIPDATE DATE NOT NULL,
L_COMMITDATE DATE NOT NULL,
L_RECEIPTDATE DATE NOT NULL,
L_SHIPINSTRUCT CHAR(25) NOT NULL,
L_SHIPMODE CHAR(10) NOT NULL,
L_COMMENT VARCHAR(44) NOT NULL)
DISTRIBUTE BY HASH(l_orderkey);

COPY CUSTOMER FROM '/tmp/customer.csv' 
    DELIMITER ',' 
    QUOTE '"' 
    ESCAPE '"' 
	ENCODING 'GBK' 
    CSV;

COPY LINEITEM FROM '/tmp/lineitem_.csv' 
    DELIMITER ',' 
    QUOTE '"' 
    ESCAPE '"' 
    ENCODING 'UTF-8' 
    CSV;

步骤3 尝试使用 o_custkey 作为 orders 的分布键,并导入数据文件orders.csv。

Drop table if exists orders; 

CREATE TABLE orders (
o_orderkey bigint NOT NULL,
o_custkey bigint NOT NULL,
o_orderstatus character(1) NOT NULL,
o_totalprice numeric(15,2) NOT NULL,
o_orderdate date NOT NULL,
o_orderpriority character(15) NOT NULL,
o_clerk character(15) NOT NULL,
o_shippriority bigint NOT NULL,
o_comment character varying(79) NOT NULL
)WITH (orientation=row, compression=no)
DISTRIBUTE BY HASH(o_custkey);

COPY ORDERS FROM '/tmp/orders.csv' 
    DELIMITER ',' 
    QUOTE '"' 
    ESCAPE '"' 
    ENCODING 'UTF-8' 
    CSV;

执行步骤一中的查询语句,设置参数:
set enable_fast_query_shipping = off;
set enable_stream_operator = on; 

该两个参数为会话级,只在本次会话期间生效。

jiang=# Explain analyze 
jiang-# select
jiang-# l_orderkey, 
jiang-# o_orderdate, 
jiang-# o_shippriority
jiang-# from
jiang-# customer, orders, lineitem
jiang-# where
;jiang-# c_custkey = o_custkey
jiang-# and l_orderkey = o_orderkey
jiang-# and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
jiang-# order by
jiang-# o_orderdate limit 10;
 id |                      operation                      |      A-time       | A-rows | E-rows | Peak Memory | A-width | E-width | E-costs  
----+-----------------------------------------------------+-------------------+--------+--------+-------------+---------+---------+----------
  1 | ->  Limit                                           | 782.531           |     10 |     10 | 2KB         |         |      16 | 11271.18
  2 |    ->  Streaming (type: GATHER)                     | 782.528           |     10 |     10 | 176KB       |         |      16 | 11271.35
  3 |       ->  Limit                                     | [753.865,768.892] |     30 |     18 | [2KB,2KB]   |         |      16 | 11270.60
  4 |          ->  Sort                                   | [753.861,768.887] |     30 |     18 | [28KB,29KB] | [32,32] |      16 | 11270.60
  5 |             ->  Hash Join (6,7)                     | [752.256,767.242] |  25917 |     17 | [9KB,9KB]   |         |      16 | 11270.51
  6 |                ->  Seq Scan on lineitem             | [85.445,98.007]   | 565702 | 564893 | [41KB,41KB] |         |       4 | 10763.06
  7 |                ->  Hash                             | [594.646,597.109] | 713105 |     10 | [12MB,13MB] | [40,40] |      20 | 28.73
  8 |                   ->  Streaming(type: REDISTRIBUTE) | [373.185,449.521] | 713105 |     10 | [69KB,69KB] |         |      20 | 28.73
  9 |                      ->  Hash Join (10,11)          | [362.084,403.154] | 713105 |     10 | [9KB,9KB]   |         |      20 | 28.44
 10 |                         ->  Seq Scan on customer    | [14.668,18.917]   | 147100 |     30 | [34KB,35KB] |         |       4 | 14.14
 11 |                         ->  Hash                    | [199.532,206.435] | 727305 |     10 | [14MB,15MB] | [48,48] |      28 | 14.18
 12 |                            ->  Seq Scan on orders   | [145.599,150.525] | 727305 |     10 | [33KB,33KB] |         |      28 | 14.18
(12 rows)

                  Predicate Information (identified by plan id)                   
----------------------------------------------------------------------------------
   5 --Hash Join (6,7)
         Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
   6 --Seq Scan on lineitem
         Filter: (l_shipdate > '1995-03-15'::date), (LLVM Optimized, Jit Execute)
         Rows Removed by Filter: 482874
   9 --Hash Join (10,11)
         Hash Cond: (customer.c_custkey = orders.o_custkey)
  12 --Seq Scan on orders
         Filter: (o_orderdate < '1995-03-15'::date)
         Rows Removed by Filter: 772695
(10 rows)

              Memory Information (identified by plan id)               
-----------------------------------------------------------------------
 Coordinator Query Peak Memory:
         Query Peak Memory: 2MB
 Datanode:
         Max Query Peak Memory: 15MB
         Min Query Peak Memory: 14MB
   4 --Sort
         Sort Method: top-N heapsort  Memory: 26kB ~ 26kB
 Sort Method: top-N heapsort  Disk: 1024kB ~ 0kB
   7 --Hash
         Max Buckets: 32768  Max Batches: 1  Max Memory Usage: 13010kB
         Min Buckets: 32768  Min Batches: 1  Min Memory Usage: 12984kB
  11 --Hash
         Max Buckets: 32768  Max Batches: 1  Max Memory Usage: 15310kB
         Min Buckets: 32768  Min Batches: 1  Min Memory Usage: 14980kB
(14 rows)

                           User Define Profiling                           
---------------------------------------------------------------------------
 Segment Id: 3  Track name: Datanode build connection
  (actual time=[0.203, 0.232], calls=[1, 1])
 Plan Node id: 2  Track name: coordinator get datanode connection
  (actual time=[0.029, 0.029], calls=[1, 1])
 Plan Node id: 2  Track name: Coordinator check and update node definition
  (actual time=[0.001, 0.001], calls=[1, 1])
 Plan Node id: 2  Track name: Coordinator serialize plan
  (actual time=[0.904, 0.904], calls=[1, 1])
 Plan Node id: 2  Track name: Coordinator send query id with sync
  (actual time=[0.220, 0.220], calls=[1, 1])
 Plan Node id: 2  Track name: Coordinator send begin command
  (actual time=[0.001, 0.001], calls=[1, 1])
 Plan Node id: 2  Track name: Coordinator start transaction and send query
  (actual time=[0.025, 0.025], calls=[1, 1])
 Plan Node id: 3  Track name: Datanode start up stream thread
  (actual time=[0.032, 0.049], calls=[1, 1])
(16 rows)

                                 ====== Query Summary =====                                 
--------------------------------------------------------------------------------------------
 Datanode executor start time [dn_6007_6008_6009, dn_6004_6005_6006]: [7.810 ms,9.919 ms]
 Datanode executor run time [dn_6007_6008_6009, dn_6004_6005_6006]: [753.898 ms,768.921 ms]
 Datanode executor end time [dn_6007_6008_6009, dn_6001_6002_6003]: [0.082 ms,0.092 ms]
 Coordinator executor start time: 6.973 ms
 Coordinator executor run time: 782.544 ms
 Coordinator executor end time: 0.031 ms
 Planner runtime: 0.904 ms
 Plan size: 6167 byte
 Query Id: 72902018968525132
 Total runtime: 789.565 ms
(10 rows)

此时,orders 和 customer 两表由于关联列都在各自的分布键上,所以可以本地进行关联,然后其结果集再根据和 lineitem 的关联条件作为重分布的分布列进行重分布。

步骤4 尝试使用 o_orderkey 作为 orders 的分布列,并导入数据文件orders.csv。

Drop table if exists orders;

CREATE TABLE orders (
o_orderkey bigint NOT NULL,
o_custkey bigint NOT NULL,
o_orderstatus character(1) NOT NULL,
o_totalprice numeric(15,2) NOT NULL,
o_orderdate date NOT NULL,
o_orderpriority character(15) NOT NULL,
o_clerk character(15) NOT NULL,
o_shippriority bigint NOT NULL,
o_comment character varying(79) NOT NULL
)
WITH (orientation=row, compression=no)
DISTRIBUTE BY HASH(o_orderkey);

COPY ORDERS FROM '/tmp/orders.csv' 
    DELIMITER ',' 
    QUOTE '"' 
    ESCAPE '"' 
    ENCODING 'UTF-8' 
    CSV;

执行步骤 1 中查询语句:

jiang=# Explain analyze 
jiang-# select
jiang-# l_orderkey, 
jiang-# o_orderdate, 
jiang-# o_shippriority
jiang-# from
jiang-# customer, orders, lineitem
jiang-# where
jiang-# c_custkey = o_custkey
jiang-# and l_orderkey = o_orderkey
jiang-# and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
jiang-# order by
jiang-# o_orderdate limit 10;
 id |                      operation                      |      A-time       | A-rows | E-rows |  Peak Memory  | A-width | E-width | E-costs  
----+-----------------------------------------------------+-------------------+--------+--------+---------------+---------+---------+----------
  1 | ->  Limit                                           | 419.741           |     10 |     10 | 2KB           |         |      16 | 13063.96
  2 |    ->  Streaming (type: GATHER)                     | 419.738           |     10 |     10 | 176KB         |         |      16 | 13064.13
  3 |       ->  Limit                                     | [415.101,416.042] |     30 |     18 | [2KB,2KB]     |         |      16 | 13063.38
  4 |          ->  Sort                                   | [415.097,416.037] |     30 |     18 | [28KB,29KB]   | [32,32] |      16 | 13063.38
  5 |             ->  Hash Join (6,7)                     | [413.825,414.770] |  25917 |     17 | [9KB,9KB]     |         |      16 | 13063.29
  6 |                ->  Seq Scan on customer             | [10.589,10.882]   | 147100 | 147100 | [34KB,35KB]   |         |       4 | 1684.33
  7 |                ->  Hash                             | [393.875,394.071] |  26500 |     17 | [840KB,840KB] | [44,44] |      24 | 11254.18
  8 |                   ->  Streaming(type: REDISTRIBUTE) | [387.692,387.929] |  26500 |     17 | [70KB,70KB]   |         |      24 | 11254.18
  9 |                      ->  Hash Join (10,11)          | [360.934,376.886] |  26500 |     17 | [10KB,10KB]   |         |      24 | 11253.60
 10 |                         ->  Seq Scan on lineitem    | [88.832,100.285]  | 565702 | 564893 | [41KB,41KB]   |         |       4 | 10763.06
 11 |                         ->  Hash                    | [199.541,202.174] | 727305 |     10 | [15MB,15MB]   | [48,48] |      28 | 14.18
 12 |                            ->  Seq Scan on orders   | [145.768,147.778] | 727305 |     10 | [33KB,33KB]   |         |      28 | 14.18
(12 rows)

            Predicate Information (identified by plan id)            
---------------------------------------------------------------------
   5 --Hash Join (6,7)
         Hash Cond: (customer.c_custkey = orders.o_custkey)
   9 --Hash Join (10,11)
         Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
  10 --Seq Scan on lineitem
         Filter: (l_shipdate > '1995-03-15'::date), (LLVM Optimized)
         Rows Removed by Filter: 482874
  12 --Seq Scan on orders
         Filter: (o_orderdate < '1995-03-15'::date)
         Rows Removed by Filter: 772695
(10 rows)

              Memory Information (identified by plan id)               
-----------------------------------------------------------------------
 Coordinator Query Peak Memory:
         Query Peak Memory: 2MB
 Datanode:
         Max Query Peak Memory: 1MB
         Min Query Peak Memory: 1MB
   4 --Sort
         Sort Method: top-N heapsort  Memory: 26kB ~ 26kB
 Sort Method: top-N heapsort  Disk: 1024kB ~ 0kB
   7 --Hash
         Max Buckets: 32768  Max Batches: 1  Max Memory Usage: 530kB
         Min Buckets: 32768  Min Batches: 1  Min Memory Usage: 511kB
  11 --Hash
         Max Buckets: 32768  Max Batches: 1  Max Memory Usage: 15162kB
         Min Buckets: 32768  Min Batches: 1  Min Memory Usage: 15137kB
(14 rows)

                           User Define Profiling                           
---------------------------------------------------------------------------
 Segment Id: 3  Track name: Datanode build connection
  (actual time=[0.179, 0.226], calls=[1, 1])
 Plan Node id: 2  Track name: coordinator get datanode connection
  (actual time=[0.030, 0.030], calls=[1, 1])
 Plan Node id: 2  Track name: Coordinator check and update node definition
  (actual time=[0.001, 0.001], calls=[1, 1])
 Plan Node id: 2  Track name: Coordinator serialize plan
  (actual time=[0.881, 0.881], calls=[1, 1])
 Plan Node id: 2  Track name: Coordinator send query id with sync
  (actual time=[0.246, 0.246], calls=[1, 1])
 Plan Node id: 2  Track name: Coordinator send begin command
  (actual time=[0.001, 0.001], calls=[1, 1])
 Plan Node id: 2  Track name: Coordinator start transaction and send query
  (actual time=[0.026, 0.026], calls=[1, 1])
 Plan Node id: 3  Track name: Datanode start up stream thread
  (actual time=[0.028, 0.030], calls=[1, 1])
(16 rows)

                                 ====== Query Summary =====                                 
--------------------------------------------------------------------------------------------
 Datanode executor start time [dn_6007_6008_6009, dn_6004_6005_6006]: [0.337 ms,0.367 ms]
 Datanode executor run time [dn_6007_6008_6009, dn_6001_6002_6003]: [415.130 ms,416.059 ms]
 Datanode executor end time [dn_6001_6002_6003, dn_6004_6005_6006]: [0.069 ms,0.101 ms]
 Coordinator executor start time: 6.699 ms
 Coordinator executor run time: 419.754 ms
 Coordinator executor end time: 0.031 ms
 Planner runtime: 0.679 ms
 Plan size: 6325 byte
 Query Id: 72902018968532990
 Total runtime: 426.499 ms
(10 rows)

从 id=8 所在行可以看到,orders 和 customer 表进行关联时由于分布列不同,customer 选择了广播的计划,再和orders 表进行管理。之后orders+customer 的结果集具有orders 相同的分布列,所以可以和lineitem 在本地进行关联。

实验总结

本实验通过调整数据表分布键的方法,对查询进行了调优,选择不同的分布键对查询性能会产生一定的影响,主要遵循以下几个原则:
1)选择没有显著倾斜性的字段作为分布列;
2)尽可能选择查询中的关联列作为表的分布键;
3)存在多个满足上面条件的分布列时,尽可能选择数据量较少的表进行重分布或广播,优先满足大表间的分布键统一。

思考题

选择行存表的分布列的原则有哪些? 

答案:
分布列选择有以下建议:
选择的分布列字段和分布算法能够将表数据在均匀分布到各个 DN 节点;
该分布字段在执行 SQL 时经常被用于作为连接字段;
进行数据访问时最大限度地减少跨 DN 节点数据访问。


网站公告

今日签到

点亮在社区的每一天
去签到