All Products
Search
Document Center

E-MapReduce:Use the CREATE TABLE AS and CREATE DATABASE AS statements of Realtime Compute for Apache Flink to synchronize data from an ApsaraDB RDS for MySQL instance to a StarRocks cluster

Last Updated:Aug 08, 2024

The CREATE TABLE AS statement can be executed to synchronize data and the changes in the table schema from one table to another table. The CREATE DATABASE AS statement can be executed to synchronize data of a database or synchronize data and schemas of multiple tables in a database. This topic describes how to use the CREATE TABLE AS and CREATE DATABASE AS statements of Realtime Compute for Apache Flink to synchronize transaction processing (TP) and analytical processing (AP) data for E-MapReduce (EMR) StarRocks.

Background information

You can use the CTAS statement to automatically create tables in StarRocks that have the same schema as those in a MySQL database and synchronize data. You can also synchronize changes in the schema of an upstream table to a downstream table in real time. This improves the efficiency of creating tables in the destination storage and maintaining changes in the schema of the source table.

When you execute the CTAS statement, Flink performs the following operations:

  1. Check whether the destination table exists in the destination storage.

    • If the destination table does not exist, the corresponding destination table is created in the destination storage based on the catalog to which the destination table belongs. The destination table has the same schema as the source table.

    • If the destination tables exist, Flink skips the table creation step. If the schema of the existing destination table is different from the schema of the source table, an error message is returned.

  2. Commit and run a data synchronization job. Flink synchronizes data and changes in the schema of the source table to the destination table.

The schema change synchronization policy uses the CTAS statement to synchronize data in real time and changes in the schema of the source table to the destination table.

Schema changes include table creation and schema changes after a table is created.

  • The following schema changes are supported:

    • Add a nullable column: The statement automatically adds the related column to the end of the schema of the destination table and synchronizes data to the added column.

    • Delete a nullable column: The statement automatically fills the nullable column of the destination table with null values instead of deleting the column from the table.

    • Rename a column: The operation of renaming a column involves adding a column and deleting a column. After a column is renamed in the source table, the column that uses the new name is added to the end of the destination table and the column that uses the original name is filled with null values.

      For example, if the name of the col_a column in the source table is changed to col_b, the col_b column is added to the end of the destination table and the col_a column is filled with null values.

  • The following schema changes are not supported:

    • Change data types.

      For example, the data in a column is changed from the VARCHAR type to the BIGINT type, or the column property is changed from NOT NULL to NULLABLE.

    • Change constraints, such as the primary key or index.

    • Add or delete a non-nullable column.

    • Adjust the length of fields in DDL statements.

Note
  • If the schema of the source table has one of the preceding changes, you must delete the destination table and restart the job that executes the CTAS statement. This way, the destination table is created again and the historical data is resynchronized to the destination table.

  • The CTAS statement does not identify the types of DDL statements, but compares the schema differences between the two data records before and after the schema is changed. Therefore, if you delete a column and then add the column again, and no data changes between the two DDL statements that are used to delete and add the column, the CTAS statement considers that no schema change occurs. Similarly, the CTAS statement does not trigger schema change synchronization even if you add a column to the source table. The statement identifies the schema change only when the data changes in the source table. In this case, the statement synchronizes the schema change to the destination table.

  • For more information about the field types supported by the CTAS statement, see Continuously load data from Apache Flink.

  • When you use the CTAS statement to merge multiple MySQL tables, the system automatically adds the _db_name and _table_name columns to the new table to track information about the source table. To ensure that the new table schema meets your expectations, you must define the column order from the third column.

Prerequisites

Note

In this topic, MySQL 5.7, a StarRocks cluster of EMR 3.39.1, and fully managed Flink of vvr-6.0.3-flink-1.15 are used.

Limits

  • The fully managed Flink workspace, StarRocks cluster, and ApsaraDB RDS for MySQL instance that you create must be in the same virtual private cloud (VPC).

  • The engine version of the ApsaraDB RDS for MySQL instance must be 5.7 or later.

  • Internet access must be enabled for the StarRocks cluster.

  • Fully managed Flink must be of vvr-6.0.3-flink-1.15 or later.

Step 1: Prepare test data

  1. Create a database and an account for testing. For more information, see Create databases and accounts for an ApsaraDB RDS for MySQL instance.

    After you create the database and account, grant the read and write permissions to the test account.

    Note

    In this topic, the name of the database is test_cdc and the username of the account is test.

  2. Use the created test account to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.

  3. Run the following commands on the ApsaraDB RDS for MySQL database to create a data table:

    use test_cdc;
    
    CREATE TABLE IF NOT EXISTS `runoob_tbl`(
       `runoob_id` INT UNSIGNED AUTO_INCREMENT,
       `runoob_title` VARCHAR(100) NOT NULL,
       `runoob_author` VARCHAR(40) NOT NULL,
       `submission_date` DATE,
       `add_col` int DEFAULT NULL,
       PRIMARY KEY ( `runoob_id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
  4. Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.

  5. Run the following command to connect to the StarRocks cluster:

    mysql -h127.0.0.1 -P 9030 -uroot
  6. Run the following commands to create a user and grant permissions to the user:

    CREATE DATABASE test_cdc;
    CREATE USER 'test' IDENTIFIED by '123456';
    GRANT CREATE TABLE ON DATABASE test_cdc TO test;

Step 2: Create a catalog by using an SQL client in the console of fully managed Flink

On the Draft Editor page in the console of fully managed Flink, create a MySQL catalog and a StarRocks catalog. For more information, see Getting started for a Flink SQL deployment.

Note

The parameter formats are only for reference. You can configure the parameters based on your business requirements.

  • MySQL catalog

    • Sample code

      CREATE CATALOG mysql WITH (
        'type' = 'mysql',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr-test',
        'password' = '123456',
        'default-database' = 'test_cdc'
      );
    • Parameters

      Parameter

      Description

      type

      The type of the catalog. Set the value to mysql.

      hostname

      The internal endpoint of the ApsaraDB RDS for MySQL instance. You can copy the internal endpoint on the Database Connection page of the ApsaraDB RDS for MySQL instance in the ApsaraDB RDS console, such as rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com.

      port

      The port number of the ApsaraDB RDS for MySQL database. Default value: 3306.

      username

      The username that is used to access the ApsaraDB RDS for MySQL database.

      Set this parameter to the username of the account that is created in Step 1: Prepare test data. In this example, test is used.

      password

      The password that is used to access the ApsaraDB RDS for MySQL database.

      Set this parameter to the password of the account that is created in Step 1: Prepare test data.

      default-database

      The name of the default ApsaraDB RDS for MySQL database.

      Set this parameter to the name of the database that is created in Step 1: Prepare test data. In this example, test_cdc is used.

  • StarRocks catalog

    • Sample code

      CREATE CATALOG sr  WITH (
        'type' = 'starrocks',
        'endpoint' = '172.16.**.**:9030',
        'username' = 'test',
        'password' = '123456',
        'dbname' = 'test_cdc'
      );
    • Parameters

      Parameter

      Description

      type

      The type of the catalog. Set the value to starrocks.

      endpoint

      The IP address and port number of the frontend of the StarRocks cluster.

      username

      The username that is used to access the StarRocks database.

      Set this parameter to the username of the account that is created in Step 1: Prepare test data. In this example, test is used.

      password

      The password that is used to access the StarRocks database.

      Set this parameter to the password of the account that is created in Step 1: Prepare test data.

      dbname

      The name of the StarRocks database.

      Set this parameter to the name of the database that is created in Step 1: Prepare test data. In this example, test_cdc is used.

Step 3: Create and publish a draft

  1. On the Draft Editor page in the console of fully managed Flink, write a CREATE TABLE AS statement.

    You can use one of the following methods to execute the CREATE TABLE AS statement:

    • At-least-once semantics: You can configure the sink.buffer-flush.interval-ms parameter to specify the time interval at which data is written to the StarRocks database. The advantage is that the write interval is short and less memory is occupied.

      /*
            At-least-once semantics
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl_sr with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
      'load-url'='172.16.**.**:18030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '123456',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      
    • Exactly-once semantics: You must specify the interval at which checkpoints are periodically scheduled. The advantage is that data is not lost or duplicate when errors occur. The disadvantage is that the checkpoint interval determines when the data is visible. For more information, see Checkpointing.

      /*
            Exactly-once semantics 
      */
      set 'execution.checkpointing.interval' = '1 min';
      set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
      set 'execution.checkpointing.timeout' = '10 min';
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
      'load-url'='172.16.**.**:18030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '123456',
      'sink.semantic' = 'exactly-once',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      
    • Simple mode: The advantage is that you do not need to take note of the fields in the table in the ApsaraDB RDS for MySQL database when you create a table. The schema of the table that you want to create is the same as the schema of the table in the ApsaraDB RDS for MySQL database. This mode is easy to use for developers. The disadvantage is that you cannot create partitions. For tables that need to be partitioned, you must create partitions in normal mode.

      /*
            The sample code of the preceding two methods provides examples of the normal mode. This sample code provides an example of the simple mode.
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'='buckets 8',
      'starrocks.create.table.mode'='simple',
       'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
      'load-url'='172.16.**.**:18030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '123456',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr-test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      

    Table 1. Parameters in the WITH clause

    Parameter

    Required

    Description

    starrocks.create.table.properties

    Yes

    Other suffix definitions except for field definitions in the statement that is used to create a table in the StarRocks database, such as engine, key, and buckets in the sample code.

    database-name

    Yes

    The name of the StarRocks database.

    In this example, test_cdc is used.

    jdbc-url

    Yes

    The Java Database Connectivity (JDBC) URL that is used to connect to StarRocks and perform queries in StarRocks.

    For example, the URL can be jdbc:mysql://172.16.**.**:9030. 172.16.**.** in the URL is the internal IP address of the StarRocks cluster.

    load-url

    Yes

    The IP address and HTTP port of the frontend, in the format of Internal IP address of the StarRocks cluster:8030. In this example, port 8030 is used. Select a port based on the version of your cluster.

    • 18030: Select this port for clusters of EMR V5.9.0 or a later minor version and clusters of EMR V3.43.0 or a later minor version.

    • 8030: Select this port for clusters of EMR V5.8.0, EMR V3.42.0, or a minor version earlier than EMR V5.8.0 or EMR V3.42.0.

    Note

    For more information, see Access the UI and ports.

    sink.semantic

    No

    The semantics that is used to execute the statement. Set this parameter to exactly-once to ensure data consistency. Default value: at-least-once.

    starrocks.create.table.mode

    No

    Valid values:

    • normal: the default value. You must enter complete configurations, such as engine, key, and buckets, in the starrocks.create.table.properties parameter in the sample code.

    • simple: By default, the engine parameter is set to olap, the key parameter is set to primary key. The primary key is the same as the primary key in the ApsaraDB RDS for MySQL table. The distributed by hash parameter is configured for all primary keys, and no partition exists. The buckets parameter in the starrocks.create.table.properties parameter is required. Other parameters, such as properties, are optional.

    Note
    • If you use a version earlier than Flink of vvr-6.0.5-flink-1.15, you must add 'sink.use.new-apiapi'='false', to the WITH clause.

    • For more information about other configurations, see Continuously load data from Apache Flink.

    Table 2. Parameters in the OPTIONS clause

    Parameter

    Description

    connector

    The type of the connector. Set the value to mysql-cdc.

    hostname

    The internal endpoint of the ApsaraDB RDS for MySQL instance.

    You can copy the internal endpoint on the Database Connection page of the ApsaraDB RDS for MySQL instance in the ApsaraDB RDS console, such as rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.

    port

    The port number of the ApsaraDB RDS for MySQL database. Default value: 3306.

    username

    The username that is used to access the ApsaraDB RDS for MySQL database.

    Set this parameter to the username of the account that is created in Step 1: Prepare test data. In this example, test is used.

    password

    The password that is used to access the ApsaraDB RDS for MySQL database.

    Set this parameter to the password of the account that is created in Step 1: Prepare test data.

    table-name

    The name of the table in the StarRocks database.

    Set this parameter to the name of the table that is created in Step 1: Prepare test data. In this example, runoob_tbl is used.

    database-name

    The name of the default ApsaraDB RDS for MySQL database.

    Set this parameter to the name of the database that is created in Step 1: Prepare test data. In this example, test_cdc is used.

  2. On the Advanced tab of the Draft Editor page, set Engine Version to vvr-6.0.3-flink-1.15 or later.

  3. In the upper-right corner of the Draft Editor page, click Publish.

  4. On the Deployments page in the console of fully managed Flink, find the deployment that you want to start, and click Start in the Actions column.

Step 4: Demonstrate the procedure in different scenarios

Query data

  1. Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to connect to the StarRocks cluster:

    mysql -h127.0.0.1 -P 9030 -uroot
  3. On the CLI of the StarRocks cluster, execute the following statement to query the table data:

    use test_cdc;
    select * from runoob_tbl1;

    The following output is returned. This indicates that data is synchronized from the ApsaraDB RDS for MySQL instance to the StarRocks cluster.

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Query inserted data

  1. On the SQL Console tab of the ApsaraDB RDS for MySQL database, execute the following statement to insert data:

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1)
  2. On the CLI of the StarRocks cluster, execute the following statement to query the table data:

    select * from runoob_tbl1;

    The following output is returned. This indicates that the data is inserted.

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Synchronize updated data

  1. On the SQL Console tab of the ApsaraDB RDS for MySQL database, execute the following statement to update specific data:

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18
  2. On the CLI of the StarRocks cluster, execute the following statement to query the table data:

    select * from runoob_tbl1;

    The following output is returned. This indicates that the updated data is synchronized.

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Synchronize deleted data

  1. On the SQL Console tab of the ApsaraDB RDS for MySQL database, execute the following statement to delete specific data:

    DELETE FROM runoob_tbl WHERE runoob_id = 1
  2. On the CLI of the StarRocks cluster, execute the following statement to query the table data:

    select * from runoob_tbl1;

    The following output is returned. This indicates that the deleted data is synchronized.

    +-----------+--------------+---------------+-----------------+---------+ 
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | 
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2022-06-22      |       3 | 
    +-----------+--------------+---------------+-----------------+---------+

Add nullable columns

  1. On the SQL Console tab of the ApsaraDB RDS for MySQL database, execute the following statement to add a nullable column:

    alter table `runoob_tbl` add COLUMN `add_col2` INT;
  2. Execute the following statement to insert data into the table:

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`)  values(1,'second','tom2','2022-06-23',1,2)
  3. On the CLI of the StarRocks cluster, execute the following statement to query the table data:

    select * from runoob_tbl1;

    The following output is returned. This indicates that the schema is changed and the nullable column is added.

    +-----------+--------------+---------------+-----------------+---------+----------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 | 
    +-----------+--------------+---------------+-----------------+---------+----------+ 
    |        18 | new          | tom           | 2022-06-22      |       3 |     NULL |
    +-----------+--------------+---------------+-----------------+---------+----------+ 
    |         1 | second       | tom2          | 2022-06-23      |       1 |      2   |
    |        18 | first        | tom           | 2022-06-22      |       3 |     NULL |
    +-----------+--------------+---------------+-----------------+---------+----------+ 

CREATE DATABASE AS

The CREATE DATABASE AS statement is a syntactic sugar of the CREATE TABLE AS statement. You can use the CREATE DATABASE AS statement to synchronize data of an ApsaraDB RDS for MySQL database. During data synchronization, a Flink deployment is generated. The source is an ApsaraDB RDS for MySQL database and the destination tables are the tables in a StarRocks database. When you execute the CREATE DATABASE AS statement, you can also use the INCLUDING TABLE syntax to select only specific tables in a database.

Similar to the execution of the CREATE TABLE AS statement, you must create catalogs in an ApsaraDB RDS for MySQL database and a StarRocks database before you execute the CREATE DATABASE AS statement. Sample statement:

CREATE DATABASE IF NOT EXISTS sr_db with (
  'starrocks.create.table.properties'=' buckets 8',
  'starrocks.create.table.mode'='simple',
  'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
  'load-url'='172.16.**.**:18030',
  'username'='test',
  'password' = '123456',
  'sink.buffer-flush.interval-ms' = '5000' ,
  'sink.properties.row_delimiter' = '\x02',
  'sink.properties.column_separator' = '\x01'
)
as DATABASE mysql.test_cdc including table  'tabl1','tbl2','tbl3'
/*+ OPTIONS (   'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',   
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc' )*/;