全部產品
Search
文件中心

Data Lake Analytics - Deprecated:Tablestore計算下推

更新時間:Jul 06, 2024

本文介紹如何使用計算下推功能,該功能可以有效解決Tablestore計算慢的問題。

背景資訊

在Presto裡面,通常查詢是從對應資料來源拉取資料,並放到Presto端計算。當需要拉取的資料量比較大時,會嚴重影響計算效率。計算下推的原理就是把一部分計算放到資料來源端進行,資料來源端計算完再把結果返回給Presto,在一些情境下(比如彙總)可以有效減少資料來源與Presto之間的資料轉送,從而提升計算效率。

支援的運算元

運算元名稱具體類型
FILTER
  • OR/AND條件下推
  • LIKE/NOT LIKE下推
  • IS NULL/NOT NULL下推
  • BETWEEN XX AND XX下推
  • =, <, <=, >, >=, <>邏輯運算式下推
  • IN/NOT IN下推
LIMIT
  • LIMIT下推
  • ORDER BY XX LIMIT N下推
AGGREGATE
  • MIN(COL)下推
  • MAX(COL)下推
  • SUM(COL)下推
  • COUNT(*)\COUNT(COL)\COUNT(DISTINCT COL)下推
  • AVG(COL)下推
  • 帶GROUP BY的彙總

計算下推限制

計算下推功能有如下限制。

  • 必須是簡單的運算式,支援下推的例如:a>10、sum(a)、avg(a);不支援下推的例如:a+b>10、abs(a)>10、sum(a+b)。
  • 所涉及的列都必須在Tablestore的同一個索引裡面,否則會導致只能下推一部分運算元或者都不下推。
  • LIKE的字元必須小於等於20個字元,比如a like '%123456789123456789%',包括%在內需要小於等於20個字元。
  • 目前GROUP BY後面的列最多隻能支援4層,比如group by a1,a2,a3,a4 ,如果超過4層,將不能下推。
  • GROUP BY下推預設是關閉的,下推到Tablestore之後,Tablestore只能返回2000條資料,如果彙總的結果不止這些會導致結果不對。開啟計算下推詳情,請參見開啟計算下推

開啟計算下推

Tablestore資料來源計算下推預設是關閉的,需要通過ots-index-first=auto,ots-pushdown-enabled=true開關開啟,樣本如下:

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 limit 100;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 order by l_commitdate desc limit 10;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 order by l_commitdate desc limit 10;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select count(l_orderkey),sum(l_partkey), avg(l_linenumber) from lineitem;

GROUP BY下推

要實現GROUP BY下推,需要在開啟計算下推的基礎上加上ots-groupby-pushdown-enabled=true hint,樣本如下:

/*+ots-index-first=auto,ots-pushdown-enabled=true,ots-groupby-pushdown-enabled=true*/ 
select
l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
    l_shipdate <= '1998-12-01'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

查看計算下推

您可以使用EXPLAIN查看SQL是否下推到Tablestore上,SQL樣本如下:

explain select
l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
    l_shipdate <= '1998-12-01'
group by
    l_returnflag,
    l_linestatus;

返回結果如下:

- Output[l_returnflag, l_linestatus, sum_qty, sum_base_price, avg_qty, avg_price, avg_disc, count_order] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
        sum_qty := sum
        sum_base_price := sum_7
        avg_qty := avg
        avg_price := avg_8
        avg_disc := avg_9
        count_order := count
    - RemoteStreamingMerge[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
        - LocalMerge[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
            - PartialSort[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
                - RemoteStreamingExchange[REPARTITION] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
                        Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
                    - TableScan[TableHandle {connectorId='ots', connectorHandle='table=ots20201208_mzl.lineitem', layout='Optional[table=ots20201208_mzl.lineitem, OtsPushDownInfo = Optional[OtsQueryGeneratorContext{filter=L_SHIPDATE <= 1998-12-01, candidateIndexMap=Optional[{lineitem_index=[L_SHIPMODE, L_TAX, L_COMMITDATE, L_EXTENDEDPRICE, L_ORDERKEY, L_LINESTATUS, L_RETURNFLAG, L_RECEIPTDATE, L_PARTKEY, L_LINENUMBER, L_SHIPDATE, L_SHIPINSTRUCT, L_SUPPKEY, L_DISCOUNT, L_COMMENT, L_QUANTITY]}], aggregationLists=Optional[[SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_quantity), fieldName=L_QUANTITY), SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_quantity), fieldName=L_QUANTITY), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_discount), fieldName=L_DISCOUNT), CountAggregation(aggregationType=AGG_COUNT, aggName=count(*), fieldName=_ID)]], groupByColumns={l_returnflag=l_returnflag, l_linestatus=l_linestatus}, orderBy={}, limit=OptionalInt.empty}]]'}] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
                            Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
                            LAYOUT: table=ots20201208_mzl.lineitem, OtsPushDownInfo = Optional[OtsQueryGeneratorContext{filter=L_SHIPDATE <= 1998-12-01, candidateIndexMap=Optional[{lineitem_index=[L_SHIPMODE, L_TAX, L_COMMITDATE, L_EXTENDEDPRICE, L_ORDERKEY, L_LINESTATUS, L_RETURNFLAG, L_RECEIPTDATE, L_PARTKEY, L_LINENUMBER, L_SHIPDATE, L_SHIPINSTRUCT, L_SUPPKEY, L_DISCOUNT, L_COMMENT, L_QUANTITY]}], aggregationLists=Optional[[SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_quantity), fieldName=L_QUANTITY), SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_quantity), fieldName=L_QUANTITY), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_discount), fieldName=L_DISCOUNT), CountAggregation(aggregationType=AGG_COUNT, aggName=count(*), fieldName=_ID)]], groupByColumns={l_returnflag=l_returnflag, l_linestatus=l_linestatus}, orderBy={}, limit=OptionalInt.empty}]
                            avg_8 := OtsColumnHandle{columnName=avg(l_extendedprice), mappedName=L_EXTENDEDPRICE, primaryKey=false, columnType=double}
                            avg := OtsColumnHandle{columnName=avg(l_quantity), mappedName=L_QUANTITY, primaryKey=false, columnType=double}
                            sum := OtsColumnHandle{columnName=sum(l_quantity), mappedName=L_QUANTITY, primaryKey=false, columnType=double}
                            avg_9 := OtsColumnHandle{columnName=avg(l_discount), mappedName=L_DISCOUNT, primaryKey=false, columnType=double}
                            count := OtsColumnHandle{columnName=count(*), mappedName=_ID, primaryKey=false, columnType=bigint}
                            l_returnflag := OtsColumnHandle{columnName=l_returnflag, mappedName=L_RETURNFLAG, primaryKey=false, columnType=varchar}
                            sum_7 := OtsColumnHandle{columnName=sum(l_extendedprice), mappedName=L_EXTENDEDPRICE, primaryKey=false, columnType=double}
                            l_linestatus := OtsColumnHandle{columnName=l_linestatus, mappedName=L_LINESTATUS, primaryKey=false, columnType=varchar}

上述輸出的執行計畫已經看不到Aggregate Node了,而且在TableScan的OtsQueryGeneratorContex中包含下推的相關資訊,表明查詢已經下推到Tablestore了。