All Products
Search
Document Center

Realtime Compute for Apache Flink:Getting started with real-time data ingestion into data lakes based on Apache Paimon

Last Updated:Nov 12, 2024

Apache Paimon is a unified lake storage that allows you to process data in streaming and batch modes. Apache Paimon supports data writing with high throughput and data queries with low latency. This topic describes how to use an Apache Paimon catalog and the MySQL connector to import order data and table schema changes from ApsaraDB RDS to an Apache Paimon table, and perform simple analytics on the data in the Apache Paimon table in the console of fully managed Flink.

Background information

Apache Paimon is a unified lake storage that allows you to process data in streaming and batch modes. Apache Paimon supports data writing with high throughput and data queries with low latency. Alibaba Cloud Realtime Compute for Apache Flink and the commonly used compute engines such as Spark, Hive, and Trino of the open source big data platform E-MapReduce (EMR) can be integrated with Apache Paimon. You can use Apache Paimon to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Object Storage Service (OSS), and integrate Apache Paimon with a compute engine to allow the compute engine to access data in Apache Paimon for data lake analytics.

Prerequisites

  • If you want to use a RAM user or RAM role to access the console of fully managed Flink, make sure that the RAM user or RAM role has the required permissions. For more information, see Permission management.

  • A workspace of fully managed Flink is created. For more information, see Activate fully managed Flink.

Step 1: Prepare a data source

  1. Create an ApsaraDB RDS for MySQL instance.

    Note

    The ApsaraDB RDS for MySQL instance and the workspace of fully managed Flink must reside in the same virtual private cloud (VPC). If the ApsaraDB RDS for MySQL instance and the workspace of fully managed Flink do not reside in the same VPC, you must establish a connection between them. For more information, see Network connectivity.

  2. Create a database and an account.

    Create a database named orders and create a privileged account or a standard account that has the read and write permissions on the orders database.

  3. Log on to the desired ApsaraDB RDS for MySQL instance by using Database Management Service (DMS) and create tables orders_1 and orders_2 in the orders database.

    CREATE TABLE `orders_1` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
    
    CREATE TABLE `orders_2` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
  4. Insert the following test data:

    INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among ');
    INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot');
    INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos');
    INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro');
    INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly');
    INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia');
    INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests ');
    INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep');
    INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request');
    INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe');
    INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio');
    INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu');
    INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never');
    INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re');
    INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir');
    INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin');
    INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit');
    INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate');
    INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron');
    INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');

Step 2: Create a catalog

  1. Go to the Catalogs page.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Catalogs.

  2. Create an Apache Paimon catalog.

    1. On the Catalog List page, click Create Catalog. On the Built-in Catalog tab of the Create Catalog dialog box, click Apache Paimon and click Next.

    2. Configure the parameters in the Configure Catalog step.

      image.png

      Parameter

      Description

      Remarks

      catalog name

      The name of the Apache Paimon catalog.

      Enter a custom name.

      metastore

      The metadata storage type of the Apache Paimon table. Valid values:

      • filesystem: Metadata is stored only in OSS buckets.

      • dlf: Metadata is stored in OSS buckets and is also synchronized to Alibaba Cloud Data Lake Formation (DLF).

      In this topic, filesystem is selected.

      warehouse

      The storage root directory of the Apache Paimon catalog. This directory is an OSS directory. You can select the OSS bucket that is used when you activate Realtime Compute for Apache Flink. You can also use another OSS bucket within the same region as the Alibaba Cloud account.

      The value of this parameter is in the oss://<bucket>/<object> format. Parameters in the directory:

      • bucket: the name of the OSS bucket that you created.

      • object: the path in which your data is stored.

      You can view the bucket name and object name in the OSS console.

      fs.oss.endpoint

      The endpoint of OSS.

      • If DLF resides in the same region as your Realtime Compute for Apache Flink workspace, use the VPC endpoint. If they are not in the same region, use the public endpoint.

      • These parameters are required if the OSS bucket specified by the warehouse parameter does not reside in the same region as the Realtime Compute for Apache Flink workspace or if an OSS bucket within another Alibaba Cloud account is used.

      For more information about how to obtain the required information, see Regions, endpoints, and open ports and Create an AccessKey pair.

      fs.oss.accessKeyId

      The AccessKey ID of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

      fs.oss.accessKeySecret

      The AccessKey secret of the Alibaba Cloud account or RAM user that has the read and write permissions on OSS.

    3. Click Confirm.

Step 3: Create a fully managed Flink draft

  1. In the left-side navigation pane, click Development > ETL. In the upper-left corner of the SQL Editor page, click New.

  2. On the SQL Scripts tab of the New Draft dialog box, click Blank Stream Draft and click Next.

  3. In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.

    Parameter

    Description

    Name

    The name of the draft that you want to create.

    Note

    The draft name must be unique in the current project.

    Location

    The folder in which the code file of the draft is saved.

    You can also click the 新建文件夹 icon to the right of an existing folder to create a subfolder.

    Engine Version

    The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.

  4. Click Create.

  5. Enter the following statements to capture the changes of related tables in the orders database in real time and synchronize the changes to the Apache Paimon table.

    -- Use the Apache Paimon catalog that you created.
    USE CATALOG `test`;
    
    -- Create a temporary MySQL table to capture the changes of the MySQL table that matches the regular expression orders_\d+.
    CREATE TEMPORARY TABLE mysql_orders (
        orderkey BIGINT,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        `comment` VARCHAR(100),
        PRIMARY KEY (orderkey) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql',
        'hostname' = 'rm-bp1s1xgll21ey****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'your_username',
        'password' = '${secret_values.mysql_pw}',
        'database-name' = 'orders',
        'table-name' = 'orders_\d+',
        'server-time-zone' = 'Asia/Shanghai'
    );
    
    -- Synchronize the changes of the MySQL table to the Apache Paimon table.
    CREATE TABLE IF NOT EXISTS orders AS TABLE mysql_orders;

    The following table describes the parameters in the preceding code. You can modify the parameters based on your business requirements. For more information about the parameters of the MySQL connector, see MySQL connector.

    Parameter

    Description

    Remarks

    connector

    The type of the connector.

    In this example, this parameter is set to mysql.

    hostname

    The IP address or hostname that is used to access the MySQL database.

    In this example, the internal endpoint of the ApsaraDB RDS for MySQL instance is used.

    username

    The username that is used to access the MySQL database.

    N/A.

    password

    The password that is used to access the MySQL database.

    In this example, a key named mysql_pw is used to protect the password. For more information, see Manage variables.

    database-name

    The name of the MySQL database that you want to access.

    In this example, the name of the database that you created in Step 1: Prepare a data source is used.

    table-name

    The name of the MySQL table.

    If you want to read data from multiple tables, you can set this parameter to a regular expression.

    port

    The port that is used to access the MySQL database.

    N/A.

  6. Optional. In the upper-right corner of the SQL Editor page, click Validate to check whether the SQL statements of the fully managed Flink draft contain syntax errors.

  7. In the upper-right corner of the SQL Editor page, click Deploy. Then, click OK.

  8. In the left-side navigation pane, click O&M > Deployments. On the Deployments page, click the name of the desired deployment in the Name column.

  9. On the Configuration tab of the Deployments page, click Edit in the upper-right corner of the Parameters section.

    In this example, to quickly obtain the result of the deployment, change the values of the Checkpointing Interval and Min Interval Between Checkpoints parameters to 10s. Then, click Save.

    image

  10. In the upper-right corner of the Configuration tab, click Start. In the Start Job panel, select Initial Mode and click Start.

    image.png

  11. Query data in the Apache Paimon table.

    1. On the Scripts tab of the Development > Scripts page, copy the following code to the script editor of the draft, select the code, and then click Run that appears on the left side of the code.

      select custkey, sum(total_price) from `test`.`default`.`orders` group by custkey;
    2. After you view the result, click the image.png icon on the left side of the query result to stop debugging.

      image.png

Step 4: Update the schema of the MySQL table

This section demonstrates how to synchronize the changes of the MySQL table schema to the Apache Paimon table.

  1. Log on to the ApsaraDB RDS console.

  2. Log on to the orders database, enter the following SQL statement, and then click Execute to add a column to the two data tables and fill data in the data tables.

    ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; 
    ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; 
    UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5;
    UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
  3. On the Scripts tab of the Development > Scripts page in the Realtime Compute for Apache Flink console, copy the following code to the script editor of the draft, select the code, and then click Run that appears on the left side of the code.

    select * from `test`.`default`.`orders` where `quantity` is not null;

    After you view the result, click the image.png icon on the left side of the query result to stop debugging. The following figure shows the result.

    Image 32

References