Apache Paimon (Paimon) supports two types of tables: primary key tables and append-only tables. This topic describes the features of primary key tables and append-only tables.
Primary key tables
A primary key table in Paimon has one or more primary keys that must be specified during table creation.
Syntax
The following sample code shows how to create a primary key table. In this example, the partition key is set to dt, the primary keys to dt, shop_id, and user_id, and the number of buckets to 4 for the table.
CREATE TABLE T (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket' = '4'
);
Primary keys are used to identify each data record in a primary key table. If two data records have the same primary keys, the data records are merged into one based on the merge engine configuration.
Bucket mode
A bucket is the smallest unit for read and write operations in a Paimon table. Unpartitioned tables or partitions in a partitioned table are subdivided into buckets. This allows for parallel reading and writing, thereby increasing efficiency. The following table describes the supported bucket modes.
Mode | Definition | Usage notes |
Dynamic bucket mode (default) | If you leave the |
|
Fixed bucket mode | If you specify You can change the number of buckets in this mode. For more information, see the "Change the number of buckets in fixed bucket mode" section of this topic. | In this mode, make sure that the primary keys include all partition keys to prevent cross-partition updates based on primary keys. |
Data updates in dynamic bucket mode
Type | Usage notes |
Cross-partition updates | In a primary key table that uses the dynamic bucket mode, cross-partition updates occur when the primary keys do not include all partition keys. In such cases, Paimon cannot determine the bucket and partition of a data record based only on primary keys. Therefore, Paimon uses RocksDB to maintain a mapping of primary keys to their respective partitions and buckets. If the table contains a large amount of data, the performance may significantly decline compared with the fixed bucket mode. Deployment initialization may take longer because the mapping needs to be loaded to RocksDB. The results of cross-partition updates vary based on the merge engine configuration. The following merge engines are supported:
|
Intra-partition updates | In a primary key table that uses the dynamic bucket mode, intra-partition updates occur when the primary keys include all partition keys. In such cases, Paimon can determine the partition of a data record based on the primary keys, but cannot determine the corresponding bucket. Therefore, Paimon creates an index to maintain a mapping between primary keys and buckets. Every 100 million mapping records use 1GB of heap memory. Only the partition to which data is being written consumes heap memory. The dynamic bucket mode requires additional heap memory but does not cause significant performance loss compared with the fixed bucket mode. |
Bucket assignment
Mode | Description |
Dynamic bucket mode | Data is first written into existing buckets. If the number of buckets is insufficient, a new bucket is automatically created. You can use the following parameters in the WITH clause to configure this mode:
|
Fixed bucket mode | By default, Paimon assigns a data record to a bucket based on the hash value calculated for the primary keys of the data record. To use a different method, configure the |
Change the number of buckets in fixed bucket mode
The number of buckets determines the parallelism of read or write operations. An excessively small number of buckets result in a large amount of data in each bucket, which affects performance. An excessively large number of buckets result in a large number of small files. We recommend that you set the total size of data in each bucket to 2 GB and do not specify a value greater than 5 GB. To change the number of buckets for a table that uses the fixed bucket mode, perform the following steps:
Pause all deployments that write data to or consume data from the table.
Create a script and execute the following SQL statement to configure the bucket parameter:
ALTER TABLE `<catalog-name>`.`<database-name>`.`<table-name>` SET ('bucket' = '<bucket-num>');
Reorganize data in the partitions that you want to use.
Unpartitioned table: Create a Blank Batch Draft, paste the following SQL statements in the editor, and then click Deploy and Start to run a batch deployment.
INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`;
Partitioned table: Create a Blank Batch Draft, paste the following SQL statements in the editor, and then click Deploy and Start to run a batch deployment.
INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` PARTITION (<partition-spec>) SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>` WHERE <partition-condition>;
Replace <partition-spec> and <partition-condition> with the partitions you want to reorganize. For example,
dt = 20240312, hh = 08
specifies a partition in which the dt field is 20240312 and the hh field is 08.INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` PARTITION (dt = '20240312', hh = '08') SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>` WHERE dt = '20240312' AND hh = '08';
If the batch deployment runs as expected, resume the deployments that write data to or consume data from the table.
Changelog producer
To allow downstream consumption in streaming mode, a primary key table needs to generate a complete changelog for INSERT, DELETE, and UPDATE operations. The changelog is similar to the binlog in a database. To configure the method used to generate the changelog, specify the changelog-producer
parameter in the WITH clause. The following table describes the valid values.
Valid value | Description | Scenarios |
none | The primary key table does not generate a changelog. | Scenarios that do not involve data consumption in streaming mode. |
input | The primary key table passes the input records to downstream consumers. | Scenarios in which the input data stream contains a complete changelog, such as the binlog of a database. This option is most efficient because no additional computation is required. |
lookup | The primary key table executes a lookup for the result of small file compaction to generate a complete changelog. Small file compaction is triggered at each checkpoint of the Flink deployment. | This option is applicable for any type of input data stream. Compared with the full-compaction option, this option has a lower latency but consumes more resources. We recommend this option if your business requires a minute-level latency. |
full-compaction | The primary key table generates a complete changelog each time a full compaction of small files is performed. | This option is applicable for any type of input data stream. Compared with the lookup option, this option has a higher latency but consumes less resources. This option leverages the full compaction process to prevent additional computation and reduce resource consumption. We recommend this option if your business can accommodate a latency of up to several hours. To ensure data freshness, specify |
By default, Paimon generates a changelog record even if the updated data record is the same as the previous one. To prevent this issue, specify 'changelog-producer.row-deduplicate' = 'true'
in the WITH clause. This configuration is valid only if you set the changelog-producer parameter to lookup or full-compaction. We recommend that you add this configuration only if a large number of unnecessary records may be generated in the changelog. This is because the comparison of values before and after the update requires additional computation.
Merge engine
Description
If a Paimon table receives multiple data records that have the same primary keys, the data records are merged based on the merge-engine
parameter you specify in the WITH clause. Valid values:
first-row
aggregation
partial-update
For more information, see Merge Engine.
Out-of-order data handling
By default, Paimon determines the merge order based on the input order. The last input record is the last one to merge. If the input stream contains out-of-order data records, specify 'sequence.field' = '<column-name>
in the WITH clause. This way, data records that have the same primary keys are merged in ascending order of the values in the column specified by <column-name>
. The sequence.field parameter supports the following data types: TINYINT, SMALLINT, INTEGER, BIGINT, TIMESTAMP, and TIMESTAMP_LTZ.
If you use MySQL as the input and specify the op_t
metadata column as the sequence field, the sequence field value is the same for a pair of UPDATE_BEFORE and UPDATE_AFTER changes. To prevent this issue, specify 'sequence.auto-padding' = 'row-kind-flag'
in the WITH clause. This ensures that Paimon processes the UPDATE_BEFORE change before the UPDATE_AFTER change.
Append-only tables (without primary keys)
An append-only table in Paimon has no primary keys. This type of table allows only INSERT operations in streaming mode and is suitable for scenarios that do not require streaming updates, such as log data synchronization.
Syntax
The following sample code shows how to create an append-only table. In this example, the partition key is set to dt for the table.
CREATE TABLE T (
dt STRING
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) PARTITIONED BY (dt) WITH (
'bucket' = '-1'
);
Subcategories
Subcategory | Definition | Usage notes |
Append scalable table | If you specify | This type of table is the counterpart to Hive tables and is suitable for scenarios in which data can be consumed in an order different from the writing order. Append scalable tables use the following methods to increase write efficiency: eliminate the shuffle of input records, support data sorting, provide flexible parallelism configuration, support direct conversion from Hive tables, and support completely asynchronous file compaction. |
Append queue table | If you specify The value of | This type of table is the counterpart to a message queue service that has a latency of several minutes. The number of buckets for an append queue table is equivalent to the number of partitions in a Kafka topic or the number of shards in an ApsaraMQ for MQTT instance. |
Bucket assignment
Table type | Description |
Append scalable table | Data is written to a single partition in parallel. The bucket concept is ignored and the order of data is not maintained. This means data is directly pushed to the writer and no hash partitioning is required. Therefore, this type of table provides a high write performance. Take note that if the upstream operator and the writer have the same parallelism, data skew may occur. |
Append queue table | By default, Paimon assigns a data record to a bucket based on the values in all columns of the data record. To use a different method, configure the For example, if you specify Note We recommend that you specify the |
Data consumption order
Table type | Description |
Append scalable table | This type of table is suitable for scenarios in which data can be consumed in an order different from the writing order. |
Append queue table | This type of table ensures that records in each bucket are consumed in the order they were written.
|
References
For information about how to create a Paimon catalog and a Paimon table, see Manage Apache Paimon catalogs.
For information about how to create a Paimon catalog and a Paimon table, see Manage Apache Paimon catalogs.
For information about how to optimize the performance of primary key tables in Paimon, see Performance optimization.