All Products
Search
Document Center

Realtime Compute for Apache Flink:Getting started with a YAML deployment for data ingestion

Last Updated:Oct 31, 2024

Realtime Compute for Apache Flink allows you to create YAML deployments for data ingestion from a source to a sink based on Change Data Capture (CDC) connectors for Apache Flink. This topic describes how to develop a YAML draft of Realtime Compute for Apache Flink to synchronize all data from a MySQL database to StarRocks.

Prerequisites

  • A Realtime Compute for Apache Flink workspace is created. For more information, see Activate Realtime Compute for Apache Flink.

  • Upstream and downstream storage instances are created.

    Note

    The ApsaraDB RDS for MySQL instance and the EMR Serverless StarRocks instance must reside in the same virtual private cloud (VPC) as the Realtime Compute for Apache Flink workspace. If the instances and the workspace do not reside in the same VPC, network connections must be established between the instances and the workspace. In addition, an IP address whitelist must be configured for the ApsaraDB RDS for MySQL instance. For more information, see Console operations, Console operations, and Console operations.

Background information

For example, the ApsaraDB RDS for MySQL instance has a database named order_dw_mysql. In the order_dw_mysql database, three business tables named orders, orders_pay, and product_catalog are created. If you want to develop a YAML draft to synchronize data from the business tables in the order_dw_mysql database to the order_dw_sr database in StarRocks, you can perform the following steps:

  1. Step 1: Prepare test data of the ApsaraDB RDS for MySQL instance

  2. Step 2: Develop a YAML draft

  3. Step 3: Start the YAML deployment for the draft

  4. Step 4: View the synchronization result in StarRocks

Step 1: Prepare test data of the ApsaraDB RDS for MySQL instance

  1. Create a database in the ApsaraDB RDS for MySQL instance and an account for the database.

    Create a database named order_dw_mysql and a standard account that has read and write permissions on the database. For more information, see Create accounts and databases and Manage databases.

  2. Log on to the ApsaraDB RDS for MySQL instance from the Data Management (DMS) console.

  3. On the SQL Console tab, enter the following statements and click Execute to create three business tables in the database and insert data into the business tables:

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- Prepare data.
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

Step 2: Develop a YAML draft

  1. Log on to the management console of Realtime Compute for Apache Flink.

  2. Find the workspace that you want to manage and click Console in the Actions column. In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, choose Development > Data Ingestion.

  3. In the upper-left corner of the Drafts page, click New. In the New Draft dialog box, click Data Ingestion from MySQL to Starrocks and click Next.

  4. On the page that appears, configure the Name and Location parameters, select a value from the Engine Version drop-down list, and then click OK.

  5. Configure the code of the YAML draft.

    The following code provides an example on how to synchronize all tables from the order_dw_mysql database in ApsaraDB RDS for MySQL to the order_dw_sr database in StarRocks.

    source:
      type: mysql
      hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: order_dw_mysql.\.*
      server-id: 5405-5415
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
      load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
      username: ${secret_values.starrocksusername}
      password: ${secret_values.starrockspassword}
      table.create.properties.replication_num: 1
      
    route:
      - source-table: order_dw_mysql.\.*
        sink-table: order_dw_sr.<>
        replace-symbol: <>
        description: route all tables in source_db to sink_db
    
    pipeline:
      name: Sync MySQL Database to StarRocks

    The following table describes the configuration information required for the ApsaraDB RDS for MySQL and StarRocks instances in this example. For more information about other parameters used for data ingestion, see MySQL connector and StarRocks connector.

    Category

    Parameter

    Description

    Example

    source

    hostname

    The IP address or hostname that is used to access the ApsaraDB RDS for MySQL database.

    We recommend that you enter the VPC endpoint of the database.

    rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com

    port

    The port number that is used to access the ApsaraDB RDS for MySQL database.

    3306

    username

    The username and password that are used to access the ApsaraDB RDS for MySQL database. Set the parameters to the username and password of the account that is created in Step 1: Prepare test data of the ApsaraDB RDS for MySQL instance.

    Note

    In this example, variables are used to prevent security risks that are caused by plaintext password information. For more information, see Manage variables.

    ${secret_values.mysqlusername}

    password

    ${secret_values.mysqlpassword}

    tables

    The names of the ApsaraDB RDS for MySQL tables. You can use regular expressions to read data from multiple tables.

    In this example, all tables and data in the order_dw_mysql database are synchronized.

    order_dw_mysql.\.*

    server-id

    The numeric ID that is allocated to a database client.

    5405-5415

    sink

    jdbc-url

    The Java Database Connectivity (JDBC) URL that is used to connect to the database.

    The JDBC URL contains a specific IP address and the JDBC port of a frontend (FE). Specify a value for this parameter in the jdbc:mysql://ip:port format.

    You can go to the Instance Details tab of the EMR Serverless StarRocks instance in the EMR console to view the internal endpoint and the query port of the instance in the FE Details section.

    jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030

    load-url

    The HTTP service URL that is used to connect to the FE node.

    You can go to the Instance Details tab of the EMR Serverless StarRocks instance in the EMR console to view the internal endpoint and HTTP port of the instance in the FE Details section.

    fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030

    username

    The username and password that are used to access the StarRocks database.

    Set the parameters to the username and password that you used when you create the EMR Serverless StarRocks instance.

    Note

    In this example, variables are used to prevent security risks that are caused by plaintext password information. For more information, see Manage variables.

    ${secret_values.starrocksusername}

    password

    ${secret_values.starrockspassword}

    route

    source-table

    Specifies the effective source tables.

    order_dw_mysql.\.*

    sink-table

    Specifies the destination location for data routing.

    order_dw_sr.<>

    replace-symbol

    The string to be replaced by the name of a source table when the pattern matching feature is used.

    <>

  6. Click Deploy.

Step 3: Start the YAML deployment for the draft

  1. In the upper-right corner of the Data Ingestion page, click Deploy. In the dialog box that appears, configure the parameters and click Confirm.

  2. In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, choose O&M > Deployments. On the Deployments page, find the desired YAML deployment and click Start in the Actions column.

  3. In the Start Job panel, configure the parameters and click Start.

    In this example, Initial Mode is selected. For more information about the parameters, see Start a deployment. You can view the status of and information about the deployment on the Deployments page after you start the deployment.

Step 4: View the synchronization result in StarRocks

After the YAML deployment enters the RUNNING state, you can view the data synchronization status in StarRocks.

  1. Use EMR StarRocks Manager to access the EMR Serverless StarRocks instance.

  2. In the left-side navigation pane of the page that appears, click SQL Editor. On the Databases tab, click the image icon.

    A database named order_dw_sr appears in default_catalog.

  3. On the Queries tab, click + File. In the Create a file dialog box, configure the parameter to create a query script. On the query script editing page, enter the following SQL statements and click Run.

    SELECT * FROM default_catalog.order_dw_sr.orders order by order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
  4. View the synchronization result below the statements.

    Tables that have the same names as the tables in the ApsaraDB RDS for MySQL database and data of the tables already exist in the StarRocks database.

    image

References