All Products
Search
Document Center

Realtime Compute for Apache Flink:Performance optimization

Last Updated:Jun 28, 2024

This topic describes how to optimize primary key tables and append scalable tables of Apache Paimon (Paimon) in different scenarios.

Limits

Paimon tables are supported only in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.5 or later.

Primary key tables

Data writing

In most cases, write operations in a Paimon table are blocked by the compaction of small files. If a bucket of the table contains a large number of small files or the changelog-producer parameter is set to lookup for the table, the compaction of small files must be completed at checkpointing. If the compaction process requires a long period of time, checkpoints may time out, which leads to backpressure and affects processing efficiency.

To address the preceding issue, use the following methods:

  • Adjust the parallelism of a Paimon sink

    Use SQL Hints to configure the sink.parallelism parameter of the Paimon sink. Please note that changes in parallelism may affect resource usage.

  • Adjust the Flink checkpoint configuration

    The checkpoint interval affects the data latency in Paimon. Data latency refers to the time required for the written data to become available for consumption. If your business allows for a higher data latency, you can increase the checkpoint interval to enhance write performance.

    • Configure the execution.checkpointing.interval parameter to increase the checkpoint interval. For more information, see How do I configure parameters for deployment running?

    • Add the execution.checkpointing.max-concurrent-checkpoints: 3 configuration, which specifies that up to three checkpoints can be simultaneously performed. This reduces the impact of long tails in concurrent checkpoints.

    • Consider running a batch deployment.

  • Enable completely asynchronous compaction in Paimon

    Completely asynchronous compaction of small files does not block checkpoints.

    To enable completely asynchronous compaction, configure the following parameters by using the ALTER TABLE statement or SQL Hints:

    'num-sorted-run.stop-trigger' = '2147483647',
    'sort-spill-threshold' = '10',
    'changelog-producer.lookup-wait' = 'false'

    Parameter

    Type

    Default value

    Description

    num-sorted-run.stop-trigger

    Integer

    5

    If the number of small files in a bucket exceeds the value of this parameter, data writing is stopped for the bucket until the small files are compacted. This prevents an uncontrollable increase in the number of small files due to slow compaction. The number of small files significantly impacts the efficiency of batch consumption and ad hoc queries for Online Analytical Processing (OLAP), but has minimal impact on stream consumption.

    If you set this parameter to an excessively large value, data writing continues regardless of the number of small files. This allows small files to be compacted only when resources are sufficient and achieves completely asynchronous compaction. To monitor the number of small files in a bucket, query the files table provided by Paimon.

    sort-spill-threshold

    Integer

    N/A

    By default, merge sort is used to compact small files in memory. The sort reader of each small file occupies a specific amount of heap memory. As the number of small files increases, the heap memory may become insufficient.

    You can configure this parameter to prevent heap memory shortage. If the number of small files exceeds the value of this parameter, merge sort is replaced with external sorting.

    changelog-producer.lookup-wait

    Boolean

    true

    Specifies whether to wait for changelog generation at checkpointing when the changelog-producer parameter is set to lookup. Changelog generation involves compaction of small files. Valid values:

    • true: waits for changelog generation. You can estimate the processing latency based on the speed of checkpoint creation. Then, you can determine whether to scale resources or switch to completely asynchronous compaction.

    • false: does not wait for changelog generation. This allows concurrent tasks that completed file compaction to continue processing subsequent data, improves CPU utilization, and does not affect the generated changelogs. In this case, the speed of checkpoint creation does not indicate the latency of data processing.

  • Change the file format in Paimon

    If your business focuses on batch or stream consumption and does not involve ad hoc queries for OLAP, configure the following parameters to change the data file format and disable statistics collection. This improves the efficiency of write operations.

    'file.format' = 'avro',
    'metadata.stats-mode' = 'none'
    Note

    You must configure the preceding parameters when you create a table. You cannot change the data file format of an existing table.

Data consumption

  • Adjust the parallelism of a Paimon source

    Use SQL Hints to configure the scan.parallelism parameter of the Paimon source.

  • Use the read-optimized table provided by Paimon

    During batch consumption, the full scan phase of stream consumption, and ad hoc queries for OLAP, the performance of Paimon source tables is mainly affected by small files. Paimon source tables need to compact data from small files in the memory, and a large number of small files reduces the efficiency of merge sort during compaction. Small files also affect the efficiency of write operations. You need to balance write efficiency and consumption efficiency.

    To improve write efficiency, configure the parameters as described in Enable completely asynchronous compaction. If you do not need to consume the most recent data, you can use the read-optimized table to improve consumption efficiency.

Append scalable tables

Data writing

In most cases, write performance in an append scalable table is determined by the sink parallelism and the bandwidth of the file system or Object Storage Service (OSS). To eliminate potential performance bottlenecks, ensure that the file system you use has sufficient bandwidth for data writing. Then, use the following methods to optimize an append scalable table:

  • Adjust the parallelism of a Paimon sink

    Use SQL Hints to configure the sink.parallelism parameter of the Paimon sink. Please note that changes in parallelism may affect resource usage.

  • Check data skew

    Upstream data is not shuffled before being written to append scalable tables. If the upstream data is significantly skewed, the resource utilization of specific data writers may be low. This reduces the write efficiency. To resolve this issue, set the sink.parallelism parameter to a value different from the parallelism of the upstream node. You can verify the effect of this configuration in the development console of Realtime Compute for Apache Flink. If the sink operator and its upstream node are in different subtasks, data is shuffled.

Data consumption

  • Adjust the parallelism of a Paimon source

    Use SQL Hints to configure the scan.parallelism parameter of the Paimon source.

  • Use data sorting

    Data order significantly impacts the efficiency of batch processing and ad hoc queries for OLAP. You can use data sorting to improve the query efficiency of append scalable tables. To use data sorting, you must complete the required configurations. For more information, see Data management configuration. You must also run the Flink deployment in batch mode and configure the parameters in the Entry Point Main Arguments field.

    For example, if you want to sort the data in a partition based on the date and type fields, add the following configurations in the Entry Point Main Arguments field:

    compact
    --warehouse 'oss://your-bucket/data-warehouse'
    --database 'your_database'
    --table 'your_table'
    --order_strategy 'zorder'
    --order_by 'date,type'
    --partition 'dt=20240311,hh=08;dt=20240312,hh=09'
    --catalog_conf 'fs.oss.endpoint=oss-cn-hangzhou-internal.aliyuncs.com'
    --catalog_conf 'fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com'
    --table_conf 'write-buffer-size=256 MB'
    --table_conf 'your_table.logRetentionDuration=7 days'

    The following table describes the parameters.

    Parameter

    Description

    warehouse

    The OSS directory that contains the data warehouse to which the catalog that contains the Paimon table belongs.

    database

    The name of the database that contains the Paimon table.

    table

    The name of the Paimon table.

    order_strategy

    The strategy used to sort data in the Paimon table. Valid values:

    • zorder: recommended for a range query in which the number of filter fields is less than five.

    • hilbert: recommended for a range query in which the number of filter fields is greater than or equal to five.

    • order: recommended if the filter contains only equality conditions.

    order_by

    The columns based on which the data is sorted. Separate multiple columns with commas (,).

    partition

    The partitions that you want to sort. Separate multiple partitions with semicolons (;). If the table is not partitioned, ignore this parameter.

    catalog_conf

    The parameters in the WITH clause configured for the catalog that contains the Paimon table. Specify each parameter on a separate line.

    table_conf

    The temporary configuration of the Paimon table. This configuration is equivalent to SQL Hints. Specify each parameter on a separate line.