This topic provides answers to some frequently asked questions about checkpoints or savepoints of a fully managed Flink deployment.
How is the minimum time interval between two checkpoints calculated?
What are the differences between GeminiStateBackend in VVR 8.X and GeminiStateBackend in VVR 6.X?
What do I do if the error message "org.apache.flink.util.SerializedThrowable" appears?
What do I do if the error message "No space left on device" appears?
What do I do if the error message "java.lang.NegativeArraySizeException" appears?
What do I do if the size of an incremental checkpoint is the same as the size of a full checkpoint?
How is the minimum time interval between two checkpoints calculated?
The minimum time interval between two checkpoints is calculated from the last successful checkpoint. If the minimum duration between checkpoint attempts is set to 3 and the minimum checkpoint interval is set to 5, the checkpoint interval is adjusted to 5.
In the following scenarios, the minimum duration between checkpoint attempts is 3 minutes, the timeout period is 10 minutes, and the minimum checkpoint interval is 5 minutes.
Scenario 1: The deployment runs as expected. Each checkpoint is successful.
The first checkpoint is triggered at 12:00:00 and becomes successful at 12:00:02. The second checkpoint is triggered at 12:05:02.
Scenario 2: The deployment is abnormal. (A checkpoint times out or fails due to specific reasons. In this example, the checkpoint times out.)
The first checkpoint is triggered at 12:00:00 and becomes successful at 12:00:02. The second checkpoint is triggered at 12:05:02 but fails at 12:15:02 because the checkpoint times out. The third checkpoint is triggered at 12:15:02.
For more information about the setting of the minimum checkpoint interval, see Tuning Checkpointing.
What are the differences between GeminiStateBackend in VVR 8.X and GeminiStateBackend in VVR 6.X?
By default, GeminiStateBackend V3 is used in Ververica Runtime (VVR) 6.X of Realtime Compute for Apache Flink, and GeminiStateBackend V4 is used in VVR 8.X of Realtime Compute for Apache Flink.
Item | Description |
Basic capabilities |
|
State lazy loading configuration |
|
Managed memory | The two versions differ only in the resident set size (RSS) metric in terms of the managed memory.
Note For more information about managed memory, see Set up TaskManager Memory. |
What do I do if the error message "org.apache.flink.util.SerializedThrowable" appears?
Error details
Cause
When you use GeminiStateBackend V3 to create savepoints, the NullPointerException (NPE) error may occur at an extremely low probability. In most cases, this error occurs because the internal memory structure reference is 0 but is not reclaimed in a timely manner.
Solution
In most cases, this error may be fixed after the system runs for a specific period of time or after a restart. This error only causes checkpoint failures but does not affect the correctness of data. You can increase the maximum number of restarts that are allowed when a checkpoint failure occurs.
Upgrade the VVR version to 8.0.1 or later. For more information, see Upgrade the engine version of deployments.
What do I do if the error message "You are using the new V4 state engine to restore old state data from a checkpoint" appears?
Problem description
When I upgrade the VVR version from 6.X to 8.X, the error message "
You are using the new V4 state engine to restore old state data from a checkpoint
" appears.Cause
The checkpoints of GeminiStateBackend V3 and GeminiStateBackend V4 are incompatible.
Solution
Use one of the following methods to resolve the issue:
Create a savepoint in the standard format for your deployment and start the deployment from the state of the savepoint. For more information, see Manually create a savepoint and Start a deployment.
Restart your deployment without states.
(Not recommended) Use GeminiStateBackend V3. In this case, you must configure
state.backend.gemini.engine.type: STREAMING
and restart your deployment. For more information about how to configure parameters, see How do I configure parameters for deployment running?
What do I do if the error message "No space left on device" appears?
Problem description
When a deployment is running, an error message that is similar to the following information appears:
java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_102] at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_102] at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_102] at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_102] at java.io.FilterOutputStream.close(FilterOutputStream.java:158) ~[?:1.8.0_102] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSOutputStream.close(AliyunOSSOutputStream.java:82) ~[?:?] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[?:?] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) ~[?:?] at org.apache.flink.fs.osshadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) ~[?:?] at com.alibaba.flink.statebackend.FlinkDataOutputStreamWapper.close(FlinkDataOutputStreamWapper.java:31) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.common.io.GeminiFileOutputViewImpl.close(GeminiFileOutputViewImpl.java:188) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.filecache.InfiniteFileCache.lambda$flushBatchPages$1(InfiniteFileCache.java:635) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.handler.GeminiEventExecutor.lambda$execute$1(GeminiEventExecutor.java:137) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.handler.GeminiEventExecutor.doEventInQueue(GeminiEventExecutor.java:86) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at com.alibaba.gemini.engine.handler.GeminiEventExecutor.run(GeminiEventExecutor.java:71) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT] at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
Cause
The local disk space is insufficient. The maximum disk size for a single pod is 20 GB. If one of the following types of data is accumulated, the local disk space may become insufficient:
State data
Non-state data of compute nodes, such as logs
Old state data that is accumulated due to exceptions
Solution
You can determine whether the state data is excessively large based on the state size in savepoints. If the error message appears because the state data is excessively large, you can use one of the following solutions to fix the issue based on the VVR version of the deployment:
Solutions for VVR 4.X and VVR 6.X
Use one of the following solutions:
Enable the compute-storage separation feature. By default, this feature is enabled in VVR 4.0.12 and later.
To enable this feature, you can configure the state.backend.gemini.file.cache.type and state.backend.gemini.file.cache.preserved-space parameters. For more information, see Parameters for compute-storage separation.
Increase the degree of parallelism.
If the original degree of parallelism is 1, only one pod can run in a deployment, and the total disk space is 20 GB. If you increase the degree of parallelism to 4, four pods can run in a job, and the total disk space is 80 GB.
Remove expired state values from disks based on time-to-live (TTL).
If you configure TTL and a state value expires based on TTL, the system automatically removes the expired state value. This way, disk space is released.
Solutions for VVR 3.X.X
Use one of the following solutions:
Compress the state data.
For VVR 3.0.X, configure the state.backend.gemini.page.flush.local.compression: Lz4 parameter to compress the state data in your local disks. This way, less space is occupied on your local disks. However, deployment performance deteriorates.
Enable the compute-storage separation feature.
For VVR 3.0.3 or later, configure the state.backend.gemini.file.cache.type: LIMITED parameter. Local disks can store up to 18 GB of state data. If the data size exceeds 18 GB, the excess data is stored in a remote distributed file system (DFS). In this case, the system reads the excess data from the DFS the next time the system reads the data. Each local disk is used as an on-premises file cache.
What do I do if the error message "java.lang.IllegalArgumentException: Illegal Capacity: -1" appears?
Problem description
If a deployment uses the map state to traverse data, the following exception may occur at a low probability when the deployment is running:
java.lang.IllegalArgumentException: Illegal Capacity: -1 at java.util.ArrayList.<init>(ArrayList.java:156) at com.alibaba.gemini.engine.pagestore.PageStoreImpl$3.<init>(PageStoreImpl.java:1113) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:1094) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:112) at com.alibaba.gemini.engine.table.BinaryKMapTable.internalEntries(BinaryKMapTable.java:83) at com.alibaba.gemini.engine.table.AbstractBinaryKMapTable.iterator(AbstractBinaryKMapTable.java:282) at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.doIterator(AbstractGeminiKeyedMapStateImpl.java:496) at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iteratorWithMetrics(AbstractGeminiKeyedMapStateImpl.java:501) at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iterator(AbstractGeminiKeyedMapStateImpl.java:489) at com.alibaba.flink.statebackend.gemini.context.ContextMapState.entries(ContextMapState.java:97) at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:107) at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:102) at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77) at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.<init>(OuterJoinRecordStateViews.java:279) at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey.getRecordsAndNumOfAssociations(OuterJoinRecordStateViews.java:276) at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:229) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:216) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processRight(StreamingJoinOperator.java:134) at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator.processElement2(AbstractStreamingJoinOperator.java:136) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:834)
Cause
This is a known issue. This issue occurs only in VVR 4.0.10.
Solution
Update the VVR version to 4.0.11 or later.
What do I do if the error message "java.lang.NegativeArraySizeException" appears?
Problem description
If a deployment uses the list state, the following exception occurs when the deployment is running:
Caused by: java.lang.NegativeArraySizeException at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)
Cause
The size of the state data of a single key in the list state exceeds 2 GB. A large amount of state data is generated in the following process:
When a deployment runs as expected, the values that are appended to the values for a key in the list state are combined in the merge process. For example, the state data is continuously accumulated for a deployment that uses the list state of window operators.
When the size of the state data accumulates to a specific threshold, an out-of-memory (OOM) error is triggered. After the deployment recovers from the failure, the merge process of the list state causes the size of the temporary byte array that is requested by the state backend to exceed the limit. As a result, this exception occurs.
NoteWhen you use RocksDBStateBackend, this issue may also occur and the error message "ArrayIndexOutOfBoundsException" or "Segmentation fault" appears. For more information, see The EmbeddedRocksDBStateBackend.
Solution
If window operators generate an excessively large amount of state data, we recommend that you reduce the window size.
If the deployment logic generates an excessively large amount of state data, we recommend that you modify the deployment logic. For example, you can split keys.
What do I do if the size of an incremental checkpoint is the same as the size of a full checkpoint?
If the size of an incremental checkpoint is the same as the size of a full checkpoint when you use fully managed Flink, perform the following operations to check whether an issue occurs:
Check whether the incremental savepoint is properly configured and takes effect.
Check whether the deployment runs in special scenarios. In some special scenarios, the size of an incremental checkpoint is expected to be the same as the size of the full checkpoint. Example:
Before data is injected into a deployment at 18:29, the deployment does not process any data. In this case, the checkpoint contains only the initialized state data of the data source. The checkpoint is a full checkpoint because no state data exists.
A total of 1,000,000 data records are injected into the deployment at 18:29. If the data records are completely processed within the current checkpoint interval and no other data is injected into the deployment during the interval, the first incremental checkpoint that is generated during the interval contains all state data of the 1,000,000 data records. The checkpoint interval is 3 minutes.
In this case, the size of the incremental checkpoint is the same as the size of the full checkpoint. The first incremental checkpoint must contain the state of full data to ensure that the entire state can be restored from the incremental checkpoint. Therefore, the first incremental checkpoint is actually a full checkpoint.
In most cases, if the data input is stable and no large-scale state changes occur, the sizes of the second and subsequent incremental checkpoints are different from the size of the full checkpoint. This indicates that the system creates savepoints only for the incremental data of the state data as expected. If the size of the second checkpoint or a subsequent checkpoint is the same as the size of the full checkpoint, check the status and behavior of the system to determine whether a system issue occurs.