全部產品
Search
文件中心

PolarDB:列存索引中TopK運算元的實現

更新時間:Jul 06, 2024

在海量資料上求TopK是一個很經典的問題,特別是衍生出的深翻頁查詢,給AnalyticDB帶來了很大的挑戰。本文將介紹PolarDB MySQL版的列存索引(In Memory Column Index,IMCI)特性如何應對這樣的挑戰。

背景

業務系統中普遍存在這樣一種情境:根據給定條件式篩選一批記錄,這些記錄按使用者指定的條件排序,以分頁的方式展示。例如,篩選出某個商家在售的商品,按商品銷量排序,以分頁的方式展示。上述情境在資料庫中,通常以ORDER BY column LIMIT n, m這樣的查詢語句實現。

假設業務系統中每頁展示100條記錄:

  • 通過ORDER BY column LIMIT 0, 100可以展示第1頁的資料;

  • 通過ORDER BY column LIMIT 1000000, 100可以展示第10001頁的資料。

在沒有索引的情況下,此類查詢在資料庫中是通過基於堆的經典TopK演算法來實現的,邏輯如下:在記憶體中維護一個大小為K的堆,堆頂元素是最小的元素,將遍曆到的資料與堆頂元素比較,如果比堆頂元素大,替換堆頂元素,並重建堆。遍曆完資料後,堆中的元素即為前K個最大的元素。當翻頁較淺時(如上文中展示第1頁),K較小,基於堆的TopK演算法非常高效。

然而,業務情境中也存在翻頁較深的情境(下文中簡稱“深翻頁”),例如上文中展示第10001頁。該情境下的K非常大,記憶體中可能無法緩衝大小為K的堆,也就無法使用上述方式獲得查詢結果。即便記憶體充足,由於維護堆的操作訪問記憶體是亂序的,當堆非常大時,TopK演算法的訪存效率較差,最終的效能表現也差強人意。

PolarDB MySQL版IMCI最初也採用了上述方式來實現分頁查詢,並在記憶體不足以緩衝大小為K的堆時,採用了全表排序後取相應的位置記錄的方式,所以在深翻頁時的效能表現也不是非常理想。為此,在分析了深翻頁情境的特點和傳統方案存在的問題,並調研了相關研究和工業界現有方案後,PolarDB MySQL版重新設計了IMCI的Sort/TopK運算元。在測試情境中,重新設計的Sort/TopK運算元顯著提升了IMCI在深翻頁情境的效能表現。

業界方案調研

TopK是一個非常經典的問題,存在多種方案來高效地實現TopK查詢,這些方案的核心都在於減少對非結果集資料的操作。已經在工業界中應用的方案主要有如下三種:

基於Priority Queue的TopK演算法

請參見背景部分內容描述。

歸併排序時基於offset和limit做truncate

當記憶體不足以緩衝大小為K的Priority Queue時,一些資料庫會使用歸併排序來處理TopK問題(例如PolarDB IMCI、ClickHouse、SQL Server、DuckDB)。因為TopK演算法只需要擷取排在第[offset, offset+limit)位的記錄,所以在每一次歸併排序時,不需要對所有資料做排序,而是僅輸出長度為offset+limit的新的sorted run即可。上述merge時的truncation可以在保證結果正確性的同時減少對非結果集資料的操作。

image.png

Self-sharpening Input Filter

該方案最初是在Goetz Graefe的論文中提出的,雲資料庫ClickHouse目前採用了這種方案。該方案在執行過程中會維護一個cutoff value,並且保證大於cutoff value的記錄一定不會出現在TopK的結果集中。在產生new sorted run時,方案會使用當前的cutoff value對資料進行過濾。在產生new sorted run之後,如果K小於new sorted run的長度,則會使用new sorted run中第K條記錄替換當前cutoff value。由於new sorted run中的資料都是經過old cutoff value過濾的,因此必定有new cutoff value <= old cutoff value,即cutoff value是一個不斷sharpening的值。最後只需要合并這些過濾後的sorted run即可得到結果集。

通過一個簡單的例子來說明上述演算法:假設當前TopK查詢中K=3,讀取第一批資料後產生的sorted run為(1, 2, 10, 15, 21),則cutoff value更新為10。接下來使用cutoff value=10過濾第二批資料,產生的第二個sorted run為(2, 3, 5, 6, 8),則cutoff value更新為5。然後讀取並過濾第三批資料,產生的第三個sorted run為(1, 2, 3, 3, 3),則cutoff value更新為3。依此類推,不斷sharpen cutoff value從而在接下來過濾更多的資料。

如果TopK查詢中K大於單個sorted run的長度,該方案會積累足夠的sorted run(包含的記錄數量大於K),然後對這些sorted run提前進行merge,從而獲得cutoff value。接下來會使用cutoff value進行過濾並繼續積累足夠的sorted run,從而獲得更小的cutoff value,依此類推。整個執行過程和K小於單個sorted run的情況是類似的,區別在於需要merge足夠的sorted run才能獲得cutoff value。

image.png

問題分析

深翻頁是TopK問題中一個較為特殊的情境,特殊之處在於所求的K特別大,但實際結果集很小。例如上文中展示第10001頁的例子,對於ORDER BY column LIMIT 1000000, 100,K=1,000,100,但最終結果集只包含100條記錄。該特點會給上一節中所述方案帶來如下挑戰:

  • 當記憶體充足時,如果採用基於Priority Queue的TopK演算法,則需要維護一個非常大的Priority Queue,隊列操作對記憶體的訪問操作是亂序的,訪存效率較差,影響演算法實際啟動並執行效能。

  • 當記憶體不足時,如果使用歸併排序並基於offset和limit做truncate,則在歸併排序的前期階段,sorted run的長度可能小於offset+limit,無法進行truncate,所有資料都將參與排序,truncate的實際效果受到影響。

重要

本文中的記憶體充足是指,演算法中用於管理至少K條記錄的資料結構可以在執行記憶體中緩衝,而不是TopK查詢的輸入資料可以在執行記憶體中緩衝。實際上本文討論的情境,TopK查詢的輸入資料都是遠大於執行記憶體的。

另外,從系統設計的角度上看,設計深翻頁的解決方案時還應考慮如下兩點:

  • 是否採用不同方案來實現深翻頁和淺翻頁?如果需要使用不同的方案來處理兩種情境,如何判斷深淺翻頁的界線?

  • 如何根據可用執行記憶體的大小自適應地選擇記憶體演算法或磁碟演算法?

方案設計

總體設計

綜合上述調研和分析,基於現有的成熟方案,針對深翻頁問題,PolarDB MySQL版重新設計了IMCI的Sort/TopK運算元:

  • 記憶體充足時,採用如下的記憶體演算法:

    • 採用Self-sharpening Input Filter的設計,避免訪存效率的問題。

    • 並在此基礎上利用SIMD指令提升過濾效率。

    • 深淺翻頁均採用該記憶體演算法,不需要判斷深淺翻頁的界線。

  • 記憶體不足時,採用如下的磁碟演算法:

    • 採用歸併排序時基於offset和limit做truncate的方案。

    • 並在此基礎上利用ZoneMap在歸併排序的前期階段做pruning,儘可能地減少對非結果集資料的操作。

  • 動態選擇記憶體磁碟演算法:不依賴固定的閾值來選擇使用記憶體演算法或磁碟演算法,而是在執行過程中根據可用執行記憶體的大小,動態調整所用演算法。

由於Self-sharpening Input Filter和歸併排序時基於offset和limit做truncate的方案在前文中已經介紹,因此接下來僅介紹選擇這兩種方案的原因,並介紹利用SIMD指令提升過濾效率、利用ZoneMap做pruning以及動態選擇記憶體磁碟演算法的部分。

SIMD Accelerated Self-sharpening Input Filter

在記憶體充足時,直接採用Self-sharpening Input Filter的設計,主要基於兩個原因:

  • Self-sharpening Input Filter不管是使用cutoff value進行過濾,還是pre-merge,訪問記憶體的模式都是順序的,可以避免Priority Queue訪存效率的問題。

  • 該設計無論翻頁深淺都具有優異的效能,在應用時不需要考慮深淺翻頁的界線。

實際上,Self-sharpening Input Filter在某種程度上和基於Priority Queue的演算法是類似的,cutoff value類似堆頂,都用於過濾後續讀取的資料,兩者的不同之處在於,基於Priority Queue的演算法會即時更新堆頂,而Self-sharpening Input Filter則將資料積累在sorted run中,以batch的方式更新cutoff value。

使用cutoff value進行過濾是Self-sharpening Input Filter中很重要的過程,涉及資料比較,操作簡單重複但非常頻繁,因此使用SIMD指令來加速這一過程。由於cutoff value過濾和TableScan中使用Predicate過濾是類似的,因此在具體實現中直接複用處理Predicate的運算式,提升過濾的效率,減少計算TopK的時間。

Zonemap-based Pruning

在記憶體不足時,採用歸併排序,並基於offset和limit做truncate,主要原因如下:

  • 如果在記憶體不足時繼續使用Self-sharpening Input Filter的設計,就需要將積累的sorted run落盤,並且在pre-merge時同樣使用外排序演算法,產生大量的讀寫磁碟的操作,相對於記憶體充足情境下的Self-sharpening Input Filter有額外的開銷。當K非常大時,pre-merge時的外排序可能還會涉及大量非結果集資料,因為最終只需要擷取排在第[offset, offset + limit)位的記錄,而不關心排在第[0, offset)位的記錄。

  • 在這種情境下,可以使用歸併排序,在產生sorted run的階段僅將sorted run落盤,然後使用統計資訊進行pruning,避免不必要的讀寫磁碟的操作,也可以儘可能地避免對非結果集資料的操作。

以下圖為例來說明使用統計資訊進行pruning的原理。下圖中,箭頭表示數軸,代表sorted run的矩形的左右兩端在數軸上對應的位置表示sorted run的min/max值,Barrier表示pruning所依賴的一個閾值。 ARRIER

image.png

  • 任意一個Barrier可以將所有sorted run分為三類:

    • 類型A:min value of sorted run<Barrier&&max value of sorted run<Barrier,如上圖中Run1,Run2。

    • 類型B:min value of sorted run<Barrier&&max value of sorted run>Barrier,如上圖中Run3。

    • 類型C:min value of sorted run>Barrier&&max value of sorted run>Barrier,如上圖中Run4,Run5。

  • 對於任意一個Barrier,如果類型A和類型B中的資料量<TopK查詢中的offset,那麼類型A中的資料必然排在第[0, offset)位,類型A中的sorted run可以不參與後續的merge。

  • 對於任意一個Barrier,如果類型A中的資料量>TopK查詢中的offset+limit,那麼類型C中的資料必然排在第[offset+limit, N)位,類型C中的sorted run可以不參與後續的merge。

根據上述原理,使用統計資訊進行pruning的具體流程如下:

  1. 構建包含sorted run的min/max資訊的Zonemap。

  2. 基於Zonemap尋找一個儘可能大的Barrier1滿足類型A和類型B中的資料量<TopK查詢中的offset。

  3. 基於Zonemap尋找一個儘可能小的Barrier2滿足類型A中的資料量>TopK查詢中的offset+limit。

  4. 使用Barrier1和Barrier2對相關sorted run進行pruning。

動態選擇記憶體磁碟演算法

記憶體演算法和磁碟演算法不同,如果使用一個固定的閾值來作為選擇記憶體演算法或磁碟演算法的依據(比如K小於閾值時使用記憶體演算法,否則使用磁碟演算法),那麼針對不同的可用執行記憶體就需要設定不同的閾值,帶來了人工幹預的開銷。

因此PolarDB MySQL版設計了一個簡單的回退機制,可以在執行過程中根據可用執行記憶體的大小,動態調整所用演算法:

  • 無論可用執行記憶體有多大,首先嘗試以記憶體演算法計算TopK。

  • 在記憶體演算法的執行過程中,如果記憶體始終都是充足的,直接使用記憶體演算法完成整個計算過程。

  • 在記憶體演算法的執行過程中,如果出現記憶體不足的情況(例如,K比較大時,可用執行記憶體不足以緩衝足夠的sorted run使其包含的記錄數量大於K,或者可用執行記憶體不足以完成pre-merge的過程),那麼執行回退機制。

  • 回退機制:採集記憶體中已積累的sorted run的min/max資訊,以便於後續使用Zonemap進行pruning,然後將sorted run落盤,這些sorted run將會參與磁碟演算法的計算過程。

  • 執行完回退機制後,使用磁碟演算法完成整個計算過程。

由於記憶體演算法和磁碟演算法採用相同的資料群組織格式,因此回退機制並不需要對資料進行重新組織,開銷較小。另外,記憶體演算法只會過濾非結果集的資料,因此直接使用記憶體演算法已積累的sorted run參與磁碟演算法的計算過程不會有正確性的問題。

其他

延遲物化

延遲物化是一個工程實現方面的最佳化,指的是在產生sorted run時僅物化RowID和ORDER BY相關的運算式(列),在計算出TopK的結果集後,再根據結果集中的RowID從儲存上擷取查詢需要輸出的列。延遲物化相比於在產生sorted run時就物化查詢需要輸出的所有列有兩個優勢:

  • 物化RowID的空間佔用更小,在可用執行記憶體一定的情況下,可以使用記憶體演算法處理更大的資料量。

  • 計算TopK的過程需要調整資料順序,涉及對資料的Copy/Swap。如果在產生sorted run時就物化查詢需要輸出的所有列,則計算過程中對一條記錄的Copy/Swap需要對每一列都進行相應操作,帶來很大的overhead。而如果僅物化RowID,則可以降低Copy/Swap的代價。

延遲物化的不足之處在於根據結果集中的RowID從儲存上擷取查詢需要輸出的列時,可能會產生一些隨機IO。分析後發現深翻頁情境雖然K特別大,但實際結果集很小,因此使用延遲物化時隨機IO產生的overhead較小。

計算下推

應用Self-sharpening Input Filter時,會將不斷更新的cutoff value下推至table scan運算元,作為SQL中一個新的predicate,在table scan運算元擷取資料時根據這個新的predicate,複用pruner對pack(或稱為row group)進行過濾。

計算下推可以從兩個方面提升TopK查詢的效能:

  • 減少IO:table scan時避免讀取僅包含非結果集資料的pack/row group。

  • 減少計算:被過濾的pack/row group中的資料將不再參與table scan上層運算元的後續計算。

測試結果

在TPCH 100 GB的資料集上對方案進行簡單的驗證

select
    l_orderkey,
    sum(l_quantity)
from
    lineitem
group by
    l_orderkey
order by
    sum(l_quantity) desc
limit
    1000000, 100;

測試結果如下:

PolarDB IMCI

ClickHouse

MySQL

11.63 sec

23.07 sec

353.15 sec