All Products
Search
Document Center

Realtime Compute for Apache Flink:Write data to and consume data from a Paimon table

Last Updated:Jul 01, 2024

This topic describes how to perform insert, update, overwrite, and delete operations on Apache Paimon (Paimon) tables by using the development console of Realtime Compute for Apache Flink. This topic also describes how to consume data from Paimon tables based on a specific offset.

Prerequisites

A Paimon catalog and a Paimon table are created. For more information, see Manage Apache Paimon catalogs.

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.5 or later supports Paimon tables.

Write data to a Paimon table

Use the CTAS or CDAS statement to synchronize data and schema changes

For information about how to use the CREATE TABLE AS (CTAS) or CREATE DATABASE AS (CDAS) statement, see Manage Apache Paimon catalogs.

Use the INSERT INTO statement to insert or update data

Use the INSERT OVERWRITE statement to overwrite data

Overwriting is the process of replacing old data with new data. The old data is removed and can no longer be accessed. You can use the INSERT OVERWRITE statement to partially or completely overwrite a Paimon table. In the following examples, a table named my_table is used.

Note
  • The INSERT OVERWRITE statement applies only to batch deployments.

  • By default, changes resulted from the INSERT OVERWRITE statement cannot be consumed by downstream operators in streaming mode. For information about how to consume the changes in streaming mode, see Consume results of the INSERT OVERWRITE statement.

  • Overwrite all data in an unpartitioned table.

    INSERT OVERWRITE my_table SELECT ...;
  • Overwrite a partition of the table, such as the dt=20240108,hh=06 partition.

    INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;
  • Overwrite certain partitions of the table. Specify the partitions that you want to overwrite in the SELECT statement.

    INSERT OVERWRITE my_table SELECT ...;
  • Overwrite all data in a partitioned table.

    INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;

Use the DELETE statement to delete data

You can use the DELETE statement to delete data from a primary key table. You must execute the DELETE statement in a script.

-- Delete all data records whose currency field is 'UNKNOWN' from the my_table table. 
DELETE FROM my_table WHERE currency = 'UNKNOWN';

Ignore DELETE changes

If a primary key table receives a DELETE change, the corresponding data is deleted by default. To ignore the DELETE changes and prevent data deletion, use SQL hints to set the ignore-delete parameter to true.

Parameter

Description

Data Type

Default value

ignore-delete

Specifies whether to ignore DELETE changes from upstream.

Boolean

false

Adjust sink parallelism

You can use SQL hints to manually change the parallelism of the sink operator. The following table describes the parameter.

Parameter

Description

Data Type

Default value

sink.parallelism

The parallelism of the sink operator.

Integer

N/A

In the following example, the parallelism of the sink operator is set to 10.

INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;

Consume data from a Paimon table

Use a streaming deployment

Note

Before you use a streaming deployment to consume data from a primary key table, complete the changelog producer configuration.

By default, the source operator produces all data in the Paimon table at the start time of the deployment and continues to produce subsequent incremental data in the Paimon table.

Configure a consumer offset

To consume data from a Paimon table based on a specific offset, use the following methods:

  • If you want to consume only the incremental data after the start time of the deployment, use SQL hints to specify 'scan.mode' = 'latest'.

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
  • If you want to consume only the incremental data after a specific point in time, use SQL hints to specify the scan.timestamp-millis parameter. The parameter specifies the number of milliseconds that have elapsed since the epoch time UTC 1970-01-01 00:00:00.

    SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
  • If you want to consume all written data from a specific point in time and subsequent incremental data, use the following methods:

    Note

    The data files read by the source operator may contain a small amount of data that was written before the specified point in time because Paimon compacts small files for downstream consumption. You can use a WHERE condition in the SQL statement to filter data based on your business requirements.

    • Configure the Specify source's start time parameter when you start the deployment on the GUI. image.png

    • Use SQL hints to specify the scan.file-creation-time-millis parameter.

      SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
  • If you want to consume only the incremental data starting from a specific snapshot file, use SQL hints to specify the scan.snapshot-id parameter. The parameter specifies the ID of the snapshot file that you want to use.

    SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
  • If you want to consume all data contained in a specific snapshot file and subsequent incremental data, use SQL hints to include the 'scan.mode' = 'from-snapshot-full' configuration and specify the scan.snapshot-id parameter. The scan.snapshot-id parameter specifies the ID of the snapshot file that you want to use.

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;

Specify a consumer ID

A consumer ID is used to record the consumer offset in a Paimon table.

  • If you modify the compute logic in the SQL statements, the deployment topology may change and the consumer offset may fail to be recovered from the state saved by Realtime Compute for Apache Flink. In this case, you can specify a consumer ID to save the consumer offset in the metadata files of the Paimon table. This way, the consumer can resume from the previous offset even if the deployment is restarted in a stateless manner.

  • If an expired snapshot file is deleted before it is consumed , an error occurs. You can specify a consumer ID to prevent deletion of expired snapshot files that are not consumed.

To specify a consumer ID for the source operator, set the consumer-id parameter to a string. The first time you specify a consumer ID, the consumer offset is determined as described in Configure a consumer offset. When you specify the same consumer ID in subsequent operations, the consumer can resume from the saved consumer offset of the Paimon table.

For example, you can use the following SQL statement to specify a consumer ID named test-id for the source operator. If you want to reset the consumer offset of a consumer ID, specify 'consumer.ignore-progress' = 'true'.

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;
Note

When you use consumer IDs, snapshot files and the corresponding historical data files are not deleted after expiration. This may result in storage space waste. To resolve the preceding issue, you can specify the consumer.expiration-time parameter to delete inactive consumer IDs. For example, 'consumer.expiration-time' = '3d' specifies that consumer IDs that are not in use for three consecutive days are deleted.

Consume results of the INSERT OVERWRITE statement

By default, changes resulted from the INSERT OVERWRITE statement are not consumed by downstream operators in streaming mode. If you want to consume the changes, use SQL hints to specify 'streaming-read-overwrite' = 'true'.

SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;

Use a batch deployment

By default, the source operator of a batch deployment reads the latest snapshot file of the Paimon table to produce the most recent state data.

Batch Time Travel

To query the state of a Paimon table at a specific point in time, use SQL hints to specify the scan.timestamp-millis parameter. The parameter specifies the number of milliseconds that have elapsed since the epoch time UTC 1970-01-01 00:00:00.

SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

To query the state of a Paimon table when a specific snapshot file is generated, use SQL hints to specify the scan.snapshot-id parameter. The parameter specifies the ID of the snapshot file that you want to use.

SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;

Query incremental changes between two snapshots

To query the incremental changes to a Paimon table between two specific snapshots, use SQL hints to specify the incremental-between parameter. For example, you can execute the following SQL statement to query the changed data between snapshot 20 and snapshot 12.

SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
Note

By default, DELETE changes are discarded in a batch deployment. If you want to consume the DELETE changes in a batch deployment, query the audit log table provided by Paimon. Example: SELECT * FROM 't$audit_log ' /*+ OPTIONS('incremental-between' = '12,20') */;.

Adjust source parallelism

By default, Paimon automatically derives the parallelism of the source operator based on information such as the number of partitions and buckets. You can use SQL hints to manually change the parallelism of the source operator.

Parameter

Data Type

Default value

Remarks

scan.parallelism

Integer

N/A

The parallelism of the source operator.

scan.infer-parallelism

Boolean

true

Specifies whether to automatically derive the parallelism of the source operator.

scan.infer-parallelism.max

Integer

1024

The maximum parallelism derived by the source operator.

In the following example, the parallelism of the source operator is set to 10.

SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;

Function as a dimension table

You can use a Paimon table as a dimension table. For information about how to write the JOIN clause, see Syntax.

References