全部产品
Search
文档中心

实时计算Flink版:系统检查点或作业快照

更新时间:Nov 12, 2024

本文为您介绍实时计算Flink版系统检查点或作业快照相关的常见问题。

两次Checkpoint最小间隔时间计算方式

最小间隔时间是按照上次成功的Checkpoint开始计算。配置间隔时间为3,最小间隔时间为5,这种情况下,间隔时间会调整为5。

以两个场景进行说明,两个场景Checkpoint间隔时间为3分钟,超时时间为10分钟,最小间隔时间为5分钟。

  • 场景一:作业正常运行(Checkpoint每次都成功)

    12:00第一次开始执行Checkpoint,12:00:02 Checkpoint成功,第二次Checkpoint开始时间为12:05:02。

  • 场景二:作业不正常(Checkpoint因某些原因超时或者失败,本次场景以超时为例)

    12:00第一次开始执行Checkpoint,12:00:02 Checkpoint成功,12:05:02第二次开始执行Checkpoint,12:15:02超时失败,第三次Checkpoint开始时间为12:15:02。

Checkpoint最小间隔时间设置详情请参见Tuning Checkpoint

VVR 8.x和VVR 6.x使用的GeminiStateBackend有什么区别?

实时计算Flink版计算引擎VVR 6.x默认使用V3版本的GeminiStateBackend,VVR 8.x默认使用V4版本的GeminiStateBackend。

分类

详情

基础能力

  • 旧版(V3):支持的功能包括KV分离、存算分离、标准或原生格式作业快照、状态懒加载等。

  • 新版(V4):基于流计算的场景特点,对旧版Gemini核心架构和功能进行了改造升级,在支持旧版Gemini所有功能的基础上,拥有更优的State访问性能、更快速的扩缩容。

状态懒加载参数

  • 新版:state.backend.gemini.file.cache.download.type: LazyDownloadOnRestore

  • 旧版:state.backend.gemini.file.cache.lazy-restore: ON

Managed Memory使用差异

仅在RSS(Resident Set Size)指标上有区别:

  • 新版:在真正使用到内存时,才会向操作系统申请并体现到RSS指标上。

  • 旧版:直接向操作系统申请state's managed memory * 80%,自己做内存管理,这部分会在作业一启动就体现在RSS指标上。

说明

关于Managed Memory的更多解释请参见TaskManager Memory

报错:org.apache.flink.util.SerializedThrowable

  • 报错详情

    image

  • 报错原因

    使用旧版Gemini在进行快照时,有极小概率会遇到NullPointerException(NPE)报错。该报错通常是由于内部内存结构引用为0,但尚未及时回收导致的。

  • 解决方案

    • 通常,该问题在系统运行一段时间后或进行重启后,可以恢复正常。这个问题不会影响数据的正确性,只会导致Checkpoint失败。您可以适当增加Checkpoint失败时的重启容忍次数。

    • 将VVR版本升级到8.0.1及以上版本,详情请参见作业引擎版本升级

报错:You are using the new V4 state engine to restore old state data from a checkpoint

  • 报错详情

    从VVR 6.x升级到VVR 8.x时,报You are using the new V4 state engine to restore old state data from a checkpoint

  • 报错原因

    新版Gemini和旧版Gemini的Checkpoint无法兼容。

  • 解决方案

    您可以采用以下任何一种方式解决:

报错:No space left on device

  • 报错详情

    在作业运行过程中,出现类似以下异常。

    java.io.IOException: No space left on device
      at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_102]
      at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_102]
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_102]
      at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_102]
      at java.io.FilterOutputStream.close(FilterOutputStream.java:158) ~[?:1.8.0_102]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSOutputStream.close(AliyunOSSOutputStream.java:82) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) ~[?:?]
      at org.apache.flink.fs.osshadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) ~[?:?]
      at com.alibaba.flink.statebackend.FlinkDataOutputStreamWapper.close(FlinkDataOutputStreamWapper.java:31) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.common.io.GeminiFileOutputViewImpl.close(GeminiFileOutputViewImpl.java:188) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.filecache.InfiniteFileCache.lambda$flushBatchPages$1(InfiniteFileCache.java:635) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.handler.GeminiEventExecutor.lambda$execute$1(GeminiEventExecutor.java:137) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.handler.GeminiEventExecutor.doEventInQueue(GeminiEventExecutor.java:86) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.handler.GeminiEventExecutor.run(GeminiEventExecutor.java:71) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT]
      at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT]
      at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
  • 报错原因

    本地磁盘空间不足。目前单Pod磁盘大小限制为20 GB,在该限制下,造成本地磁盘空间不足的原因通常有以下几点:

    • 状态数据堆积。

    • 计算节点的非状态数据(例如日志)堆积。

    • 异常导致的旧状态数据堆积。

  • 解决方案

    您可以通过快照里的State Size判断状态数据是不是过大。如果确定因为状态数据过大造成该报错,则可以采用以下解决方案:

    • VVR 4.x和VVR 6.x版本优化方案

      您可以采用以下任何一种方案:

      • 启用存算分离功能(在VVR 4.0.12及以上版本已默认启用存算分离功能)

        即配置state.backend.gemini.file.cache.typestate.backend.gemini.file.cache.preserved-space参数。详情请参见存算分离配置

      • 增加并发度。

        如果原来并发度是1,只能用一个pod,磁盘总空间是20 GB。如果并发加到4,作业就可以使用4个Pod,磁盘总空间相当于是80 GB。

      • 基于State TTL(Time To Live)进行磁盘清理。

        当您设置了State TTL,如果时间超过了State的过期时间,则State数据就过期了,系统就会自动清理掉过期的State数据,即可释放一定的磁盘空间。

    • VVR 3.x.x版本优化方案

      您可以采用以下任何一种方案:

      • 对State进行压缩。

        VVR 3.0.x版本配置state.backend.gemini.page.flush.local.compression: Lz4参数,本地State会有压缩,能降低本地磁盘空间,但一定程度上会降低作业性能。

      • 启用存算分离功能。

        VVR 3.0.3及以上版本配置state.backend.gemini.file.cache.type: LIMITED。本地盘会有一个18 GB的State限制,超过18 GB的数据会被存储到远程DFS,下次读取该部分数据时,会从DFS读取,相当于把本地盘当作一个本地文件缓存。

报错:java.lang.IllegalArgumentException: Illegal Capacity: -1

  • 报错详情

    在作业使用Map State的遍历操作时,在作业运行过程中,有小概率会触发到以下异常。

    java.lang.IllegalArgumentException: Illegal Capacity: -1
      at java.util.ArrayList.<init>(ArrayList.java:156)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl$3.<init>(PageStoreImpl.java:1113)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:1094)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:112)
      at com.alibaba.gemini.engine.table.BinaryKMapTable.internalEntries(BinaryKMapTable.java:83)
      at com.alibaba.gemini.engine.table.AbstractBinaryKMapTable.iterator(AbstractBinaryKMapTable.java:282)
      at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.doIterator(AbstractGeminiKeyedMapStateImpl.java:496)
      at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iteratorWithMetrics(AbstractGeminiKeyedMapStateImpl.java:501)
      at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iterator(AbstractGeminiKeyedMapStateImpl.java:489)
      at com.alibaba.flink.statebackend.gemini.context.ContextMapState.entries(ContextMapState.java:97)
      at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:107)
      at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:102)
      at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
      at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.<init>(OuterJoinRecordStateViews.java:279)
      at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey.getRecordsAndNumOfAssociations(OuterJoinRecordStateViews.java:276)
      at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:229)
      at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:216)
      at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processRight(StreamingJoinOperator.java:134)
      at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator.processElement2(AbstractStreamingJoinOperator.java:136)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
      at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
      at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
      at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
      at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
      at java.lang.Thread.run(Thread.java:834)
  • 报错原因

    产品已知缺陷,仅在VVR 4.0.10版本出现。

  • 解决方案

    将VVR版本升级到4.0.11及以上。

报错:java.lang.NegativeArraySizeException

  • 报错详情

    当作业使用了List State,在作业运行过程中,会出现以下异常。

    Caused by: java.lang.NegativeArraySizeException
      at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
      at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)
  • 报错原因

    List State中单个key对应的State数据过大,即超过了2 GB。State数据过大产生的过程如下:

    1. 在作业正常运行时,List State中单个Key下Append的Value会通过Merge进行组合(例如在包含Window的List State中),State数据不断累积。

    2. 在State数据累积到一定程度时,一开始会触发OOM。而在作业从故障中恢复之后,List State的Merge过程会进一步导致StateBackend申请的临时Byte数组的大小超过可用的限制,从而出现该异常。

    说明

    RocksDBStateBackend也会遇到类似的问题并触发ArrayIndexOutOfBoundsException或者Segmentation fault。详情请参见The EmbeddedRocksDBStateBackend

  • 解决方案

    • 如果是Window算子导致的State数据过大,则建议减小窗口大小。

    • 如果是作业逻辑不合理,则建议调整作业逻辑,例如将Key进行拆分。

全量Checkpoint与增量Checkpoint的大小一致,是否正常?

如果您在使用Flink的情况下,观察到全量Checkpoint与增量Checkpoint的大小一致,您需要:

  • 检查增量快照是否正常配置并生效。

  • 是否为特定情况。在特定情况下,这种现象是正常的,例如:

    1. 在数据注入前(18:29之前),作业没有处理任何数据,此时Checkpoint只包含了初始化的源(Source)状态信息。由于没有其他状态数据,此时的Checkpoint实际上是一个全量Checkpoint。

    2. 在18:29时注入了100万条数据。假设数据在接下来的Checkpoint间隔时间(3分钟)内被完全处理,并且期间没有其他数据注入,此时发生的第一个增量Checkpoint将会包含这100万条数据产生的所有状态信息。

    在这种情况下,全量Checkpoint和增量Checkpoint的大小一致是符合预期的。因为第一个增量Checkpoint需要包含全量数据状态,以确保能够从该点恢复整个状态,导致它实际上也是一个全量Checkpoint。

    增量Checkpoint通常是从第二个Checkpoint开始体现出来的,在数据稳定输入且没有大规模的状态变更时,后续的增量Checkpoint应该显示出大小上的差异,表明系统正常地只对状态的增量部分进行快照。如果仍然一致,则需要进一步审查系统状态和行为,确认是否存在问题。