All Products
Search
Document Center

E-MapReduce:Use CTAS and CDAS in Realtime Compute for Flink to synchronize data from MySQL to StarRocks

Last Updated:Mar 26, 2026

The CREATE TABLE AS (CTAS) statement automatically creates a matching table in StarRocks and continuously synchronizes data and schema changes from a MySQL source. The CREATE DATABASE AS (CDAS) statement extends this to an entire database. This topic shows how to use both statements in Realtime Compute for Apache Flink to move transaction processing (TP) data from ApsaraDB RDS for MySQL into an E-MapReduce (EMR) StarRocks cluster for analytical processing (AP).

Architecture overview

Component Role
ApsaraDB RDS for MySQL Source — origin of change data capture (CDC) events and schema changes
Realtime Compute for Apache Flink Processing engine — reads MySQL CDC and writes to StarRocks
EMR StarRocks cluster Sink — analytical destination for synchronized data

How it works

When you run a CTAS statement, Flink performs two operations in sequence:

  1. Table creation check — Flink checks whether the destination table exists in StarRocks.

    • If it does not exist, Flink creates the table with the same schema as the source, based on the destination catalog.

    • If it already exists, Flink skips creation. If the existing schema differs from the source schema, an error is returned.

  2. Data synchronization — Flink starts a continuous job that synchronizes both data and schema changes from the source table to the destination table.

Schema change behavior

CTAS uses a fixed policy for propagating schema changes. The following tables summarize what is and is not propagated automatically.

Supported schema changes

Change type Behavior in StarRocks
Add a nullable column Column is appended to the end of the destination table; incoming data fills the column
Delete a nullable column Column is retained in StarRocks but filled with NULL values
Rename a column The new-name column is appended to the end; the old-name column is filled with NULL values. For example, renaming col_a to col_b adds col_b at the end and sets col_a to NULL.

Unsupported schema changes

If any of the following changes occur in the source table, delete the destination table in StarRocks and restart the CTAS job. Flink recreates the table and resynchronizes all historical data.

  • Change a column's data type (for example, VARCHAR to BIGINT, or NOT NULL to NULLABLE)

  • Change constraints such as the primary key or index

  • Add or delete a non-nullable column

  • Adjust field lengths in DDL (Data Definition Language) statements

CTAS detects schema changes by comparing the schema of consecutive data records — it does not parse DDL statement types. As a result:
If a column is deleted and then re-added with no data changes in between, CTAS treats this as no schema change.
A schema change is only propagated when new data arrives in the source table after the change.
For field type mappings, see Continuously load data from Apache Flink.
When CTAS merges multiple MySQL tables, Flink automatically prepends _db_name and _table_name columns to the destination table to track the source. Define your own column order starting from the third column.

Prerequisites

Before you begin, ensure that you have:

The examples in this topic use MySQL 5.7, EMR 3.39.1, and fully managed Flink vvr-6.0.3-flink-1.15.

Limitations

  • The Flink workspace, StarRocks cluster, and ApsaraDB RDS for MySQL instance must be in the same virtual private cloud (VPC).

  • The ApsaraDB RDS for MySQL engine version must be 5.7 or later.

  • Internet access must be enabled for the StarRocks cluster.

  • Fully managed Flink must be vvr-6.0.3-flink-1.15 or later.

Step 1: Prepare test data

  1. Create a database and an account on the ApsaraDB RDS for MySQL instance. See Create databases and accounts for an ApsaraDB RDS for MySQL instance. Grant the test account read and write permissions.

    This topic uses a database named test_cdc and an account named test.
  2. Log in to the ApsaraDB RDS for MySQL instance using the test account. See Use DMS to log on to an ApsaraDB RDS for MySQL instance.

  3. Create a test table and insert a row:

    USE test_cdc;
    
    CREATE TABLE IF NOT EXISTS `runoob_tbl` (
      `runoob_id`      INT UNSIGNED AUTO_INCREMENT,
      `runoob_title`   VARCHAR(100) NOT NULL,
      `runoob_author`  VARCHAR(40)  NOT NULL,
      `submission_date` DATE,
      `add_col`        INT DEFAULT NULL,
      PRIMARY KEY (`runoob_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    INSERT INTO test_cdc.`runoob_tbl`
      (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`)
    VALUES (18, 'first', 'tom', '2022-06-22', 3);
  4. Log in to the StarRocks cluster over SSH. See Log on to a cluster.

  5. Connect to StarRocks:

    mysql -h127.0.0.1 -P 9030 -uroot
  6. Create a user and grant the permissions needed for this tutorial:

    CREATE DATABASE test_cdc;
    CREATE USER 'test' IDENTIFIED BY '123456';
    GRANT CREATE TABLE ON DATABASE test_cdc TO test;

Step 2: Create catalogs in the Flink SQL editor

On the Draft Editor page in the fully managed Flink console, create one catalog for MySQL and one for StarRocks. See Getting started with a Flink SQL deployment.

The parameter values below are examples. Adjust them to match your environment.

MySQL catalog

CREATE CATALOG mysql WITH (
  'type'             = 'mysql',
  'hostname'         = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'             = '3306',
  'username'         = 'test',
  'password'         = '123456',
  'default-database' = 'test_cdc'
);
Parameter Description
type Catalog type. Set to mysql.
hostname Internal endpoint of the ApsaraDB RDS for MySQL instance. Copy it from the Database Connection page in the ApsaraDB RDS console (for example, rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com).
port Port of the MySQL database. Default: 3306.
username Username created in Step 1: Prepare test data. In this example: test.
password Password for the username created in Step 1: Prepare test data.
default-database Name of the database created in Step 1: Prepare test data. In this example: test_cdc.

StarRocks catalog

CREATE CATALOG sr WITH (
  'type'     = 'starrocks',
  'endpoint' = '172.16.**.**:9030',
  'username' = 'test',
  'password' = '123456',
  'dbname'   = 'test_cdc'
);
Parameter Description
type Catalog type. Set to starrocks.
endpoint IP address and port of the StarRocks frontend (for example, 172.16..:9030).
username Username created in Step 1: Prepare test data. In this example: test.
password Password for the username created in Step 1: Prepare test data.
dbname StarRocks database name. In this example: test_cdc.

Step 3: Write and publish a CTAS deployment

On the Draft Editor page, write a CTAS statement. Three delivery modes are available — choose based on your consistency requirements.

At-least-once semantics (recommended for low-latency scenarios)

Data is written at a configurable flush interval. Memory usage is lower, but duplicate writes may occur on failure.

/* At-least-once semantics */

USE CATALOG sr;

CREATE TABLE IF NOT EXISTS runoob_tbl_sr WITH (
  'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
  'database-name'                     = 'test_cdc',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'table-name'                        = 'runoob_tbl_sr',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.buffer-flush.interval-ms'     = '5000',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc',
  'table-name'    = 'runoob_tbl'
) */;

Exactly-once semantics (recommended for data-critical scenarios)

No data loss or duplication on failure. Data visibility depends on the checkpoint interval.

/* Exactly-once semantics */

SET 'execution.checkpointing.interval' = '1 min';
SET 'execution.checkpointing.mode'     = 'EXACTLY_ONCE';
SET 'execution.checkpointing.timeout'  = '10 min';

USE CATALOG sr;

CREATE TABLE IF NOT EXISTS runoob_tbl WITH (
  'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
  'database-name'                     = 'test_cdc',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'table-name'                        = 'runoob_tbl',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.semantic'                     = 'exactly-once',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc',
  'table-name'    = 'runoob_tbl'
) */;

For checkpoint configuration options, see Checkpointing.

Simple mode (recommended for quick setup)

Flink infers the table definition from the MySQL schema — no need to specify engine, key, or distribution manually. Partitioned tables are not supported in simple mode; create partitions using normal mode instead.

/* Simple mode */

USE CATALOG sr;

CREATE TABLE IF NOT EXISTS runoob_tbl1 WITH (
  'starrocks.create.table.properties' = 'buckets 8',
  'starrocks.create.table.mode'       = 'simple',
  'database-name'                     = 'test_cdc',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'table-name'                        = 'runoob_tbl_sr',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.buffer-flush.interval-ms'     = '5000',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc',
  'table-name'    = 'runoob_tbl'
) */;

WITH clause parameters

Parameter Required Description
starrocks.create.table.properties Yes Suffix definitions for the StarRocks CREATE TABLE statement, excluding field definitions — for example, engine, key, and buckets.
database-name Yes Name of the StarRocks database.
jdbc-url Yes Java Database Connectivity (JDBC) URL for StarRocks queries — for example, jdbc:mysql://172.16..:9030, where 172.16.. is the internal IP address of the StarRocks cluster.
load-url Yes Internal IP address and HTTP port of the StarRocks frontend. Select the port based on your EMR cluster version: 18030 for EMR V5.9.0 or later (minor versions) and EMR V3.43.0 or later (minor versions); 8030 for EMR V5.8.0, EMR V3.42.0, or earlier. See Access the UI and ports.
sink.semantic No Delivery semantics: at-least-once (default) or exactly-once.
starrocks.create.table.mode No normal (default): specify engine, key, and buckets in starrocks.create.table.properties. simple: Flink sets engine=olap, uses the MySQL primary key, and distributes by hash across all primary key columns. Only buckets is required in starrocks.create.table.properties. Partitions are not created.
If your Flink version is earlier than vvr-6.0.5-flink-1.15, add 'sink.use.new-apiapi' = 'false' to the WITH clause. For additional sink parameters, see Continuously load data from Apache Flink.

OPTIONS clause parameters

The OPTIONS clause configures the MySQL CDC source.

Parameter Description
connector Connector type. Set to mysql-cdc.
hostname Internal endpoint of the ApsaraDB RDS for MySQL instance (for example, rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com).
port MySQL port. Default: 3306.
username Username for ApsaraDB RDS for MySQL access. Use the account created in Step 1: Prepare test data.
password Password for ApsaraDB RDS for MySQL access.
table-name Source table name. In this example: runoob_tbl.
database-name Source database name. In this example: test_cdc.

Publish and start the deployment

  1. On the Advanced tab of the Draft Editor page, set Engine Version to vvr-6.0.3-flink-1.15 or later.

  2. In the upper-right corner of the Draft Editor page, click Publish.

  3. On the Deployments page, find the new deployment and click Start in the Actions column.

Step 4: Verify synchronization

After the job starts, run the following scenarios to confirm that data changes and schema changes both propagate to StarRocks in real time.

Connect to StarRocks before running the verification queries:

mysql -h127.0.0.1 -P 9030 -uroot
USE test_cdc;

Verify initial data

Query the StarRocks table to confirm the seed row was synchronized:

SELECT * FROM runoob_tbl1;

Expected output:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|        18 | first        | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

Verify INSERT

On the SQL Console tab of the ApsaraDB RDS for MySQL instance, insert a row:

INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1);

Query StarRocks to confirm both rows appear:

SELECT * FROM runoob_tbl1;

Expected output:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|         1 | second       | tom2          | 2022-06-23      |       1 |
|        18 | first        | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

Verify UPDATE

On the MySQL SQL Console, update a row:

UPDATE runoob_tbl SET runoob_title = 'new' WHERE runoob_id = 18;

Query StarRocks to confirm the change:

SELECT * FROM runoob_tbl1;

Expected output:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|         1 | second       | tom2          | 2022-06-23      |       1 |
|        18 | new          | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

Verify DELETE

On the MySQL SQL Console, delete a row:

DELETE FROM runoob_tbl WHERE runoob_id = 1;

Query StarRocks to confirm the row is removed:

SELECT * FROM runoob_tbl1;

Expected output:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|        18 | new          | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

Verify schema change: add a nullable column

On the MySQL SQL Console, add a nullable column and insert a row with a value in that column:

ALTER TABLE `runoob_tbl` ADD COLUMN `add_col2` INT;

INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`, `add_col2`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1, 2);

Query StarRocks to confirm the new column appears and existing rows show NULL:

SELECT * FROM runoob_tbl1;

Expected output:

+-----------+--------------+---------------+-----------------+---------+----------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 |
+-----------+--------------+---------------+-----------------+---------+----------+
|        18 | new          | tom           | 2022-06-22      |       3 |     NULL |
|         1 | second       | tom2          | 2022-06-23      |       1 |        2 |
+-----------+--------------+---------------+-----------------+---------+----------+

The add_col2 column was automatically added to StarRocks when the first data row containing it arrived.

CDAS: synchronize an entire database

The CREATE DATABASE AS (CDAS) statement is syntactic sugar for CTAS — it creates one Flink deployment that synchronizes all selected tables from a MySQL database to StarRocks at once. Use the INCLUDING TABLE clause to select specific tables by name.

Create the MySQL and StarRocks catalogs first (same as Step 2), then run:

CREATE DATABASE IF NOT EXISTS sr_db WITH (
  'starrocks.create.table.properties' = 'buckets 8',
  'starrocks.create.table.mode'       = 'simple',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.buffer-flush.interval-ms'     = '5000',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS DATABASE mysql.test_cdc INCLUDING TABLE 'tbl1', 'tbl2', 'tbl3'
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc'
) */;

The INCLUDING TABLE clause accepts a comma-separated list of table names. Omit it to synchronize all tables in the source database.

What's next