本文為您介紹Realtime ComputeFlink版系統檢查點或作業快照相關的常見問題。
兩次Checkpoint最小間隔時間計算方式
最小間隔時間是按照上次成功的Checkpoint開始計算。配置間隔時間為3,最小間隔時間為5,這種情況下,間隔時間會調整為5。
以兩個情境進行說明,兩個情境Checkpoint間隔時間為3分鐘,逾時時間為10分鐘,最小間隔時間為5分鐘。
情境一:作業正常運行(Checkpoint每次都成功)
12:00第一次開始執行Checkpoint,12:00:02 Checkpoint成功,第二次Checkpoint開始時間為12:05:02。
情境二:作業不正常(Checkpoint因某些原因逾時或者失敗,本次情境以逾時為例)
12:00第一次開始執行Checkpoint,12:00:02 Checkpoint成功,12:05:02第二次開始執行Checkpoint,12:15:02逾時失敗,第三次Checkpoint開始時間為12:15:02。
Checkpoint最小間隔時間設定詳情請參見Tuning Checkpoint。
VVR 8.x和VVR 6.x使用的GeminiStateBackend有什麼區別?
Realtime ComputeFlink版計算引擎VVR 6.x預設使用V3版本的GeminiStateBackend,VVR 8.x預設使用V4版本的GeminiStateBackend。
分類 | 詳情 |
基礎能力 |
|
狀態懶載入參數 |
|
Managed Memory使用差異 | 僅在RSS(Resident Set Size)指標上有區別:
說明 關於Managed Memory的更多解釋請參見TaskManager Memory。 |
報錯:org.apache.flink.util.SerializedThrowable
報錯詳情
報錯原因
使用舊版Gemini在進行快照時,有極小機率會遇到NullPointerException(NPE)報錯。該報錯通常是由於內部記憶體結構引用為0,但尚未及時回收導致的。
解決方案
通常,該問題在系統運行一段時間後或進行重啟後,可以恢複正常。這個問題不會影響資料的正確性,只會導致Checkpoint失敗。您可以適當增加Checkpoint失敗時的重啟容忍次數。
將VVR版本升級到8.0.1及以上版本,詳情請參見作業引擎版本升級。
報錯:You are using the new V4 state engine to restore old state data from a checkpoint
報錯詳情
從VVR 6.x升級到VVR 8.x時,報
You are using the new V4 state engine to restore old state data from a checkpoint
。報錯原因
新版Gemini和舊版Gemini的Checkpoint無法相容。
解決方案
您可以採用以下任何一種方式解決:
無狀態重啟作業。
(不推薦)繼續使用舊版本Gemini。需要配置
state.backend.gemini.engine.type: STREAMING
後重啟作業才會生效。參數配置方法請參見如何配置作業運行參數?
報錯:No space left on device
報錯詳情
在作業運行過程中,出現類似以下異常。
java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_102] at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_102] at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_102] at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_102] at java.io.FilterOutputStream.close(FilterOutputStream.java:158) ~[?:1.8.0_102] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSOutputStream.close(AliyunOSSOutputStream.java:82) ~[?:?] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[?:?] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) ~[?:?] at org.apache.flink.fs.osshadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) ~[?:?] at com.alibaba.flink.statebackend.FlinkDataOutputStreamWapper.close(FlinkDataOutputStreamWapper.java:31) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.common.io.GeminiFileOutputViewImpl.close(GeminiFileOutputViewImpl.java:188) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.filecache.InfiniteFileCache.lambda$flushBatchPages$1(InfiniteFileCache.java:635) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.handler.GeminiEventExecutor.lambda$execute$1(GeminiEventExecutor.java:137) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.handler.GeminiEventExecutor.doEventInQueue(GeminiEventExecutor.java:86) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.handler.GeminiEventExecutor.run(GeminiEventExecutor.java:71) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT] at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
報錯原因
本地磁碟空間不足。目前單Pod磁碟大小限制為20 GB,在該限制下,造成本地磁碟空間不足的原因通常有以下幾點:
狀態資料堆積。
計算節點的非狀態資料(例如日誌)堆積。
異常導致的舊狀態資料堆積。
解決方案
您可以通過快照裡的State Size判斷狀態資料是不是過大。如果確定因為狀態資料過大造成該報錯,則可以採用以下解決方案:
VVR 4.x和VVR 6.x版本最佳化方案
您可以採用以下任何一種方案:
啟用存算分離功能(在VVR 4.0.12及以上版本已預設啟用存算分離功能)
即配置state.backend.gemini.file.cache.type和state.backend.gemini.file.cache.preserved-space參數。詳情請參見存算分離配置。
增加並發度。
如果原來並發度是1,只能用一個pod,磁碟總空間是20 GB。如果並發加到4,作業就可以使用4個Pod,磁碟總空間相當於是80 GB。
基於State TTL(Time To Live)進行磁碟清理。
當您設定了State TTL,如果時間超過了State的到期時間,則State資料就到期了,系統就會自動清理掉到期的State資料,即可釋放一定的磁碟空間。
VVR 3.x.x版本最佳化方案
您可以採用以下任何一種方案:
對State進行壓縮。
VVR 3.0.x版本配置state.backend.gemini.page.flush.local.compression: Lz4參數,本地State會有壓縮,能降低本地磁碟空間,但一定程度上會降低作業效能。
啟用存算分離功能。
VVR 3.0.3及以上版本配置state.backend.gemini.file.cache.type: LIMITED。本地碟會有一個18 GB的State限制,超過18 GB的資料會被儲存到遠程DFS,下次讀取該部分資料時,會從DFS讀取,相當於把本地碟當作一個本地檔案快取。
報錯:java.lang.IllegalArgumentException: Illegal Capacity: -1
報錯詳情
在作業使用Map State的遍曆操作時,在作業運行過程中,有小機率會觸發到以下異常。
java.lang.IllegalArgumentException: Illegal Capacity: -1 at java.util.ArrayList.<init>(ArrayList.java:156) at com.alibaba.gemini.engine.pagestore.PageStoreImpl$3.<init>(PageStoreImpl.java:1113) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:1094) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:112) at com.alibaba.gemini.engine.table.BinaryKMapTable.internalEntries(BinaryKMapTable.java:83) at com.alibaba.gemini.engine.table.AbstractBinaryKMapTable.iterator(AbstractBinaryKMapTable.java:282) at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.doIterator(AbstractGeminiKeyedMapStateImpl.java:496) at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iteratorWithMetrics(AbstractGeminiKeyedMapStateImpl.java:501) at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iterator(AbstractGeminiKeyedMapStateImpl.java:489) at com.alibaba.flink.statebackend.gemini.context.ContextMapState.entries(ContextMapState.java:97) at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:107) at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:102) at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77) at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.<init>(OuterJoinRecordStateViews.java:279) at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey.getRecordsAndNumOfAssociations(OuterJoinRecordStateViews.java:276) at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:229) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:216) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processRight(StreamingJoinOperator.java:134) at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator.processElement2(AbstractStreamingJoinOperator.java:136) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:834)
報錯原因
產品已知缺陷,僅在VVR 4.0.10版本出現。
解決方案
將VVR版本升級到4.0.11及以上。
報錯:java.lang.NegativeArraySizeException
報錯詳情
當作業使用了List State,在作業運行過程中,會出現以下異常。
Caused by: java.lang.NegativeArraySizeException at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)
報錯原因
List State中單個key對應的State資料過大,即超過了2 GB。State資料過大產生的過程如下:
在作業正常運行時,List State中單個Key下Append的Value會通過Merge進行組合(例如在包含Window的List State中),State資料不斷累積。
在State資料累積到一定程度時,一開始會觸發OOM。而在作業從故障中恢複之後,List State的Merge過程會進一步導致StateBackend申請的臨時Byte數組的大小超過可用的限制,從而出現該異常。
說明RocksDBStateBackend也會遇到類似的問題並觸發ArrayIndexOutOfBoundsException或者Segmentation fault。詳情請參見The EmbeddedRocksDBStateBackend。
解決方案
如果是Window運算元導致的State資料過大,則建議減小視窗大小。
如果是作業邏輯不合理,則建議調整作業邏輯,例如將Key進行拆分。
全量Checkpoint與增量Checkpoint的大小一致,是否正常?
如果您在使用Flink的情況下,觀察到全量Checkpoint與增量Checkpoint的大小一致,您需要:
檢查增量快照是否正常配置並生效。
是否為特定情況。在特定情況下,這種現象是正常的,例如:
在資料注入前(18:29之前),作業沒有處理任何資料,此時Checkpoint只包含了初始化的源(Source)狀態資訊。由於沒有其他狀態資料,此時的Checkpoint實際上是一個全量Checkpoint。
在18:29時注入了100萬條資料。假設資料在接下來的Checkpoint間隔時間(3分鐘)內被完全處理,並且期間沒有其他資料注入,此時發生的第一個增量Checkpoint將會包含這100萬條資料產生的所有狀態資訊。
在這種情況下,全量Checkpoint和增量Checkpoint的大小一致是符合預期的。因為第一個增量Checkpoint需要包含全量資料狀態,以確保能夠從該點恢複整個狀態,導致它實際上也是一個全量Checkpoint。
增量Checkpoint通常是從第二個Checkpoint開始體現出來的,在資料穩定輸入且沒有大規模的狀態變更時,後續的增量Checkpoint應該顯示出大小上的差異,表明系統正常地只對狀態的增量部分進行快照。如果仍然一致,則需要進一步審查系統狀態和行為,確認是否存在問題。