All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage Apache Paimon catalogs

更新時間:Jun 24, 2024

After you configure an Apache Paimon catalog, you can directly access the Apache Paimon tables in the catalog in Alibaba Cloud Object Storage Service (OSS) from Realtime Compute for Apache Flink. This topic describes how to create, view, and delete an Apache Paimon catalog and manage Apache Paimon databases and tables in the development console of Realtime Compute for Apache Flink.

Background information

Apache Paimon catalogs can be used to efficiently manage all Apache Paimon tables in the same directory. Apache Paimon catalogs can also be used by other Alibaba Cloud services. The following table describes the supported metadata storage types. You can select a metadata storage type based on your business requirements.

Metadata storage type

Description

Other Alibaba Cloud services that can access Apache Paimon tables in an Apache Paimon catalog

filesystem

Stores metadata only in a specific path in OSS.

Compute engines such as Spark, Hive, and Trino in E-MapReduce (EMR). For more information, see Apache Paimon of EMR.

dlf

Stores metadata in a specific path in OSS and synchronizes metadata to Alibaba Cloud Data Lake Formation (DLF).

maxcompute

Stores metadata in a specific path in OSS and creates, modifies, or deletes an external table in a specified MaxCompute project when you create, modify, or delete an Apache Paimon table. This helps you query data of an Apache Paimon table in MaxCompute.

MaxCompute. For more information, see Apache Paimon external tables.

sync

Combines the features of Apache Paimon DLF catalogs and Apache Paimon MaxCompute catalogs. This helps you connect to Hologres and MaxCompute by using the same catalog.

Note

In Apache Paimon Sync catalogs, the metadata of Apache Paimon DLF catalogs is synchronized to Apache Paimon MaxCompute catalogs. To ensure metadata consistency, do not manually modify or delete Apache Paimon external tables in MaxCompute.

Precautions

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.5 or later allows you to create and configure Apache Paimon catalogs and Apache Paimon tables.

  • OSS is used to store files related to Apache Paimon tables, including data files and metadata files. Make sure that you have activated OSS and that the storage class of the OSS bucket is Standard. For more information, see Get started by using the OSS console and Overview.

    Important

    You can also use the OSS bucket that you specify when you activate the Realtime Compute for Apache Flink service. However, to better distinguish data and prevent misoperations, we recommend that you create and use an OSS bucket that resides in the same region as Realtime Compute for Apache Flink.

  • The OSS bucket that you specify when you create an Apache Paimon catalog must reside in the same region as the MaxCompute project. The AccessKey pair that you specify when you create the Apache Paimon catalog must belong to an account that has read and write permissions on the OSS bucket, MaxCompute project, and DLF directory.

  • After you create or delete a catalog, database, or table by using SQL statements, you can click the image icon to refresh the Catalogs page.

Create an Apache Paimon catalog

All the preceding metadata storage types allow you to create an Apache Paimon catalog by using an SQL statement, whereas only the filesystem and dlf metadata storage types allow you to create an Apache Paimon catalog on the UI. The parameters for creating an Apache Paimon catalog by using an SQL statement are basically the same as the parameters for creating an Apache Paimon catalog on the UI. This section describes how to create an Apache Paimon catalog by using an SQL statement and the required parameters.

Create an Apache Paimon catalog

  1. In the script editor, enter the SQL statement that is used to create an Apache Paimon catalog and change the values of the required parameters.

    For more information, see the following topics:

  2. Select the SQL statement that is used to create the catalog and click Run in the upper-left corner of the script editor.

    image..png

  3. View the catalog that you created in the Catalogs pane on the left side of the Catalog List page.

Create an Apache Paimon Filesystem catalog

CREATE CATALOG `my-catalog` WITH (
  'type' = 'paimon',
  'metastore' = 'filesystem',
  'warehouse' = '<warehouse>',
  'fs.oss.endpoint' = '<fs.oss.endpoint>',
  'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
  'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
);

The following table describes the parameters in the sample code.

  • Common parameters

    Parameter

    Description

    Required

    Remarks

    my-catalog

    The name of the Apache Paimon catalog.

    Yes

    Enter a custom name.

    type

    The type of the catalog.

    Yes

    Set the value to paimon.

    metastore

    The metadata storage type.

    Yes

    Valid values:

    • filesystem: Set this parameter to filesystem when you create an Apache Paimon Filesystem catalog.

    • dlf: Set this parameter to dlf when you create an Apache Paimon DLF catalog.

    • maxcompute: Set this parameter to maxcompute when you create an Apache Paimon MaxCompute catalog.

    • sync: Set this parameter to sync when you create an Apache Paimon Sync catalog.

  • OSS

    Parameter

    Description

    Required

    Remarks

    warehouse

    The data warehouse directory that is specified in OSS.

    Yes

    The format is oss://<bucket>/<object>. Parameters in the directory:

    • bucket: indicates the name of the OSS bucket that you created.

    • object: indicates the path in which your data is stored.

    You can view the names of your bucket and object in the OSS console.

    fs.oss.endpoint

    The endpoint of OSS.

    Yes

    • If DLF resides in the same region as Realtime Compute for Apache Flink, the virtual private cloud (VPC) endpoint of OSS is used. Otherwise, the public endpoint is used.

    • If you want to store Apache Paimon tables in OSS-HDFS, the value of the fs.oss.endpoint parameter must be in the cn-<region>.oss-dls.aliyuncs.com format. Example: cn-hangzhou.oss-dls.aliyuncs.com.

    fs.oss.accessKeyId

    The AccessKey ID of the Alibaba Cloud account or Resource Access Management (RAM) user that has read and write permissions on OSS.

    Yes

    For more information about how to obtain the required information, see Regions and endpoints and Create an AccessKey pair.

    fs.oss.accessKeySecret

    The AccessKey secret of the Alibaba Cloud account or RAM user that has read and write permissions on OSS.

    Yes

Create an Apache Paimon DLF catalog

CREATE CATALOG `my-catalog` WITH (
  'type' = 'paimon',
  'metastore' = 'dlf',
  'warehouse' = '<warehouse>',
  'dlf.catalog.id' = '<dlf.catalog.id>',
  'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>',
  'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>',
  'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>',
  'dlf.catalog.region' = '<dlf.catalog.region>',
  'fs.oss.endpoint' = '<fs.oss.endpoint>',
  'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
  'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
);

The following table describes the parameters in the sample code.

  • Common parameters

    Parameter

    Description

    Required

    Remarks

    my-catalog

    The name of the Apache Paimon catalog.

    Yes

    Enter a custom name.

    type

    The type of the catalog.

    Yes

    Set the value to paimon.

    metastore

    The metadata storage type.

    Yes

    Valid values:

    • filesystem: Set this parameter to filesystem when you create an Apache Paimon Filesystem catalog.

    • dlf: Set this parameter to dlf when you create an Apache Paimon DLF catalog.

    • maxcompute: Set this parameter to maxcompute when you create an Apache Paimon MaxCompute catalog.

    • sync: Set this parameter to sync when you create an Apache Paimon Sync catalog.

  • OSS

    Parameter

    Description

    Required

    Remarks

    warehouse

    The data warehouse directory that is specified in OSS.

    Yes

    The format is oss://<bucket>/<object>. Parameters in the directory:

    • bucket: indicates the name of the OSS bucket that you created.

    • object: indicates the path in which your data is stored.

    You can view the names of your bucket and object in the OSS console.

    fs.oss.endpoint

    The endpoint of OSS.

    Yes

    • If DLF resides in the same region as Realtime Compute for Apache Flink, the virtual private cloud (VPC) endpoint of OSS is used. Otherwise, the public endpoint is used.

    • If you want to store Apache Paimon tables in OSS-HDFS, the value of the fs.oss.endpoint parameter must be in the cn-<region>.oss-dls.aliyuncs.com format. Example: cn-hangzhou.oss-dls.aliyuncs.com.

    fs.oss.accessKeyId

    The AccessKey ID of the Alibaba Cloud account or Resource Access Management (RAM) user that has read and write permissions on OSS.

    Yes

    For more information about how to obtain the required information, see Regions and endpoints and Create an AccessKey pair.

    fs.oss.accessKeySecret

    The AccessKey secret of the Alibaba Cloud account or RAM user that has read and write permissions on OSS.

    Yes

  • DLF

    Parameter

    Description

    Required

    Remarks

    dlf.catalog.id

    The ID of the DLF data directory.

    Yes

    You can view the ID of the data directory in the DLF console.

    dlf.catalog.accessKeyId

    The AccessKey ID that is used to access the DLF service.

    Yes

    For more information about how to obtain your AccessKey ID, see Create an AccessKey pair.

    dlf.catalog.accessKeySecret

    The AccessKey secret that is used to access the DLF service.

    Yes

    For more information about how to obtain your AccessKey secret, see Create an AccessKey pair.

    dlf.catalog.endpoint

    The endpoint of the DLF service.

    Yes

    For more information, see Supported regions and endpoints.

    Note

    If DLF resides in the same region as Realtime Compute for Apache Flink, the VPC endpoint of OSS is used. Otherwise, the public endpoint is used.

    dlf.catalog.region

    The region in which the DLF service resides.

    Yes

    For more information, see Supported regions and endpoints.

    Note

    Make sure that the value of this parameter matches the endpoint that is specified by the dlf.catalog.endpoint parameter.

Create an Apache Paimon MaxCompute catalog

  • Prerequisites

    The paimon_maxcompute_connector.jar file is uploaded to your MaxCompute project by using one of the following methods:

    • Use the MaxCompute client (odpscmd) to access the MaxCompute project, and run the ADD JAR <path_to_paimon_maxcompute_connector.jar>; command to upload the Apache Paimon plug-in file to the MaxCompute project.

    • Create a resource in the DataWorks console to upload the Apache Paimon plug-in file to the MaxCompute project. For more information, see Create and use MaxCompute resources.

  • SQL statement

    CREATE CATALOG `my-catalog` WITH (
      'type' = 'paimon',
      'metastore' = 'maxcompute',
      'warehouse' = '<warehouse>',
      'maxcompute.endpoint' = '<maxcompute.endpoint>',
      'maxcompute.project' = '<maxcompute.project>',
      'maxcompute.accessid' = '<maxcompute.accessid>',
      'maxcompute.accesskey' = '<maxcompute.accesskey>',
      'maxcompute.oss.endpoint' = '<maxcompute.oss.endpoint>',
      'fs.oss.endpoint' = '<fs.oss.endpoint>',
      'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
      'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
    );
    Note

    When you create an Apache Paimon table in the Apache Paimon MaxCompute catalog, an Apache Paimon external table is automatically created in the MaxCompute project. To query the Apache Paimon external table in MaxCompute, you must execute the following SET statements, and then execute the SELECT statement in MaxCompute. For more information, see Step 4: Read data from the Apache Paimon external table on the MaxCompute client (odpscmd) or by using a tool that can execute MaxCompute SQL statements.

    SET odps.sql.common.table.planner.ext.hive.bridge = true;
    SET odps.sql.hive.compatible = true;
  • The following table describes the parameters in the sample code.

    • Common parameters

      Parameter

      Description

      Required

      Remarks

      my-catalog

      The name of the Apache Paimon catalog.

      Yes

      Enter a custom name.

      type

      The type of the catalog.

      Yes

      Set the value to paimon.

      metastore

      The metadata storage type.

      Yes

      Valid values:

      • filesystem: Set this parameter to filesystem when you create an Apache Paimon Filesystem catalog.

      • dlf: Set this parameter to dlf when you create an Apache Paimon DLF catalog.

      • maxcompute: Set this parameter to maxcompute when you create an Apache Paimon MaxCompute catalog.

      • sync: Set this parameter to sync when you create an Apache Paimon Sync catalog.

    • OSS

      Parameter

      Description

      Required

      Remarks

      warehouse

      The data warehouse directory that is specified in OSS.

      Yes

      The format is oss://<bucket>/<object>. Parameters in the directory:

      • bucket: indicates the name of the OSS bucket that you created.

      • object: indicates the path in which your data is stored.

      You can view the names of your bucket and object in the OSS console.

      fs.oss.endpoint

      The endpoint of OSS.

      Yes

      • If DLF resides in the same region as Realtime Compute for Apache Flink, the VPC endpoint of OSS is used. Otherwise, the public endpoint is used.

      • These parameters are required if the OSS bucket specified by the warehouse parameter does not reside in the same region as the Realtime Compute for Apache Flink workspace or an OSS bucket within another Alibaba Cloud account is used.

      For more information about how to obtain the required information, see Regions and endpoints and Create an AccessKey pair.

      fs.oss.accessKeyId

      The AccessKey ID of the Alibaba Cloud account or RAM user that has read and write permissions on OSS.

      Yes

      fs.oss.accessKeySecret

      The AccessKey secret of the Alibaba Cloud account or RAM user that has read and write permissions on OSS.

      Yes

    • maxcompute

      Parameter

      Description

      Required

      Remarks

      maxcompute.endpoint

      The endpoint of the MaxCompute service.

      Yes

      For more information, see Endpoints.

      maxcompute.project

      The name of the MaxCompute project.

      Yes

      MaxCompute projects for which the schema feature is enabled are not supported.

      maxcompute.accessid

      The AccessKey ID of the Alibaba Cloud account that has permissions on MaxCompute.

      Yes

      For more information about how to obtain the AccessKey ID, see Create an AccessKey pair.

      maxcompute.accesskey

      The AccessKey secret of the Alibaba Cloud account that has permissions on MaxCompute.

      Yes

      For more information about how to obtain the AccessKey secret, see Create an AccessKey pair.

      maxcompute.oss.endpoint

      The endpoint that is used to access OSS from MaxCompute.

      No

      If you do not configure this parameter, the value of the fs.oss.endpoint parameter is used by default. For more information, see Regions and endpoints.

      Note

      We recommend that you set the maxcompute.oss.endpoint parameter to an internal endpoint because the OSS bucket resides in the same region as the MaxCompute project.

      maxcompute.life-cycle

      The lifecycle of the MaxCompute external table.

      No

      Unit: day.

Create an Apache Paimon Sync catalog

CREATE CATALOG `my-catalog` WITH (
  'type' = 'paimon',
  'metastore' = 'sync',
  'source' = 'dlf',
  'target' = 'maxcompute',
  'warehouse' = '<warehouse>',
  'dlf.catalog.id' = '<dlf.catalog.id>',
  'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>',
  'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>',
  'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>',
  'dlf.catalog.region' = '<dlf.catalog.region>',
  'maxcompute.endpoint' = '<maxcompute.endpoint>',
  'maxcompute.project' = '<maxcompute.project>',
  'maxcompute.accessid' = '<maxcompute.accessid>',
  'maxcompute.accesskey' = '<maxcompute.accesskey>',
  'maxcompute.oss.endpoint' = '<maxcompute.oss.endpoint>',
  'fs.oss.endpoint' = '<fs.oss.endpoint>',
  'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
  'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
);

The following table describes the parameters in the sample code.

  • Common parameters

    Parameter

    Description

    Required

    Remarks

    my-catalog

    The name of the Apache Paimon catalog.

    Yes

    Enter a custom name.

    type

    The type of the catalog.

    Yes

    Set the value to paimon.

    metastore

    The metadata storage type.

    Yes

    Valid values:

    • filesystem: Set this parameter to filesystem when you create an Apache Paimon Filesystem catalog.

    • dlf: Set this parameter to dlf when you create an Apache Paimon DLF catalog.

    • maxcompute: Set this parameter to maxcompute when you create an Apache Paimon MaxCompute catalog.

    • sync: Set this parameter to sync when you create an Apache Paimon Sync catalog.

  • Parameters only for Apache Paimon Sync catalogs

    Parameter

    Description

    Required

    Remarks

    source

    The storage service from which metadata is synchronized.

    Yes

    Set the value to dlf.

    target

    The storage service to which metadata is synchronized.

    Yes

    Set the value to maxcompute.

  • OSS

    Parameter

    Description

    Required

    Remarks

    warehouse

    The data warehouse directory that is specified in OSS.

    Yes

    The format is oss://<bucket>/<object>. Parameters in the directory:

    • bucket: indicates the name of the OSS bucket that you created.

    • object: indicates the path in which your data is stored.

    You can view the names of your bucket and object in the OSS console.

    fs.oss.endpoint

    The endpoint of OSS.

    Yes

    • If DLF resides in the same region as Realtime Compute for Apache Flink, the VPC endpoint of OSS is used. Otherwise, the public endpoint is used.

    • These parameters are required if the OSS bucket specified by the warehouse parameter does not reside in the same region as the Realtime Compute for Apache Flink workspace or an OSS bucket within another Alibaba Cloud account is used.

    For more information about how to obtain the required information, see Regions and endpoints and Create an AccessKey pair.

    fs.oss.accessKeyId

    The AccessKey ID of the Alibaba Cloud account or RAM user that has read and write permissions on OSS.

    Yes

    fs.oss.accessKeySecret

    The AccessKey secret of the Alibaba Cloud account or RAM user that has read and write permissions on OSS.

    Yes

  • DLF

    Parameter

    Description

    Required

    Remarks

    dlf.catalog.id

    The ID of the DLF data directory.

    Yes

    You can view the ID of the data directory in the DLF console.

    dlf.catalog.accessKeyId

    The AccessKey ID that is used to access the DLF service.

    Yes

    For more information about how to obtain your AccessKey ID, see Create an AccessKey pair.

    dlf.catalog.accessKeySecret

    The AccessKey secret that is used to access the DLF service.

    Yes

    For more information about how to obtain your AccessKey secret, see Create an AccessKey pair.

    dlf.catalog.endpoint

    The endpoint of the DLF service.

    Yes

    For more information, see Supported regions and endpoints.

    Note

    If DLF resides in the same region as Realtime Compute for Apache Flink, the VPC endpoint of OSS is used. Otherwise, the public endpoint is used.

    dlf.catalog.region

    The region in which the DLF service resides.

    Yes

    For more information, see Supported regions and endpoints.

    Note

    Make sure that the value of this parameter matches the endpoint that is specified by the dlf.catalog.endpoint parameter.

  • MaxCompute

    Parameter

    Description

    Required

    Remarks

    maxcompute.endpoint

    The endpoint of the MaxCompute service.

    Yes

    For more information, see Endpoints.

    maxcompute.project

    The name of the MaxCompute project.

    Yes

    MaxCompute projects for which the schema feature is enabled are not supported.

    maxcompute.accessid

    The AccessKey ID of the Alibaba Cloud account that has permissions on MaxCompute.

    Yes

    For more information about how to obtain the AccessKey ID, see Create an AccessKey pair.

    maxcompute.accesskey

    The AccessKey secret of the Alibaba Cloud account that has permissions on MaxCompute.

    Yes

    For more information about how to obtain the AccessKey secret, see Create an AccessKey pair.

    maxcompute.oss.endpoint

    The endpoint that is used to access OSS from MaxCompute.

    No

    If you do not configure this parameter, the value of the fs.oss.endpoint parameter is used by default. For more information, see Regions and endpoints.

    Note

    We recommend that you set the maxcompute.oss.endpoint parameter to an internal endpoint because the OSS bucket resides in the same region as the MaxCompute project.

    maxcompute.life-cycle

    The lifecycle of the MaxCompute external table.

    No

    Unit: day.

Manage an Apache Paimon database

In the script editor, enter and select the following code, and click Run in the upper-left corner of the script editor.

  • Create a database

    After you create an Apache Paimon catalog, a database named default is automatically created in the catalog.

    -- Replace my-catalog with the name of the Apache Paimon catalog that you created. 
    USE CATALOG `my-catalog`;
    
    -- Replace my_db with a custom database name. 
    CREATE DATABASE `my_db`;
  • Delete a database

    Important

    You cannot delete the default database from an Apache Paimon DLF catalog, an Apache Paimon MaxCompute catalog, or an Apache Paimon Sync catalog. You can delete the default database from an Apache Paimon Filesystem catalog.

    -- Replace my-catalog with the name of the Apache Paimon catalog that you created. 
    USE CATALOG `my-catalog`;
    
    -- Replace my_db with the name of the database that you want to delete. 
    DROP DATABASE 'my_db'; -- Delete the database that does not contain tables. 
    DROP DATABASE 'my_db' CASCADE; -- Delete the database and all tables from the database.
    

Manage an Apache Paimon table

Create an Apache Paimon table

Note

After you configure an Apache Paimon catalog, you can reference tables in the Apache Paimon catalog in a deployment. When you use a table in the Apache Paimon catalog as a source table, result table, or dimension table, you do not need to declare the DDL statement of the table. When you create an Apache Paimon table by using an SQL statement, you can directly enter the name of the table in the format of ${Paimon-catalog-name}.${Paimon-db-name}.${Paimon-table-name}. You can also first declare the catalog name and database name in use catalog ${Paimon-catalog-name} and use ${Paimon-db-name}, and then enter the table name in the ${Paimon-table-name} format in the subsequent SQL statement.

  • Create an Apache Paimon table by using the CREATE TABLE statement

    In the script editor, enter and select the following code, and click Run in the upper-left corner of the script editor.

    In the my_db database of a catalog named my-catalog, create a partitioned table in which the partition key is dt, the primary key columns are dt, shop_id, and user_id, and the number of buckets is fixed to 4. Sample code:

    -- Replace my-catalog with the name of the Apache Paimon catalog that you create. 
    -- Replace my_db with the name of the database that you want to use. 
    -- You can replace my_tbl with a custom name. 
    CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
      dt STRING,
      shop_id BIGINT,
      user_id BIGINT,
      num_orders INT,
      total_amount INT,
      PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
    ) PARTITIONED BY (dt) WITH (
      'bucket' = '4'
    );

    For more information about the parameters and usage of Apache Paimon tables, see Apache Paimon connector and Primary key tables and append-only tables.

  • Create an Apache Paimon table by using the CREATE TABLE AS or CREATE DATABASE AS statement

    The CREATE TABLE AS and CREATE DATABASE AS statements automatically synchronize data as well as table schema changes. You can use the CREATE TABLE AS or CREATE DATABASE AS statement to synchronize tables from data sources such as MySQL and Kafka to an Apache Paimon catalog in an efficient manner.

    To synchronize data by using the CREATE TABLE AS or CREATE DATABASE AS statement, you must publish a draft as a deployment and start the deployment. For more information, see Develop an SQL draft and Start a deployment.

    Note
    • When you create an Apache Paimon table by using the CREATE TABLE AS or CREATE DATABASE AS statement, you cannot configure bucket' = '-1' to use Apache Paimon primary key tables or Apache Paimon append-only tables (non-primary key tables) based on dynamic bucketing.

    • The CREATE TABLE AS and CREATE DATABASE AS statements allow you to configure physical table properties in the WITH clause. When you create a table, you can configure these properties for the table. When the deployment starts, these properties take effect on the result tables to which you want to synchronize data. For more information about the table properties supported by the statements, see Apache Paimon connector.

    • Create a table and synchronize data to the table

      In the following sample code, the Apache Paimon table `my-catalog`.`my_db`.`web_sales` is automatically created based on the schema of the mysql.tpcds.web_sales table. Data in the mysql.tpcds.web_sales table is synchronized to the Apache Paimon table. The number of buckets for the Apache Paimon table is set to 4, and the input-based change data capture (CDC) mechanism is used.

      CREATE TABLE IF NOT EXISTS `<catalog name>`.`<db name>`.`<table name>`
      WITH (
        'bucket' = '4',
        'changelog-producer' = 'input'
      ) AS TABLE mysql.tpcds.web_sales;
    • Create a database and synchronize data in all tables to the database

      In the following sample code, Apache Paimon tables are automatically created in the `my-catalog`.`my_db` database based on the schema of each table in the mysql.tpcds database. Data of all tables in the mysql.tpcds database is synchronized to the Apache Paimon tables. The input-based CDC mechanism is used.

      CREATE DATABASE IF NOT EXISTS `<catalog name>`.`<db name>`
      WITH (
        'changelog-producer' = 'input'
      ) AS DATABASE mysql.tpcds INCLUDING ALL TABLES;
    • Synchronize data type changes of columns

      Apache Paimon tables created by using the CREATE TABLE AS or CREATE DATABASE AS statement allow you to add columns and change the data types of specific columns. You can determine whether to use the type normalization mode based on your business requirements.

      • Non-type normalization mode (default mode)

        The column types of Apache Paimon tables created by using the CREATE TABLE AS or CREATE DATABASE AS statement are the same as the column types of their source tables. The following rules for changing column types are supported:

        • Integer data types TINYINT, SMALLINT, INT, and BIGINT can be changed to integer data types with the same or higher precision. The precision of TINYINT is the lowest and the precision of BIGINT is the highest.

        • Floating-point data types FLOAT and DOUBLE can be changed to floating-point data types with the same or higher precision. The precision of FLOAT is the lowest and the precision of DOUBLE is the highest.

        • String data types CHAR, VARCHAR, and STRING can be changed to string data types with the same or higher precision.

      • Type normalization mode

        When you create Apache Paimon tables by using the CREATE TABLE AS or CREATE DATABASE AS statement, you can configure 'enableTypeNormalization' = 'true' in the WITH clause to use the type normalization mode. In type normalization mode, data type changes in the source table do not lead to a deployment failure only if the data types before and after the change can be converted into the same data type based on type normalization rules. The type normalization mode has the following rules:

        • The TINYINT, SMALLINT, INT, and BIGINT data types are normalized into the BIGINT data type.

        • The FLOAT and DOUBLE data types are normalized into the DOUBLE data type.

        • The CHAR, VARCHAR, and STRING data types are normalized into the STRING data type.

        • Other data types are not normalized.

        Examples when the type normalization mode is used:

        • The SMALLINT and INT data types are normalized into the BIGINT data type. If you change the SMALLINT data type to the INT data type, the data type is changed successfully. Therefore, the deployment runs as expected.

        • The FLOAT data type is normalized into DOUBLE and the BIGINT data type is normalized into BIGINT. If you change the FLOAT data type to the BIGINT data type, a data type incompatibility error is returned.

        The types of data that is stored in Apache Paimon tables are normalized. For example, data in columns of the SMALLINT and INT types in MySQL is stored as data of the BIGINT type in Apache Paimon tables.

Modify the schema of an Apache Paimon table

To modify the schema of an Apache Paimon table, enter the code in the following table in the script editor, select the code, and then click Run in the upper-left corner of the script editor.

Operation

Sample code

Add or modify a table parameter

Change the value of the write-buffer-size parameter of a table to 256 MB and the value of the write-buffer-spillable parameter to true.

ALTER TABLE my_table SET (
  'write-buffer-size' = '256 MB',
  'write-buffer-spillable' = 'true'
);

Temporarily modify a table parameter

You can add an SQL hint to the name of a table to temporarily modify a table parameter when you write data to the table. The parameters that are temporarily modified take effect only for the current SQL draft.

  • Temporarily set the write-buffer-size parameter to 256 MB and the write-buffer-spillable parameter to true when data is written to the my_table table.

    INSERT INTO my_table /*+ OPTIONS('write-buffer-size' = '256 MB', 'write-buffer-spillable' = 'true') */
    SELECT ...;
  • Temporarily set the scan.mode parameter to latest and the scan.parallelism parameter to 10 when data is consumed from the my_table table.

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest', 'scan.parallelism' = '10') */;

Rename a table

Change the name of a table from my_table to my_table_new.

ALTER TABLE my_table RENAME TO my_table_new;
Important

The rename operation in OSS is not atomic. Therefore, if you use OSS to store Apache Paimon table files, exercise caution when you rename an Apache Paimon table. We recommend you use the OSS-HDFS service to ensure the atomicity of operations on table files.

Add a column

  • Add the c1 column of the INT data type and the c2 column of the STRING data type to the end of the my_table table.

    ALTER TABLE my_table ADD (c1 INT, c2 STRING);
  • Add the c2 column of the STRING data type after the c1 column in the my_table table.

    ALTER TABLE my_table ADD c2 STRING AFTER c1;
  • Add the c1 column of the INT data type to the beginning of the my_table table.

    ALTER TABLE my_table ADD c1 INT FIRST;

Rename a column

Rename the c0 column in the my_table table c1.

ALTER TABLE my_table RENAME c0 TO c1;

Drop a column

Drop the c1 and c2 columns from the my_table table.

ALTER TABLE my_table DROP (c1, c2);

Drop a partition

Drop the dt=20240108,hh=06 and dt=20240109,hh=07 partitions from the my_table table.

ALTER TABLE my_table DROP PARTITION (`dt` = '20240108', `hh` = '08'), PARTITION (`dt` = '20240109', `hh` = '07');

Change the comment of a column

Change the comment of the buy_count column in the my_table table to this is buy count.

ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'this is buy count';

Change the order of a column

  • Move the col_a column of the DOUBLE data type to the beginning of the my_table table.

    ALTER TABLE my_table MODIFY col_a DOUBLE FIRST;
  • Move the col_a column of the DOUBLE data type to the position after the col_b column in the my_table table.

    ALTER TABLE my_table MODIFY col_a DOUBLE AFTER col_b;

Change the data type of a column

Change the data type of the col_a column in the my_table table to DOUBLE.

ALTER TABLE my_table MODIFY col_a DOUBLE;

The following table describes the changes of column types that are supported by Apache Paimon tables. 〇 indicates that the change is supported. An empty cell indicates that the change is not supported.

image

Drop an Apache Paimon table

Enter the following SQL statement in the script editor, select the code, and then click Run in the upper-left corner of the script editor.

-- Replace my-catalog with the name of the Apache Paimon catalog that you create. 
-- Replace my_db with the name of the database that you want to use. 
-- Replace my_tbl with the name of the Apache Paimon table that you created. 
DROP TABLE `my-catalog`.`my_db`.`my_tbl`;

If the The following statement has been executed successfully! message appears, the Apache Paimon table is dropped.

View or delete an Apache Paimon catalog

  1. In the Realtime Compute for Apache Flink console, find the workspace that you want to manage and click Console in the Actions column.

  2. In the left-side navigation pane, click Catalogs. On the Catalogs page, view or delete an Apache Paimon catalog.

    • View an Apache Paimon catalog: On the Catalog List page, find the desired catalog and view the Name and Type columns of the catalog. To view the databases and tables in the catalog, click View in the Actions column.

    • Delete an Apache Paimon catalog: On the Catalog List page, find the catalog that you want to delete and click Delete in the Actions column.

      Note

      After the Apache Paimon catalog is deleted, only the catalog information on the Catalogs page in the Realtime Compute for Apache Flink namespace is deleted. The data files of the Apache Paimon tables are not deleted. After the Apache Paimon catalog is deleted, you can re-create the Apache Paimon catalog by executing an SQL statement. Then, you can reuse the Apache Paimon tables in the catalog.

      You can also enter DROP CATALOG <catalog name>; in the script editor, select the code, and then click Run in the upper-left corner of the script editor.

References

  • If the built-in catalogs of Realtime Compute for Apache Flink cannot meet your business requirements, you can use custom catalogs. For more information, see Manage custom catalogs.