本文為您介紹Flink批處理的一些基本原理和配置調優。
背景資訊
作為支援流處理和批處理的統一計算架構,Flink能夠同時處理兩種不同的資料模式。儘管Flink在流處理和批處理模式下共用許多核心執行機制,但兩種模式在作業執行機制、配置參數和效能調優方面存在一些關鍵差異。本文將專門針對Flink批次工作,為您介紹其獨特的執行機制、配置參數。通過深入理解這些差異,您將能夠更加高效地對作業進行調優,以及排查和解決在使用Flink批次工作中遇到的問題。
Realtime ComputeFlink版也對Flink批處理進行了專門的支援,提供了作業開發、作業營運、作業編排、資源隊列管理、資料結果探查等能力,您可以通過Flink批處理快速入門快速地瞭解上手。
批作業和流作業的比較
在介紹Flink批次工作的配置參數和調優方法之前,首先需要瞭解Flink批處理與流處理作業在執行機制上的差異。
執行模式
流處理作業:流處理模式專註於處理持續不斷的無界資料流,其核心在於實現低延遲的資料處理。在這種模式下,資料會以流水線模式在節點間即時傳遞並被處理。因此,流處理作業所有節點的子任務會同時部署和執行。
批次工作:批處理模式專註於處理有界資料集,重點在於提供高輸送量的資料處理。在這種執行模式下,作業通常由多個階段組成,互不依賴的階段可以並存執行,以提高資源使用率;對於存在資料依賴的階段,下遊任務需等待上遊任務完成後才能啟動。
資料轉送
流處理作業:為了實現低延遲,流處理作業的中間資料保留在記憶體中並直接通過網路進行傳輸,不會持久化。如果下遊節點處理能力不足,則可能會導致上遊節點遭遇反壓。
批次工作:批次工作的中間結果會寫入到外部儲存系統中以供下遊使用。預設情況下,這些結果檔案儲存在TaskManager的本地磁碟;如果使用遠端Shuffle服務,則資料檔案會儲存在遠端Shuffle服務中。
資源需求
流處理作業:流處理作業在啟動時需要預先分配所有資源,以確保所有子任務能夠同時部署並運行。
批次工作:批次工作在運行時不需要一次性擷取所有資源。Flink可以分批調度輸入資料已經就緒的任務,從而能夠更高效地利用現有資源,即使在資源受限(甚至單Slot)的情況下也能順利執行。
任務失敗重啟
流處理作業:流處理作業在遇到故障時可以從最近的檢查點或儲存點恢複,這樣作業進度回退的程度較小。但因為中間結果不持久化,恢複時需要重新啟動所有任務。
批次工作:批次工作的中間結果資料會落盤,因此當任務出錯重啟時,這些中間結果能夠被再次利用,這意味著只需重啟失敗的任務以及它的下遊任務即可,無需全域回溯。這樣可以減少因故障而需要重新執行的任務數,提高恢複效率。不過由於批作業沒有檢查點機制,這些重啟的任務需要從頭開始運行。
關鍵配置參數及調優方式
本章節將為您介紹Flink批作業的關鍵配置。
資源配置
CPU和Memory
在作業的資源配置視窗中,您可以為作業設定單個JobManager和TaskManager的CPU和記憶體資源,以下是一些配置建議:
JobManager資源配置:建議為JobManager分配1個CPU核心和至少4 GiB的記憶體資源,以確保其順利執行作業調度與管理。
TaskManager資源配置:建議根據Slot數量分配相應的資源。具體來說,建議為每個Slot配備1個CPU核心和4 GiB記憶體。如果一個TaskManager擁有n個Slot,那麼總共應為其分配n個CPU核心和4n*GiB記憶體。
Realtime Compute引擎中的批次工作預設為每個TaskManager分配一個Slot。為了降低調度和管理TaskManager的開銷,您可以考慮將每個TaskManager的Slot數量增加到2或4。
然而需要注意的是,每個TaskManager的可用磁碟空間是有限的,與其分配的CPU核心數是成比例的。具體來說,會給每個CPU核心配額20 GiB的磁碟空間。TaskManager最低磁碟空間為20 GiB,最大磁碟空間為200 GiB。
因此增加每個TaskManager上的Slot數量意味著更多的任務將在同一TaskManager節點上運行,這可能會加劇本地磁碟空間的緊張狀況,甚至可能導致磁碟空間不足。如果磁碟空間不足,則會導致作業失敗並重啟。
對於規模較大或拓撲結構複雜的作業,JobManager和TaskManager可能需要更高規格的資源配置。在這些情況下,應根據作業的具體需求適當提高資源配置,以確保作業能夠高效且穩定地運行。
此外,如果您在作業執行過程中遇到資源相關的問題,可以參考此文檔進行故障診斷和解決:
Apache Flink Memory Troubleshooting。
為了保證作業穩定運行,每個JobManager和TaskManager至少需要配置0.5個CPU核心和2 GiB記憶體。
最大Slot個數
配置Flink作業允許分配的最大slot數量。由於Flink批作業在資源受限的情況下也可以運行,在資源受限的環境中,通過設定最大Slot數量,可以限制Flink批作業所使用的最大資源量。這有助於避免批作業佔用過多資源,從而影響其他作業的運行。
並行度配置
在作業的資源配置中,支援為作業設定全域並行度或自動推斷並行度。
全域並行度:全域並行度決定作業中任務的最大並存執行數量。您可以直接在頁面上填寫作業的並行度,作業將使用該值作為全域預設並行度。
自動推斷:配置自動推斷後,Flink批作業將通過分析每個節點的消費總資料量和每個子任務期望處理的平均資料量來自動推導並行度,協助您最佳化並行度配置。
此外,Realtime Compute引擎VVR 8.0及以上版本提供了以下配置項(作業運行參數配置地區配置),使您能夠對自動並行度推導進行更精細的調優:
在Realtime Compute引擎VVR 8.0及以上版本中,Flink批作業預設開啟自動推導並行度功能,並使用您配置的全域並行度作為自動推導並行度的上限。建議您使用Realtime Compute引擎VVR 8.0及以上版本,以獲得Flink批次工作更優的效能表現。
配置項 | 說明 | 預設值 |
execution.batch.adaptive.auto-parallelism.enabled | 是否啟用自動並行度推導。 | true |
execution.batch.adaptive.auto-parallelism.min-parallelism | 允許自動化佈建的並行度最小值。 | 1 |
execution.batch.adaptive.auto-parallelism.max-parallelism | 允許自動化佈建的並行度的最大值。如果未配置此參數,將採用全域並行度作為預設值。 | 128 |
execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task | 期望每個任務平均處理的資料量大小。Flink將根據此配置和節點實際需要處理的資料量來動態決定節點並行度。 | 16MiB |
execution.batch.adaptive.auto-parallelism.default-source-parallelism | Source運算元的預設並行度。目前Flink無法很好的感知到Source節點要讀取的資料量,因此需要您自行配置其並行度。如果未配置,會取全域並行度。 | 1 |
常見問題
並行度與Slot數的區別
並行度是指作業中能夠同時執行的任務執行個體的最大數量。它反映了作業處理能力的理論上限。Slot是Flink作業中的資源分派單元,Slot的數量決定了Flink作業能夠同時處理多少個任務執行個體。
由於流處理作業一次性擷取全部的資源來同時運行所有任務,在預設開啟Slot-sharing的情況下,申請的Slot數量通常與作業全域並行度數一致,以確保所有任務都能獲得必要的資源。
批次工作處理的是有限資料集,不需要所有任務一次性擷取所有資源。全域並行度表示作業各節點的最大並行任務數,而實際的並存執行數量取決於當前可用的Slot。
批作業運行卡住如何定位
您可以參見查看作業效能文檔瞭解如何監控TaskManager的記憶體、CPU和線程使用方式。
記憶體問題排查:首先檢查記憶體使用量情況,判斷是否存在記憶體不足導致的頻繁記憶體回收(GC)。如果確認存在記憶體不足,應增加TaskManager的記憶體配置,以減少因頻繁記憶體回收導致的效能問題。
CPU流量分析:檢查是否存在個別線程佔用了大量CPU資源,這可能是導致作業卡頓的原因。
線程棧跟蹤:利用線程棧資訊,分析當前節點啟動並執行瓶頸所在。
報錯No space left on device
當您在Realtime Compute引擎中運行批次工作時,如果遇到No space left on device的報錯,這通常意味著TaskManager用於儲存中間結果檔案的本地磁碟空間已被耗盡。每個TaskManager的可用磁碟空間是有限的,與其分配的CPU核心數是成比例的。具體來說,會給每個CPU核心配額20GiB的磁碟空間。TaskManager最低磁碟空間為20GiB,最大磁碟空間為200GiB。
解決建議:
減少每個TaskManager上的Slot數量,可以降低單個節點上的並行任務數,從而減少對本地磁碟空間的需求。
提高TaskManager的CPU核心數,從而提高TaskManager的磁碟空間大小。
相關文檔
利用Realtime ComputeFlink版關鍵功能進行資料批處理快速入門,請參見Flink批處理快速入門。
運行參數配置方法,請參見如何配置作業運行參數?