Compaction可以把所有的資料檔案按照一定策略進行Merge操作,可提升查詢效率。
功能介紹
Delta Table支援近即時增量寫入和time travel查詢特性,在資料頻繁寫入的情境中,必然會引入大量的小檔案,需要設計合理高效的合并策略來對小檔案進行合并以及資料去重,解決大量小檔案讀寫IO低效以及緩解儲存系統的壓力,但也要避免頻繁Compact引發嚴重的寫放大和衝突失敗。
目前主要支援兩種資料合併方式:
Clustering:只是把Commit的DeltaFile合并成一個大檔案,不改變資料內容。系統內部會根據新增的檔案大小、檔案數量等因素周期性地執行,不需要使用者手動操作。主要解決小檔案IO讀寫效率和穩定性問題。
Compaction:會把所有的資料檔案按照一定策略進行Merge操作,產生一批新的BaseFile,相同PK的資料行只儲存最新的狀態,不包含任何歷史狀態,也不會包含任何系統列資訊,主要用於提升查詢效率。
命令格式
alter table <table_name> [partition
(<partition_key> = '<partition_value>' [, ...])
]
compact major;
注意事項
手動執行Compaction需要依賴設定一個flag。
set odps.merge.task.mode=service;
Compaction會產生新的檔案,雖然可以提升查詢效率,但也要承擔對應的資料檔案儲存成本,您需要根據業務情境進行權衡,選擇合適的觸發頻率。
Compaction會使用計算資源對資料進行讀取重寫,若您是隨用隨付模式,將產生Compaction費用,其計費方式為掃描量*1*單價
。若您是訂用帳戶付費模式,那麼Compaction會使用訂用帳戶的計算資源額度。
使用樣本
--建立表
create table mf_dt (pk bigint not null primary key, val bigint not null)
partitioned by (dd string, hh string)
tblproperties ("transactional"="true");
--插入資料
insert into table mf_dt partition(dd='01', hh='01') values (1, 1), (2, 2);
insert into table mf_dt partition(dd='01', hh='01') values (2, 20), (3, 3);
insert into table mf_dt partition(dd='01', hh='01') values (3, 30), (4, 4);
--執行compaction,以及操作完成後,可繼續查詢歷史資料。
set odps.merge.task.mode=service;
alter table mf_dt partition(dd='01', hh='01') compact major;
select * from mf_dt timestamp as of get_latest_timestamp('mf_dt') where dd='01' and hh='01';
select * from mf_dt timestamp as of get_latest_timestamp('mf_dt', 2) where dd='01' and hh='01';