本文介紹如何使用計算下推功能,該功能可以有效解決Tablestore計算慢的問題。
背景資訊
在Presto裡面,通常查詢是從對應資料來源拉取資料,並放到Presto端計算。當需要拉取的資料量比較大時,會嚴重影響計算效率。計算下推的原理就是把一部分計算放到資料來源端進行,資料來源端計算完再把結果返回給Presto,在一些情境下(比如彙總)可以有效減少資料來源與Presto之間的資料轉送,從而提升計算效率。
支援的運算元
運算元名稱 | 具體類型 |
FILTER |
|
LIMIT |
|
AGGREGATE |
|
計算下推限制
計算下推功能有如下限制。
- 必須是簡單的運算式,支援下推的例如:a>10、sum(a)、avg(a);不支援下推的例如:a+b>10、abs(a)>10、sum(a+b)。
- 所涉及的列都必須在Tablestore的同一個索引裡面,否則會導致只能下推一部分運算元或者都不下推。
- LIKE的字元必須小於等於20個字元,比如a like '%123456789123456789%',包括%在內需要小於等於20個字元。
- 目前GROUP BY後面的列最多隻能支援4層,比如group by a1,a2,a3,a4 ,如果超過4層,將不能下推。
- GROUP BY下推預設是關閉的,下推到Tablestore之後,Tablestore只能返回2000條資料,如果彙總的結果不止這些會導致結果不對。開啟計算下推詳情,請參見開啟計算下推。
開啟計算下推
Tablestore資料來源計算下推預設是關閉的,需要通過ots-index-first=auto,ots-pushdown-enabled=true開關開啟,樣本如下:
/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100;
/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 limit 100;
/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 order by l_commitdate desc limit 10;
/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 order by l_commitdate desc limit 10;
/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select count(l_orderkey),sum(l_partkey), avg(l_linenumber) from lineitem;
GROUP BY下推
要實現GROUP BY下推,需要在開啟計算下推的基礎上加上ots-groupby-pushdown-enabled=true hint,樣本如下:
/*+ots-index-first=auto,ots-pushdown-enabled=true,ots-groupby-pushdown-enabled=true*/
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-12-01'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
查看計算下推
您可以使用EXPLAIN查看SQL是否下推到Tablestore上,SQL樣本如下:
explain select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-12-01'
group by
l_returnflag,
l_linestatus;
返回結果如下:
- Output[l_returnflag, l_linestatus, sum_qty, sum_base_price, avg_qty, avg_price, avg_disc, count_order] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
sum_qty := sum
sum_base_price := sum_7
avg_qty := avg
avg_price := avg_8
avg_disc := avg_9
count_order := count
- RemoteStreamingMerge[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
- LocalMerge[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
- PartialSort[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
- RemoteStreamingExchange[REPARTITION] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
- TableScan[TableHandle {connectorId='ots', connectorHandle='table=ots20201208_mzl.lineitem', layout='Optional[table=ots20201208_mzl.lineitem, OtsPushDownInfo = Optional[OtsQueryGeneratorContext{filter=L_SHIPDATE <= 1998-12-01, candidateIndexMap=Optional[{lineitem_index=[L_SHIPMODE, L_TAX, L_COMMITDATE, L_EXTENDEDPRICE, L_ORDERKEY, L_LINESTATUS, L_RETURNFLAG, L_RECEIPTDATE, L_PARTKEY, L_LINENUMBER, L_SHIPDATE, L_SHIPINSTRUCT, L_SUPPKEY, L_DISCOUNT, L_COMMENT, L_QUANTITY]}], aggregationLists=Optional[[SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_quantity), fieldName=L_QUANTITY), SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_quantity), fieldName=L_QUANTITY), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_discount), fieldName=L_DISCOUNT), CountAggregation(aggregationType=AGG_COUNT, aggName=count(*), fieldName=_ID)]], groupByColumns={l_returnflag=l_returnflag, l_linestatus=l_linestatus}, orderBy={}, limit=OptionalInt.empty}]]'}] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
LAYOUT: table=ots20201208_mzl.lineitem, OtsPushDownInfo = Optional[OtsQueryGeneratorContext{filter=L_SHIPDATE <= 1998-12-01, candidateIndexMap=Optional[{lineitem_index=[L_SHIPMODE, L_TAX, L_COMMITDATE, L_EXTENDEDPRICE, L_ORDERKEY, L_LINESTATUS, L_RETURNFLAG, L_RECEIPTDATE, L_PARTKEY, L_LINENUMBER, L_SHIPDATE, L_SHIPINSTRUCT, L_SUPPKEY, L_DISCOUNT, L_COMMENT, L_QUANTITY]}], aggregationLists=Optional[[SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_quantity), fieldName=L_QUANTITY), SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_quantity), fieldName=L_QUANTITY), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_discount), fieldName=L_DISCOUNT), CountAggregation(aggregationType=AGG_COUNT, aggName=count(*), fieldName=_ID)]], groupByColumns={l_returnflag=l_returnflag, l_linestatus=l_linestatus}, orderBy={}, limit=OptionalInt.empty}]
avg_8 := OtsColumnHandle{columnName=avg(l_extendedprice), mappedName=L_EXTENDEDPRICE, primaryKey=false, columnType=double}
avg := OtsColumnHandle{columnName=avg(l_quantity), mappedName=L_QUANTITY, primaryKey=false, columnType=double}
sum := OtsColumnHandle{columnName=sum(l_quantity), mappedName=L_QUANTITY, primaryKey=false, columnType=double}
avg_9 := OtsColumnHandle{columnName=avg(l_discount), mappedName=L_DISCOUNT, primaryKey=false, columnType=double}
count := OtsColumnHandle{columnName=count(*), mappedName=_ID, primaryKey=false, columnType=bigint}
l_returnflag := OtsColumnHandle{columnName=l_returnflag, mappedName=L_RETURNFLAG, primaryKey=false, columnType=varchar}
sum_7 := OtsColumnHandle{columnName=sum(l_extendedprice), mappedName=L_EXTENDEDPRICE, primaryKey=false, columnType=double}
l_linestatus := OtsColumnHandle{columnName=l_linestatus, mappedName=L_LINESTATUS, primaryKey=false, columnType=varchar}
上述輸出的執行計畫已經看不到Aggregate Node了,而且在TableScan的OtsQueryGeneratorContex中包含下推的相關資訊,表明查詢已經下推到Tablestore了。