All Products
Search
Document Center

Realtime Compute for Apache Flink:Optimize batch processing of Realtime Compute for Apache Flink

Last Updated:Jul 19, 2024

This topic describes basic principles and configuration optimization methods for batch processing of Realtime Compute for Apache Flink.

Background information

As a computing framework with unified stream processing and batch processing, Flink can process batch data and streaming data at the same time. The stream processing and batch processing modes of Flink share multiple core execution mechanisms. However, the two modes have key differences in terms of deployment execution mechanisms, configuration parameters, and performance optimization. This topic describes the execution mechanisms and configuration parameters of batch deployments of Realtime Compute for Apache Flink. You can optimize deployments more efficiently after you understand the key differences between the stream and batch processing modes. This helps troubleshoot issues that you may encounter when you run batch deployments of Realtime Compute for Apache Flink.

Note

Realtime Compute for Apache Flink supports batch processing in features such as draft development, deployment O&M, workflows, queue management, and data profiling. For more information about how to perform batch processing, see Getting started with batch processing of Realtime Compute for Apache Flink.

Differences between batch and streaming deployments

You must understand the differences between the execution mechanisms of batch and streaming deployments before you learn about the configuration parameters and optimization methods of batch deployments.

Execution mode

  • Streaming deployment: The stream processing mode focuses on processing continuous and unbounded data streams. This mode is used to achieve low-latency data processing. In this mode, data is instantly passed between operators and processed in pipeline mode. Therefore, the subtasks of all streaming deployment operators are deployed and run at the same time.

image.png

  • Batch deployment: The batch processing mode focuses on processing bounded datasets. This mode is used to achieve a high throughput when processing data. In this mode, a deployment is usually run in multiple phases. For phases that are independent of each other, subtasks can run in parallel to improve resource utilization. For phases that have data dependencies, downstream subtasks can be started only after the upstream subtasks are complete.

image.png

Data transmission

  • Streaming deployment: To process data with low latency, the intermediate data of a streaming deployment is temporarily stored in the memory for direct data transmission. The intermediate data is not persisted. If the processing capability of the downstream operators is insufficient, the upstream operators may have backpressure.

  • Batch deployment: The intermediate result data of a batch deployment is written to an external storage system for downstream operators to use. By default, the result files are stored in the local disk of the TaskManagers. If the remote shuffle service is used, the result files are stored in the remote shuffle service.

Resource requirements

  • Streaming deployment: All resources must be allocated before a streaming deployment starts. This ensures that all subtasks of the deployment can be deployed and run at the same time.

  • Batch deployment: The allocation of all resources is not required before a batch deployment starts. Realtime Compute for Apache Flink can batch schedule subtasks whose input data is prepared. This efficiently utilizes existing resources and ensures smooth execution of batch deployments in the scenario where resources are limited (even in the scenario where only one slot is available).

Restart of a failed subtask

  • Streaming deployment: A streaming deployment can be resumed from the last checkpoint or savepoint if a failure occurs. This way, the deployment can be resumed at the lowest cost. When a deployment resumes, all subtasks of the deployment are restarted because the intermediate result data of streaming deployments is not persisted.

  • Batch deployment: The intermediate result data of a batch deployment is stored in disks. Therefore, the intermediate result data can be reused when a subtask restarts due to a failure. Only the failed subtask and its downstream subtasks need to be restarted without the need to backtrack all data. This reduces the number of subtasks that need to be restarted after a failure occurs and improves the efficiency of deployment resuming. The subtasks are restarted from the beginning because batch deployments do not have a checkpoint mechanism.

Key configuration parameters and optimization methods

This section describes the key configurations of batch deployments of Realtime Compute for Apache Flink.

Resource configurations

CPU and memory resources

In the Resources section of the Configuration tab of a deployment, you can configure CPU and memory resources for a single JobManager and a single TaskManager. Take note of the following configuration suggestions:

  • JobManager: We recommend that you configure one CPU core and at least 4 GiB of memory for the JobManager of a deployment. This ensures smooth scheduling and management of the deployment.

  • TaskManager: We recommend that you configure resources for a TaskManager based on the number of slots. Specifically, we recommend that you configure 1 CPU core and 4 GiB of memory for each slot. For a TaskManager with n slots, n CPU cores and 4n GiB of memory are recommended.

Note

By default, one slot is allocated to each TaskManager in a batch deployment of Realtime Compute for Apache Flink. To reduce the overhead from TaskManager scheduling and management, you can increase the number of slots for each TaskManager to 2 or 4.

The disk space of each TaskManager is limited and proportional to the number of CPU cores that you configure. Specifically, each CPU core is allocated 20 GiB of disk space. A minimum of 20 GiB and a maximum of 200 GiB of disk space can be allocated to a TaskManager.

If you increase the number of slots on each TaskManager, more subtasks are run on the same TaskManager node. This occupies more local disk space and may lead to insufficient disk space. If the disk space is insufficient, the deployment fails and restarts.

The JobManager and TaskManagers of a large-scale deployment or a deployment with a complex network topology may require higher resource configurations. In this case, you must configure more resources for the deployment based on your business requirements to ensure the efficient and stable running of the deployment.

If you encounter a resource-related issue when you run a deployment, you can troubleshoot the issue by following the instructions provided in the following document:

Troubleshooting

Important

To ensure the stable running of deployments, you must configure at least 0.5 CPU core and 2 GiB of memory for each JobManager and TaskManager.

Maximum number of slots

You must configure the maximum number of slots that can be allocated to a Realtime Compute for Apache Flink deployment. Batch deployments of Realtime Compute for Apache Flink can run in scenarios where resources are limited. You can configure the maximum number of slots in a batch deployment to specify the maximum number of resources that can be used by the deployment. This helps prevent batch deployments from occupying an excessive number of resources and affecting the running of other deployments.

Parallelism

You can configure the global parallelism or the automatic parallelism inference feature in the Resources section of the Configuration tab of a deployment.

  • Global parallelism: The global parallelism determines the maximum number of subtasks that can be run in parallel in a deployment. You can enter a value in the Parallelism field in the Resources section of the Configuration tab of a deployment. This value is used as the default global parallelism.

  • Automatic parallelism inference: After you configure the automatic parallelism inference feature for a batch deployment of Realtime Compute for Apache Flink, the deployment automatically infers the parallelism by analyzing the total amount of data consumed by each operator and the average amount of data expected to be processed by each subtask. This helps you optimize the parallelism configuration.

In Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0 or later, you can configure the parameters that are described in the following table to further optimize the automatic parallelism inference feature. You can configure the parameters in the Parameters section of the Configuration tab of a deployment.

Note

In Realtime Compute for Apache Flink that uses VVR 8.0 or later, the automatic parallelism inference feature is enabled by default for batch deployments. The global parallelism that you configure is used as the upper limit of the automatically inferred parallelism. We recommend that you use Realtime Compute for Apache Flink that uses VVR 8.0 or later to improve the performance of batch deployments.

Parameter

Description

Default value

execution.batch.adaptive.auto-parallelism.enabled

Specifies whether to enable the automatic parallelism inference feature.

true

execution.batch.adaptive.auto-parallelism.min-parallelism

The minimum value of the automatically inferred parallelism.

1

execution.batch.adaptive.auto-parallelism.max-parallelism

The maximum value of the automatically inferred parallelism. If this parameter is not configured, the global parallelism is used as the default value.

128

execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task

The average amount of data expected to be processed by each subtask. Realtime Compute for Apache Flink determines the parallelism of an operator based on the configuration of this parameter and the amount of data to be processed by the operator.

16MiB

execution.batch.adaptive.auto-parallelism.default-source-parallelism

The default parallelism of the source operator. Realtime Compute for Apache Flink cannot accurately perceive the amount of data to be read by the source operator. Therefore, you need to configure the parallelism of the source operator. If this parameter is not configured, the global parallelism is used.

1

FAQ

What is the difference between parallelism and slots?

Parallelism specifies the maximum number of subtask instances that can be run at the same time in a deployment. It reflects the theoretical upper limit of the data processing performance of a deployment. A slot is a resource allocation unit in a Realtime Compute for Apache Flink deployment. The number of slots determines how many subtask instances can be run at the same time in a deployment.

A streaming deployment obtains all resources at once to run all subtasks at the same time. If the slot sharing mechanism is enabled by default, the number of slots to be applied for is the same as the value of the global parallelism of a streaming deployment in most cases. This ensures that all subtasks can obtain the required resources.

Batch deployments process a limited number of datasets without the need to obtain all resources that are required for all subtasks at one time. The global parallelism specifies the maximum number of parallel subtasks on each operator of a deployment. The actual number of parallel subtasks depends on the available slots.

What do I do if a batch deployment becomes stuck when it is running?

You can monitor the memory usage, CPU utilization, and thread usage of the TaskManagers of a deployment. For more information, see Monitor deployment performance.

  • Troubleshoot memory issues: Check the memory usage to determine whether garbage collection (GC) frequently occurs due to insufficient memory. If the memory of the TaskManagers is insufficient, increase memory resources for the TaskManagers to reduce performance issues caused by frequent GC.

  • Analyze CPU usage: Check whether specific threads consume a large number of CPU resources.

  • Trace thread stacks: Analyze execution bottlenecks of the current operator based on thread stack information.

What do I do if the error message "No space left on device" appears?

If the error message "No space left on device" appears when you run a batch deployment of Realtime Compute for Apache Flink, the local disk space that is used by the TaskManagers of the deployment to store intermediate result files is used up. The available disk space of each TaskManager is limited and proportional to the number of CPU cores that you configure. Specifically, each CPU core is allocated 20 GiB of disk space. A minimum of 20 GiB and a maximum of 200 GiB of disk space can be allocated to a TaskManager.

Solutions:

  • Reduce the number of slots on each TaskManager. This reduces the number of parallel subtasks on a single operator and thereby reduces the demand for local disk space.

  • Increase the number of CPU cores of a TaskManager. This expands the disk space of the TaskManager.

References