全部產品
Search
文件中心

Realtime Compute for Apache Flink:Task快速重啟配置

更新時間:Sep 13, 2024

本文為您介紹,如何配置Task快速重啟,從而降低Failover對作業的影響。

背景資訊

重要

此功能為實驗性功能,請在生產環境下謹慎使用,如遇問題請及時提交工單和支援人員部門取得聯絡。

通常,當Flink流作業中的某個Task發生異常時,為了保證資料一致性,同一個PipelineRegion的所有Task都會進行Failover。作業Failover後,Source節點需要從上一個Checkpoint位點開始消費資料。然而,在一些作業中,Task Failover後還需要下載大資源檔或者State資料。如果作業並發很高,所有Task進行一輪Failover的調度時間可能也會比較長。這些都會導致作業出現延遲或阻塞,一段時間內無法正常消費資料等問題,恢複正常運行所需的時間會更長。

Task快速重啟配置可以有效緩解上述問題。配置Task快速重啟後,當某個Task發生異常時,可以只重啟失敗的Task。從而避免由於非Source Task異常導致的Source Task回退到上一個Checkpoint位點重新消費資料的情況,減少由於Task取消、重啟、追資料導致作業無法正常消費資料的時間長度。此外,還可以緩解高並發的情況下,所有Task進行一輪Failover的調度時間和初始化給叢集造成的壓力,從而降低Failover對作業的影響。

目前Task快速重啟支援兩種一致性語義:APPROXIMATE(不保證資料不丟失和重複)和AT_LEAST_ONCE(保證資料不丟失,不保證資料不重複)。其中APPROXIMATE語義無效能開銷,而AT_LEAST_ONCE語義存在效能開銷且需要額外使用Object Storage Service。

使用限制

  • 不能在有限資料來源的流作業中使用。

  • 無法與Unaligned Checkpoint共同使用。

  • 無法在批作業中使用。

  • 同一個Session叢集內無法同時運行未配置和配置了Task快速重啟的作業。

  • 若作業中有運算元實現了prepareSnapshotPreBarrier方法,或在運行中會發送與Checkpoint相關的資訊,則不可使用AT_LEAST_ONCE語義。

注意事項

語義

注意事項

APPROXIMATE

  • 當某個Task進行Failover時,其上遊Task將無法繼續向該Task發送資料,資料將會造成反壓。因此Task Failover期間和之後一段時間出現的反壓和numRecordsInPerSecond指標歸零都是正常情況,在Task Failover完成後會恢複。

    如果作業僅有Rebalance或Rescale邊,且每個邊的下遊均有多個並發,可以配合Dynamic Rebalance功能,將資料發送給未出錯的任務處理,從而實現Failover期間資料處理不受影響。

  • 如果Task異常是由TaskManager異常退出或網路異常導致的,則配置Task快速重啟後,作業Failover的耗時可能不會有明顯的縮短,極端情況下可能會更長。

  • 由於單個Task Failover期間作業Checkpoint會失敗,建議調高容忍Checkpoint失敗的次數。

  • 目前,配置Task快速重啟後,如果發生作業Failover,將會出現資料丟失或重複。因此請您一定要先確保您的業務可以允許出現資料的丟失或重複,再配置Task快速重啟。

  • 啟用Task快速重啟後,您可以忽略Flink UI頁面上顯示的Checkpoint一致性語義。在Flink UI頁面上,Checkpoint一致性語義固定顯示為AT_LEAST_ONCE,但實際為APPROXIMATE,該語義不保證資料不丟失和重複。

AT_LEAST_ONCE

  • 目前的實現下,開啟後作業效能會有一定下降。請關注效能和延遲情況,適當擴充資源。

  • 當某個Task進行Failover時,或AT_LEAST_ONCE語義下Failover後回追資料過程中,其上遊Task將無法繼續向該Task發送資料,資料將會造成反壓。因此Task Failover期間和之後一段時間出現的反壓和numRecordsInPerSecond指標歸零都是正常情況,在Task Failover完成後會恢複。

    如果作業僅有Rebalance或Rescale邊,且每個邊的下遊均有多個並發,可以配合Dynamic Rebalance功能,將資料發送給未出錯的任務處理,從而實現Failover期間資料處理不受影響。

  • 如果Task異常是由TaskManager異常退出或網路異常導致的,則配置Task快速重啟後,作業Failover的耗時可能不會有明顯的縮短,極端情況下可能會更長。

  • 由於單個Task Failover期間作業Checkpoint會失敗,建議調高容忍Checkpoint失敗的次數。

  • 開啟後各個Task將以較高頻率儲存自身的狀態,同時會記錄輸出的資料,這都會增加作業讀寫OSS的IO量。不建議在State較大或資料流量較大的作業上開啟。

    說明
    • 您可以在EMR控制台查看OSS使用的頻寬和QPS。

    • 作業當前總IO量可用各節點 Bytes Received的總和 / 已耗用時間估算。

  • 某個作業開啟該功能導致OSS的IO量增加,可能導致使用同一帳號下的OSS的作業受到影響,反之亦然。因此,請同時關注其他作業情況和OSS使用方式。

  • 部分情況下,包括首次Checkpoint完成前,作業異常會回退為正常的Failover,屬於正常情況。

  • 啟用AT_LEAST_ONCE語義後作業的Checkpoint ID不再為連續數值。

  • 由於作業Failover次數為獨立計數,如果出現全域性異常導致大量Task同時Failover,Failover計數會按照發生異常的Task數量相應增加。

操作步驟

  1. 進入Task快速重啟配置入口。

    1. 登入Realtime Compute控制台

    2. 單擊目標工作空間操作列下的控制台

    3. 營運中心 > 作業營運頁面,單擊目標作業名稱。

    4. 部署詳情頁簽,單擊運行參數配置地區右側的編輯

  2. 其他配置中,增加如下代碼資訊。

    • 若使用APPROXIMATE語義:

      individual-task-failover.enabled: enabled_approximate
      shuffle-service-factory.class: org.apache.flink.runtime.io.network.IndividualRecoverableNettyShuffleServiceFactory
    • 若使用AT_LEAST_ONCE語義:

      individual-task-failover.enabled: enabled
      shuffle-service-factory.class: org.apache.flink.runtime.io.network.IndividualRecoverableNettyShuffleServiceFactory
      individual-task-failover.intermediate-checkpointing.interval: 適當的間隔,一般建議為cp周期的1/5到1/10,單位為ms。
      classloader.check-leaked-classloader: false

    如果您的作業使用的是Session模式的叢集,則也需要在Session叢集的配置中增加上述代碼。Session叢集配置詳情請參見步驟一:建立Session叢集

  3. 單擊儲存

  4. 在頁面頂部,單擊停止

  5. 啟動作業,詳情請參見作業啟動