We recommend that you use the Paimon connector together with Apache Paimon catalogs. This topic describes how to use the Paimon connector.
Background information
Apache Paimon is a data lake storage that allows you to process data in streaming and batch modes. Apache Paimon supports high-throughput data writing and low-latency data queries. Apache Paimon is compatible with common computing engines of Alibaba Cloud E-MapReduce (EMR), such as Flink, Spark, Hive, and Trino. You can use Apache Paimon to deploy your data lake storage service on Apsara File Storage for HDFS (HDFS) or Object Storage Service (OSS) in an efficient manner, and connect to the preceding computing engines to perform data lake analytics. For more information, visit Apache Paimon.
Item | Description |
Supported type | Source table, dimension table, sink table, and data ingestion sink |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric | N/A |
API type | SQL API and data ingestion YAML API |
Data update or deletion in a sink table | Supported |
Features
Apache Paimon provides the following features:
Low-cost lightweight data lake storage service based on HDFS or OSS.
Read and write operations on large-scale datasets in streaming and batch modes.
Batch queries and online analytical processing (OLAP) queries within minutes or even seconds.
Consumption and generation of incremental data. Apache Paimon can be used as storage for a traditional offline data warehouse and a streaming data warehouse.
Data pre-aggregation to reduce storage costs and downstream computing workloads.
Data backtracking of historical versions.
Efficient data filtering.
Table schema changes.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.6 or later supports the Paimon connector.
The following table lists the compatibility between different versions of Apache Paimon and VVR.
Apache Paimon version
VVR version
0.9
8.0.7 and 8.0.8
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
SQL
The Paimon connector can be used in an SQL draft to read data from and write data to a Paimon table.
Syntax
If you create an Apache Paimon table in an Apache Paimon catalog, you do not need to specify the
connector
parameter. The following sample code shows the syntax for creating an Apache Paimon table in an Apache Paimon catalog:CREATE TABLE `<your-paimon-catalog>`.`<your-db>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );
NoteIf you have created an Apache Paimon table in the Apache Paimon catalog, you can directly use the table without the need to recreate a table.
If you want to create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, you must specify the connector parameter and the storage path of the Apache Paimon table. The following sample code shows the syntax for creating an Apache Paimon table in such a catalog:
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create'='true', -- If no Apache Paimon table file exists in the specified path, a file is automatically created. ... );
Parameters for the WITH clause
Parameter | Description | Type | Required | Default value | Remarks |
connector | The type of the table. | String | No | N/A |
|
path | The storage path of the table. | String | No | N/A |
|
auto-create | Specifies whether to automatically create an Apache Paimon table file if no Apache Paimon table file exists in the specified path when you create a temporary Apache Paimon table. | Boolean | No | false | Valid values:
|
bucket | The number of buckets in each partition. | Integer | No | 1 | Data that is written to the Apache Paimon table is distributed to each bucket based on the columns that are specified by the Note We recommend that the data in each bucket be less than 5 GB in size. |
bucket-key | The bucket key columns. | String | No | N/A | The columns based on which the data written to the Apache Paimon table is distributed to different buckets. Separate column names with commas (,). For example, Note
|
changelog-producer | The incremental data generation mechanism. | String | No | none | Apache Paimon can generate complete incremental data for any input data stream to facilitate downstream data consumption. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. Valid values:
For more information about how to select an incremental data generation mechanism, see the Incremental data generation mechanism section of this topic. |
full-compaction.delta-commits | The maximum interval at which full compaction is performed. | Integer | No | N/A | A full compaction is definitely triggered when the number of commit savepoints reaches the value of this parameter. |
lookup.cache-max-memory-size | The memory cache size of the Apache Paimon dimension table. | String | No | 256 MB | The value of this parameter determines the cache sizes of both the dimension table and the lookup changelog producer. |
merge-engine | The mechanism for merging data that has the same primary key. | String | No | deduplicate | Valid values:
For more information about the data merging mechanism, see the Data merging mechanism section of this topic. |
partial-update.ignore-delete | Specifies whether to ignore delete messages. | Boolean | No | false | Valid values:
Note
|
ignore-delete | Specifies whether to ignore delete messages. | Boolean | No | false | Its valid values are the same as partial-update.ignore-delete. Note
|
partition.default-name | The default name of the partition. | String | No | __DEFAULT_PARTITION__ | If the value of a partition key column is null or an empty string, the value of this parameter is used as the partition name. |
partition.expiration-check-interval | The interval at which the system checks partition expiration. | String | No | 1h | For more information, see How do I configure automatic partition expiration?. |
partition.expiration-time | The validity period of a partition. | String | No | N/A | If the period of time for which a partition exists exceeds the value of this parameter, the partition expires. By default, a partition never expires. The period for which a partition exists is calculated based on the value of the partition. For more information, see How do I configure automatic partition expiration?. |
partition.timestamp-formatter | The pattern that is used to convert a time string into a timestamp. | String | No | N/A | This parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value. For more information, see How do I configure automatic partition expiration?. |
partition.timestamp-pattern | The pattern that is used to convert a partition value into a time string. | String | No | N/A | This parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value. For more information, see the "How do I configure automatic partition expiration?" section of the FAQ about upstream and downstream storage topic. |
scan.bounded.watermark | The end condition for bounded streaming mode. If the watermark of data in an Apache Paimon source table exceeds the value of this parameter, the generation of data in the Apache Paimon source table ends. | Long | No | N/A | N/A |
scan.mode | The consumer offset of the Apache Paimon source table. | String | No | default | For more information, see How do I specify the consumer offset for an Apache Paimon source table?. |
scan.snapshot-id | The ID of the savepoint from which the Apache Paimon source table starts to consume data. | Integer | No | N/A | For more information, see the How do I specify the consumer offset for an Apache Paimon source table?. |
scan.timestamp-millis | The point in time from which the Apache Paimon source table starts to consume data. | Integer | No | N/A | For more information, see the How do I specify the consumer offset for an Apache Paimon source table?. |
snapshot.num-retained.max | The maximum number of the latest savepoints that can be retained. | Integer | No | 2147483647 | The savepoint expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met and the condition specified by the snapshot.num-retained.min parameter is met. |
snapshot.num-retained.min | The minimum number of the latest savepoints that can be retained. | Integer | No | 10 | N/A |
snapshot.time-retained | The duration for which savepoints can be retained. | String | No | 1h | The savepoint expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met and the condition specified by the snapshot.num-retained.min parameter is met. |
write-mode | The write mode of the Apache Paimon table. | String | No | change-log | Valid values:
For more information about write modes, see Write modes. |
scan.infer-parallelism | Specifies whether to automatically infer the degree of parallelism of the Apache Paimon source table. | Boolean | No | false | Valid values:
|
scan.parallelism | The degree of parallelism of the Apache Paimon source table. | Integer | No | N/A | Note This parameter does not take effect in Expert mode. Check the mode parameter value by navigating to . |
sink.parallelism | The parallelism of the Apache Paimon sink table. | Integer | No | N/A | Note This parameter does not take effect in Expert mode. Check the mode parameter value by navigating to . |
sink.clustering.by-columns | The clustering column that is used to write data to the Apache Paimon sink table. | String | No | N/A | For an Apache Paimon append-only table without primary keys, you can specify this parameter in a batch deployment to enable the clustering feature for data writing. After this feature is enabled, data is clustered and displayed in specific columns by size. This improves the query speed of the table Separate multiple column names with commas (,). Example: For more information about the clustering feature, see Clustering. |
sink.delete-strategy | Specifies the verification strategy for the system to handle retraction (delete and update-before) messages.
| Enum | No | NONE | Valid values:
Note
|
For more information about the configuration items, see Configuration.
Features
Data freshness and consistency assurance
An Apache Paimon sink table uses the two-phase commit protocol (2PC) to commit the written data each time a checkpoint is generated in a Flink deployment. Therefore, the data freshness is based on the checkpoint interval of the Flink deployment. A maximum of two savepoints can be generated each time data is committed.
If two Flink deployments write data to an Apache Paimon table at the same time but write data to different buckets, serializable consistency is ensured. If the Flink deployments write data to the same bucket, only isolation-level consistency of savepoints is ensured. In this case, the table may contain the results of the two deployments, but no data is lost.
Data merging mechanism
When an Apache Paimon sink table receives multiple data records that have the same primary key, the Apache Paimon sink table merges the data records into one data record to ensure the uniqueness of the primary key. You can specify the merge-engine
parameter to specify the data merging mechanism. The following table describes the different data merging mechanisms.
Data merging mechanism | Description |
deduplicate | This is the default value of the merge-engine parameter. If the data merging mechanism is deduplicate and multiple data records have the same primary key, the Apache Paimon sink table retains only the latest data record and discards other data records. Note If the latest data record is a delete message, all the data records that have the same primary key are discarded. |
partial-update | The partial-update mechanism allows you to use multiple messages to update data and finally obtain complete data. New data that has the same primary key as the existing data overwrites the existing data. Columns that have NULL values cannot overwrite existing data. For example, the Apache Paimon sink table receives the following data records in sequence:
If the first column is the primary key, the final result is <1, 25.2, 10, 'This is a book'>. Note
|
aggregation | In specific scenarios, you may require only the aggregated values. The aggregation mechanism can aggregate data that has the same primary key based on the specified aggregate function. You must use
In this example, data in the price column is aggregated based on the max function, and data in the sales column is aggregated based on the sum function. If the input data records are <1, 23.0, 15> and <1, 30.2, 20>, the result is <1, 30.2, 35>. Mappings between the supported aggregate functions and data types:
Note
|
Incremental data generation mechanism
The incremental data generation mechanism is specified by the changelog-producer parameter. Apache Paimon can generate complete incremental data for any input data stream. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. The following table describes all incremental data generation mechanisms. For more information, see Configuration.
Incremental data generation mechanism | Description |
None | None is the default value of the For example, if the downstream consumer wants to calculate the sum of a column and the consumer obtains only the latest value 5, the downstream consumer cannot determine how to update the sum. If the original value is 4, the sum should be increased by 1. If the original value is 6, the sum should be decreased by 1. This type of consumer is sensitive to UPDATE_BEFORE data. For this type of consumer, we recommend that you do not set the changelog-producer parameter to none. However, other incremental data generation mechanisms may cause performance loss. Note If your downstream consumer is a database that is not sensitive to UPDATE_BEFORE data, you can set the changelog-producer parameter to none. We recommend that you specify this parameter based on your business requirements. |
Input | If you set the Therefore, this incremental data generation mechanism can be used only when the input data streams, such as Change Data Capture (CDC) data, are complete. |
Lookup | If you set the The lookup mechanism is more efficient than the full-compaction mechanism in the generation of incremental data. However, the lookup mechanism consumes more resources. We recommend that you use the lookup mechanism in scenarios in which the requirement for the freshness of incremental data is high. For example, incremental data within minutes is required. |
Full Compaction | If you set the Compared with the lookup mechanism, the full compaction mechanism is less efficient in generating incremental data. However, the full compaction mechanism does not cause additional computations based on the full compaction process of data. Therefore, fewer resources are consumed. We recommend that you use the full compaction mechanism in scenarios where the requirement for the freshness of incremental data is not high. For example, incremental data within hours is required. |
Write mode
The following table describes the write modes supported by Apache Paimon tables.
Write mode | Description |
Change-log | change-log is the default write mode for Apache Paimon tables. In change-log mode, data can be inserted into, deleted from, and updated in an Apache Paimon table based on the primary key of the table. In change-log mode, you can also use the data merging mechanism and incremental data generation mechanism. |
Append-only | In append-only mode, the Apache Paimon table allows only data insertion and does not support operations based on the primary key. The append-only mode is more efficient than the change-log mode. In append-only mode, an Apache Paimon table can be used as a substitute of Message Queue in scenarios where the data freshness requirement is not high. For example, data within hours is required. For more information about the append-only mode, see Configuration. When you use the append-only mode, take note of the following points:
|
CTAS- and CDAS-based data synchronization
You can execute the CREATE TABLE AS (CTAS) and CREATE DATABASE AS (CDAS) statement to synchronize data and schema changes from a single table or an entire database to a Paimon table in real time. For more information, see the Manage an Apache Paimon table section of the Manage Apache Paimon catalogs topic.
Data ingestion
You can use the Paimon connector in a YAML draft to write data to a Paimon table.
Syntax
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
Parameters
Parameter | Description | Required | Data type | Default value | Remarks |
type | The connector type. | Supported | STRING | N/A | Set the value to |
name | The name of the sink. | No | STRING | N/A | N/A |
catalog.properties.metastore | The metastore type of the Paimon catalog. | No | STRING | filesystem | Valid values:
|
catalog.properties.* | Passes options of Paimon catalog to pipeline. | No | STRING | N/A | For more information, see Manage Apache Paimon catalogs. |
table.properties.* | Passes options of Paimon table to pipeline. | No | STRING | N/A | For more information, see Paimon table options. |
catalog.properties.warehouse | The warehouse root path of the catalog. | No | STRING | N/A | This parameter takes effect when only |
commit.user | The user name for committing data files. | No | STRING | N/A | Note We recommend that you use unique user names for each deployment to quickly identify problematic deployments in case of a commit conflict. |
partition.key | Partition keys for each partitioned table. | No | STRING | N/A | Separate tables with semicolons |
Sample code
Use the Paimon connector to ingest data into a Paimon table:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: ${mysql.source.table}
server-id: 8601-8604
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse