Range Clustering作為一種新的資料切分方式,提供了一個全域有序的資料分布,一是可以避免Hash Clustering可能造成的資料扭曲問題;二是在資料有序分布的前提下,建立兩級索引(Index),支援對Clustering Key的地區查詢以及多鍵的組合查詢等情境。本文為您介紹如何在MaxCompute中使用Range Clustering。
背景資訊
雜湊聚簇(Hash Clustering)表有以下優點:
對於等值的列條件查詢,可以利用Hash演算法直接定位到對應的雜湊桶(Bucket Pruning),如果桶內資料排序儲存,還可以進一步利用索引定位,從而減少資料掃描量,提高查詢效率。
如果對不同表某一列上做Join,其中一張表因為已經對其中一列做Hash分布,可以省掉Shuffle的步驟(稱之為Shuffle Remove),進而節省計算資源。
關於Hash Clustering功能的詳細介紹請參見Hash Clustering。
但是Hash Clustering也有一些局限性:
使用Hash演算法分桶,有可能產生Data Skew的問題。和Join Skew一樣,這是Hash演算法本身固有的局限性,輸入資料存在某些特定的資料分布時,可能造成傾斜,進而導致各個雜湊桶之間資料量差異較大。因為Hash Clustering之後,並發處理單位往往是一個桶,如果雜湊桶資料量不一致,往往容易造成長尾現象。
Bucket Pruning只支援等值查詢。因為使用雜湊分桶方法,對於區間查詢比如使用某列值大於0這樣的條件,無法在雜湊桶層級定位,只能把查詢下發到所有桶內進行。
對於多個Cluster Key的組合查詢,只有所有Cluster Key都出現並且都為等值條件,才能達到最佳化效果。
比如,對於下表只有查詢條件包含
C1=x AND C2=y
才可以最佳化。單獨的C1=x
或者C2=y
查詢條件皆無法利用Hash Clustering特性加速。這是因為對於按鍵組合的情況做了Combine Hash,要求查詢的時候一定要成對出現,否則無法定位雜湊桶,也無法做Bucket Pruning。CREATE TABLE T2 (C1 int, C2 int, C3 string) CLUSTERED BY (C1, C2) SORTED by (C1, C2) INTO 1024 BUCKETS;
針對這些局限性,MaxCompute推出了新的Clustering方法,即Range Clustering。
功能簡介
Range Clustering在對Cluster Key全排序的基礎上,將其範圍空間切分成若干個不連續範圍(Disjointed Range),每個Range作為一個Bucket,並且滿足如下條件:
相同數值存在於同一個Bucket裡面。
每個Bucket包含數值個數儘可能接近。
舉一個簡單的例子:假設有表T,定義如下。
CREATE TABLE T (C1 int)
RANGE CLUSTERED BY (C1)
SORTED BY (c1)
INTO 3 BUCKETS;
同時,C1列取值為{ 1, 8, -3, 2, 4, 1, 1, 3, 8, 20, -8, 9 }
。
在Range Clustering之後,得到如下3個Bucket:
Bucket 0 : { -8, -3, 1, 1, 1 }
Bucket 1 : { 2, 3, 4 }
Bucket 2 : { 8, 8, 9, 20 }
每個Bucket所代表的Range可能是不連續的(Disjointed)。例如Bucket 1的Range是
[2, 4]
,而Bucket 2的Range是[8, 20]
,(4, 8)
這個區間是沒有數值的。Range切分的目標是讓每個桶的大小盡量接近,而不是Range大小接近。在運算的時候,每個桶是一個並發的單位,桶的大小一致保證了資料沒有長尾。但是由於資料在各個區間分布密度不一致,桶的大小一致並不代表Range大小一致。
Range的切分過程是MaxCompute自動完成的,不需要手動指定每一個Range。因為在巨量資料情境下手動指定Range不高效,往往也不現實。MaxCompute內部會自動對資料進行排序、採樣,對各個區間的資料密度建立長條圖,最後通過合并計算各個區間長條圖,來實現最終理想的Range切分。
此外RANGE CLUSTERED BY
可以和SORTED BY
結合,從而保證了資料全域有序,在這個基礎上,MaxCompute將自動建立兩級索引:Global Index和File Index,用於快速定位和尋找索引值,如下圖所示:
和Hash Clustering相比,Range Clustering優勢如下:
支援區間查詢。
假設有查詢條件
c < 3
,則可以根據Global Index快速排除掉Bucket 2和Bucket 3,從而只在Bucket 0和Bucket 1中尋找。Hash Clustering只能對等值查詢做Bucket Pruning。支援多個Cluster列的組合尋找。
假設使用
RANGE CLUSTERED BY (c1, c2, c3) SORTED BY (c1, c2, c3)
,那麼在Range切分和資料存放區都是按照c1、c2、c3排序,這就可以允許做更複雜的組合查詢,比如c1 = 100 AND c2 > 0
或者c1 = 100 AND c2 = 50 AND c3 < 5
,這個也是Hash Clustering無法支援的。重要在多鍵組合查詢時,條件需要按順序列出排序鍵,並且只有最後一個Key允許使用區間條件。
Range Clustering提供了一個高效的全域ORDER BY實現。
在沒有Range Clustering之前為了保證全域有序,MaxCompute只能通過一個Instance進行ORDER BY排序,效率很低。使用Range Clustering之後做Range切分之後,各個Range可以並發分別排序,最後在組合到一起,效率極大提高。
使用說明
Range Clustering的文法和Hash Clustering類似,比較大的區別是RANGE關鍵字以及Bucket數目對於Range Clustering是可以省略的。
建立Range Clustering表
您可以使用以下語句建立Range Clustering表。您需要指定Range Cluster Key、Bucket數目(可選)。Sort是可選項,但在大多數情況下,建議和Range Cluster Key一致,以便取得最佳的最佳化效果。
命令文法
CREATE TABLE [IF NOT EXISTS] <table_name> [(<col_name> data_type [comment <col_comment>], ...)] [comment table_comment] [PARTITIONED BY (<col_name> data_type [comment <col_comment>], ...)] [RANGE CLUSTERED BY (<col_name> [, <col_name>, ...]) [SORTED BY (<col_name> [ASC | DESC] [, <col_name> [ASC | DESC] ...])] [INTO <number_of_buckets> BUCKETS]] [AS select_statement]
使用樣本。
非分區表。
CREATE TABLE T1 (a string, b string, c int) RANGE CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS;
分區表。
CREATE TABLE T1 (a string, b string, c int) PARTITIONED BY (dt int) RANGE CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS;
屬性說明
RANGE CLUSTERED BY
指定Range Cluster Key,MaxCompute將對指定一列或者多列進行排序和採樣,並根據指定Bucket數目,切分到儘可能理想的若干個Range裡面。為避免資料扭曲、避免熱點,取得較好的並存執行效果,Clustered By列適宜選擇取值範圍大、重複索引值少的列。此外為了達到最佳化目的,也應該考慮選取常用的Aggregation Key或者Filter Key。
SORTED BY
指定在Bucket內欄位的排序方式,建議Sorted By和Clustered By一致,以取得較好的效能。此外當Sorted By子句指定之後,MaxCompute將自動產生索引(Global Index和File Index),並且在查詢的時候利用索引來加速執行。
INTO number_of_buckets BUCKETS
和Hash Clustering不一樣,對於Range Clustering
INTO ... BUCKETS
是可選項。如果不指定Bucket數目,MaxCompute將根據資料大小自動決定分區數目。在大多數情況下,建議指定Bucket數目,以便根據實際情況來做最佳化選擇。對於Bucket數目的選擇,和Hash Clustering類似,建議保持在每個Bucket 512MB ~ 1GB這樣的資料量,對於特別大的表,Bucket 數目可以大些,但建議不要超過4000。
更改表的Hash Clustering屬性
對於分區表支援通過ALTER TABLE語句,來增加或者去除Range Clustering屬性。
命令語句
--更改表為Range Clustering表 ALTER TABLE <table_name> [RANGE CLUSTERED BY (<col_name> [, <col_name>, ...]) [SORTED BY (<col_name> [ASC | DESC] [, <col_name> [ASC | DESC] ...])] [INTO <number_of_buckets> BUCKETS]; --更改Range Clustering表為非Range Clustering表 ALTER TABLE <table_name> NOT CLUSTERED;
注意事項
ALTER TABLE語句改變聚集屬性,只對於分區表有效,非分區表一旦聚集屬性建立就無法改變。
ALTER TABLE語句只會影響分區表的建立分區(包括insert overwrite產生的),新分區將按新的聚集屬性儲存區,老的資料分區保持不變。
由於ALTER TABLE語句隻影響新分區,所以該語句不可以再指定PARTITION。
ALTER TABLE語句適用於存量表,在增加了新的聚簇屬性之後,新的分區將做Range Clustering儲存。
表屬性顯式驗證
在建立Range Clustering Table之後,可以通過如下命令來查看錶屬性,Range Clustering屬性將顯示在Extended Info
裡面。
DESC EXTENDED <table_name>;
返回結果樣本如下圖所示。對於分區表,除了使用以上命令查看錶屬性之後,還可以通過以下命令查看分區的屬性。
DESC EXTENDED <table_name> partition(<pt_spec>);
返回結果樣本如下圖所示。
使用情境
過濾查詢最佳化
Range Clustering保證了資料全域有序,在這個基礎上MaxCompute自動建立了Global Index和File Index,利用資料的儲存特性,可以加快資料過濾(Filter)的效率。不僅僅可以最佳化等值查詢,也可以最佳化區間查詢。
例如對於一個簡單的查詢條件id < 3
,先在最佳化器裡面將查詢條件抽取出來,並轉化成範圍空間(-∞, 3)
。這個時候就可以利用Global Index做Bucket Pruning,把不在這個區間的Bucket 2和Bucket 3都去掉。最後再利用每個Bucket檔案內建的Index,快速在檔案內部定位,這個過程稱之為謂詞下推(Predicate Pushdown),示意圖如下:TPC-H Q6在100GB資料集上使用Range Clustering後進行查詢,查詢語句如下。Q6是一個區間過濾的基礎上再做的彙總操作,使用Range Clustering可以利用兩級Index快速定位元據,無論是執行時間、CPU使用率和記憶體使用量率的效能都有數倍的提升。
select sum(l_extendedprice * l_discount) as revenue
from tpch_lineitem l
where l_shipdate >= '1994-01-01'
and l_shipdate < '1995-01-01'
and l_discount >= 0.05
and l_discount <= 0.07
and l_quantity < 24;
多鍵組合查詢
為了比較好的理解多鍵組合查詢情境,使用如下命令將mf_tab
表改造成Range Clustering表。
ALTER TABLE mf_project.mf_tab
RANGE CLUSTERED BY (project_name, name)
SORTED BY (project_name, name)
INTO 1024 BUCKETS;
這樣做的好處是,既可以在Project層級上做一些彙總查詢,樣本命令如下:
SELECT COUNT(*)
from mf_project.mf_tab
WHERE project_name="xxxdw"
AND ds="20180115"
AND type="TABLE";
又可以用按鍵組合來精確定位某張表,樣本命令如下:
SELECT count(*)
from mf_project.mf_tab
WHERE project_name="xxxdw"
AND name="adm_ctu_cle_kba_midun_trade_dd"
AND type="TABLE";
甚至可以用於地區查詢,比如統計以adm
開頭的表:
SELECT count(*)
from mf_project.mf_tab
WHERE project_name="xxxdw"
AND name>="adm"
AND name < "adn"
AND type="TABLE";
以上查詢都可以充分利用Range Clustering全域排序的特性,下推查詢謂詞,減少表掃描的IO量以及過濾計算的CPU和記憶體消耗。
對於多個Range Cluster Key組合的情境,是有一定要求的。對於RANGE CLUSTERED BY k0, k1, ..., kn
,如果查詢用到了km
,則k0, k1, ..., km-1
都必須在查詢條件中且都是等值條件,才能取得索引加速最佳效果。
假設表T的Range Cluster Key為k1, k2
,則有:
查詢條件為
k1 < 5
:可以Index加速。k1 = 10 AND k2 = 20
:可以Index加速。k1 = 10 AND k2 < 0
:可以Index加速。k2 < 0
:不可以Index加速,k1缺失。k1 < 0 AND k2 > 0
:可以Index加速部分,Index只過濾k1<0
的條件,k2>0
條件需要掃描。
Group By最佳化
對於Range Clustering,由於資料全域有序,並且相同的索引值在做Range切分的時候,放到了同一個Bucket裡面,Aggregation計算就可以利用這個資料的物理屬性,節省掉Shuffle的步驟。
例如,對於如下對錶T資料的Group By操作可以直接在一個Mapper Stage裡面完成。
CREATE TABLE T (department int, team string, employee string)
RANGE CLUSTERED BY (department, team)
SORTED BY (c1, c2)
INTO 1024 BUCKETS;
SELECT COUNT(*) from T GROUP BY department, team;
如果要取得最佳的GROUP BY最佳化效果,GROUP BY Key需要和RANGE CLUSTERED BY Key一致。
Aggregate 最佳化
表foo
結構如下。
create table foo(a bigint, b bigint, c bigint)
range clustered by (a,b)
sorted by(a,b) into 3 buckets;
假設表foo
的Range為:
Bucket 0: [1,1 : 3,3]
Bucket 1: [5,5 : 7,7]
Bucket 2: [8,8 : 9,9]
以上Bucket Range可以理解為:Bucket N: [lower bound values : upper bound values]
。如果按照a
來彙總的話,每個Bucket的範圍就變成:
Bucket 0: [1 : 3]
Bucket 1: [5 : 7]
Bucket 2: [8 : 9]
可以直接產生類似按照a、b彙總的執行計畫(Plan),啟動3個Instance對每個Bucket彙總,然後輸出結果後結束。
但是對於a的值橫跨多個Bucket的情況下,就會得到錯誤的結果,樣本如下。
Bucket 0: [1,1 : 3,3]
Bucket 1: [3,5 : 7,7]
Bucket 2: [7,8 : 9,9]
列a
有兩個值3和7,分別橫跨兩個Bucket,只有把擁有相同的a
值的tuple放到同一個Instance裡面去進行彙總才能得到正確的結果。也就是像下面按照紅色虛線重新切分Bucket,指定每個Instance讀取的資料的範圍才行:但是該從哪個地方切分, 這時候就需要長條圖的協助。對於Range Clustering表,在Clustering Key和Sort Key相同的情況下,當往Range Clustering表插入資料的時候,每個Bucket對應的Worker會每10000行採樣一個tuple,取其Clustering Keys的值,並儲存到長條圖中,最後每個Bucket的長條圖會存到Clustermeta檔案中。 這種長條圖被稱為等高長條圖(equi-depth histogram)。
目前僅在Clustering Key和Sort Key一樣的情況下才會採樣。
有了每個Bucket的長條圖就可以為每個Worker重新切分Bucket。切分原則如下:
擁有相同Grouping Key的tuple必須被劃分到同一個Bucket中。
每個Bucket中的資料盡量均勻的切分。
有了每個新的Bucket的起始值,每個Worker就可以讀取正確的資料範圍,並返回正確的結果。
使用TPC-H資料集中1TB的partsupp表來測試效能提升如何。首先使用如下命令將partsupp表改造為Range Clustering表:
CREATE TABLE partsupp ( PS_PARTKEY BIGINT NOT NULL,
PS_SUPPKEY BIGINT NOT NULL,
PS_AVAILQTY BIGINT NOT NULL,
PS_SUPPLYCOST DECIMAL(15,2) NOT NULL,
PS_COMMENT VARCHAR(199) NOT NULL)
RANGE CLUSTERED BY(PS_PARTKEY, PS_SUPPKEY)
SORTED BY(PS_PARTKEY, PS_SUPPKEY) INTO 128 BUCKETS;
使用下面的Query進行測試:
SELECT ps_partkey, count(*) c FROM partsupp GROUP BY ps_partkey;
使用如下命令關閉最佳化:
set odps.optimizer.enable.range.partial.repartitioning=false;
結果樣本:
使用如下命令開啟最佳化:
set odps.optimizer.enable.range.partial.repartitioning=true;
結果樣本:
由上圖可以看出,針對此查詢最佳化後速度提升了57%、CPU的使用率降低了52%、記憶體的使用降低了71%。當然針對不同資料量、不同的查詢,可能得到效能提升的程度也不同。
Range表的Join最佳化
假設有兩個表:
create table t1(a bigint, b bigint, c bigint, d bigint) range clustered by(a,b,c) sorted by(a,b,c) into 3 buckets; create table t2(a bigint, b bigint, c bigint, d bigint) range clustered by(a,b,c) sorted by(a,b,c) into 3 buckets;
然後分別往兩個表裡插入不同的資料。
對於Hash Clustering表,只要兩個相Join的Hash Clustering表擁有相同數目的Bucket,那麼兩個表對應的Bucket裡的資料就可以Join。與Hash Clustering表不同的是:即使相Join的兩個Range Clustering表擁有相同數目的Bucket,也不能簡單地直接根據Bucket Id來進行Join,因為每個Range Bucket所儲存的資料範圍(Boundary)很有可能會不同。兩個Range Clustering表相Join始終會產生如下圖所示有Shuffle的查詢計劃:對此需要進行改進,通過對兩個Range表的Boundary進行對齊來重新劃分每個表的Bucket,即重新定義每個Instance要讀取的資料範圍。
假設有兩個表:
create table t1(a bigint, b bigint, c bigint, d bigint) range clustered by(a,b,c) sorted by(a,b,c) into 5 buckets; create table t2(a bigint, b bigint, c bigint, d bigint) range clustered by(a,b,c) sorted by(a,b,c) into 3 buckets;
當插入一定量的資料後得到如下的Bucket Boundary:對於如下Query:
SELECT * FROM t1 JOIN t2 ON t1.a=t2.a AND t1.b=t2.b AND t1.c=t2.c;
最佳化器會取擁有較多Bucket數目的Boundary來和另外一個表的Boundary進行對齊,然後得到每個表的新的Boundary,即如下圖所示:這樣就產生了沒有Shuffle的查詢計劃:對於下面的查詢:
SELECT * FROM t1 JOIN t2 ON t1.a=t2.a AND t1.b=t2.b;
最佳化器會先對每個表按照a、b兩個列重新劃分Bucket,然後再將切分好的Bucket進行對齊再次切分,從而得到每個表的每個Bucket所需要讀取的資料範圍,進而產生像上圖那樣沒有Shuffle的查詢計劃。
效能測試
表改造
用TPC-H Q2在1TB的資料表進行測試,對part表和partsupp表進行Range Clustering的改造,其他表保持不變:
CREATE TABLE PARTSUPP ( PS_PARTKEY BIGINT NOT NULL, PS_SUPPKEY BIGINT NOT NULL, PS_AVAILQTY BIGINT NOT NULL, PS_SUPPLYCOST DECIMAL(15,2) NOT NULL, PS_COMMENT VARCHAR(199) NOT NULL) RANGE CLUSTERED BY(PS_PARTKEY, PS_SUPPKEY) SORTED BY(PS_PARTKEY, PS_SUPPKEY) INTO 128 BUCKETS; CREATE TABLE PART ( P_PARTKEY BIGINT NOT NULL, P_NAME VARCHAR(55) NOT NULL, P_MFGR CHAR(25) NOT NULL, P_BRAND CHAR(10) NOT NULL, P_TYPE VARCHAR(25) NOT NULL, P_SIZE BIGINT NOT NULL, P_CONTAINER CHAR(10) NOT NULL, P_RETAILPRICE DECIMAL(15,2) NOT NULL, P_COMMENT VARCHAR(23) NOT NULL) RANGE CLUSTERED BY(P_PARTKEY) SORTED BY(P_PARTKEY) INTO 64 BUCKETS;
TPC-H Q2的內容如下:
select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from part, supplier, partsupp, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 15 and p_type like '%BRASS' and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'EUROPE' and ps_supplycost = (select min(ps_supplycost) from partsupp, supplier, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'EUROPE') order by s_acctbal desc, n_name, s_name, p_partkey limit 100;
測試結果
使用如下命令關閉最佳化:
set odps.optimizer.enable.range.partial.repartitioning=false;
結果樣本:
使用如下命令開啟最佳化:
set odps.optimizer.enable.range.partial.repartitioning=true;
結果樣本:
對比可以看出,最佳化後不但有兩個Stage的減少,速度提升了約21.4%,而且CPU的消耗減少了約35.4%,記憶體的使用也減少了約54.6%。
全域排序加速
Range Clustering還可以用來做全域排序加速。在普通的ORDER BY情境,為保證全域有序,所有的排序資料合併到一個單獨的Instance運行,這就無法發揮平行處理的優勢。利用Range Clustering的paritition步驟,可以實現並發多路全排序。首先對資料取樣並劃分Range,然後對各個Range做並發排序,最後得到的就是全域有序的結果。
需要注意的是,排序結束以後,修改表或分區儲存時仍然包括了多個檔案(Buckets),在消費資料時,如果要保證全域有序,需要按Bucket順序讀取檔案。
Range Clustering的全排序加速預設關閉,如果需要開啟,請使用以下Flag。
set odps.optimizer.distribute.ordering.enable=true;
局限性和注意事項
Range Clustering和Hash Clustering比較,有優勢的同時在某些方面也有一些局限:
Range Clustering在資料產生的代價比Hash Clustering要高。Hash Clustering只是做一個簡單的Hash操作和排序,但是對於Range而言,需要做資料採樣,排序,以及Histogram的合并等等,總體的代價(已耗用時間、CPU使用率、Memory Cost)比Hash Clustering高。所以在Hash Clustering可以解決的問題,就不需要用Range Clustering。
不支援DYNAMIC PARTITION,以及INSERT INTO情境。
對Join的支援方面只支援Inner Join、Left/Right Outer Join、Semi Join,不支援Anti Join和Full Outer Join。
Range Clustering表的Range Cluster Key必須和Sort Key相同,比如對於表foo中
range clustered by (a,b) sorted by (a,b)
,本文中的最佳化可以生效;但是對於表bar中range clustered by(a,b) sorted by (b,a)
,本文中的最佳化就不會生效。Join Key和Group Key必須為Range Clustering key的全部或者首碼。比如
range clustered by(a,b,c) sorted by(a,b,c)
,對於Join Key和Group Key為a
、a,b
或a,b,c
的查詢才能應用本文中的最佳化;對於Join Key或Group Key為b
或a,c
的查詢就無法應用本文中的最佳化。對於分區表,如果讀取的Range Clustering表涉及兩個或兩個以上的分區無法應用本文中的最佳化。目前本文中的最佳化只針對單個分區的分區表和非分區表有效。