All Products
Search
Document Center

Realtime Compute for Apache Flink:Paimon connector

Last Updated:Nov 05, 2024

We recommend that you use the Paimon connector together with Apache Paimon catalogs. This topic describes how to use the Paimon connector.

Background information

Apache Paimon is a data lake storage that allows you to process data in streaming and batch modes. Apache Paimon supports high-throughput data writing and low-latency data queries. Apache Paimon is compatible with common computing engines of Alibaba Cloud E-MapReduce (EMR), such as Flink, Spark, Hive, and Trino. You can use Apache Paimon to deploy your data lake storage service on Apsara File Storage for HDFS (HDFS) or Object Storage Service (OSS) in an efficient manner, and connect to the preceding computing engines to perform data lake analytics. For more information, visit Apache Paimon.

Item

Description

Supported type

Source table, dimension table, sink table, and data ingestion sink

Running mode

Streaming mode and batch mode

Data format

N/A

Metric

N/A

API type

SQL API and data ingestion YAML API

Data update or deletion in a sink table

Supported

Features

Apache Paimon provides the following features:

  • Low-cost lightweight data lake storage service based on HDFS or OSS.

  • Read and write operations on large-scale datasets in streaming and batch modes.

  • Batch queries and online analytical processing (OLAP) queries within minutes or even seconds.

  • Consumption and generation of incremental data. Apache Paimon can be used as storage for a traditional offline data warehouse and a streaming data warehouse.

  • Data pre-aggregation to reduce storage costs and downstream computing workloads.

  • Data backtracking of historical versions.

  • Efficient data filtering.

  • Table schema changes.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.6 or later supports the Paimon connector.

  • The following table lists the compatibility between different versions of Apache Paimon and VVR.

    Apache Paimon version

    VVR version

    0.9

    8.0.7 and 8.0.8

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

SQL

The Paimon connector can be used in an SQL draft to read data from and write data to a Paimon table.

Syntax

  • If you create an Apache Paimon table in an Apache Paimon catalog, you do not need to specify the connector parameter. The following sample code shows the syntax for creating an Apache Paimon table in an Apache Paimon catalog:

    CREATE TABLE `<your-paimon-catalog>`.`<your-db>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    Note

    If you have created an Apache Paimon table in the Apache Paimon catalog, you can directly use the table without the need to recreate a table.

  • If you want to create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, you must specify the connector parameter and the storage path of the Apache Paimon table. The following sample code shows the syntax for creating an Apache Paimon table in such a catalog:

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create'='true', -- If no Apache Paimon table file exists in the specified path, a file is automatically created. 
      ...
    );

Parameters for the WITH clause

Parameter

Description

Type

Required

Default value

Remarks

connector

The type of the table.

String

No

N/A

  • If you create an Apache Paimon table in an Apache Paimon catalog, you do not need to specify this parameter.

  • If you create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, set the value to paimon.

path

The storage path of the table.

String

No

N/A

  • If you create an Apache Paimon table in an Apache Paimon catalog, you do not need to specify this parameter.

  • If you create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, set this parameter to the HDFS or OSS directory in which you want to store the table.

auto-create

Specifies whether to automatically create an Apache Paimon table file if no Apache Paimon table file exists in the specified path when you create a temporary Apache Paimon table.

Boolean

No

false

Valid values:

  • false (default): If no Apache Paimon table file exists in the specified path, an error is returned.

  • true: If the specified path does not exist, the system automatically creates an Apache Paimon table file.

bucket

The number of buckets in each partition.

Integer

No

1

Data that is written to the Apache Paimon table is distributed to each bucket based on the columns that are specified by the bucket-key parameter.

Note

We recommend that the data in each bucket be less than 5 GB in size.

bucket-key

The bucket key columns.

String

No

N/A

The columns based on which the data written to the Apache Paimon table is distributed to different buckets.

Separate column names with commas (,). For example, 'bucket-key' = 'order_id,cust_id' indicates that data is distributed to buckets based on the order_id and cust_id columns.

Note
  • If you do not specify this parameter, data is distributed based on the primary key.

  • If no primary key is specified for the Apache Paimon table, data is distributed based on the values of all columns.

changelog-producer

The incremental data generation mechanism.

String

No

none

Apache Paimon can generate complete incremental data for any input data stream to facilitate downstream data consumption. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. Valid values:

  • none (default): No incremental data is generated. The downstream consumer can read data from the Apache Paimon table in streaming mode. However, the incremental data that is read by the downstream consumer contains only UPDATE_AFTER data and does not contain UPDATE_BEFORE data.

  • input: The input data streams are written to an incremental data file as incremental data in dual-write mode.

  • full-compaction: Complete incremental data is generated each time full compaction is performed.

  • lookup: Complete incremental data is generated before commit savepoint is performed.

For more information about how to select an incremental data generation mechanism, see the Incremental data generation mechanism section of this topic.

full-compaction.delta-commits

The maximum interval at which full compaction is performed.

Integer

No

N/A

A full compaction is definitely triggered when the number of commit savepoints reaches the value of this parameter.

lookup.cache-max-memory-size

The memory cache size of the Apache Paimon dimension table.

String

No

256 MB

The value of this parameter determines the cache sizes of both the dimension table and the lookup changelog producer.

merge-engine

The mechanism for merging data that has the same primary key.

String

No

deduplicate

Valid values:

  • deduplicate: Only the latest data record is retained.

  • partial-update: Existing data that has the same primary key as the latest data is overwritten by the latest data in the non-null columns. Data in other columns remains unchanged.

  • aggregation: An aggregate function is specified to perform pre-aggregation.

For more information about the data merging mechanism, see the Data merging mechanism section of this topic.

partial-update.ignore-delete

Specifies whether to ignore delete messages.

Boolean

No

false

Valid values:

  • true: Delete messages are ignored.

  • false: Delete messages are not ignored. You must set how the sink handles delete messages by configuring sequence.field or other parameters. Otherwise, errors like IllegalStateException or IllegalArgumentException occur.

Note
  • In Realtime Compute for Apache Flink that uses VVR 8.0.6 or earlier, this parameter takes effect only when merge-engine = partial-update is configured.

  • In Realtime Compute for Apache Flink that uses VVR 8.0.7 or later, this parameter is applicable to non-partial update scenarios, where it is functionally equivalent to the ignore-delete parameter. In this case, we recommend that you use ignore-delete instead.

  • Whether delete messages need to be ignored depends on the actual scenario. You need to configure this parameter based on your business requirements.

ignore-delete

Specifies whether to ignore delete messages.

Boolean

No

false

Its valid values are the same as partial-update.ignore-delete.

Note
  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.7 or later supports this parameter.

  • This parameter is functionally equivalent to the partial-update.ignore-delete parameter. Use the ignore-delete parameter instead of the partial-update.ignore-delete parameter; and avoid configuring both parameters simultaneously.

partition.default-name

The default name of the partition.

String

No

__DEFAULT_PARTITION__

If the value of a partition key column is null or an empty string, the value of this parameter is used as the partition name.

partition.expiration-check-interval

The interval at which the system checks partition expiration.

String

No

1h

For more information, see How do I configure automatic partition expiration?.

partition.expiration-time

The validity period of a partition.

String

No

N/A

If the period of time for which a partition exists exceeds the value of this parameter, the partition expires. By default, a partition never expires.

The period for which a partition exists is calculated based on the value of the partition. For more information, see How do I configure automatic partition expiration?.

partition.timestamp-formatter

The pattern that is used to convert a time string into a timestamp.

String

No

N/A

This parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value. For more information, see How do I configure automatic partition expiration?.

partition.timestamp-pattern

The pattern that is used to convert a partition value into a time string.

String

No

N/A

This parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value. For more information, see the "How do I configure automatic partition expiration?" section of the FAQ about upstream and downstream storage topic.

scan.bounded.watermark

The end condition for bounded streaming mode. If the watermark of data in an Apache Paimon source table exceeds the value of this parameter, the generation of data in the Apache Paimon source table ends.

Long

No

N/A

N/A

scan.mode

The consumer offset of the Apache Paimon source table.

String

No

default

For more information, see How do I specify the consumer offset for an Apache Paimon source table?.

scan.snapshot-id

The ID of the savepoint from which the Apache Paimon source table starts to consume data.

Integer

No

N/A

For more information, see the How do I specify the consumer offset for an Apache Paimon source table?.

scan.timestamp-millis

The point in time from which the Apache Paimon source table starts to consume data.

Integer

No

N/A

For more information, see the How do I specify the consumer offset for an Apache Paimon source table?.

snapshot.num-retained.max

The maximum number of the latest savepoints that can be retained.

Integer

No

2147483647

The savepoint expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met and the condition specified by the snapshot.num-retained.min parameter is met.

snapshot.num-retained.min

The minimum number of the latest savepoints that can be retained.

Integer

No

10

N/A

snapshot.time-retained

The duration for which savepoints can be retained.

String

No

1h

The savepoint expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met and the condition specified by the snapshot.num-retained.min parameter is met.

write-mode

The write mode of the Apache Paimon table.

String

No

change-log

Valid values:

  • change-log: Data is inserted into, deleted from, and updated in the Apache Paimon table based on the primary key.

  • append-only: The Apache Paimon table allows only data insertion and does not support operations based on the primary key. This mode is more efficient than the change-log mode.

For more information about write modes, see Write modes.

scan.infer-parallelism

Specifies whether to automatically infer the degree of parallelism of the Apache Paimon source table.

Boolean

No

false

Valid values:

  • true: The parallelism of the Apache Paimon source table is automatically inferred based on the number of buckets.

  • false: The default degree of parallelism that is configured based on Ververica Platform (VVP) is used. If the resource configuration is in expert mode, the degree of parallelism that is configured is used.

scan.parallelism

The degree of parallelism of the Apache Paimon source table.

Integer

No

N/A

Note

This parameter does not take effect in Expert mode. Check the mode parameter value by navigating to Configuration > Resources.

sink.parallelism

The parallelism of the Apache Paimon sink table.

Integer

No

N/A

Note

This parameter does not take effect in Expert mode. Check the mode parameter value by navigating to Configuration > Resources.

sink.clustering.by-columns

The clustering column that is used to write data to the Apache Paimon sink table.

String

No

N/A

For an Apache Paimon append-only table without primary keys, you can specify this parameter in a batch deployment to enable the clustering feature for data writing. After this feature is enabled, data is clustered and displayed in specific columns by size. This improves the query speed of the table

Separate multiple column names with commas (,). Example: 'col1,col2'.

For more information about the clustering feature, see Clustering.

sink.delete-strategy

Specifies the verification strategy for the system to handle retraction (delete and update-before) messages.

Enum

No

NONE

Valid values:

  • NONE (default): Verification is not performed.

  • IGNORE_DELETE: The sink operator should ignore retraction messages.

  • NON_PK_FIELD_TO_NULL: The sink operator should ignore update_before messages; When receiving delete messages, it should retain the corresponding primary key values and delete the values in non-primary-key columns.

    This value is suitable for partial-update scenarios where data is written to a single table from multiple sinks.

  • DELETE_ROW_ON_PK: The sink operator should ignore update_before messages and delete data records corresponding to delete messages.

  • CHANGELOG_STANDARD: The sink operator should delete the data records corresponding to the received update_before and delete messages.

Note
  • Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

  • You can configure parameters such as ignore-delete and merge-engine to manage how Paimon Sink processes retraction message. This parameter is used to check whether the retraction behavior aligns with expectations. If the actual retraction behavior deviates from the expectation, Paimon Sink will prevent problematic operations and reports an error. The error will guide you adjust parameters such as ignore-delete and merge-engine to correct the retraction behavior.

Note

For more information about the configuration items, see Configuration.

Features

Data freshness and consistency assurance

An Apache Paimon sink table uses the two-phase commit protocol (2PC) to commit the written data each time a checkpoint is generated in a Flink deployment. Therefore, the data freshness is based on the checkpoint interval of the Flink deployment. A maximum of two savepoints can be generated each time data is committed.

If two Flink deployments write data to an Apache Paimon table at the same time but write data to different buckets, serializable consistency is ensured. If the Flink deployments write data to the same bucket, only isolation-level consistency of savepoints is ensured. In this case, the table may contain the results of the two deployments, but no data is lost.

Data merging mechanism

When an Apache Paimon sink table receives multiple data records that have the same primary key, the Apache Paimon sink table merges the data records into one data record to ensure the uniqueness of the primary key. You can specify the merge-engine parameter to specify the data merging mechanism. The following table describes the different data merging mechanisms.

Data merging mechanism

Description

deduplicate

This is the default value of the merge-engine parameter. If the data merging mechanism is deduplicate and multiple data records have the same primary key, the Apache Paimon sink table retains only the latest data record and discards other data records.

Note

If the latest data record is a delete message, all the data records that have the same primary key are discarded.

partial-update

The partial-update mechanism allows you to use multiple messages to update data and finally obtain complete data. New data that has the same primary key as the existing data overwrites the existing data. Columns that have NULL values cannot overwrite existing data.

For example, the Apache Paimon sink table receives the following data records in sequence:

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

If the first column is the primary key, the final result is <1, 25.2, 10, 'This is a book'>.

Note
  • If you want the Apache Paimon sink table to read the result that is obtained by using the partial-update mechanism in streaming mode, you must set the changelog-producer parameter to lookup or full-compaction.

  • When you use the partial-update mechanism, delete messages cannot be processed. You can set the partial-update.ignore-delete parameter to true to ignore delete messages.

aggregation

In specific scenarios, you may require only the aggregated values. The aggregation mechanism can aggregate data that has the same primary key based on the specified aggregate function. You must use fields.<field-name>.aggregate-function to specify an aggregate function for each column that is not the primary key. If you do not specify an aggregate function for a column that is not the primary key, the column uses the last_non_null_value aggregate function by default. The following sample code provides an example on how to create an Apache Paimon table by using the aggregation mechanism:

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

In this example, data in the price column is aggregated based on the max function, and data in the sales column is aggregated based on the sum function. If the input data records are <1, 23.0, 15> and <1, 30.2, 20>, the result is <1, 30.2, 35>. Mappings between the supported aggregate functions and data types:

  • sum: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE

  • min and max: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ

  • last_value and last_non_null_value: all data types

  • listagg: STRING

  • bool_and and bool_or: BOOLEAN

Note
  • Only the sum function supports data retraction and deletion. If you want specific columns to ignore retraction and deletion messages, you can specify 'fields.${field_name}.ignore-retract'='true'.

  • If you want the Apache Paimon sink table to read the aggregation result in streaming mode, you must set the changelog-producer parameter to lookup or full-compaction.

Incremental data generation mechanism

The incremental data generation mechanism is specified by the changelog-producer parameter. Apache Paimon can generate complete incremental data for any input data stream. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. The following table describes all incremental data generation mechanisms. For more information, see Configuration.

Incremental data generation mechanism

Description

None

None is the default value of the changelog-producer parameter. If you use the default value, the Apache Paimon source table of the downstream consumer can obtain only the latest situation of data when specific data records have the same primary key. In this case, the downstream consumer cannot learn the complete incremental data to effectively calculate data. The downstream consumer can view the latest data and determine whether existing data is deleted, but cannot learn more information about the deleted data.

For example, if the downstream consumer wants to calculate the sum of a column and the consumer obtains only the latest value 5, the downstream consumer cannot determine how to update the sum. If the original value is 4, the sum should be increased by 1. If the original value is 6, the sum should be decreased by 1. This type of consumer is sensitive to UPDATE_BEFORE data. For this type of consumer, we recommend that you do not set the changelog-producer parameter to none. However, other incremental data generation mechanisms may cause performance loss.

Note

If your downstream consumer is a database that is not sensitive to UPDATE_BEFORE data, you can set the changelog-producer parameter to none. We recommend that you specify this parameter based on your business requirements.

Input

If you set the changelog-producer parameter to input, the Apache Paimon sink table writes input data streams to an incremental data file as incremental data in dual-write mode.

Therefore, this incremental data generation mechanism can be used only when the input data streams, such as Change Data Capture (CDC) data, are complete.

Lookup

If you set the changelog-producer parameter to lookup, the Apache Paimon sink table uses a point query mechanism that is similar to a dimension table to generate complete incremental data that corresponds to the savepoint before commit savepoint is performed. The incremental data generation mechanism allows the Apache Paimon sink table to generate complete incremental data regardless of whether the input incremental data is complete.

The lookup mechanism is more efficient than the full-compaction mechanism in the generation of incremental data. However, the lookup mechanism consumes more resources.

We recommend that you use the lookup mechanism in scenarios in which the requirement for the freshness of incremental data is high. For example, incremental data within minutes is required.

Full Compaction

If you set the changelog-producer parameter to full-compaction, the Apache Paimon sink table generates complete incremental data each time full compaction is performed. The incremental data generation mechanism allows the Apache Paimon sink table to generate complete incremental data regardless of whether the input incremental data is complete. The time interval at which full compaction is performed is specified by the full-compaction.delta-commits parameter.

Compared with the lookup mechanism, the full compaction mechanism is less efficient in generating incremental data. However, the full compaction mechanism does not cause additional computations based on the full compaction process of data. Therefore, fewer resources are consumed.

We recommend that you use the full compaction mechanism in scenarios where the requirement for the freshness of incremental data is not high. For example, incremental data within hours is required.

Write mode

The following table describes the write modes supported by Apache Paimon tables.

Write mode

Description

Change-log

change-log is the default write mode for Apache Paimon tables. In change-log mode, data can be inserted into, deleted from, and updated in an Apache Paimon table based on the primary key of the table. In change-log mode, you can also use the data merging mechanism and incremental data generation mechanism.

Append-only

In append-only mode, the Apache Paimon table allows only data insertion and does not support operations based on the primary key. The append-only mode is more efficient than the change-log mode. In append-only mode, an Apache Paimon table can be used as a substitute of Message Queue in scenarios where the data freshness requirement is not high. For example, data within hours is required.

For more information about the append-only mode, see Configuration. When you use the append-only mode, take note of the following points:

  • We recommend that you specify the bucket-key parameter based on your business requirements. Otherwise, data of the Apache Paimon table is distributed to buckets based on the values of all columns. This results in low computing efficiency.

  • The append-only mode ensures the data generation order based on the following rules:

    1. If two data records come from different partitions and the scan.plan-sort-partition parameter is specified, data that has the smaller partition value is preferentially generated. Otherwise, data from the partition that is created earlier is preferentially generated.

    2. If two data records come from the same bucket in the same partition, the data that is written earlier is preferentially generated.

    3. If two data records come from different buckets in the same partition, the data generation order cannot be ensured because data in different buckets is processed in different parallel subtasks.

CTAS- and CDAS-based data synchronization

You can execute the CREATE TABLE AS (CTAS) and CREATE DATABASE AS (CDAS) statement to synchronize data and schema changes from a single table or an entire database to a Paimon table in real time. For more information, see the Manage an Apache Paimon table section of the Manage Apache Paimon catalogs topic.

Data ingestion

You can use the Paimon connector in a YAML draft to write data to a Paimon table.

Syntax

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

Parameters

Parameter

Description

Required

Data type

Default value

Remarks

type

The connector type.

Supported

STRING

N/A

Set the value to paimon.

name

The name of the sink.

No

STRING

N/A

N/A

catalog.properties.metastore

The metastore type of the Paimon catalog.

No

STRING

filesystem

Valid values:

  • filesystem

  • dlf-paimon

catalog.properties.*

Passes options of Paimon catalog to pipeline.

No

STRING

N/A

For more information, see Manage Apache Paimon catalogs.

table.properties.*

Passes options of Paimon table to pipeline.

No

STRING

N/A

For more information, see Paimon table options.

catalog.properties.warehouse

The warehouse root path of the catalog.

No

STRING

N/A

This parameter takes effect when only catalog.properties.metastore is set to filesystem.

commit.user

The user name for committing data files.

No

STRING

N/A

Note

We recommend that you use unique user names for each deployment to quickly identify problematic deployments in case of a commit conflict.

partition.key

Partition keys for each partitioned table.

No

STRING

N/A

Separate tables with semicolons (;); separate fields with commas (,); delimit a table and its field with colons (:). Example: testdb.table1:id1,id2;testdb.table2:name.

Sample code

Use the Paimon connector to ingest data into a Paimon table:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: ${mysql.source.table}
  server-id: 8601-8604

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

FAQ