This topic describes how to perform insert, update, overwrite, and delete operations on Apache Paimon (Paimon) tables by using the development console of Realtime Compute for Apache Flink. This topic also describes how to consume data from Paimon tables based on a specific offset.
Prerequisites
A Paimon catalog and a Paimon table are created. For more information, see Manage Apache Paimon catalogs.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.5 or later supports Paimon tables.
Write data to a Paimon table
Use the CTAS or CDAS statement to synchronize data and schema changes
For information about how to use the CREATE TABLE AS (CTAS) or CREATE DATABASE AS (CDAS) statement, see Manage Apache Paimon catalogs.
Use the INSERT INTO statement to insert or update data
A primary key table supports all types of changes (INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE) as inputs. If specific written data records have the same primary keys, they are merged based on the merge engine configuration.
An append-only table (without primary keys) supports only INSERT-type changes as inputs.
Use the INSERT OVERWRITE statement to overwrite data
Overwriting is the process of replacing old data with new data. The old data is removed and can no longer be accessed. You can use the INSERT OVERWRITE statement to partially or completely overwrite a Paimon table. In the following examples, a table named my_table is used.
The INSERT OVERWRITE statement applies only to batch deployments.
By default, changes resulted from the INSERT OVERWRITE statement cannot be consumed by downstream operators in streaming mode. For information about how to consume the changes in streaming mode, see Consume results of the INSERT OVERWRITE statement.
Overwrite all data in an unpartitioned table.
INSERT OVERWRITE my_table SELECT ...;
Overwrite a partition of the table, such as the
dt=20240108,hh=06
partition.INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;
Overwrite certain partitions of the table. Specify the partitions that you want to overwrite in the SELECT statement.
INSERT OVERWRITE my_table SELECT ...;
Overwrite all data in a partitioned table.
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;
Use the DELETE statement to delete data
You can use the DELETE statement to delete data from a primary key table. You must execute the DELETE statement in a script.
-- Delete all data records whose currency field is 'UNKNOWN' from the my_table table.
DELETE FROM my_table WHERE currency = 'UNKNOWN';
Ignore DELETE changes
If a primary key table receives a DELETE change, the corresponding data is deleted by default. To ignore the DELETE changes and prevent data deletion, use SQL hints to set the ignore-delete parameter to true.
Parameter | Description | Data Type | Default value |
ignore-delete | Specifies whether to ignore DELETE changes from upstream. | Boolean | false |
Adjust sink parallelism
You can use SQL hints to manually change the parallelism of the sink operator. The following table describes the parameter.
Parameter | Description | Data Type | Default value |
sink.parallelism | The parallelism of the sink operator. | Integer | N/A |
In the following example, the parallelism of the sink operator is set to 10.
INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;
Consume data from a Paimon table
Use a streaming deployment
Before you use a streaming deployment to consume data from a primary key table, complete the changelog producer configuration.
By default, the source operator produces all data in the Paimon table at the start time of the deployment and continues to produce subsequent incremental data in the Paimon table.
Configure a consumer offset
To consume data from a Paimon table based on a specific offset, use the following methods:
If you want to consume only the incremental data after the start time of the deployment, use SQL hints to specify
'scan.mode' = 'latest'
.SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
If you want to consume only the incremental data after a specific point in time, use SQL hints to specify the
scan.timestamp-millis
parameter. The parameter specifies the number of milliseconds that have elapsed since the epoch time UTC 1970-01-01 00:00:00.SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
If you want to consume all written data from a specific point in time and subsequent incremental data, use the following methods:
NoteThe data files read by the source operator may contain a small amount of data that was written before the specified point in time because Paimon compacts small files for downstream consumption. You can use a WHERE condition in the SQL statement to filter data based on your business requirements.
Configure the Specify source's start time parameter when you start the deployment on the GUI.
Use SQL hints to specify the
scan.file-creation-time-millis
parameter.SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
If you want to consume only the incremental data starting from a specific snapshot file, use SQL hints to specify the
scan.snapshot-id
parameter. The parameter specifies the ID of the snapshot file that you want to use.SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
If you want to consume all data contained in a specific snapshot file and subsequent incremental data, use SQL hints to include the
'scan.mode' = 'from-snapshot-full'
configuration and specify thescan.snapshot-id
parameter. Thescan.snapshot-id
parameter specifies the ID of the snapshot file that you want to use.SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;
Specify a consumer ID
A consumer ID is used to record the consumer offset in a Paimon table.
If you modify the compute logic in the SQL statements, the deployment topology may change and the consumer offset may fail to be recovered from the state saved by Realtime Compute for Apache Flink. In this case, you can specify a consumer ID to save the consumer offset in the metadata files of the Paimon table. This way, the consumer can resume from the previous offset even if the deployment is restarted in a stateless manner.
If an expired snapshot file is deleted before it is consumed , an error occurs. You can specify a consumer ID to prevent deletion of expired snapshot files that are not consumed.
To specify a consumer ID for the source operator, set the consumer-id
parameter to a string. The first time you specify a consumer ID, the consumer offset is determined as described in Configure a consumer offset. When you specify the same consumer ID in subsequent operations, the consumer can resume from the saved consumer offset of the Paimon table.
For example, you can use the following SQL statement to specify a consumer ID named test-id for the source operator. If you want to reset the consumer offset of a consumer ID, specify 'consumer.ignore-progress' = 'true'
.
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;
When you use consumer IDs, snapshot files and the corresponding historical data files are not deleted after expiration. This may result in storage space waste. To resolve the preceding issue, you can specify the consumer.expiration-time
parameter to delete inactive consumer IDs. For example, 'consumer.expiration-time' = '3d'
specifies that consumer IDs that are not in use for three consecutive days are deleted.
Consume results of the INSERT OVERWRITE statement
By default, changes resulted from the INSERT OVERWRITE statement are not consumed by downstream operators in streaming mode. If you want to consume the changes, use SQL hints to specify 'streaming-read-overwrite' = 'true'
.
SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;
Use a batch deployment
By default, the source operator of a batch deployment reads the latest snapshot file of the Paimon table to produce the most recent state data.
Batch Time Travel
To query the state of a Paimon table at a specific point in time, use SQL hints to specify the scan.timestamp-millis
parameter. The parameter specifies the number of milliseconds that have elapsed since the epoch time UTC 1970-01-01 00:00:00.
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
To query the state of a Paimon table when a specific snapshot file is generated, use SQL hints to specify the scan.snapshot-id
parameter. The parameter specifies the ID of the snapshot file that you want to use.
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
Query incremental changes between two snapshots
To query the incremental changes to a Paimon table between two specific snapshots, use SQL hints to specify the incremental-between
parameter. For example, you can execute the following SQL statement to query the changed data between snapshot 20 and snapshot 12.
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
By default, DELETE changes are discarded in a batch deployment. If you want to consume the DELETE changes in a batch deployment, query the audit log table provided by Paimon. Example: SELECT * FROM 't$audit_log ' /*+ OPTIONS('incremental-between' = '12,20') */;
.
Adjust source parallelism
By default, Paimon automatically derives the parallelism of the source operator based on information such as the number of partitions and buckets. You can use SQL hints to manually change the parallelism of the source operator.
Parameter | Data Type | Default value | Remarks |
scan.parallelism | Integer | N/A | The parallelism of the source operator. |
scan.infer-parallelism | Boolean | true | Specifies whether to automatically derive the parallelism of the source operator. |
scan.infer-parallelism.max | Integer | 1024 | The maximum parallelism derived by the source operator. |
In the following example, the parallelism of the source operator is set to 10.
SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;
Function as a dimension table
You can use a Paimon table as a dimension table. For information about how to write the JOIN clause, see Syntax.
References
When you perform write or read operations in a Paimon table, you can use SQL hints to temporarily modify the table parameters. For more information, see Manage Paimon tables.
For information about the features of primary key tables and append-only tables, see Primary key tables and append-only tables.
For information about how to optimize primary key tables and append scalable tables, see Performance optimization.
Consumers use snapshot files to access a Paimon table. If snapshot files are retained for a short period of time or the consumption efficiency is low, the snapshot file that are being used may be deleted. As a result, the
File xxx not found, Possible causes
error message appears. For information about how to resolve the issue, see What do I do if the "File xxx not found, Possible causes" message appears during a read operation?