All Products
Search
Document Center

Realtime Compute for Apache Flink:Primary key tables and append-only tables

Last Updated:Jul 18, 2024

Apache Paimon (Paimon) supports two types of tables: primary key tables and append-only tables. This topic describes the features of primary key tables and append-only tables.

Primary key tables

A primary key table in Paimon has one or more primary keys that must be specified during table creation.

Syntax

The following sample code shows how to create a primary key table. In this example, the partition key is set to dt, the primary keys to dt, shop_id, and user_id, and the number of buckets to 4 for the table.

CREATE TABLE T (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
  'bucket' = '4'
);

Primary keys are used to identify each data record in a primary key table. If two data records have the same primary keys, the data records are merged into one based on the merge engine configuration.

Bucket mode

A bucket is the smallest unit for read and write operations in a Paimon table. Unpartitioned tables or partitions in a partitioned table are subdivided into buckets. This allows for parallel reading and writing, thereby increasing efficiency. The following table describes the supported bucket modes.

Mode

Definition

Usage notes

Dynamic bucket mode (default)

If you leave the bucket parameter empty or specify 'bucket' = '-1' in the WITH clause when you create a primary key table, the dynamic bucket mode is used.

  • In this mode, primary key tables do not support concurrent writing by multiple Flink deployments.

  • In this mode, primary key tables support cross-partition updates based on primary keys.

Fixed bucket mode

If you specify 'bucket' = '<num>' in the WITH clause when you create a primary key table, the fixed bucket mode is used. If the table is not partitioned, <num> specifies the number of buckets used for the entire table. If the table is partitioned, <num> specifies the number of buckets used for each partition. The value of <num> must be an integer greater than 0.

You can change the number of buckets in this mode. For more information, see the "Change the number of buckets in fixed bucket mode" section of this topic.

In this mode, make sure that the primary keys include all partition keys to prevent cross-partition updates based on primary keys.

Data updates in dynamic bucket mode

Type

Usage notes

Cross-partition updates

In a primary key table that uses the dynamic bucket mode, cross-partition updates occur when the primary keys do not include all partition keys. In such cases, Paimon cannot determine the bucket and partition of a data record based only on primary keys. Therefore, Paimon uses RocksDB to maintain a mapping of primary keys to their respective partitions and buckets. If the table contains a large amount of data, the performance may significantly decline compared with the fixed bucket mode. Deployment initialization may take longer because the mapping needs to be loaded to RocksDB. The results of cross-partition updates vary based on the merge engine configuration. The following merge engines are supported:

  • deduplicate: deletes the existing data record and inserts a new data record to the specified partition.

  • aggregation or partial-update: updates the existing data record in the current partition.

  • first-row: retains the existing data record and discards the new data record.

Intra-partition updates

In a primary key table that uses the dynamic bucket mode, intra-partition updates occur when the primary keys include all partition keys. In such cases, Paimon can determine the partition of a data record based on the primary keys, but cannot determine the corresponding bucket. Therefore, Paimon creates an index to maintain a mapping between primary keys and buckets.

Every 100 million mapping records use 1GB of heap memory. Only the partition to which data is being written consumes heap memory.

The dynamic bucket mode requires additional heap memory but does not cause significant performance loss compared with the fixed bucket mode.

Bucket assignment

Mode

Description

Dynamic bucket mode

Data is first written into existing buckets. If the number of buckets is insufficient, a new bucket is automatically created. You can use the following parameters in the WITH clause to configure this mode:

  • dynamic-bucket.target-row-num: the maximum number of data records that can be stored in a bucket. Default value: 2000000.

  • dynamic-bucket.initial-buckets: the initial number of buckets. If you leave this parameter empty, the parallelism of the writer operator is used.

Fixed bucket mode

By default, Paimon assigns a data record to a bucket based on the hash value calculated for the primary keys of the data record.

To use a different method, configure the bucket-key parameter in the WITH clause when you create a primary key table. Separate multiple columns with commas (,). Make sure that the columns specified in the bucket-key parameter are included in the primary keys. For example, if you specify 'bucket-key' = 'c1,c2', Paimon determines the bucket of a data record based on the values in the c1 and c2 columns of the data record.

Change the number of buckets in fixed bucket mode

The number of buckets determines the parallelism of read or write operations. An excessively small number of buckets result in a large amount of data in each bucket, which affects performance. An excessively large number of buckets result in a large number of small files. We recommend that you set the total size of data in each bucket to 2 GB and do not specify a value greater than 5 GB. To change the number of buckets for a table that uses the fixed bucket mode, perform the following steps:

  1. Pause all deployments that write data to or consume data from the table.

  2. Create a script and execute the following SQL statement to configure the bucket parameter:

    ALTER TABLE `<catalog-name>`.`<database-name>`.`<table-name>` SET ('bucket' = '<bucket-num>');
  3. Reorganize data in the partitions that you want to use.

    • Unpartitioned table: Create a Blank Batch Draft, paste the following SQL statements in the editor, and then click Deploy and Start to run a batch deployment.

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`;
    • Partitioned table: Create a Blank Batch Draft, paste the following SQL statements in the editor, and then click Deploy and Start to run a batch deployment.

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (<partition-spec>)
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE <partition-condition>;

      Replace <partition-spec> and <partition-condition> with the partitions you want to reorganize. For example, dt = 20240312, hh = 08 specifies a partition in which the dt field is 20240312 and the hh field is 08.

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (dt = '20240312', hh = '08')
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE dt = '20240312' AND hh = '08';
  4. If the batch deployment runs as expected, resume the deployments that write data to or consume data from the table.

Changelog producer

To allow downstream consumption in streaming mode, a primary key table needs to generate a complete changelog for INSERT, DELETE, and UPDATE operations. The changelog is similar to the binlog in a database. To configure the method used to generate the changelog, specify the changelog-producer parameter in the WITH clause. The following table describes the valid values.

Valid value

Description

Scenarios

none

The primary key table does not generate a changelog.

Scenarios that do not involve data consumption in streaming mode.

input

The primary key table passes the input records to downstream consumers.

Scenarios in which the input data stream contains a complete changelog, such as the binlog of a database.

This option is most efficient because no additional computation is required.

lookup

The primary key table executes a lookup for the result of small file compaction to generate a complete changelog. Small file compaction is triggered at each checkpoint of the Flink deployment.

This option is applicable for any type of input data stream.

Compared with the full-compaction option, this option has a lower latency but consumes more resources. We recommend this option if your business requires a minute-level latency.

full-compaction

The primary key table generates a complete changelog each time a full compaction of small files is performed.

This option is applicable for any type of input data stream.

Compared with the lookup option, this option has a higher latency but consumes less resources. This option leverages the full compaction process to prevent additional computation and reduce resource consumption. We recommend this option if your business can accommodate a latency of up to several hours.

To ensure data freshness, specify 'full-compaction.delta-commits' = '<num>' in the WITH clause. This allows Paimon to perform full compaction of small files after a specific number (<num>) of Flink checkpoints. Take note that full compaction is resource-intensive. We recommend that you set the interval of full compaction to a value from 30 minutes to 1 hour.

Note

By default, Paimon generates a changelog record even if the updated data record is the same as the previous one. To prevent this issue, specify 'changelog-producer.row-deduplicate' = 'true' in the WITH clause. This configuration is valid only if you set the changelog-producer parameter to lookup or full-compaction. We recommend that you add this configuration only if a large number of unnecessary records may be generated in the changelog. This is because the comparison of values before and after the update requires additional computation.

Merge engine

Description

If a Paimon table receives multiple data records that have the same primary keys, the data records are merged based on the merge-engine parameter you specify in the WITH clause. Valid values:

deduplicate (default)

If you specify 'merge-engine' = 'deduplicate', Paimon retains only the most recent data record and discards other data records that have the same primary keys. If the most recent data record is a DELETE record, all data records that have the same primary keys are deleted. In this example, the following Data Definition Language (DDL) statement is used to create a Paimon table.

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine'='deduplicate' -- This configuration is optional because deduplicate is the default value.
);
  • If +I(1, 2.0, 'apple'), +I(1, 4.0, 'banana'), and +I(1, 8.0, 'cherry') are written to a table in sequence, the result of the SELECT * FROM T WHERE k = 1 query is (1, 8.0, 'cherry').

  • If +I(1, 2.0, 'apple'), +I(1, 4.0, 'banana'), and -D(1, 4.0, 'banana') are written to a table in sequence, no data is retrieved for the SELECT * FROM T WHERE k = 1 query.

first-row

If you specify 'merge-engine' = 'first-row', Paimon retains only the first data record in cases where multiple records have the same primary keys. Compared with the deduplicate engine, the first-row engine only generates INSERT-type changelogs, which increases the efficiency of changelog production.

Note
  • To allow for data consumption in streaming mode, set the changelog-producer parameter to lookup.

  • The first-row engine cannot process DELETE and UPDATE_BEFORE changes. To ignore them, specify 'first-row.ignore-delete' = 'true'.

  • The first-row engine does not support sequence fields.

In this example, the following DDL statement is used to create a Paimon table.

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'first-row'
);

If +I(1, 2.0, 'apple'), +I(1, 4.0, 'banana'), and +I(1, 8.0, 'cherry') are written to the table in sequence, the result of the SELECT * FROM T WHERE k = 1 query is (1, 2.0, 'apple').

aggregation

Paimon aggregates each column in data records that have the same primary keys based on the specified aggregate function. Use fields.<field-name>.aggregate-function to specify an aggregate function for each column that is not included in the primary keys. Otherwise, the last_non_null_value aggregate function is used.

Note

To allow for data consumption in streaming mode, set the changelog-producer parameter to lookup or full-compaction.

In the following example, the price column is aggregated by using the max function, and the sales column is aggregated by using the sum function.

CREATE TABLE T (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

If +I(1, 23.0, 15) and +I(1, 30.2, 20) are written to the table in sequence, the result of the SELECT * FROM T WHERE product_id = 1 query is (1, 30.2, 35).

The following list describes the supported aggregate functions and data types:

  • sum: supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE.

  • product: supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE.

  • count: supports INTEGER and BIGINT.

  • max or min: supports CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ.

  • first_value or last_value: supports all data types, including null.

  • first_not_null_value and last_non_null_value: supports all data types.

  • listagg: supports STRING.

  • bool_and and bool_or: BOOLEAN

Note

In the preceding aggregate functions, only sum, product, and count support retraction changes (UPDATE_BEFORE and DELETE changes). To ignore retraction changes for a column, specify 'fields.<field-name>.ignore-retract' = 'true' and replace <field-name> with the name of the column.

partial-update

If you specify 'merge-engine' = 'partial-update', you can gradually update the columns in the existing data record by using the most recent values from records that have the same primary keys. Null values do not overwrite existing values.

Note
  • To allow for data consumption in streaming mode, set the changelog-producer parameter to lookup or full-compaction.

  • The partial-update engine cannot process DELETE and UPDATE_BEFORE changes. To ignore these changes, specify 'partial-update.ignore-delete' = 'true'.

In this example, the following DDL statement is used to create a Paimon table.

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 BIGINT,
  v3 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update'
);

If +I(1, 23.0, 10, NULL), +I(1, NULL, NULL, 'This is a book'), and +I(1, 25.2, NULL, NULL) are written to the table in sequence, the result of the SELECT * FROM T WHERE k = 1 query is (1, 25.2, 10, 'This is a book').

You can also specify parameters related to sequence groups and aggregate functions in the WITH clause.

  • Use sequence groups to specify the merge order

    If you join columns from different tables to create a wide table, you can use sequence groups to specify the partial-update order of different columns to handle out-of-order data records.

    In the following example, columns a and b are updated in ascending order of the values in column g_1, and columns c and d are updated in ascending order of the values in column g_2.

    CREATE TABLE T (
      k INT,
      a STRING,
      b STRING,
      g_1 INT,
      c STRING,
      d STRING,
      g_2 INT,
      PRIMARY KEY (k) NOT ENFORCED
    ) WITH (
      'merge-engine' = 'partial-update',
      'fields.g_1.sequence-group' = 'a,b',
      'fields.g_2.sequence-group' = 'c,d'
    );
  • Use sequence groups and aggregate functions together

    You can also specify fields.<field-name>.aggregate-function in the WITH clause to apply aggregate functions to the column specified by <field-name>. The column specified by <field-name> must belong to a sequence group. You can use all aggregate functions supported by the aggregation engine.

    In the following example, columns a and b are updated in ascending order of the values in column g_1. The final result contains the most recent non-null value in column a and the maximum value in column b. Columns c and d are updated in ascending order of the values in column g_2. The final result contains the most recent non-null value in column c and the sum of values in column b.

    CREATE TABLE T (
      k INT,
      a STRING,
      b INT,
      g_1 INT,
      c STRING,
      d INT,
      g_2 INT,
      PRIMARY KEY (k) NOT ENFORCED
    ) WITH (
      'merge-engine' = 'partial-update',
      'fields.g_1.sequence-group' = 'a,b',
      'fields.b.aggregate-function' = 'max',
      'fields.g_2.sequence-group' = 'c,d',
      'fields.d.aggregate-function' = 'sum'
    );

For more information, see Merge Engine.

Out-of-order data handling

By default, Paimon determines the merge order based on the input order. The last input record is the last one to merge. If the input stream contains out-of-order data records, specify 'sequence.field' = '<column-name> in the WITH clause. This way, data records that have the same primary keys are merged in ascending order of the values in the column specified by <column-name>. The sequence.field parameter supports the following data types: TINYINT, SMALLINT, INTEGER, BIGINT, TIMESTAMP, and TIMESTAMP_LTZ.

Note

If you use MySQL as the input and specify the op_t metadata column as the sequence field, the sequence field value is the same for a pair of UPDATE_BEFORE and UPDATE_AFTER changes. To prevent this issue, specify 'sequence.auto-padding' = 'row-kind-flag' in the WITH clause. This ensures that Paimon processes the UPDATE_BEFORE change before the UPDATE_AFTER change.

Append-only tables (without primary keys)

An append-only table in Paimon has no primary keys. This type of table allows only INSERT operations in streaming mode and is suitable for scenarios that do not require streaming updates, such as log data synchronization.

Syntax

The following sample code shows how to create an append-only table. In this example, the partition key is set to dt for the table.

CREATE TABLE T (
  dt STRING
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) PARTITIONED BY (dt) WITH (
  'bucket' = '-1'
);

Subcategories

Subcategory

Definition

Usage notes

Append scalable table

If you specify 'bucket' = '-1' in the WITH clause when you create an append-only table, the table is an append scalable table.

This type of table is the counterpart to Hive tables and is suitable for scenarios in which data can be consumed in an order different from the writing order. Append scalable tables use the following methods to increase write efficiency: eliminate the shuffle of input records, support data sorting, provide flexible parallelism configuration, support direct conversion from Hive tables, and support completely asynchronous file compaction.

Append queue table

If you specify 'bucket' = '<num>' in the WITH clause when you create an append-only table, the table is an append queue table.

The value of <num> must be an integer greater than 0, which specifies the number of buckets used for an unpartitioned table, or the number of buckets used for each partition in a partitioned table.

This type of table is the counterpart to a message queue service that has a latency of several minutes. The number of buckets for an append queue table is equivalent to the number of partitions in a Kafka topic or the number of shards in an ApsaraMQ for MQTT instance.

Bucket assignment

Table type

Description

Append scalable table

Data is written to a single partition in parallel. The bucket concept is ignored and the order of data is not maintained. This means data is directly pushed to the writer and no hash partitioning is required. Therefore, this type of table provides a high write performance. Take note that if the upstream operator and the writer have the same parallelism, data skew may occur.

Append queue table

By default, Paimon assigns a data record to a bucket based on the values in all columns of the data record. To use a different method, configure the bucket-key parameter in the WITH clause when you create an append queue table. Separate multiple columns with commas (,).

For example, if you specify 'bucket-key' = 'c1,c2', Paimon determines the bucket of a data record based on the values in the c1 and c2 columns of the data record.

Note

We recommend that you specify the bucket-key parameter. This reduces the amount of computation during bucket assignment and improves write efficiency.

Data consumption order

Table type

Description

Append scalable table

This type of table is suitable for scenarios in which data can be consumed in an order different from the writing order.

Append queue table

This type of table ensures that records in each bucket are consumed in the order they were written.

  • For two records from different partitions:

    • If you specify 'scan.plan-sort-partition' = 'true', the record that has a smaller partition value is consumed first.

    • Otherwise, the record that has an earlier partition creation time is consumed first.

  • For two records from the same partition and the same bucket, the record written first is consumed first.

  • For two records from the same partition but different buckets, the consumption order is not guaranteed because different buckets may be concurrently processed by different Flink deployments.

References

For information about how to create a Paimon catalog and a Paimon table, see Manage Apache Paimon catalogs.