Apache Paimon is a data lake storage that allows you to process data in streaming and batch modes. Apache Paimon supports high-throughput data writing and low-latency data queries. This topic describes how to use an Apache Paimon catalog and the MySQL connector to import order data and table schema changes from ApsaraDB RDS to an Apache Paimon table, and perform simple analytics on the data in the Apache Paimon table in the Realtime Compute for Apache Flink console.
Background information
Apache Paimon is a data lake storage that allows you to process data in streaming and batch modes. Apache Paimon supports high-throughput data writing and low-latency data queries. Alibaba Cloud Realtime Compute for Apache Flink and the commonly used compute engines such as Spark, Hive, and Trino of the open source big data platform E-MapReduce (EMR) can be integrated with Apache Paimon. You can use Apache Paimon to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Object Storage Service (OSS), and integrate Apache Paimon with a compute engine to allow the compute engine to access data in Apache Paimon for data lake analytics.
Prerequisites
A RAM user or RAM role has the required permissions. This prerequisite must be met if you want to use the RAM user or RAM role to access the development console of Realtime Compute for Apache Flink. For more information, see Permission management.
A Realtime Compute for Apache Flink workspace is created. For more information, see Activate Realtime Compute for Apache Flink.
Step 1: Prepare a data source
Create an ApsaraDB RDS for MySQL instance and configure databases.
NoteWe recommend that you create an ApsaraDB RDS for MySQL instance in the same virtual private cloud (VPC) as the Realtime Compute for Apache Flink workspace. If the ApsaraDB RDS for MySQL instance and the Realtime Compute for Apache Flink workspace do not reside in the same VPC, you must establish a connection between them. For more information, see Network connectivity.
Create a database named orders and create a privileged account or a standard account that has the read and write permissions on the orders database.
Log on to the desired ApsaraDB RDS for MySQL instance and create tables orders_1 and orders_2 in the orders database.
CREATE TABLE `orders_1` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) ); CREATE TABLE `orders_2` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) );
Insert the following test data:
INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among '); INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot'); INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos'); INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro'); INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly'); INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia'); INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests '); INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep'); INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request'); INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe'); INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio'); INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu'); INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never'); INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re'); INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir'); INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin'); INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit'); INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate'); INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron'); INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');
Step 2: Create a catalog
Go to the Catalogs page.
Log on to the Realtime Compute for Apache Flink console.
Find the workspace that you want to manage and click Console in the Actions column.
Click Catalogs.
Create an Apache Paimon catalog.
On the page that appears, click Create Catalog. On the Built-in Catalog tab of the Create Catalog dialog box, click Apache Paimon and click Next.
Configure the parameters in the Configure Catalog step.
Parameter
Description
Remarks
catalog name
The name of the Apache Paimon catalog.
In this example, enter paimon-catalog.
metastore
The metadata storage type of the Apache Paimon table. Valid values:
filesystem: Metadata is stored only in OSS buckets.
dlf: Metadata is stored in OSS buckets and is also synchronized to Alibaba Cloud Data Lake Formation (DLF).
In this example, filesystem is selected.
warehouse
The storage root directory of the Apache Paimon catalog. This directory is an OSS directory. You can select the OSS bucket that is used when you activate Realtime Compute for Apache Flink. You can also use another OSS bucket within the same region as the Alibaba Cloud account.
The value of this parameter is in the oss://<bucket>/<object> format. Parameters in the directory:
bucket: the name of the OSS bucket that you created.
object: the path in which your data is stored.
You can view the bucket name and object name in the OSS console.
fs.oss.endpoint
The endpoint of OSS.
If DLF resides in the same region as Realtime Compute for Apache Flink, the VPC endpoint of OSS is used. Otherwise, the public endpoint is used. For more information about how to obtain the required information, see Regions and endpoints.
fs.oss.accessKeyId
The AccessKey ID of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.
If you do not have an AccessKey ID, create an AccessKey pair. For more information, see Create an AccessKey pair.
fs.oss.accessKeySecret
The AccessKey secret of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.
In this example, the AccessKey secret is introduced as a variable to prevent the risk of secret leaks caused by plaintext storage. For more information, see Namespace variables.
Click Confirm.
Create a MySQL catalog.
On the Catalog List page, click Create Catalog. On the Built-in Catalog tab of the Create Catalog dialog box, click MySQL and click Next.
Configure the parameters in the Configure Catalog step.
Parameter
Description
Remarks
catalogname
The name of the MySQL catalog.
In this example, enter mysql-catalog.
hostname
The IP address or hostname that is used to access the MySQL database.
In this example, the internal endpoint of the ApsaraDB RDS for MySQL instance is used.
port
The port that is used to access the MySQL database.
Default value: 3306.
default-database
The name of the default MySQL database.
In this example, the name of the database that you created in Step 1: Prepare a data source is used.
username
The username that is used to access the MySQL database.
Enter the username that is used to access your database.
password
The password that is used to access the MySQL database.
In this example, the AccessKey secret is introduced as a variable to prevent the risk of secret leaks caused by plaintext storage. For more information, see Namespace variables.
Click Confirm.
Step 3: Create a Realtime Compute for Apache Flink draft
In the left-side navigation pane, choose
. In the upper-left corner of the SQL Editor page, click New.On the SQL Scripts tab of the New Draft dialog box, select Blank Stream Draft and click Next.
In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.
Parameter
Description
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
Location
The folder in which the code file of the draft is saved.
You can also click the
icon to the right of an existing folder to create a subfolder.
Engine Version
You can view the engine version of Flink that is used by the deployment.
We recommend that you use an engine version that has the RECOMMENDED label. Versions with the labels provide higher reliability and performance. For more information about engine versions, see Release notes and Engine version.
Click Create.
Enter the following statements to capture the changes of related tables in the orders database in real time and synchronize the changes to the Apache Paimon table.
-- Capture the MySQL table whose name matches the regular expression orders_\d+ and synchronize the changes of the MySQL table to the orders table in the default database of Apache Paimon. CREATE TABLE IF NOT EXISTS `paimon-catalog`.`default`.`orders` AS TABLE `mysql-catalog`.`orders`.`orders_\d+`;
For more information about how to use the CREATE TABLE AS statement, see CREATE TABLE AS statement.
Optional. In the upper-right corner of the SQL Editor page, click Validate to check whether the SQL statements of the Realtime Compute for Apache Flink draft contain syntax errors.
In the upper-right corner of the SQL Editor page, click Deploy. Then, click OK.
In the left-side navigation pane, choose
. On the Deployments page, click the name of the deployment that you want to manage to go to the deployment details page.On the Configuration tab of the Deployments page, click Edit in the upper-right corner of the Parameters section.
In this example, to quickly obtain the result of the deployment, change the values of the Checkpointing Interval and Min Interval Between Checkpoints parameters to 10s. Then, click Save.
In the upper-right corner of the Configuration tab, click Start. In the Start Job panel, select Initial Mode and click Start.
Query data in the Apache Paimon table.
In the left-side navigation pane, choose
. Copy the following code to the script editor of the draft on the Scripts tab,select custkey, sum(total_price) from `paimon-catalog`.`default`.`orders` group by custkey;
select the code, and then click Run that appears to the left of the code.
Step 4: Update the schema of the MySQL table
This section demonstrates how to synchronize the changes of the MySQL table schema to the Apache Paimon table.
Log on to the ApsaraDB RDS console.
Log on to the orders database, enter the following SQL statement, and then click Execute to add a column to the two data tables and fill data in the data tables.
ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5; UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
In the left-side navigation pane of Realtime Compute for Apache Flink, choose . Copy the following code to the script editor on the Scripts tab,
select * from `test`.`default`.`orders` where `quantity` is not null;
select the code, and then click Run that appears to the left of the code.
References
The Apache Paimon connector can be used with Apache Paimon catalogs. For more information about how to use the Apache Paimon connector, see Apache Paimon connector.
For more information about how to use Apache Paimon catalogs, see Manage Apache Paimon catalogs.
You can build a streaming data lakehouse by using Realtime Compute for Apache Flink. For more information, see Build a streaming data lakehouse by using Realtime Compute for Apache Flink, Apache Paimon, and StarRocks.