All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage Apache Paimon catalogs

Last Updated:Nov 01, 2024

After you configure an Apache Paimon catalog, you can directly access the Apache Paimon tables in the catalog stored in Alibaba Cloud Object Storage Service (OSS) from Realtime Compute for Apache Flink. This topic describes how to create, view, and delete an Apache Paimon catalog and manage Apache Paimon databases and tables in the development console of Realtime Compute for Apache Flink.

Background information

Apache Paimon catalogs can be used to efficiently manage all Apache Paimon tables in the same catalog. Apache Paimon catalogs can also work seamlessly with other Alibaba Cloud services. The following table describes the supported metadata storage types.

Metadata storage type

Description

Other Alibaba Cloud services that can access Apache Paimon tables in an Apache Paimon catalog

filesystem

Stores metadata only in a specific OSS path.

Compute engines such as Spark, Hive, and Trino in E-MapReduce (EMR). For more information, see Paimon.

dlf

Stores metadata in a specific OSS path and additionally synchronizes metadata to Alibaba Cloud Data Lake Formation (DLF).

maxcompute

Stores metadata in a specific OSS path; additionally creates, modifies, or deletes an external table in the specified MaxCompute project when you create, modify, or delete an Apache Paimon table. This facilitates querying data of an Apache Paimon table in MaxCompute.

MaxCompute. For more information, see Apache Paimon external tables.

sync

Combines the features of Apache Paimon DLF catalogs and Apache Paimon MaxCompute catalogs. This helps you connect to Hologres and MaxCompute by using the same catalog.

Note

In Apache Paimon Sync catalogs, the metadata of Apache Paimon DLF catalogs is synchronized to Apache Paimon MaxCompute catalogs. To ensure metadata consistency, do not manually modify or delete Apache Paimon external tables in MaxCompute.

Usage notes

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.5 or later allows you to create and configure Apache Paimon catalogs and Apache Paimon tables.

  • OSS is used to store files related to Apache Paimon tables. The files include data files and metadata files. Make sure that you have activated OSS and that the storage class of the associated OSS bucket is Standard. For more information, see Get started by using the OSS console and Overview of storage classes.

    Important

    While you can use the OSS bucket specified when you create your Realtime Compute for Apache Flink workspace, we recommend that you create and use a separate OSS bucket in the same region. This improves data isolation and minimizes the risk of misoperations.

  • The OSS bucket that you specify when you create an Apache Paimon catalog must reside in the same region as the MaxCompute project. The AccessKey pair that you specify when you create the Apache Paimon catalog must belong to an account that has the read and write permissions on the OSS bucket, MaxCompute project, and DLF catalog.

  • After you create or delete a catalog, database, or table by using SQL statements, you can click the image icon to refresh the Catalogs page.

  • The following table lists the compatibility between different versions of Apache Paimon and VVR.

    Apache Paimon version

    VVR version

    0.9

    8.0.7 and 8.0.8

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

Create an Apache Paimon catalog

You can use SQL statements to create Apache Paimon catalogs of all the preceding metadata storage types. However, only Apache Paimon catalogs of the filesystem and dlf metadata storage types can be created on the console. This section describes the creation of Apache Paimon catalogs of different metadata storage types and configuration parameters.

Create an Apache Paimon Filesystem catalog

Create an Apache Paimon DLF catalog on the console

  1. Go to the Catalog List page.

    1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    2. In the left-side navigation pane, click Catalogs.

  2. Click Create Catalog. On the Built-in Catalog tab of the wizard that appears, choose Apache Paimon and click Next.

  3. In the Configure Catalog step, configure the parameters.

Create an Apache Paimon catalog through SQL

CREATE CATALOG `my-catalog` WITH (
  'type' = 'paimon',
  'metastore' = 'filesystem',
  'warehouse' = '<warehouse>',
  'fs.oss.endpoint' = '<fs.oss.endpoint>',
  'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
  'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
);

The following table describes the configuration parameters.

  • Common parameters

    Parameter

    Description

    Required

    Remarks

    my-catalog

    The name of the Apache Paimon catalog.

    Required

    Enter a custom name.

    type

    The type of the catalog.

    Required

    Set the value to paimon.

    metastore

    The metadata storage type.

    Required

    Valid values:

    • filesystem: Creates an Apache Paimon Filesystem catalog.

    • dlf: Creates an Apache Paimon DLF catalog.

    • maxcompute: Creates an Apache Paimon MaxCompute catalog.

    • sync: Creates an Apache Paimon Sync catalog.

  • OSS

    Parameter

    Description

    Required

    Remarks

    warehouse

    The data warehouse directory that is specified in OSS.

    Required

    Format: oss://<bucket>/<object>. Field description:

    • bucket: the name of the OSS bucket that you created.

    • object: the path in which your data is stored.

    You can view the values of the bucket and object fields in the OSS console.

    fs.oss.endpoint

    The endpoint of OSS.

    Required

    • If DLF resides in the same region as your Realtime Compute for Apache Flink workspace, use the VPC endpoint. If they are not in the same region, use the public endpoint.

    • These parameters are required if the OSS bucket specified by the warehouse parameter does not reside in the same region as the Realtime Compute for Apache Flink workspace or if an OSS bucket within another Alibaba Cloud account is used.

    For more information about how to obtain the required information, see Regions, endpoints, and open ports and Create an AccessKey pair.

    fs.oss.accessKeyId

    The AccessKey ID of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

    Required

    fs.oss.accessKeySecret

    The AccessKey secret of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

    Required

Create an Apache Paimon DLF catalog

Important

Apache Paimon DLF 2.0 catalogs and Apache Paimon DLF 1.0 catalogs are very different in the creation methods and configuration parameters. Make sure you follow the instructions specific to your DLF version.

DLF 2.0

Create an Apache Paimon DLF catalog on the console

  1. Go to the Catalog List page.

    1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    2. In the left-side navigation pane, click Catalogs.

  2. Click Create Catalog. On the Built-in Catalog tab of the wizard that appears, choose Apache Paimon and click Next.

  3. In the Configure Catalog step, configure the parameters.

    image

    Parameter

    Description

    Required

    Remarks

    metastore

    Metadata storage type

    Required

    dlf: Set this parameter to dlf to create an Apache Paimon DLF catalog.

    catalog name

    Catalog name

    Required

    You can choose an existing DLF catalog from the drop-down list.

Create an Apache Paimon DLF catalog through SQL

Execute the following SQL statement on the SQL Editor page. For more information, see Scripts.

CREATE CATALOG `my-catalog` WITH(
'metastore'='dlf-paimon',
'type'='paimon',
'dlf.endpoint'='<dlf.endpoint>',
'dlf.region'='<dlf.region>',
'dlf.catalog.instance.id'='<dlf.catalog.instance.id>'
);

The following table describes the parameters.

Parameter

Description

Required

Remarks

my-catalog

The name of the Apache Paimon catalog.

Required

Set the value to a custom name.

metastore

The metadata storage type.

Required

Set the value to dlf-paimon.

type

The type of the catalog.

Required

Set the value to paimon.

dlf.endpoint

The endpoint of DLF.

Required

Cross-service endpoint: dlfnext-share.[regionId].aliyuncs.com.

VPC endpoint: dlfnext-vpc.[regionId].aliyuncs.com

Public endpoint: dlfnext.[regionId].aliyuncs.com

For more information, see Supported regions and endpoints.

Example endpoint: dlfnext-vpc.cn-hangzhou.aliyuncs.com.

dlf.region

The region of DLF.

Required

For more information, see Supported regions and endpoints. Example: cn-hangzhou.

Note

Make sure that the region you specified in this parameter matches the endpoint you specified for the dlf.endpoint parameter.

dlf.catalog.instance.id

The ID of the DLF data directory.

Required

You can view the ID of the data directory in the DLF console. For more information, see Data catalog.

DLF 1.0

Create an Apache Paimon DLF catalog on the console

  1. Go to the Catalog List page.

    1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    2. In the left-side navigation pane, click Catalogs.

  2. Click Create Catalog. On the Built-in Catalog tab of the wizard that appears, choose Apache Paimon and click Next.

  3. In the Configure Catalog step, configure the parameters.

Create an Apache Paimon DLF catalog through SQL

Execute the following SQL statement as instructed in Scripts.

CREATE CATALOG `my-catalog` WITH (
  'type' = 'paimon',
  'metastore' = 'dlf',
  'warehouse' = '<warehouse>',
  'dlf.catalog.id' = '<dlf.catalog.id>',
  'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>',
  'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>',
  'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>',
  'dlf.catalog.region' = '<dlf.catalog.region>',
  'fs.oss.endpoint' = '<fs.oss.endpoint>',
  'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
  'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
);

The following table describes the parameters.

  • Common parameters

    Parameter

    Description

    Required

    Remarks

    my-catalog

    The name of the Apache Paimon catalog.

    Required

    Enter a custom name.

    type

    The type of the catalog.

    Required

    Set the value to paimon.

    metastore

    The metadata storage type.

    Required

    Set the value to dlf.

  • OSS

    Parameter

    Description

    Required

    Remarks

    warehouse

    The data warehouse directory that is specified in OSS.

    Required

    Format: oss://<bucket>/<object>. Field description:

    • bucket: the name of your OSS bucket.

    • object: the path in which your data is stored.

    You can view the values of the bucket and object fields in the OSS console.

    fs.oss.endpoint

    The endpoint of OSS.

    Required

    • If DLF resides in the same region as your Realtime Compute for Apache Flink workspace, use the VPC endpoint. If they are not in the same region, use the public endpoint.

    • If you want to store Apache Paimon tables in OSS-Apsara File Storage for HDFS (HDFS), set the value of the fs.oss.endpoint parameter in this format: cn-<region>.oss-dls.aliyuncs.com format. Example: cn-hangzhou.oss-dls.aliyuncs.com.

    fs.oss.accessKeyId

    The AccessKey ID of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

    Required

    For more information about how to obtain the required information, see Regions, endpoints, and open ports and Create an AccessKey pair.

    fs.oss.accessKeySecret

    The AccessKey secret of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

    Required

  • DLF

    Parameter

    Description

    Required

    Remarks

    dlf.catalog.id

    The ID of the DLF data directory.

    Required

    You can view the ID of the data directory in the DLF console.

    dlf.catalog.accessKeyId

    The AccessKey ID that is used to access the DLF service.

    Required

    For more information about how to obtain an AccessKey ID, see Create an AccessKey pair.

    dlf.catalog.accessKeySecret

    The AccessKey secret that is used to access the DLF service.

    Required

    For more information about how to obtain an AccessKey secret, see Create an AccessKey pair.

    dlf.catalog.endpoint

    The endpoint of DLF.

    Required

    For more information, see Supported regions and endpoints.

    Note

    If DLF resides in the same region as your Realtime Compute for Apache Flink workspace, use the VPC endpoint. If they are not in the same region, use the public endpoint.

    dlf.catalog.region

    The region of DLF.

    Required

    For more information, see Supported regions and endpoints.

    Note

    Make sure that the value of this parameter matches the endpoint specified by the dlf.catalog.endpoint parameter.

Create an Apache Paimon MaxCompute catalog

  • Prerequisites

    The Apache Paimon plug-in file is uploaded to your MaxCompute project by using one of the following methods:

    • Use the MaxCompute client (odpscmd) to access the MaxCompute project, and run the ADD JAR <path_to_paimon_maxcompute_connector.jar>; command to upload the Apache Paimon plug-in file to the MaxCompute project.

    • Create a resource in the DataWorks console to upload the Apache Paimon plug-in file to the MaxCompute project. For more information, see Create and use MaxCompute resources.

  • SQL statements:

    CREATE CATALOG `my-catalog` WITH (
      'type' = 'paimon',
      'metastore' = 'maxcompute',
      'warehouse' = '<warehouse>',
      'maxcompute.endpoint' = '<maxcompute.endpoint>',
      'maxcompute.project' = '<maxcompute.project>',
      'maxcompute.accessid' = '<maxcompute.accessid>',
      'maxcompute.accesskey' = '<maxcompute.accesskey>',
      'maxcompute.oss.endpoint' = '<maxcompute.oss.endpoint>',
      'fs.oss.endpoint' = '<fs.oss.endpoint>',
      'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
      'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
    );
    Note

    When you create an Apache Paimon table in the Apache Paimon MaxCompute catalog, an Apache Paimon external table is automatically created in the MaxCompute project. To query the Apache Paimon external table in MaxCompute, you must execute the following SET statements, and then execute the SELECT statement in MaxCompute. For more information, see the "Step 4: Read data from the Apache Paimon external table on the MaxCompute client (odpscmd) or by using a tool that can execute MaxCompute SQL statements" section of the Apache Paimon external tables topic.

    SET odps.sql.common.table.planner.ext.hive.bridge = true;
    SET odps.sql.hive.compatible = true;
  • The following table describes the parameters in the SQL statements.

    • Common parameters

      Parameter

      Description

      Required

      Remarks

      my-catalog

      The name of the Apache Paimon catalog.

      Required

      Enter a custom name.

      type

      The type of the catalog.

      Required

      Set the value to paimon.

      metastore

      The metadata storage type.

      Required

      Valid values:

      • filesystem: Creates an Apache Paimon Filesystem catalog.

      • dlf: Creates an Apache Paimon DLF catalog.

      • maxcompute: Creates an Apache Paimon MaxCompute catalog.

      • sync: Creates an Apache Paimon Sync catalog.

    • OSS

      Parameter

      Description

      Required

      Remarks

      warehouse

      The data warehouse directory that is specified in OSS.

      Required

      Format: oss://<bucket>/<object>. Field description:

      • bucket: the name of the OSS bucket that you created.

      • object: the path in which your data is stored.

      You can view the values of the bucket and object fields in the OSS console.

      fs.oss.endpoint

      The endpoint of OSS.

      Required

      • If DLF resides in the same region as your Realtime Compute for Apache Flink workspace, use the VPC endpoint. If they are not in the same region, use the public endpoint.

      • These parameters are required if the OSS bucket specified by the warehouse parameter does not reside in the same region as the Realtime Compute for Apache Flink workspace or if an OSS bucket within another Alibaba Cloud account is used.

      For more information about how to obtain the required information, see Regions, endpoints, and open ports and Create an AccessKey pair.

      fs.oss.accessKeyId

      The AccessKey ID of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

      Required

      fs.oss.accessKeySecret

      The AccessKey secret of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

      Required

    • MaxCompute

      Parameter

      Description

      Required

      Remarks

      maxcompute.endpoint

      The endpoint of MaxCompute.

      Required

      For more information, see Endpoints.

      maxcompute.project

      The name of the MaxCompute project.

      Required

      MaxCompute projects with the schema feature enabled are not supported.

      maxcompute.accessid

      The AccessKey ID of the Alibaba Cloud account that has permissions on MaxCompute.

      Required

      For more information about how to obtain the AccessKey ID, see Create an AccessKey pair.

      maxcompute.accesskey

      The AccessKey secret of the Alibaba Cloud account that has permissions on MaxCompute.

      Required

      For more information about how to obtain the AccessKey secret, see Create an AccessKey pair.

      maxcompute.oss.endpoint

      The endpoint that is used to access OSS from MaxCompute.

      No

      If you do not specify this parameter, the value of the fs.oss.endpoint parameter is used by default. For more information about how to obtain the required information, see Regions, endpoints, and open ports.

      Note

      We recommend that you set the maxcompute.oss.endpoint parameter to an VPC endpoint because the OSS bucket resides in the same region as the MaxCompute project.

      maxcompute.life-cycle

      The lifecycle of the MaxCompute external table.

      No

      Unit: day.

Create an Apache Paimon Sync catalog

CREATE CATALOG `my-catalog` WITH (
  'type' = 'paimon',
  'metastore' = 'sync',
  'source' = 'dlf',
  'target' = 'maxcompute',
  'warehouse' = '<warehouse>',
  'dlf.catalog.id' = '<dlf.catalog.id>',
  'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>',
  'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>',
  'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>',
  'dlf.catalog.region' = '<dlf.catalog.region>',
  'maxcompute.endpoint' = '<maxcompute.endpoint>',
  'maxcompute.project' = '<maxcompute.project>',
  'maxcompute.accessid' = '<maxcompute.accessid>',
  'maxcompute.accesskey' = '<maxcompute.accesskey>',
  'maxcompute.oss.endpoint' = '<maxcompute.oss.endpoint>',
  'fs.oss.endpoint' = '<fs.oss.endpoint>',
  'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
  'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
);

The following table describes the parameters in the SQL statements.

  • Common parameters

    Parameter

    Description

    Required

    Remarks

    my-catalog

    The name of the Apache Paimon catalog.

    Required

    Enter a custom name.

    type

    The type of the catalog.

    Required

    Set the value to paimon.

    metastore

    The metadata storage type.

    Required

    Valid values:

    • filesystem: Creates an Apache Paimon Filesystem catalog.

    • dlf: Creates an Apache Paimon DLF catalog.

    • maxcompute: Creates an Apache Paimon MaxCompute catalog.

    • sync: Creates an Apache Paimon Sync catalog.

  • sync

    Parameter

    Description

    Required

    Remarks

    source

    The storage service from which metadata is synchronized.

    Required

    Set the value to dlf.

    target

    The storage service to which metadata is synchronized.

    Required

    Set the value to maxcompute.

  • OSS

    Parameter

    Description

    Required

    Remarks

    warehouse

    The data warehouse directory that is specified in OSS.

    Required

    Format: oss://<bucket>/<object>. Field description:

    • bucket: the name of the OSS bucket that you created.

    • object: the path in which your data is stored.

    You can view the values of the bucket and object fields in the OSS console.

    fs.oss.endpoint

    The endpoint of OSS.

    Required

    • If DLF resides in the same region as your Realtime Compute for Apache Flink workspace, use the VPC endpoint. If they are not in the same region, use the public endpoint.

    • These parameters are required if the OSS bucket specified by the warehouse parameter does not reside in the same region as the Realtime Compute for Apache Flink workspace or if an OSS bucket within another Alibaba Cloud account is used.

    For more information about how to obtain the required information, see Regions, endpoints, and open ports and Create an AccessKey pair.

    fs.oss.accessKeyId

    The AccessKey ID of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

    Required

    fs.oss.accessKeySecret

    The AccessKey secret of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

    Required

  • DLF

    Parameter

    Description

    Required

    Remarks

    dlf.catalog.id

    The ID of the DLF data directory.

    Required

    You can view the ID of the data directory in the DLF console.

    dlf.catalog.accessKeyId

    The AccessKey ID that is used to access the DLF service.

    Required

    For more information about how to obtain an AccessKey ID, see Create an AccessKey pair.

    dlf.catalog.accessKeySecret

    The AccessKey secret that is used to access the DLF service.

    Required

    For more information about how to obtain an AccessKey secret, see Create an AccessKey pair.

    dlf.catalog.endpoint

    The endpoint of DLF.

    Required

    For more information, see Supported regions and endpoints.

    Note

    If DLF resides in the same region as your Realtime Compute for Apache Flink workspace, use the VPC endpoint. If they are not in the same region, use the public endpoint.

    dlf.catalog.region

    The region in which DLF is deployed.

    Required

    For more information, see Supported regions and endpoints.

    Note

    Make sure that the value of this parameter matches the endpoint that is specified by the dlf.catalog.endpoint parameter.

  • maxcompute

    Parameter

    Description

    Required

    Remarks

    maxcompute.endpoint

    The endpoint of MaxCompute.

    Required

    For more information, see Endpoints.

    maxcompute.project

    The name of the MaxCompute project.

    Required

    MaxCompute projects with the schema feature enabled are not supported.

    maxcompute.accessid

    The AccessKey ID of the Alibaba Cloud account that has permissions on MaxCompute.

    Required

    For more information about how to obtain the AccessKey ID, see Create an AccessKey pair.

    maxcompute.accesskey

    The AccessKey secret of the Alibaba Cloud account that has permissions on MaxCompute.

    Required

    For more information about how to obtain the AccessKey secret, see Create an AccessKey pair.

    maxcompute.oss.endpoint

    The endpoint that is used to access OSS from MaxCompute.

    No

    If you do not specify this parameter, the value of the fs.oss.endpoint parameter is used by default. For more information about how to obtain the required information, see Regions, endpoints, and open ports.

    Note

    We recommend that you set the maxcompute.oss.endpoint parameter to an VPC endpoint because the OSS bucket resides in the same region as the MaxCompute project.

    maxcompute.life-cycle

    The lifecycle of the MaxCompute external table.

    No

    Unit: day.

Manage an Apache Paimon database

You can manage an Apache Paimon database by executing the following commands on the SQL Editor page. For more information, see Scripts.

  • Create a database

    After you create an Apache Paimon catalog, a database named default is automatically created in the catalog.

    -- Replace my-catalog with the name of the actual Apache Paimon catalog. 
    USE CATALOG `my-catalog`;
    
    -- Replace my_db with a custom database name. 
    CREATE DATABASE `my_db`;
  • Drop a database

    Important

    You cannot drop the default database from an Apache Paimon DLF catalog, an Apache Paimon MaxCompute catalog, or an Apache Paimon Sync catalog. You can only drop the default database from an Apache Paimon Filesystem catalog.

    -- Replace my-catalog with the name of the actual Apache Paimon catalog. 
    USE CATALOG `my-catalog`;
    
    -- Replace my_db with the name of the database that you want to drop. 
    DROP DATABASE 'my_db'; -- Drop an empty database. 
    DROP DATABASE `my_db` CASCADE; -- Drop the database and all the associated tables.

Manage an Apache Paimon table

Create an Apache Paimon table

Note

After you configure an Apache Paimon catalog, you can reference tables in the Apache Paimon catalog in a draft. When you use a table in the Apache Paimon catalog as a source table, sink table, or dimension table, you do not need to define the table schema through DDL. To create an Apache Paimon table by using an SQL statement, you can use a fully qualified table name in this format: ${Paimon-catalog-name}.${Paimon-db-name}.${Paimon-table-name}. You can also first set the default catalog and database by using use catalog ${Paimon-catalog-name} and use ${Paimon-db-name}, then reference the table name as ${Paimon-table-name} in subsequent SQL statements.

  • Create an Apache Paimon table by using the CREATE TABLE statement

    You can create an Apache Paimon database by executing the following commands on the SQL Editor page. For more information, see Scripts.

    In the my_db database of a catalog named my-catalog, create a partitioned table in which the partition key is dt, the primary key columns are dt, shop_id, and user_id, and the number of buckets is fixed to 4. Sample code:

    -- Replace my-catalog with the name of the actual Apache Paimon catalog. 
    -- Replace my_db with the name of the actual database. 
    -- You can replace my_tbl with a custom name. 
    CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
      dt STRING,
      shop_id BIGINT,
      user_id BIGINT,
      num_orders INT,
      total_amount INT,
      PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
    ) PARTITIONED BY (dt) WITH (
      'bucket' = '4'
    );

    For more information about the parameters and usage of Apache Paimon tables, see Apache Paimon connector and Primary key tables and append-only tables.

  • Create an Apache Paimon table by using the CREATE TABLE AS (CTAS) or CREATE DATABASE AS (CDAS) statement.

    The CTAS and CDAS statements automatically synchronize data and changes to the table schema, facilitating synchronization from data sources such as MySQL and Kafka to an Apache Paimon catalog.

    Because the CTAS and CDAS statements synchronize data, you must publish a draft as a deployment and start the deployment. For more information, see Develop an SQL draft and Start a deployment.

    Note
    • When you create an Apache Paimon table by using the CTAS or CDAS statement, you cannot specify bucket' = '-1' to use Apache Paimon primary key tables or Apache Paimon append-only tables (non-primary key tables) based on dynamic bucketing.

    • The CTAS and CDAS statements allow you to configure properties for a physical table in the WITH clause as you create the table. When the deployment starts, these properties take effect on the target tables to which you want to synchronize data. For more information about the table properties supported by the statements, see Apache Paimon connector.

    • Create a table and synchronize data to the table

      In the following sample code, the Apache Paimon table `my-catalog`.`my_db`.`web_sales` is automatically created based on the schema of the mysql.tpcds.web_sales table. Data in the mysql.tpcds.web_sales table is synchronized to the Apache Paimon table. The number of buckets for the Apache Paimon table is set to 4, and the input-based incremental data generation mechanism is used.

      CREATE TABLE IF NOT EXISTS `<catalog name>`.`<db name>`.`<table name>`
      WITH (
        'bucket' = '4',
        'changelog-producer' = 'input'
      ) AS TABLE mysql.tpcds.web_sales;
    • Create a database and synchronize data in all tables to the database

      In the following sample code, Apache Paimon tables are automatically created in the `my-catalog`.`my_db` database based on the schema of each table in the mysql.tpcds database. Data of all tables in the mysql.tpcds database is synchronized to the Apache Paimon tables. The input-based incremental data generation mechanism is used.

      CREATE DATABASE IF NOT EXISTS `<catalog name>`.`<db name>`
      WITH (
        'changelog-producer' = 'input'
      ) AS DATABASE mysql.tpcds INCLUDING ALL TABLES;
    • Synchronize column type changes

      The CTAS or CDAS statement allows you to add columns and change the data types of specific columns when creating Apache Paimon tables. You can determine whether to use the type normalization mode based on your business requirements.

      • Non-type normalization mode (default mode)

        The column types of Apache Paimon tables created using the CTAS or CDAS statement are the same as the column types of their source tables. The rules of changing column types are as follows:

        • Integer data types TINYINT, SMALLINT, INT, and BIGINT can be changed to integer data types with the same or higher precision. The precision of TINYINT is the lowest and the precision of BIGINT is the highest.

        • Floating-point data types FLOAT and DOUBLE can be changed to floating-point data types with the same or higher precision. The precision of FLOAT is the lowest and the precision of DOUBLE is the highest.

        • String data types CHAR, VARCHAR, and STRING can be changed to string data types with the same or higher precision.

      • Type normalization mode

        When you create Apache Paimon tables by using the CTAS or CDAS statement, you can specify 'enableTypeNormalization' = 'true' in the WITH clause to enable type normalization mode. This means changing data type in the source table will not lead to a deployment failure if the old and new data types can be normalized, which means they can be converted into the same data type. Type normalization rules are as follows:

        • TINYINT, SMALLINT, INT, and BIGINT are normalized into BIGINT.

        • FLOAT and DOUBLE are normalized into DOUBLE.

        • CHAR, VARCHAR, and STRING are normalized into STRING.

        • Other data types are not normalized.

        For example,

        • if you change SMALLINT to INT, the deployment runs as expected because both are normalized to BIGINT;

        • changing from FLOAT to BIGINT causes an exception because FLOAT and BIGINT cannot be not normalized.

        Apache Paimon stores data in its normalized type. For example, SMALLINT and INT from a MySQL table become BIGINT in an Apache Paimon table.

Modify the schema of an Apache Paimon table

You can manage an Apache Paimon database by executing the following commands on the SQL Editor page. For more information, see Scripts.

Operation

Remarks and sample code

Add or modify a table parameter

Set the value of the write-buffer-size parameter to 256 MB and the value of the write-buffer-spillable parameter to true.

ALTER TABLE my_table SET (
  'write-buffer-size' = '256 MB',
  'write-buffer-spillable' = 'true'
);

Temporarily modify a table parameter

You can use a hint to temporarily modify a table parameter. The modification takes effect only for the current SQL draft.

  • Temporarily set the write-buffer-size parameter to 256 MB and the write-buffer-spillable parameter to true when inserting data into the my_table table.

    INSERT INTO my_table /*+ OPTIONS('write-buffer-size' = '256 MB', 'write-buffer-spillable' = 'true') */
    SELECT ...;
  • Temporarily set the scan.mode parameter to latest and the scan.parallelism parameter to 10 when data is consumed from the my_table table.

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest', 'scan.parallelism' = '10') */;

Rename a table

Rename the my_table table to my_table_new.

ALTER TABLE my_table RENAME TO my_table_new;
Important

The rename operation in OSS is not atomic. Therefore, if you use OSS to store Apache Paimon table files, exercise caution when you rename an Apache Paimon table. We recommend you use OSS-HDFS to ensure the atomicity of operations on table files.

Add a column

  • Add the c1 column of the INT data type and the c2 column of the STRING data type to the end of the my_table table.

    ALTER TABLE my_table ADD (c1 INT, c2 STRING);
  • Add the c2 column of the STRING data type after the c1 column in the my_table table.

    ALTER TABLE my_table ADD c2 STRING AFTER c1;
  • Add the c1 column of the INT data type to the beginning of the my_table table.

    ALTER TABLE my_table ADD c1 INT FIRST;

Rename a column

Rename a column named c0 in my_table to c1.

ALTER TABLE my_table RENAME c0 TO c1;

Drop a column

Drop the c1 and c2 columns from my_table.

ALTER TABLE my_table DROP (c1, c2);

Drop a partition

Drop the dt=20240108,hh=06 and dt=20240109,hh=07 partitions from the my_table table.

ALTER TABLE my_table DROP PARTITION (`dt` = '20240108', `hh` = '08'), PARTITION (`dt` = '20240109', `hh` = '07');

Change the comment of a column

Change the comment of the buy_count column in the my_table table to 'this is buy count'.

ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'this is buy count';

Change the order of a column

  • Move the col_a column of the DOUBLE data type to the beginning of the my_table table.

    ALTER TABLE my_table MODIFY col_a DOUBLE FIRST;
  • Move the col_a column of the DOUBLE data type to the position after the col_b column in the my_table table.

    ALTER TABLE my_table MODIFY col_a DOUBLE AFTER col_b;

Change the data type of a column

Change the data type of the col_a column in the my_table table to DOUBLE.

ALTER TABLE my_table MODIFY col_a DOUBLE;

The following table describes the changes of column types that are supported by Apache Paimon tables. 〇 indicates that the change is supported while an empty cell indicates that the change is not supported.

image

Drop an Apache Paimon table

You can manage an Apache Paimon database by executing the following commands on the SQL Editor page. For more information, see Scripts.

-- Replace my-catalog with the name of the actual Apache Paimon catalog. 
-- Replace my_db with the name of the database that you want to use. 
-- Replace my_tbl with the name of the Apache Paimon table that you created. 
DROP TABLE `my-catalog`.`my_db`.`my_tbl`;

If the message The following statement has been executed successfully! appears, the Apache Paimon table is dropped.

View or drop an Apache Paimon catalog

  1. On the Realtime Compute for Apache Flink console, find the workspace that you want to manage and click Console in the Actions column.

  2. In the left-side navigation pane, click Catalogs. A list of Apache Paimon catalogs displays.

    • View an Apache Paimon catalog: In the Catalog List section, find the catalog that you want to manage, and you can view its Name and Type. To view the databases and tables in the catalog, click View in the Actions column.

    • Drop an Apache Paimon catalog: In the Catalog List section, find the catalog that you want to manage and click Delete in the Actions column.

      Note

      After the Apache Paimon catalog is deleted, only the catalog information on the Catalogs page in the Realtime Compute for Apache Flink workspace is deleted. The data files of the Apache Paimon tables remain. After the Apache Paimon catalog is deleted, you can re-create the Apache Paimon catalog by executing an SQL statement. Then, you can use the Apache Paimon tables in the catalog again.

      You can also drop an Apache Paimon catalog by executing the DROP CATALOG <catalog name>; command on the SQL Editor page. For more information, see Scripts.

References

  • After you create an Apache Paimon table, you can consume data from or write data to the table. For more information, see Write data to and consume data from a Paimon table.

  • If the built-in catalogs of Realtime Compute for Apache Flink cannot meet your business requirements, you can use custom catalogs. For more information, see Manage custom catalogs.

  • For more information about common optimization methods for Apache Paimon primary key tables and append scalable tables in different scenarios, see Performance optimization.