MaxCompute allows you to create an Apache Paimon external table and establish a mapping between the external table and the directory of an Apache Paimon table that is stored in Object Storage Service (OSS). This way, you can use the Apache Paimon external table in MaxCompute to access data in the Apache Paimon table that is stored in OSS. This topic describes how to create an Apache Paimon external table by using Realtime Compute for Apache Flink and how to query data by using the Apache Paimon external table in MaxCompute.
Background information
Apache Paimon is an integrated streaming and batch processing lake storage format that supports high-throughput writes and low-latency queries. Common compute engines such as Spark, Hive, and Trino of Alibaba Cloud Realtime Compute for Apache Flink and E-MapReduce are seamlessly integrated with Apache Paimon. Apache Paimon helps you quickly build your own data lake storage service on OSS and connect the service to MaxCompute to implement data lake analytics. For more information about Apache Paimon, see Apache Paimon.
Prerequisites
The Alibaba Cloud account that you use to perform operations has the CreateTable permission to create MaxCompute tables. For more information about table permissions, see MaxCompute permissions.
A MaxCompute project is created. For more information, see Create a MaxCompute project.
OSS is activated. A bucket and a file directory are created. For more information, see Create a bucket.
NoteMaxCompute is deployed only in specific regions. To prevent a cross-region data connectivity issue, we recommend that you use a bucket in the same region as your MaxCompute project.
Fully managed Flink is activated. For more information, see Activate Realtime Compute for Apache Flink.
Precautions
MaxCompute can only read data from Apache Paimon external tables but cannot write data to Apache Paimon external tables or automatically synchronize the schema changes of Apache Paimon external tables.
Apache Paimon does not support MaxCompute projects for which the schema feature is enabled.
Apache Paimon external tables do not support the clustering attribute.
Apache Paimon external tables do not support features such as querying and backtracking data of historical versions.
Step 1: Upload the Apache Paimon plug-in to your MaxCompute project
You can use one of the following methods to upload the Apache Paimon plug-in to the MaxCompute project.
Use the MaxCompute client (odpscmd)
Access the MaxCompute project on the MaxCompute client (odpscmd) and run the following code to upload the paimon_maxcompute_connector.jar
package to the MaxCompute project:
ADD JAR <path_to_paimon_maxcompute_connector.jar>;
Use the DataWorks console
Log on to the DataWorks console. In the left-side navigation pane, click Workspace. On the Workspaces page, find the desired workspace and choose Shortcuts > Data Development in the Actions column.
On the DataStudio page, click Create and choose Create Resource > JAR.
In the Create Resource dialog box, configure the parameters, upload the
paimon_maxcompute_connector.jar
package, and then click Create. For more information about how to create a resource, see Step 1: Create a resource or upload an existing resource.After the resource is created, click the icon on the toolbar on the configuration tab of the resource to commit the resource to the development environment.
Step 2: Create an Apache Paimon external table by using Realtime Compute for Apache Flink
The best practice in this topic is performed based on Realtime Compute for Apache Flink. Realtime Compute for Apache Flink writes data of Apache Paimon files to OSS. An Apache Paimon catalog is created in the Realtime Compute for Apache Flink console, and an Apache Paimon table that can be used by MaxCompute to read the data of Apache Paimon files in OSS is created in the Apache Paimon catalog. Then, MaxCompute uses the Apache Paimon table as an external table to read the Apache Paimon data that is stored in OSS.
Log on to the Realtime Compute for Apache Flink console and create a script. For more information about how to create a script, see Create a script.
In the script editing section of the Scripts tab, enter the catalog code and parameter values, select the code, and then click Run.
CREATE CATALOG `<catalog name>` WITH ( 'type' = 'paimon', 'metastore' = 'maxcompute', 'warehouse' = '<warehouse>', 'maxcompute.endpoint' = '<maxcompute.endpoint>', 'maxcompute.project' = '<maxcompute.project>', 'maxcompute.accessid' = '<maxcompute.accessid>', 'maxcompute.accesskey' = '<maxcompute.accesskey>', 'maxcompute.oss.endpoint' = '<maxcompute.oss.endpoint>', 'fs.oss.endpoint' = '<fs.oss.endpoint>', 'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>', 'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>' );
The following table describes the parameters in the code.
Parameter
Required
Description
catalog name
Yes
The name of the Apache Paimon catalog. The name can contain only letters. In this topic, the catalog name is catalogname.
type
Yes
The type of the catalog. Set the value to paimon.
metastore
Yes
The type of the metadata storage. Set the value to maxcompute.
warehouse
Yes
The data warehouse directory in OSS. The value of this parameter is in the
oss://<bucket>/<object>
format.bucket: the name of the OSS bucket that you created.
object: the path in which your data is stored.
You can view the bucket name and object name in the OSS console.
maxcompute.endpoint
Yes
The endpoint of the MaxCompute service.
You must configure this parameter based on the region and network connection type that you select when you create the MaxCompute project. For more information about the endpoints that correspond to different regions and network types, see Endpoints.
maxcompute.project
Yes
The name of the MaxCompute project.
MaxCompute projects for which the schema feature is enabled are not supported.
maxcompute.accessid
Yes
The AccessKey ID of the Alibaba Cloud account or RAM user that has the permissions on MaxCompute.
You can obtain the AccessKey ID on the AccessKey Pair page.
maxcompute.accesskey
Yes
The AccessKey secret that corresponds to the AccessKey ID.
maxcompute.oss.endpoint
No
The OSS endpoint that MaxCompute accesses. If you do not configure this parameter, the value of the fs.oss.endpoint parameter is used by default.
ImportantThe OSS bucket resides in the same region as the MaxCompute project. We recommend that you set the maxcompute.oss.endpoint parameter to an internal endpoint. For more information about the OSS endpoints of different network types in each region, see Regions and endpoints.
fs.oss.endpoint
No
The endpoint of OSS.
This parameter is required if the OSS bucket specified by the warehouse parameter is not in the same region as the Realtime Compute for Apache Flink workspace or an OSS bucket within another Alibaba Cloud account is used.
NoteYou must configure the endpoint based on the region and network connection method that you select when you create the OSS bucket. For more information about the endpoints that correspond to different regions and network types, see Regions and endpoints.
fs.oss.accessKeyId
No
The AccessKey ID of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.
This parameter is required if the OSS bucket specified by the warehouse parameter is not in the same region as the Realtime Compute for Apache Flink workspace or an OSS bucket within another Alibaba Cloud account is used.
You can obtain the AccessKey ID on the AccessKey Pair page.
fs.oss.accessKeySecret
No
The AccessKey secret that corresponds to the AccessKey ID.
This parameter is required if the OSS bucket specified by the warehouse parameter is not in the same region as the Realtime Compute for Apache Flink workspace or an OSS bucket within another Alibaba Cloud account is used.
Create an Apache Paimon table.
Create a table named test_tbl.
In the script editing section of the Scripts tab, execute the following statement and wait until a message indicating that the execution is complete is displayed on the Results tab. In this example, a table named test_tbl is created.
CREATE TABLE `catalogname`.`default`.test_tbl ( dt STRING, id BIGINT, data STRING, PRIMARY KEY (dt, id) NOT ENFORCED ) PARTITIONED BY (dt);
Write data to the table test_tbl.
On the Drafts tab of the SQL Editor page, create an SQL draft that contains the following statements. Then, deploy the draft. For more information about how to create and deploy an SQL draft, see Develop an SQL draft.
-- In this example, the execution.checkpointing.interval parameter is set to 10s. This increases the speed of committing data. SET 'execution.checkpointing.interval' = '10s'; INSERT INTO `catalogname`.`default`.test_tbl VALUES ('2023-04-21', 1, 'AAA'), ('2023-04-21', 2, 'BBB'), ('2023-04-22', 1, 'CCC'), ('2023-04-22', 2, 'DDD');
NoteThe Apache Paimon result table commits data each time checkpointing is complete.
In the production environment, the checkpointing interval and the minimal interval between checkpoints vary based on your business requirements for latency. In most cases, they are set to 1 to 10 minutes.
The engine version of the SQL draft must be vvr-8.0.5-flink-1.17 or later.
Step 3: Read data from the Apache Paimon external table by using MaxCompute
Run the following commands on the MaxCompute client (odpscmd) or by using another tool that can execute MaxCompute SQL statements:
SET odps.sql.common.table.planner.ext.hive.bridge = true; SET odps.sql.hive.compatible = true;
Run the following command to query data from the Apache Paimon external table test_tbl:
SELECT * FROM test_tbl WHERE dt = '2023-04-21';
The following result is returned:
+------------+------------+------------+ | id | data | dt | +------------+------------+------------+ | 1 | AAA | 2023-04-21 | | 2 | BBB | 2023-04-21 | +------------+------------+------------+