All Products
Search
Document Center

MaxCompute:Create an Apache Paimon external table by using Realtime Compute for Apache Flink

Last Updated:May 16, 2024

MaxCompute allows you to create an Apache Paimon external table and establish a mapping between the external table and the directory of an Apache Paimon table that is stored in Object Storage Service (OSS). This way, you can use the Apache Paimon external table in MaxCompute to access data in the Apache Paimon table that is stored in OSS. This topic describes how to create an Apache Paimon external table by using Realtime Compute for Apache Flink and how to query data by using the Apache Paimon external table in MaxCompute.

Background information

Apache Paimon is an integrated streaming and batch processing lake storage format that supports high-throughput writes and low-latency queries. Common compute engines such as Spark, Hive, and Trino of Alibaba Cloud Realtime Compute for Apache Flink and E-MapReduce are seamlessly integrated with Apache Paimon. Apache Paimon helps you quickly build your own data lake storage service on OSS and connect the service to MaxCompute to implement data lake analytics. For more information about Apache Paimon, see Apache Paimon.

Prerequisites

  • The Alibaba Cloud account that you use to perform operations has the CreateTable permission to create MaxCompute tables. For more information about table permissions, see MaxCompute permissions.

  • A MaxCompute project is created. For more information, see Create a MaxCompute project.

  • OSS is activated. A bucket and a file directory are created. For more information, see Create a bucket.

    Note

    MaxCompute is deployed only in specific regions. To prevent a cross-region data connectivity issue, we recommend that you use a bucket in the same region as your MaxCompute project.

  • Fully managed Flink is activated. For more information, see Activate Realtime Compute for Apache Flink.

Precautions

  • MaxCompute can only read data from Apache Paimon external tables but cannot write data to Apache Paimon external tables or automatically synchronize the schema changes of Apache Paimon external tables.

  • Apache Paimon does not support MaxCompute projects for which the schema feature is enabled.

  • Apache Paimon external tables do not support the clustering attribute.

  • Apache Paimon external tables do not support features such as querying and backtracking data of historical versions.

Step 1: Upload the Apache Paimon plug-in to your MaxCompute project

You can use one of the following methods to upload the Apache Paimon plug-in to the MaxCompute project.

Use the MaxCompute client (odpscmd)

Access the MaxCompute project on the MaxCompute client (odpscmd) and run the following code to upload the paimon_maxcompute_connector.jar package to the MaxCompute project:

ADD JAR <path_to_paimon_maxcompute_connector.jar>;

Use the DataWorks console

  1. Log on to the DataWorks console. In the left-side navigation pane, click Workspaces. On the Workspaces page, find the desired workspace and choose Shortcuts > Data Development in the Actions column.

  2. On the DataStudio page, click Create and choose Create Resource > JAR.

  3. In the Create Resource dialog box, configure the parameters, upload the paimon_maxcompute_connector.jar package, and then click Create. For more information about how to create a resource, see Step 1: Create a resource or upload an existing resource.

    image.png

  4. After the resource is created, click the image.png icon on the toolbar on the configuration tab of the resource to commit the resource to the development environment.

Step 2: Create an Apache Paimon external table by using Realtime Compute for Apache Flink

The best practice in this topic is performed based on Realtime Compute for Apache Flink. Realtime Compute for Apache Flink writes data of Apache Paimon files to OSS. An Apache Paimon catalog is created in the Realtime Compute for Apache Flink console, and an Apache Paimon table that can be used by MaxCompute to read the data of Apache Paimon files in OSS is created in the Apache Paimon catalog. Then, MaxCompute uses the Apache Paimon table as an external table to read the Apache Paimon data that is stored in OSS.

  1. Log on to the Realtime Compute for Apache Flink console and create a script. For more information about how to create a script, see Create a script.

  2. In the script editing section of the Scripts tab, enter the catalog code and parameter values, select the code, and then click Run.

    CREATE CATALOG `<catalog name>` WITH (
     'type' = 'paimon',
      'metastore' = 'maxcompute',
      'warehouse' = '<warehouse>',
      'maxcompute.endpoint' = '<maxcompute.endpoint>',
      'maxcompute.project' = '<maxcompute.project>',
      'maxcompute.accessid' = '<maxcompute.accessid>',
      'maxcompute.accesskey' = '<maxcompute.accesskey>',
      'maxcompute.oss.endpoint' = '<maxcompute.oss.endpoint>',
      'fs.oss.endpoint' = '<fs.oss.endpoint>',
      'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
      'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
    );

    The following table describes the parameters in the code.

    Parameter

    Required

    Description

    catalog name

    Yes

    The name of the Apache Paimon catalog. The name can contain only letters. In this topic, the catalog name is catalogname.

    type

    Yes

    The type of the catalog. Set the value to paimon.

    metastore

    Yes

    The type of the metadata storage. Set the value to maxcompute.

    warehouse

    Yes

    The data warehouse directory in OSS. The value of this parameter is in the oss://<bucket>/<object> format.

    • bucket: the name of the OSS bucket that you created.

    • object: the path in which your data is stored.

    You can view the bucket name and object name in the OSS console.

    maxcompute.endpoint

    Yes

    The endpoint of the MaxCompute service.

    You must configure this parameter based on the region and network connection type that you select when you create the MaxCompute project. For more information about the endpoints that correspond to different regions and network types, see Endpoints.

    maxcompute.project

    Yes

    The name of the MaxCompute project.

    MaxCompute projects for which the schema feature is enabled are not supported.

    maxcompute.accessid

    Yes

    The AccessKey ID of the Alibaba Cloud account or RAM user that has the permissions on MaxCompute.

    You can obtain the AccessKey ID on the AccessKey Pair page.

    maxcompute.accesskey

    Yes

    The AccessKey secret that corresponds to the AccessKey ID.

    maxcompute.oss.endpoint

    No

    The OSS endpoint that MaxCompute accesses. If you do not configure this parameter, the value of the fs.oss.endpoint parameter is used by default.

    Important

    The OSS bucket resides in the same region as the MaxCompute project. We recommend that you set the maxcompute.oss.endpoint parameter to an internal endpoint. For more information about the OSS endpoints of different network types in each region, see Regions and endpoints.

    fs.oss.endpoint

    No

    The endpoint of OSS.

    This parameter is required if the OSS bucket specified by the warehouse parameter is not in the same region as the Realtime Compute for Apache Flink workspace or an OSS bucket within another Alibaba Cloud account is used.

    Note

    You must configure the endpoint based on the region and network connection method that you select when you create the OSS bucket. For more information about the endpoints that correspond to different regions and network types, see Regions and endpoints.

    fs.oss.accessKeyId

    No

    The AccessKey ID of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

    This parameter is required if the OSS bucket specified by the warehouse parameter is not in the same region as the Realtime Compute for Apache Flink workspace or an OSS bucket within another Alibaba Cloud account is used.

    You can obtain the AccessKey ID on the AccessKey Pair page.

    fs.oss.accessKeySecret

    No

    The AccessKey secret that corresponds to the AccessKey ID.

    This parameter is required if the OSS bucket specified by the warehouse parameter is not in the same region as the Realtime Compute for Apache Flink workspace or an OSS bucket within another Alibaba Cloud account is used.

  3. Create an Apache Paimon table.

    1. Create a table named test_tbl.

      In the script editing section of the Scripts tab, execute the following statement and wait until a message indicating that the execution is complete is displayed on the Results tab. In this example, a table named test_tbl is created.

      CREATE TABLE `catalogname`.`default`.test_tbl (
       dt STRING,
       id BIGINT,
       data STRING,
       PRIMARY KEY (dt, id) NOT ENFORCED
      ) PARTITIONED BY (dt);
    2. Write data to the table test_tbl.

      On the Drafts tab of the SQL Editor page, create an SQL draft that contains the following statements. Then, deploy the draft. For more information about how to create and deploy an SQL draft, see Develop an SQL draft.

      -- In this example, the execution.checkpointing.interval parameter is set to 10s. This increases the speed of committing data. 
      SET 'execution.checkpointing.interval' = '10s';
      
      INSERT INTO `catalogname`.`default`.test_tbl VALUES ('2023-04-21', 1, 'AAA'), ('2023-04-21', 2, 'BBB'), ('2023-04-22', 1, 'CCC'), ('2023-04-22', 2, 'DDD');
      Note
      • The Apache Paimon result table commits data each time checkpointing is complete.

      • In the production environment, the checkpointing interval and the minimal interval between checkpoints vary based on your business requirements for latency. In most cases, they are set to 1 to 10 minutes.

      • The engine version of the SQL draft must be vvr-8.0.5-flink-1.17 or later.

Step 3: Read data from the Apache Paimon external table by using MaxCompute

  1. Run the following commands on the MaxCompute client (odpscmd) or by using another tool that can execute MaxCompute SQL statements:

    SET odps.sql.common.table.planner.ext.hive.bridge = true;
    SET odps.sql.hive.compatible = true;
  2. Run the following command to query data from the Apache Paimon external table test_tbl:

    SELECT * FROM test_tbl WHERE dt = '2023-04-21';

    The following result is returned:

    +------------+------------+------------+
    | id | data | dt |
    +------------+------------+------------+
    | 1 | AAA | 2023-04-21 |
    | 2 | BBB | 2023-04-21 |
    +------------+------------+------------+