全部產品
Search
文件中心

Realtime Compute for Apache Flink:系統檢查點或作業快照

更新時間:Mar 01, 2025

本文為您介紹Realtime ComputeFlink版系統檢查點或作業快照相關的常見問題。

開啟minibatch,table.exec.state.ttl到期後,為什麼無新資料更新?

當minibatch開啟時,資料是以批量方式進行計算並儲存在State中,而State中的資料是基於之前的全量計算結果。如果State由於TTL到期而被清除,之前的累積計算結果也消失了,導致無法根據minibatch的結果更新資料。

相反,如果未開啟minibatch,當State由於TTL到期時,對應到期key下的資料將重新開始累計計算並輸出,不會存在無資料更新的情況,但因為資料更新頻率增加會導致資料處理延遲等其他影響。

因此,您需要根據自身業務情境來配置minibatch與TTL的使用方式。

如何計算下一次周期性Checkpoint的開始時間?

目前間隔時間和最小間隔兩個參數能夠影響下一次Checkpoint開始時間。當某一時刻同時滿足以下兩個條件時,下一次Checkpoint開始觸發。其中:

  • 間隔時間:<上一次開始時間,下一次開始時間> 的最小時間差。

  • 最小間隔:<上一次結束時間,下一次開始時間> 的最小時間差。

以兩個情境進行說明,兩個情境Checkpoint間隔時間為3分鐘,最小間隔時間為3分鐘,逾時時間為10分鐘。

  • 情境一:作業正常運行(Checkpoint每次都成功)

    12:00第一次開始執行Checkpoint,12:00:02 Checkpoint成功,第二次Checkpoint開始時間為12:03:00。

  • 情境二:作業不正常(Checkpoint因某些原因逾時或者失敗,本情境以逾時為例)

    12:00第一次開始執行Checkpoint,12:00:02 Checkpoint成功,12:03:00第二次開始執行Checkpoint,12:13:00逾時建立失敗,第三次Checkpoint開始時間為12:16:00。

Checkpoint最小間隔時間設定詳情請參見Tuning Checkpoint

VVR 8.x和VVR 6.x使用的GeminiStateBackend有什麼區別?

Realtime ComputeFlink版計算引擎VVR 6.x預設使用V3版本的GeminiStateBackend,VVR 8.x預設使用V4版本的GeminiStateBackend。

分類

詳情

基礎能力

  • 舊版(V3):支援的功能包括KV分離、存算分離、標準或原生格式作業快照、狀態懶載入等。

  • 新版(V4):基於Realtime Compute的情境特點,對舊版Gemini核心架構和功能進行了改造升級,在支援舊版Gemini所有功能的基礎上,擁有更優的State訪問效能、更快速的擴縮容。

狀態懶載入參數

  • 新版:state.backend.gemini.file.cache.download.type: LazyDownloadOnRestore

  • 舊版:state.backend.gemini.file.cache.lazy-restore: ON

Managed Memory使用差異

僅在RSS(Resident Set Size)指標上有區別:

  • 新版:在真正使用到記憶體時,才會向作業系統申請並體現到RSS指標上。

  • 舊版:直接向作業系統申請state's managed memory * 80%,自己做記憶體管理,這部分會在作業一啟動時體現在RSS指標上。

說明

關於Managed Memory的更多解釋請參見TaskManager Memory

全量Checkpoint與增量Checkpoint的大小一致,是否正常?

如果您在使用Flink的情況下,觀察到全量Checkpoint與增量Checkpoint的大小一致,您需要:

  • 檢查增量快照是否正常配置並生效。

  • 是否為特定情況。在特定情況下,這種現象是正常的,例如:

    1. 在資料注入前(18:29之前),作業沒有處理任何資料,此時Checkpoint只包含了初始化的源(Source)狀態資訊。由於沒有其他狀態資料,此時的Checkpoint實際上是一個全量Checkpoint。

    2. 在18:29時注入了100萬條資料。假設資料在接下來的Checkpoint間隔時間(3分鐘)內被完全處理,並且期間沒有其他資料注入,此時發生的第一個增量Checkpoint將會包含這100萬條資料產生的所有狀態資訊。

    在這種情況下,全量Checkpoint和增量Checkpoint的大小一致是符合預期的。因為第一個增量Checkpoint需要包含全量資料狀態,以確保能夠從該點恢複整個狀態,這導致它實際上也是一個全量Checkpoint。

    增量Checkpoint通常是從第二個Checkpoint開始體現出來的,在資料穩定輸入且沒有大規模的狀態變更時,後續的增量Checkpoint應該顯示出大小上的差異,表明系統正常地只對狀態的增量部分進行快照。如果仍然一致,則需要進一步審查系統狀態和行為,確認是否存在問題。

Python作業,如果Checkpoint慢怎麼辦?

  • 問題原因

    Python運算元內部有一定的緩衝,在進行Checkpoint時,需要將緩衝中的資料全部處理完。因此,如果Python UDF的效能較差,則會導致Checkpoint時間變長,從而影響作業執行。

  • 解決方案

    將緩衝調小,您需要在其他配置中設定以下參數,具體操作請參見如何配置自訂的作業運行參數?

    python.fn-execution.bundle.size:預設值為100000,單位是條數。
    python.fn-execution.bundle.time:預設值為1000,單位是毫秒。

    參數的詳細資料請參見Flink Python配置

作業出現Checkpoint異常,該如何排查?

  1. 診斷異常類型

    監控警示頁簽或者狀態集管理中查看Checkpoint歷史資訊,確認異常類型。例如,Checkpoint逾時或者寫入失敗等。

    image

  2. 分類定位與處理

    • 情境一:頻繁Checkpoint逾時:需排查作業是否存在反壓,分析反壓根源,定位慢運算元並進行對應處理(資源調整或者配置調整),排查方法詳情請參見如何排查作業反壓問題?

    • 情境二:Checkpoint寫入失敗:按以下步驟尋找TM日誌,並根據日誌提示進行原因分析和定位。

      1. 在作業日誌頁簽的Checkpoints介面,單擊Checkpoints歷史。

        image

      2. 點擊異常Checkpoints左側的+號後,查看Operator的Checkpoint情況。

      3. 點擊異常的Operators左側的+號後,點擊異常的SubTasks ID,單擊ID跳轉至對應的TM中。

        image

報錯:You are using the new V4 state engine to restore old state data from a checkpoint

  • 報錯詳情

    從VVR 6.x升級到VVR 8.x時,報You are using the new V4 state engine to restore old state data from a checkpoint

  • 報錯原因

    VVR 6.x與8.x使用的Gemini狀態後端版本不一致,Checkpoint不相容。

  • 解決方案

    您可以採用以下任何一種方式解決:

    • 建立標準格式的作業快照並從該狀態啟動作業,詳情請參見手動建立作業快照作業啟動

    • 無狀態重啟作業。

    • (不推薦)繼續使用舊版本Gemini。需要配置state.backend.gemini.engine.type: STREAMING後重啟作業才會生效。參數配置方法請參見如何配置作業運行參數?

    • (不推薦)繼續使用 VVR 6.x 版本的引擎啟動作業。

報錯:java.lang.NegativeArraySizeException

  • 報錯詳情

    當作業使用了List State,在作業運行過程中,會出現以下異常。

    Caused by: java.lang.NegativeArraySizeException
      at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
      at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)
  • 報錯原因

    List State中單個key對應的State資料過大,即超過了2 GB。State資料過大產生的過程如下:

    1. 在作業正常運行時,List State中單個Key下Append的Value會通過Merge進行組合(例如在包含Window的List State中),State資料不斷累積。

    2. 在State資料累積到一定程度時,一開始會觸發OOM。而在作業從故障中恢複之後,List State的Merge過程會進一步導致StateBackend申請的臨時Byte數組的大小超過可用的限制,從而出現該異常。

    說明

    RocksDBStateBackend也會遇到類似的問題並觸發ArrayIndexOutOfBoundsException或者Segmentation fault。詳情請參見The EmbeddedRocksDBStateBackend

  • 解決方案

    • 如果是Windows運算元導致的State資料過大,則建議減小視窗大小。

    • 如果是作業邏輯不合理,則建議調整作業邏輯,例如將Key進行拆分。

報錯:org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots.

  • 報錯詳情

    org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints
  • 報錯原因

    該錯誤是在使用Kafka作為Sink時,連續多次的Checkpoint失敗導致。

  • 解決方案

    通過execution.checkpointing.timeout參數調整Checkpoint的逾時時間長度,以確保它不會因為逾時而失敗。參數配置詳情請參見如何配置自訂的作業運行參數?

報錯:Exceeded checkpoint tolerable failure threshold

  • 報錯詳情

    org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold.
      at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
  • 報錯原因

    設定的Checkpoint容忍次數過低,導致超過該次數的Checkpoint失敗後作業觸發Failover。未設定該參數時預設無法容忍任何Checkpoint失敗。

  • 解決方案

    設定execution.checkpointing.tolerable-failed-checkpoints: num參數的num值來調整任務允許Checkpoint失敗的次數。num需要為0或正整數。如果num為0,則表示不允許存在任何Checkpoint異常或者失敗。參數配置詳情請參見如何配置自訂的作業運行參數?