实验介绍
本实验通过分析普通查询过程中存在的性能瓶颈点,通过执行计划的分析找到可能的性能优化点并加以实施,最终达到优化的效果,重点关注分布式关联相关查询语句的优化。
实验目的
了解通过合理定义分布列实现分布式关联的性能优化。
实验步骤
步骤1 使用以下语句查询
Explain analyze select
l_orderkey,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
order by
o_orderdate limit 10;
步骤2 修改分布键
观察查询语句中,需要对 customer、order 和 lineitem 三张表进行关联,其中关联条件有c_custkey = o_custkey 和 l_orderkey = o_orderkey。如果只考虑本查询的优化,在该列数据没有显著倾斜的情况下,优先考虑 customer.c_custkey 和 lineitem.l_orderkey 作为分布键,从而减少 DN 数据的重分布。
customer、lineitem 建表语句,并导入数据文件customer.csv、lineitem.csv。
Drop table if exists customer;
CREATE TABLE CUSTOMER (
C_CUSTKEY INTEGER NOT NULL,
C_NAME VARCHAR(25) NOT NULL,
C_ADDRESS VARCHAR(400) NOT NULL,
C_NATIONKEY INTEGER NOT NULL,
C_PHONE CHAR(15) NOT NULL,
C_ACCTBAL DECIMAL(15,2) NOT NULL,
C_MKTSEGMENT CHAR(10) NOT NULL,
C_COMMENT VARCHAR(400) NOT NULL
)
DISTRIBUTE BY HASH(c_custkey);
Drop table if exists lineitem;
CREATE TABLE LINEITEM(
L_ORDERKEY INTEGER NOT NULL,
L_PARTKEY INTEGER NOT NULL,
L_SUPPKEY INTEGER NOT NULL,
L_LINENUMBER INTEGER NOT NULL,
L_QUANTITY DECIMAL(15,2) NOT NULL,
L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
L_DISCOUNT DECIMAL(15,2) NOT NULL,
L_TAX DECIMAL(15,2) NOT NULL,
L_RETURNFLAG CHAR(1) NOT NULL,
L_LINESTATUS CHAR(1) NOT NULL,
L_SHIPDATE DATE NOT NULL,
L_COMMITDATE DATE NOT NULL,
L_RECEIPTDATE DATE NOT NULL,
L_SHIPINSTRUCT CHAR(25) NOT NULL,
L_SHIPMODE CHAR(10) NOT NULL,
L_COMMENT VARCHAR(44) NOT NULL)
DISTRIBUTE BY HASH(l_orderkey);
COPY CUSTOMER FROM '/tmp/customer.csv'
DELIMITER ','
QUOTE '"'
ESCAPE '"'
ENCODING 'GBK'
CSV;
COPY LINEITEM FROM '/tmp/lineitem_.csv'
DELIMITER ','
QUOTE '"'
ESCAPE '"'
ENCODING 'UTF-8'
CSV;
步骤3 尝试使用 o_custkey 作为 orders 的分布键,并导入数据文件orders.csv。
Drop table if exists orders;
CREATE TABLE orders (
o_orderkey bigint NOT NULL,
o_custkey bigint NOT NULL,
o_orderstatus character(1) NOT NULL,
o_totalprice numeric(15,2) NOT NULL,
o_orderdate date NOT NULL,
o_orderpriority character(15) NOT NULL,
o_clerk character(15) NOT NULL,
o_shippriority bigint NOT NULL,
o_comment character varying(79) NOT NULL
)WITH (orientation=row, compression=no)
DISTRIBUTE BY HASH(o_custkey);
COPY ORDERS FROM '/tmp/orders.csv'
DELIMITER ','
QUOTE '"'
ESCAPE '"'
ENCODING 'UTF-8'
CSV;
执行步骤一中的查询语句,设置参数:
set enable_fast_query_shipping = off;
set enable_stream_operator = on;
该两个参数为会话级,只在本次会话期间生效。
jiang=# Explain analyze
jiang-# select
jiang-# l_orderkey,
jiang-# o_orderdate,
jiang-# o_shippriority
jiang-# from
jiang-# customer, orders, lineitem
jiang-# where
;jiang-# c_custkey = o_custkey
jiang-# and l_orderkey = o_orderkey
jiang-# and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
jiang-# order by
jiang-# o_orderdate limit 10;
id | operation | A-time | A-rows | E-rows | Peak Memory | A-width | E-width | E-costs
----+-----------------------------------------------------+-------------------+--------+--------+-------------+---------+---------+----------
1 | -> Limit | 782.531 | 10 | 10 | 2KB | | 16 | 11271.18
2 | -> Streaming (type: GATHER) | 782.528 | 10 | 10 | 176KB | | 16 | 11271.35
3 | -> Limit | [753.865,768.892] | 30 | 18 | [2KB,2KB] | | 16 | 11270.60
4 | -> Sort | [753.861,768.887] | 30 | 18 | [28KB,29KB] | [32,32] | 16 | 11270.60
5 | -> Hash Join (6,7) | [752.256,767.242] | 25917 | 17 | [9KB,9KB] | | 16 | 11270.51
6 | -> Seq Scan on lineitem | [85.445,98.007] | 565702 | 564893 | [41KB,41KB] | | 4 | 10763.06
7 | -> Hash | [594.646,597.109] | 713105 | 10 | [12MB,13MB] | [40,40] | 20 | 28.73
8 | -> Streaming(type: REDISTRIBUTE) | [373.185,449.521] | 713105 | 10 | [69KB,69KB] | | 20 | 28.73
9 | -> Hash Join (10,11) | [362.084,403.154] | 713105 | 10 | [9KB,9KB] | | 20 | 28.44
10 | -> Seq Scan on customer | [14.668,18.917] | 147100 | 30 | [34KB,35KB] | | 4 | 14.14
11 | -> Hash | [199.532,206.435] | 727305 | 10 | [14MB,15MB] | [48,48] | 28 | 14.18
12 | -> Seq Scan on orders | [145.599,150.525] | 727305 | 10 | [33KB,33KB] | | 28 | 14.18
(12 rows)
Predicate Information (identified by plan id)
----------------------------------------------------------------------------------
5 --Hash Join (6,7)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
6 --Seq Scan on lineitem
Filter: (l_shipdate > '1995-03-15'::date), (LLVM Optimized, Jit Execute)
Rows Removed by Filter: 482874
9 --Hash Join (10,11)
Hash Cond: (customer.c_custkey = orders.o_custkey)
12 --Seq Scan on orders
Filter: (o_orderdate < '1995-03-15'::date)
Rows Removed by Filter: 772695
(10 rows)
Memory Information (identified by plan id)
-----------------------------------------------------------------------
Coordinator Query Peak Memory:
Query Peak Memory: 2MB
Datanode:
Max Query Peak Memory: 15MB
Min Query Peak Memory: 14MB
4 --Sort
Sort Method: top-N heapsort Memory: 26kB ~ 26kB
Sort Method: top-N heapsort Disk: 1024kB ~ 0kB
7 --Hash
Max Buckets: 32768 Max Batches: 1 Max Memory Usage: 13010kB
Min Buckets: 32768 Min Batches: 1 Min Memory Usage: 12984kB
11 --Hash
Max Buckets: 32768 Max Batches: 1 Max Memory Usage: 15310kB
Min Buckets: 32768 Min Batches: 1 Min Memory Usage: 14980kB
(14 rows)
User Define Profiling
---------------------------------------------------------------------------
Segment Id: 3 Track name: Datanode build connection
(actual time=[0.203, 0.232], calls=[1, 1])
Plan Node id: 2 Track name: coordinator get datanode connection
(actual time=[0.029, 0.029], calls=[1, 1])
Plan Node id: 2 Track name: Coordinator check and update node definition
(actual time=[0.001, 0.001], calls=[1, 1])
Plan Node id: 2 Track name: Coordinator serialize plan
(actual time=[0.904, 0.904], calls=[1, 1])
Plan Node id: 2 Track name: Coordinator send query id with sync
(actual time=[0.220, 0.220], calls=[1, 1])
Plan Node id: 2 Track name: Coordinator send begin command
(actual time=[0.001, 0.001], calls=[1, 1])
Plan Node id: 2 Track name: Coordinator start transaction and send query
(actual time=[0.025, 0.025], calls=[1, 1])
Plan Node id: 3 Track name: Datanode start up stream thread
(actual time=[0.032, 0.049], calls=[1, 1])
(16 rows)
====== Query Summary =====
--------------------------------------------------------------------------------------------
Datanode executor start time [dn_6007_6008_6009, dn_6004_6005_6006]: [7.810 ms,9.919 ms]
Datanode executor run time [dn_6007_6008_6009, dn_6004_6005_6006]: [753.898 ms,768.921 ms]
Datanode executor end time [dn_6007_6008_6009, dn_6001_6002_6003]: [0.082 ms,0.092 ms]
Coordinator executor start time: 6.973 ms
Coordinator executor run time: 782.544 ms
Coordinator executor end time: 0.031 ms
Planner runtime: 0.904 ms
Plan size: 6167 byte
Query Id: 72902018968525132
Total runtime: 789.565 ms
(10 rows)
此时,orders 和 customer 两表由于关联列都在各自的分布键上,所以可以本地进行关联,然后其结果集再根据和 lineitem 的关联条件作为重分布的分布列进行重分布。
步骤4 尝试使用 o_orderkey 作为 orders 的分布列,并导入数据文件orders.csv。
Drop table if exists orders;
CREATE TABLE orders (
o_orderkey bigint NOT NULL,
o_custkey bigint NOT NULL,
o_orderstatus character(1) NOT NULL,
o_totalprice numeric(15,2) NOT NULL,
o_orderdate date NOT NULL,
o_orderpriority character(15) NOT NULL,
o_clerk character(15) NOT NULL,
o_shippriority bigint NOT NULL,
o_comment character varying(79) NOT NULL
)
WITH (orientation=row, compression=no)
DISTRIBUTE BY HASH(o_orderkey);
COPY ORDERS FROM '/tmp/orders.csv'
DELIMITER ','
QUOTE '"'
ESCAPE '"'
ENCODING 'UTF-8'
CSV;
执行步骤 1 中查询语句:
jiang=# Explain analyze
jiang-# select
jiang-# l_orderkey,
jiang-# o_orderdate,
jiang-# o_shippriority
jiang-# from
jiang-# customer, orders, lineitem
jiang-# where
jiang-# c_custkey = o_custkey
jiang-# and l_orderkey = o_orderkey
jiang-# and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
jiang-# order by
jiang-# o_orderdate limit 10;
id | operation | A-time | A-rows | E-rows | Peak Memory | A-width | E-width | E-costs
----+-----------------------------------------------------+-------------------+--------+--------+---------------+---------+---------+----------
1 | -> Limit | 419.741 | 10 | 10 | 2KB | | 16 | 13063.96
2 | -> Streaming (type: GATHER) | 419.738 | 10 | 10 | 176KB | | 16 | 13064.13
3 | -> Limit | [415.101,416.042] | 30 | 18 | [2KB,2KB] | | 16 | 13063.38
4 | -> Sort | [415.097,416.037] | 30 | 18 | [28KB,29KB] | [32,32] | 16 | 13063.38
5 | -> Hash Join (6,7) | [413.825,414.770] | 25917 | 17 | [9KB,9KB] | | 16 | 13063.29
6 | -> Seq Scan on customer | [10.589,10.882] | 147100 | 147100 | [34KB,35KB] | | 4 | 1684.33
7 | -> Hash | [393.875,394.071] | 26500 | 17 | [840KB,840KB] | [44,44] | 24 | 11254.18
8 | -> Streaming(type: REDISTRIBUTE) | [387.692,387.929] | 26500 | 17 | [70KB,70KB] | | 24 | 11254.18
9 | -> Hash Join (10,11) | [360.934,376.886] | 26500 | 17 | [10KB,10KB] | | 24 | 11253.60
10 | -> Seq Scan on lineitem | [88.832,100.285] | 565702 | 564893 | [41KB,41KB] | | 4 | 10763.06
11 | -> Hash | [199.541,202.174] | 727305 | 10 | [15MB,15MB] | [48,48] | 28 | 14.18
12 | -> Seq Scan on orders | [145.768,147.778] | 727305 | 10 | [33KB,33KB] | | 28 | 14.18
(12 rows)
Predicate Information (identified by plan id)
---------------------------------------------------------------------
5 --Hash Join (6,7)
Hash Cond: (customer.c_custkey = orders.o_custkey)
9 --Hash Join (10,11)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
10 --Seq Scan on lineitem
Filter: (l_shipdate > '1995-03-15'::date), (LLVM Optimized)
Rows Removed by Filter: 482874
12 --Seq Scan on orders
Filter: (o_orderdate < '1995-03-15'::date)
Rows Removed by Filter: 772695
(10 rows)
Memory Information (identified by plan id)
-----------------------------------------------------------------------
Coordinator Query Peak Memory:
Query Peak Memory: 2MB
Datanode:
Max Query Peak Memory: 1MB
Min Query Peak Memory: 1MB
4 --Sort
Sort Method: top-N heapsort Memory: 26kB ~ 26kB
Sort Method: top-N heapsort Disk: 1024kB ~ 0kB
7 --Hash
Max Buckets: 32768 Max Batches: 1 Max Memory Usage: 530kB
Min Buckets: 32768 Min Batches: 1 Min Memory Usage: 511kB
11 --Hash
Max Buckets: 32768 Max Batches: 1 Max Memory Usage: 15162kB
Min Buckets: 32768 Min Batches: 1 Min Memory Usage: 15137kB
(14 rows)
User Define Profiling
---------------------------------------------------------------------------
Segment Id: 3 Track name: Datanode build connection
(actual time=[0.179, 0.226], calls=[1, 1])
Plan Node id: 2 Track name: coordinator get datanode connection
(actual time=[0.030, 0.030], calls=[1, 1])
Plan Node id: 2 Track name: Coordinator check and update node definition
(actual time=[0.001, 0.001], calls=[1, 1])
Plan Node id: 2 Track name: Coordinator serialize plan
(actual time=[0.881, 0.881], calls=[1, 1])
Plan Node id: 2 Track name: Coordinator send query id with sync
(actual time=[0.246, 0.246], calls=[1, 1])
Plan Node id: 2 Track name: Coordinator send begin command
(actual time=[0.001, 0.001], calls=[1, 1])
Plan Node id: 2 Track name: Coordinator start transaction and send query
(actual time=[0.026, 0.026], calls=[1, 1])
Plan Node id: 3 Track name: Datanode start up stream thread
(actual time=[0.028, 0.030], calls=[1, 1])
(16 rows)
====== Query Summary =====
--------------------------------------------------------------------------------------------
Datanode executor start time [dn_6007_6008_6009, dn_6004_6005_6006]: [0.337 ms,0.367 ms]
Datanode executor run time [dn_6007_6008_6009, dn_6001_6002_6003]: [415.130 ms,416.059 ms]
Datanode executor end time [dn_6001_6002_6003, dn_6004_6005_6006]: [0.069 ms,0.101 ms]
Coordinator executor start time: 6.699 ms
Coordinator executor run time: 419.754 ms
Coordinator executor end time: 0.031 ms
Planner runtime: 0.679 ms
Plan size: 6325 byte
Query Id: 72902018968532990
Total runtime: 426.499 ms
(10 rows)
从 id=8 所在行可以看到,orders 和 customer 表进行关联时由于分布列不同,customer 选择了广播的计划,再和orders 表进行管理。之后orders+customer 的结果集具有orders 相同的分布列,所以可以和lineitem 在本地进行关联。
实验总结
本实验通过调整数据表分布键的方法,对查询进行了调优,选择不同的分布键对查询性能会产生一定的影响,主要遵循以下几个原则:
1)选择没有显著倾斜性的字段作为分布列;
2)尽可能选择查询中的关联列作为表的分布键;
3)存在多个满足上面条件的分布列时,尽可能选择数据量较少的表进行重分布或广播,优先满足大表间的分布键统一。
思考题
选择行存表的分布列的原则有哪些?
答案:
分布列选择有以下建议:
选择的分布列字段和分布算法能够将表数据在均匀分布到各个 DN 节点;
该分布字段在执行 SQL 时经常被用于作为连接字段;
进行数据访问时最大限度地减少跨 DN 节点数据访问。