全部產品
Search
文件中心

MaxCompute:Range Clustering

更新時間:Feb 28, 2024

Range Clustering作為一種新的資料切分方式,提供了一個全域有序的資料分布,一是可以避免Hash Clustering可能造成的資料扭曲問題;二是在資料有序分布的前提下,建立兩級索引(Index),支援對Clustering Key的地區查詢以及多鍵的組合查詢等情境。本文為您介紹如何在MaxCompute中使用Range Clustering。

背景資訊

雜湊聚簇(Hash Clustering)表有以下優點:

  • 對於等值的列條件查詢,可以利用Hash演算法直接定位到對應的雜湊桶(Bucket Pruning),如果桶內資料排序儲存,還可以進一步利用索引定位,從而減少資料掃描量,提高查詢效率。

  • 如果對不同表某一列上做Join,其中一張表因為已經對其中一列做Hash分布,可以省掉Shuffle的步驟(稱之為Shuffle Remove),進而節省計算資源。

關於Hash Clustering功能的詳細介紹請參見Hash Clustering

但是Hash Clustering也有一些局限性:

  • 使用Hash演算法分桶,有可能產生Data Skew的問題。和Join Skew一樣,這是Hash演算法本身固有的局限性,輸入資料存在某些特定的資料分布時,可能造成傾斜,進而導致各個雜湊桶之間資料量差異較大。因為Hash Clustering之後,並發處理單位往往是一個桶,如果雜湊桶資料量不一致,往往容易造成長尾現象。

  • Bucket Pruning只支援等值查詢。因為使用雜湊分桶方法,對於區間查詢比如使用某列值大於0這樣的條件,無法在雜湊桶層級定位,只能把查詢下發到所有桶內進行。

  • 對於多個Cluster Key的組合查詢,只有所有Cluster Key都出現並且都為等值條件,才能達到最佳化效果。

    比如,對於下表只有查詢條件包含C1=x AND C2=y才可以最佳化。單獨的C1=x或者C2=y查詢條件皆無法利用Hash Clustering特性加速。這是因為對於按鍵組合的情況做了Combine Hash,要求查詢的時候一定要成對出現,否則無法定位雜湊桶,也無法做Bucket Pruning。

    CREATE TABLE T2 (C1 int, C2 int, C3 string)
       CLUSTERED BY (C1, C2)
          SORTED by (C1, C2)
           INTO 1024 BUCKETS;

針對這些局限性,MaxCompute推出了新的Clustering方法,即Range Clustering。

功能簡介

Range Clustering在對Cluster Key全排序的基礎上,將其範圍空間切分成若干個不連續範圍(Disjointed Range),每個Range作為一個Bucket,並且滿足如下條件:

  • 相同數值存在於同一個Bucket裡面。

  • 每個Bucket包含數值個數儘可能接近。

舉一個簡單的例子:假設有表T,定義如下。

CREATE TABLE T (C1 int) 
    RANGE CLUSTERED BY (C1) 
    SORTED BY (c1) 
    INTO 3 BUCKETS;

同時,C1列取值為{ 1, 8, -3, 2, 4, 1, 1, 3, 8, 20, -8, 9 }

在Range Clustering之後,得到如下3個Bucket:

  • Bucket 0 : { -8, -3, 1, 1, 1 }

  • Bucket 1 : { 2, 3, 4 }

  • Bucket 2 : { 8, 8, 9, 20 }

說明
  • 每個Bucket所代表的Range可能是不連續的(Disjointed)。例如Bucket 1的Range是[2, 4],而Bucket 2的Range是[8, 20](4, 8)這個區間是沒有數值的。

  • Range切分的目標是讓每個桶的大小盡量接近,而不是Range大小接近。在運算的時候,每個桶是一個並發的單位,桶的大小一致保證了資料沒有長尾。但是由於資料在各個區間分布密度不一致,桶的大小一致並不代表Range大小一致。

Range的切分過程是MaxCompute自動完成的,不需要手動指定每一個Range。因為在巨量資料情境下手動指定Range不高效,往往也不現實。MaxCompute內部會自動對資料進行排序、採樣,對各個區間的資料密度建立長條圖,最後通過合并計算各個區間長條圖,來實現最終理想的Range切分。

此外RANGE CLUSTERED BY可以和SORTED BY結合,從而保證了資料全域有序,在這個基礎上,MaxCompute將自動建立兩級索引:Global Index和File Index,用於快速定位和尋找索引值,如下圖所示:range clustering

和Hash Clustering相比,Range Clustering優勢如下:

  • 支援區間查詢。

    假設有查詢條件c < 3,則可以根據Global Index快速排除掉Bucket 2和Bucket 3,從而只在Bucket 0和Bucket 1中尋找。Hash Clustering只能對等值查詢做Bucket Pruning。

  • 支援多個Cluster列的組合尋找。

    假設使用RANGE CLUSTERED BY (c1, c2, c3) SORTED BY (c1, c2, c3),那麼在Range切分和資料存放區都是按照c1、c2、c3排序,這就可以允許做更複雜的組合查詢,比如c1 = 100 AND c2 > 0或者c1 = 100 AND c2 = 50 AND c3 < 5,這個也是Hash Clustering無法支援的。

    重要

    在多鍵組合查詢時,條件需要按順序列出排序鍵,並且只有最後一個Key允許使用區間條件。

  • Range Clustering提供了一個高效的全域ORDER BY實現。

    在沒有Range Clustering之前為了保證全域有序,MaxCompute只能通過一個Instance進行ORDER BY排序,效率很低。使用Range Clustering之後做Range切分之後,各個Range可以並發分別排序,最後在組合到一起,效率極大提高。

使用說明

Range Clustering的文法和Hash Clustering類似,比較大的區別是RANGE關鍵字以及Bucket數目對於Range Clustering是可以省略的。

建立Range Clustering表

您可以使用以下語句建立Range Clustering表。您需要指定Range Cluster Key、Bucket數目(可選)。Sort是可選項,但在大多數情況下,建議和Range Cluster Key一致,以便取得最佳的最佳化效果。

  • 命令文法

    CREATE TABLE [IF NOT EXISTS] <table_name>
                 [(<col_name> data_type [comment <col_comment>], ...)]
                 [comment table_comment]
                 [PARTITIONED BY (<col_name> data_type [comment <col_comment>], ...)]
                 [RANGE CLUSTERED BY (<col_name> [, <col_name>, ...])
                 [SORTED BY (<col_name> [ASC | DESC]
                 [, <col_name> [ASC | DESC] ...])]
                 [INTO <number_of_buckets> BUCKETS]]
                 [AS select_statement]
  • 使用樣本。

    • 非分區表。

      CREATE TABLE T1 (a string, b string, c int)
                   RANGE CLUSTERED BY (c)
                   SORTED by (c)
                   INTO 1024 BUCKETS;
    • 分區表。

      CREATE TABLE T1 (a string, b string, c int)
                   PARTITIONED BY (dt int)
                   RANGE CLUSTERED BY (c)
                   SORTED by (c)
                   INTO 1024 BUCKETS;
  • 屬性說明

    • RANGE CLUSTERED BY

      指定Range Cluster Key,MaxCompute將對指定一列或者多列進行排序和採樣,並根據指定Bucket數目,切分到儘可能理想的若干個Range裡面。為避免資料扭曲、避免熱點,取得較好的並存執行效果,Clustered By列適宜選擇取值範圍大、重複索引值少的列。此外為了達到最佳化目的,也應該考慮選取常用的Aggregation Key或者Filter Key。

    • SORTED BY

      指定在Bucket內欄位的排序方式,建議Sorted By和Clustered By一致,以取得較好的效能。此外當Sorted By子句指定之後,MaxCompute將自動產生索引(Global Index和File Index),並且在查詢的時候利用索引來加速執行。

    • INTO number_of_buckets BUCKETS

      和Hash Clustering不一樣,對於Range ClusteringINTO ... BUCKETS是可選項。如果不指定Bucket數目,MaxCompute將根據資料大小自動決定分區數目。在大多數情況下,建議指定Bucket數目,以便根據實際情況來做最佳化選擇。

      對於Bucket數目的選擇,和Hash Clustering類似,建議保持在每個Bucket 512MB ~ 1GB這樣的資料量,對於特別大的表,Bucket 數目可以大些,但建議不要超過4000。

更改表的Hash Clustering屬性

對於分區表支援通過ALTER TABLE語句,來增加或者去除Range Clustering屬性。

  • 命令語句

    --更改表為Range Clustering表
    ALTER TABLE <table_name> [RANGE CLUSTERED BY (<col_name> [, <col_name>, ...])
                             [SORTED BY (<col_name> [ASC | DESC] [, <col_name> [ASC | DESC] ...])]
                             [INTO <number_of_buckets> BUCKETS];
    --更改Range Clustering表為非Range Clustering表
    ALTER TABLE <table_name> NOT CLUSTERED;
  • 注意事項

    • ALTER TABLE語句改變聚集屬性,只對於分區表有效,非分區表一旦聚集屬性建立就無法改變。

    • ALTER TABLE語句只會影響分區表的建立分區(包括insert overwrite產生的),新分區將按新的聚集屬性儲存區,老的資料分區保持不變。

    • 由於ALTER TABLE語句隻影響新分區,所以該語句不可以再指定PARTITION。

ALTER TABLE語句適用於存量表,在增加了新的聚簇屬性之後,新的分區將做Range Clustering儲存。

表屬性顯式驗證

在建立Range Clustering Table之後,可以通過如下命令來查看錶屬性,Range Clustering屬性將顯示在Extended Info裡面。

DESC EXTENDED <table_name>;

返回結果樣本如下圖所示。表屬性驗證對於分區表,除了使用以上命令查看錶屬性之後,還可以通過以下命令查看分區的屬性。

DESC EXTENDED <table_name> partition(<pt_spec>);

返回結果樣本如下圖所示。分區表range屬性驗證

使用情境

過濾查詢最佳化

Range Clustering保證了資料全域有序,在這個基礎上MaxCompute自動建立了Global Index和File Index,利用資料的儲存特性,可以加快資料過濾(Filter)的效率。不僅僅可以最佳化等值查詢,也可以最佳化區間查詢。

例如對於一個簡單的查詢條件id < 3,先在最佳化器裡面將查詢條件抽取出來,並轉化成範圍空間(-∞, 3)。這個時候就可以利用Global Index做Bucket Pruning,把不在這個區間的Bucket 2和Bucket 3都去掉。最後再利用每個Bucket檔案內建的Index,快速在檔案內部定位,這個過程稱之為謂詞下推(Predicate Pushdown),示意圖如下:過濾查詢最佳化TPC-H Q6在100GB資料集上使用Range Clustering後進行查詢,查詢語句如下。Q6是一個區間過濾的基礎上再做的彙總操作,使用Range Clustering可以利用兩級Index快速定位元據,無論是執行時間、CPU使用率和記憶體使用量率的效能都有數倍的提升。

select sum(l_extendedprice * l_discount) as revenue
  from tpch_lineitem l
 where l_shipdate >= '1994-01-01'
   and l_shipdate < '1995-01-01'
   and l_discount >= 0.05
   and l_discount <= 0.07
   and l_quantity < 24;

多鍵組合查詢

為了比較好的理解多鍵組合查詢情境,使用如下命令將mf_tab 表改造成Range Clustering表。

ALTER TABLE mf_project.mf_tab 
            RANGE CLUSTERED BY (project_name, name)
            SORTED BY (project_name, name)
            INTO 1024 BUCKETS;

這樣做的好處是,既可以在Project層級上做一些彙總查詢,樣本命令如下:

SELECT COUNT(*)
  from mf_project.mf_tab
 WHERE project_name="xxxdw"
   AND ds="20180115"
   AND type="TABLE";

又可以用按鍵組合來精確定位某張表,樣本命令如下:

SELECT count(*)
  from mf_project.mf_tab
 WHERE project_name="xxxdw"
   AND name="adm_ctu_cle_kba_midun_trade_dd"
   AND type="TABLE";

甚至可以用於地區查詢,比如統計以adm開頭的表:

SELECT count(*)
  from mf_project.mf_tab
 WHERE project_name="xxxdw"
   AND name>="adm"
   AND name < "adn"
   AND type="TABLE";

以上查詢都可以充分利用Range Clustering全域排序的特性,下推查詢謂詞,減少表掃描的IO量以及過濾計算的CPU和記憶體消耗。

對於多個Range Cluster Key組合的情境,是有一定要求的。對於RANGE CLUSTERED BY k0, k1, ..., kn,如果查詢用到了km,則k0, k1, ..., km-1都必須在查詢條件中且都是等值條件,才能取得索引加速最佳效果。

假設表T的Range Cluster Key為k1, k2 ,則有:

  • 查詢條件為k1 < 5 :可以Index加速。

  • k1 = 10 AND k2 = 20:可以Index加速。

  • k1 = 10 AND k2 < 0:可以Index加速。

  • k2 < 0:不可以Index加速,k1缺失。

  • k1 < 0 AND k2 > 0:可以Index加速部分,Index只過濾k1<0的條件,k2>0條件需要掃描。

Group By最佳化

對於Range Clustering,由於資料全域有序,並且相同的索引值在做Range切分的時候,放到了同一個Bucket裡面,Aggregation計算就可以利用這個資料的物理屬性,節省掉Shuffle的步驟。

例如,對於如下對錶T資料的Group By操作可以直接在一個Mapper Stage裡面完成。

CREATE TABLE T (department int, team string, employee string)
       RANGE CLUSTERED BY (department, team)
       SORTED BY (c1, c2)
       INTO 1024 BUCKETS;

SELECT COUNT(*) from T GROUP BY department, team;
說明

如果要取得最佳的GROUP BY最佳化效果,GROUP BY Key需要和RANGE CLUSTERED BY Key一致。

Aggregate 最佳化

foo結構如下。

create table foo(a bigint, b bigint, c bigint)
       range clustered by (a,b)
       sorted by(a,b) into 3 buckets;

假設表foo的Range為:

Bucket 0: [1,1 : 3,3]
Bucket 1: [5,5 : 7,7]
Bucket 2: [8,8 : 9,9]

以上Bucket Range可以理解為:Bucket N: [lower bound values : upper bound values]。如果按照a來彙總的話,每個Bucket的範圍就變成:

Bucket 0: [1 : 3]
Bucket 1: [5 : 7]
Bucket 2: [8 : 9]

可以直接產生類似按照a、b彙總的執行計畫(Plan),啟動3個Instance對每個Bucket彙總,然後輸出結果後結束。

但是對於a的值橫跨多個Bucket的情況下,就會得到錯誤的結果,樣本如下。

Bucket 0: [1,1 : 3,3]
Bucket 1: [3,5 : 7,7]
Bucket 2: [7,8 : 9,9]

a有兩個值3和7,分別橫跨兩個Bucket,只有把擁有相同的a值的tuple放到同一個Instance裡面去進行彙總才能得到正確的結果。也就是像下面按照紅色虛線重新切分Bucket,指定每個Instance讀取的資料的範圍才行:歸併最佳化但是該從哪個地方切分, 這時候就需要長條圖的協助。對於Range Clustering表,在Clustering Key和Sort Key相同的情況下,當往Range Clustering表插入資料的時候,每個Bucket對應的Worker會每10000行採樣一個tuple,取其Clustering Keys的值,並儲存到長條圖中,最後每個Bucket的長條圖會存到Clustermeta檔案中。 這種長條圖被稱為等高長條圖(equi-depth histogram)。

說明

目前僅在Clustering Key和Sort Key一樣的情況下才會採樣。

有了每個Bucket的長條圖就可以為每個Worker重新切分Bucket。切分原則如下:

  • 擁有相同Grouping Key的tuple必須被劃分到同一個Bucket中。

  • 每個Bucket中的資料盡量均勻的切分。

有了每個新的Bucket的起始值,每個Worker就可以讀取正確的資料範圍,並返回正確的結果。

使用TPC-H資料集中1TB的partsupp表來測試效能提升如何。首先使用如下命令將partsupp表改造為Range Clustering表:

CREATE TABLE partsupp ( PS_PARTKEY BIGINT NOT NULL,
                        PS_SUPPKEY BIGINT NOT NULL,
                        PS_AVAILQTY BIGINT NOT NULL,
                        PS_SUPPLYCOST  DECIMAL(15,2)  NOT NULL,
                        PS_COMMENT     VARCHAR(199) NOT NULL)
             RANGE CLUSTERED BY(PS_PARTKEY, PS_SUPPKEY)
             SORTED BY(PS_PARTKEY, PS_SUPPKEY) INTO 128 BUCKETS;

使用下面的Query進行測試:

SELECT ps_partkey, count(*) c FROM partsupp GROUP BY ps_partkey;
  • 使用如下命令關閉最佳化:

     set odps.optimizer.enable.range.partial.repartitioning=false;

    結果樣本:關閉最佳化

  • 使用如下命令開啟最佳化:

    set odps.optimizer.enable.range.partial.repartitioning=true;

    結果樣本:開啟最佳化

由上圖可以看出,針對此查詢最佳化後速度提升了57%、CPU的使用率降低了52%、記憶體的使用降低了71%。當然針對不同資料量、不同的查詢,可能得到效能提升的程度也不同。

Range表的Join最佳化

  • 假設有兩個表:

    create table t1(a bigint, b bigint, c bigint, d bigint)
           range clustered by(a,b,c)
           sorted by(a,b,c) into 3 buckets;
    
    create table t2(a bigint, b bigint, c bigint, d bigint)
           range clustered by(a,b,c)
           sorted by(a,b,c) into 3 buckets;

    然後分別往兩個表裡插入不同的資料。

    對於Hash Clustering表,只要兩個相Join的Hash Clustering表擁有相同數目的Bucket,那麼兩個表對應的Bucket裡的資料就可以Join。與Hash Clustering表不同的是:即使相Join的兩個Range Clustering表擁有相同數目的Bucket,也不能簡單地直接根據Bucket Id來進行Join,因為每個Range Bucket所儲存的資料範圍(Boundary)很有可能會不同。兩個Range Clustering表相Join始終會產生如下圖所示有Shuffle的查詢計劃:hash clustering對此需要進行改進,通過對兩個Range表的Boundary進行對齊來重新劃分每個表的Bucket,即重新定義每個Instance要讀取的資料範圍。

  • 假設有兩個表:

    create table t1(a bigint, b bigint, c bigint, d bigint)
           range clustered by(a,b,c)
           sorted by(a,b,c) into 5 buckets;
    
    create table t2(a bigint, b bigint, c bigint, d bigint)
           range clustered by(a,b,c)
           sorted by(a,b,c) into 3 buckets;

    當插入一定量的資料後得到如下的Bucket Boundary:bucket boundary對於如下Query:

    SELECT * FROM t1 JOIN t2 ON t1.a=t2.a AND t1.b=t2.b AND t1.c=t2.c;

    最佳化器會取擁有較多Bucket數目的Boundary來和另外一個表的Boundary進行對齊,然後得到每個表的新的Boundary,即如下圖所示:bucket最佳化後這樣就產生了沒有Shuffle的查詢計劃:查詢計劃對於下面的查詢:

    SELECT * FROM t1 JOIN t2 ON t1.a=t2.a AND t1.b=t2.b;

    最佳化器會先對每個表按照a、b兩個列重新劃分Bucket,然後再將切分好的Bucket進行對齊再次切分,從而得到每個表的每個Bucket所需要讀取的資料範圍,進而產生像上圖那樣沒有Shuffle的查詢計劃。

  • 效能測試

    • 表改造

      用TPC-H Q2在1TB的資料表進行測試,對part表和partsupp表進行Range Clustering的改造,其他表保持不變:

      CREATE TABLE PARTSUPP ( PS_PARTKEY BIGINT NOT NULL,
                              PS_SUPPKEY BIGINT NOT NULL,
                              PS_AVAILQTY BIGINT NOT NULL,
                              PS_SUPPLYCOST  DECIMAL(15,2)  NOT NULL,
                              PS_COMMENT VARCHAR(199) NOT NULL)
                   RANGE CLUSTERED BY(PS_PARTKEY, PS_SUPPKEY)
                   SORTED BY(PS_PARTKEY, PS_SUPPKEY)
                   INTO 128 BUCKETS;
      CREATE TABLE PART ( P_PARTKEY BIGINT NOT NULL,
                          P_NAME VARCHAR(55) NOT NULL,
                          P_MFGR CHAR(25) NOT NULL,
                          P_BRAND CHAR(10) NOT NULL,
                          P_TYPE  VARCHAR(25) NOT NULL,
                          P_SIZE   BIGINT NOT NULL,
                          P_CONTAINER   CHAR(10) NOT NULL,
                          P_RETAILPRICE DECIMAL(15,2) NOT NULL,
                          P_COMMENT VARCHAR(23) NOT NULL)
                   RANGE CLUSTERED BY(P_PARTKEY)
                   SORTED BY(P_PARTKEY)
                   INTO 64 BUCKETS;

      TPC-H Q2的內容如下:

      select s_acctbal,
             s_name,
             n_name,
             p_partkey,
             p_mfgr,
             s_address,
             s_phone,
             s_comment
        from part,
             supplier,
             partsupp,
             nation,
             region
       where p_partkey = ps_partkey
         and s_suppkey = ps_suppkey
         and p_size = 15
         and p_type like '%BRASS'
         and s_nationkey = n_nationkey
         and n_regionkey = r_regionkey
         and r_name = 'EUROPE'
         and ps_supplycost = (select min(ps_supplycost)
                                from partsupp, supplier, nation, region
                               where p_partkey = ps_partkey
                                 and s_suppkey = ps_suppkey
                                 and s_nationkey = n_nationkey
                                 and n_regionkey = r_regionkey
                                 and r_name = 'EUROPE')
        order by s_acctbal desc, n_name, s_name, p_partkey limit 100;
    • 測試結果

      • 使用如下命令關閉最佳化:

         set odps.optimizer.enable.range.partial.repartitioning=false;

        結果樣本:關閉最佳化

      • 使用如下命令開啟最佳化:

        set odps.optimizer.enable.range.partial.repartitioning=true;

        結果樣本:開啟最佳化

      對比可以看出,最佳化後不但有兩個Stage的減少,速度提升了約21.4%,而且CPU的消耗減少了約35.4%,記憶體的使用也減少了約54.6%。

全域排序加速

Range Clustering還可以用來做全域排序加速。在普通的ORDER BY情境,為保證全域有序,所有的排序資料合併到一個單獨的Instance運行,這就無法發揮平行處理的優勢。利用Range Clustering的paritition步驟,可以實現並發多路全排序。首先對資料取樣並劃分Range,然後對各個Range做並發排序,最後得到的就是全域有序的結果。

需要注意的是,排序結束以後,修改表或分區儲存時仍然包括了多個檔案(Buckets),在消費資料時,如果要保證全域有序,需要按Bucket順序讀取檔案。

Range Clustering的全排序加速預設關閉,如果需要開啟,請使用以下Flag。

set odps.optimizer.distribute.ordering.enable=true;

局限性和注意事項

Range Clustering和Hash Clustering比較,有優勢的同時在某些方面也有一些局限:

  • Range Clustering在資料產生的代價比Hash Clustering要高。Hash Clustering只是做一個簡單的Hash操作和排序,但是對於Range而言,需要做資料採樣,排序,以及Histogram的合并等等,總體的代價(已耗用時間、CPU使用率、Memory Cost)比Hash Clustering高。所以在Hash Clustering可以解決的問題,就不需要用Range Clustering。

  • 不支援DYNAMIC PARTITION,以及INSERT INTO情境。

  • 對Join的支援方面只支援Inner Join、Left/Right Outer Join、Semi Join,不支援Anti Join和Full Outer Join。

  • Range Clustering表的Range Cluster Key必須和Sort Key相同,比如對於表foo中range clustered by (a,b) sorted by (a,b),本文中的最佳化可以生效;但是對於表bar中range clustered by(a,b) sorted by (b,a),本文中的最佳化就不會生效。

  • Join Key和Group Key必須為Range Clustering key的全部或者首碼。比如range clustered by(a,b,c) sorted by(a,b,c),對於Join Key和Group Key為aa,ba,b,c的查詢才能應用本文中的最佳化;對於Join Key或Group Key為ba,c的查詢就無法應用本文中的最佳化。

  • 對於分區表,如果讀取的Range Clustering表涉及兩個或兩個以上的分區無法應用本文中的最佳化。目前本文中的最佳化只針對單個分區的分區表和非分區表有效。