The CREATE TABLE AS (CTAS) statement automatically creates a matching table in StarRocks and continuously synchronizes data and schema changes from a MySQL source. The CREATE DATABASE AS (CDAS) statement extends this to an entire database. This topic shows how to use both statements in Realtime Compute for Apache Flink to move transaction processing (TP) data from ApsaraDB RDS for MySQL into an E-MapReduce (EMR) StarRocks cluster for analytical processing (AP).
Architecture overview
| Component | Role |
|---|---|
| ApsaraDB RDS for MySQL | Source — origin of change data capture (CDC) events and schema changes |
| Realtime Compute for Apache Flink | Processing engine — reads MySQL CDC and writes to StarRocks |
| EMR StarRocks cluster | Sink — analytical destination for synchronized data |
How it works
When you run a CTAS statement, Flink performs two operations in sequence:
-
Table creation check — Flink checks whether the destination table exists in StarRocks.
-
If it does not exist, Flink creates the table with the same schema as the source, based on the destination catalog.
-
If it already exists, Flink skips creation. If the existing schema differs from the source schema, an error is returned.
-
-
Data synchronization — Flink starts a continuous job that synchronizes both data and schema changes from the source table to the destination table.
Schema change behavior
CTAS uses a fixed policy for propagating schema changes. The following tables summarize what is and is not propagated automatically.
Supported schema changes
| Change type | Behavior in StarRocks |
|---|---|
| Add a nullable column | Column is appended to the end of the destination table; incoming data fills the column |
| Delete a nullable column | Column is retained in StarRocks but filled with NULL values |
| Rename a column | The new-name column is appended to the end; the old-name column is filled with NULL values. For example, renaming col_a to col_b adds col_b at the end and sets col_a to NULL. |
Unsupported schema changes
If any of the following changes occur in the source table, delete the destination table in StarRocks and restart the CTAS job. Flink recreates the table and resynchronizes all historical data.
-
Change a column's data type (for example,
VARCHARtoBIGINT, orNOT NULLtoNULLABLE) -
Change constraints such as the primary key or index
-
Add or delete a non-nullable column
-
Adjust field lengths in DDL (Data Definition Language) statements
CTAS detects schema changes by comparing the schema of consecutive data records — it does not parse DDL statement types. As a result:
If a column is deleted and then re-added with no data changes in between, CTAS treats this as no schema change.
A schema change is only propagated when new data arrives in the source table after the change.
For field type mappings, see Continuously load data from Apache Flink.
When CTAS merges multiple MySQL tables, Flink automatically prepends_db_nameand_table_namecolumns to the destination table to track the source. Define your own column order starting from the third column.
Prerequisites
Before you begin, ensure that you have:
-
A fully managed Flink workspace running vvr-6.0.3-flink-1.15 or later. See Activate fully managed Flink and Getting started with a Flink SQL deployment
-
An EMR StarRocks cluster. See Create a StarRocks cluster
-
An ApsaraDB RDS for MySQL instance running MySQL 5.7 or later. See Create an ApsaraDB RDS for MySQL instance
The examples in this topic use MySQL 5.7, EMR 3.39.1, and fully managed Flink vvr-6.0.3-flink-1.15.
Limitations
-
The Flink workspace, StarRocks cluster, and ApsaraDB RDS for MySQL instance must be in the same virtual private cloud (VPC).
-
The ApsaraDB RDS for MySQL engine version must be 5.7 or later.
-
Internet access must be enabled for the StarRocks cluster.
-
Fully managed Flink must be vvr-6.0.3-flink-1.15 or later.
Step 1: Prepare test data
-
Create a database and an account on the ApsaraDB RDS for MySQL instance. See Create databases and accounts for an ApsaraDB RDS for MySQL instance. Grant the test account read and write permissions.
This topic uses a database named
test_cdcand an account namedtest. -
Log in to the ApsaraDB RDS for MySQL instance using the test account. See Use DMS to log on to an ApsaraDB RDS for MySQL instance.
-
Create a test table and insert a row:
USE test_cdc; CREATE TABLE IF NOT EXISTS `runoob_tbl` ( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` INT DEFAULT NULL, PRIMARY KEY (`runoob_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`) VALUES (18, 'first', 'tom', '2022-06-22', 3); -
Log in to the StarRocks cluster over SSH. See Log on to a cluster.
-
Connect to StarRocks:
mysql -h127.0.0.1 -P 9030 -uroot -
Create a user and grant the permissions needed for this tutorial:
CREATE DATABASE test_cdc; CREATE USER 'test' IDENTIFIED BY '123456'; GRANT CREATE TABLE ON DATABASE test_cdc TO test;
Step 2: Create catalogs in the Flink SQL editor
On the Draft Editor page in the fully managed Flink console, create one catalog for MySQL and one for StarRocks. See Getting started with a Flink SQL deployment.
The parameter values below are examples. Adjust them to match your environment.
MySQL catalog
CREATE CATALOG mysql WITH (
'type' = 'mysql',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'default-database' = 'test_cdc'
);
| Parameter | Description |
|---|---|
type |
Catalog type. Set to mysql. |
hostname |
Internal endpoint of the ApsaraDB RDS for MySQL instance. Copy it from the Database Connection page in the ApsaraDB RDS console (for example, rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com). |
port |
Port of the MySQL database. Default: 3306. |
username |
Username created in Step 1: Prepare test data. In this example: test. |
password |
Password for the username created in Step 1: Prepare test data. |
default-database |
Name of the database created in Step 1: Prepare test data. In this example: test_cdc. |
StarRocks catalog
CREATE CATALOG sr WITH (
'type' = 'starrocks',
'endpoint' = '172.16.**.**:9030',
'username' = 'test',
'password' = '123456',
'dbname' = 'test_cdc'
);
| Parameter | Description |
|---|---|
type |
Catalog type. Set to starrocks. |
endpoint |
IP address and port of the StarRocks frontend (for example, 172.16..:9030). |
username |
Username created in Step 1: Prepare test data. In this example: test. |
password |
Password for the username created in Step 1: Prepare test data. |
dbname |
StarRocks database name. In this example: test_cdc. |
Step 3: Write and publish a CTAS deployment
On the Draft Editor page, write a CTAS statement. Three delivery modes are available — choose based on your consistency requirements.
At-least-once semantics (recommended for low-latency scenarios)
Data is written at a configurable flush interval. Memory usage is lower, but duplicate writes may occur on failure.
/* At-least-once semantics */
USE CATALOG sr;
CREATE TABLE IF NOT EXISTS runoob_tbl_sr WITH (
'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
'database-name' = 'test_cdc',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'table-name' = 'runoob_tbl_sr',
'username' = 'test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl'
) */;
Exactly-once semantics (recommended for data-critical scenarios)
No data loss or duplication on failure. Data visibility depends on the checkpoint interval.
/* Exactly-once semantics */
SET 'execution.checkpointing.interval' = '1 min';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.timeout' = '10 min';
USE CATALOG sr;
CREATE TABLE IF NOT EXISTS runoob_tbl WITH (
'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
'database-name' = 'test_cdc',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'table-name' = 'runoob_tbl',
'username' = 'test',
'password' = '123456',
'sink.semantic' = 'exactly-once',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl'
) */;
For checkpoint configuration options, see Checkpointing.
Simple mode (recommended for quick setup)
Flink infers the table definition from the MySQL schema — no need to specify engine, key, or distribution manually. Partitioned tables are not supported in simple mode; create partitions using normal mode instead.
/* Simple mode */
USE CATALOG sr;
CREATE TABLE IF NOT EXISTS runoob_tbl1 WITH (
'starrocks.create.table.properties' = 'buckets 8',
'starrocks.create.table.mode' = 'simple',
'database-name' = 'test_cdc',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'table-name' = 'runoob_tbl_sr',
'username' = 'test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl'
) */;
WITH clause parameters
| Parameter | Required | Description |
|---|---|---|
starrocks.create.table.properties |
Yes | Suffix definitions for the StarRocks CREATE TABLE statement, excluding field definitions — for example, engine, key, and buckets. |
database-name |
Yes | Name of the StarRocks database. |
jdbc-url |
Yes | Java Database Connectivity (JDBC) URL for StarRocks queries — for example, jdbc:mysql://172.16..:9030, where 172.16.. is the internal IP address of the StarRocks cluster. |
load-url |
Yes | Internal IP address and HTTP port of the StarRocks frontend. Select the port based on your EMR cluster version: 18030 for EMR V5.9.0 or later (minor versions) and EMR V3.43.0 or later (minor versions); 8030 for EMR V5.8.0, EMR V3.42.0, or earlier. See Access the UI and ports. |
sink.semantic |
No | Delivery semantics: at-least-once (default) or exactly-once. |
starrocks.create.table.mode |
No | normal (default): specify engine, key, and buckets in starrocks.create.table.properties. simple: Flink sets engine=olap, uses the MySQL primary key, and distributes by hash across all primary key columns. Only buckets is required in starrocks.create.table.properties. Partitions are not created. |
If your Flink version is earlier than vvr-6.0.5-flink-1.15, add 'sink.use.new-apiapi' = 'false' to the WITH clause. For additional sink parameters, see Continuously load data from Apache Flink.
OPTIONS clause parameters
The OPTIONS clause configures the MySQL CDC source.
| Parameter | Description |
|---|---|
connector |
Connector type. Set to mysql-cdc. |
hostname |
Internal endpoint of the ApsaraDB RDS for MySQL instance (for example, rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com). |
port |
MySQL port. Default: 3306. |
username |
Username for ApsaraDB RDS for MySQL access. Use the account created in Step 1: Prepare test data. |
password |
Password for ApsaraDB RDS for MySQL access. |
table-name |
Source table name. In this example: runoob_tbl. |
database-name |
Source database name. In this example: test_cdc. |
Publish and start the deployment
-
On the Advanced tab of the Draft Editor page, set Engine Version to
vvr-6.0.3-flink-1.15or later. -
In the upper-right corner of the Draft Editor page, click Publish.
-
On the Deployments page, find the new deployment and click Start in the Actions column.
Step 4: Verify synchronization
After the job starts, run the following scenarios to confirm that data changes and schema changes both propagate to StarRocks in real time.
Connect to StarRocks before running the verification queries:
mysql -h127.0.0.1 -P 9030 -uroot
USE test_cdc;
Verify initial data
Query the StarRocks table to confirm the seed row was synchronized:
SELECT * FROM runoob_tbl1;
Expected output:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 18 | first | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+
Verify INSERT
On the SQL Console tab of the ApsaraDB RDS for MySQL instance, insert a row:
INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1);
Query StarRocks to confirm both rows appear:
SELECT * FROM runoob_tbl1;
Expected output:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 1 | second | tom2 | 2022-06-23 | 1 |
| 18 | first | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+
Verify UPDATE
On the MySQL SQL Console, update a row:
UPDATE runoob_tbl SET runoob_title = 'new' WHERE runoob_id = 18;
Query StarRocks to confirm the change:
SELECT * FROM runoob_tbl1;
Expected output:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 1 | second | tom2 | 2022-06-23 | 1 |
| 18 | new | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+
Verify DELETE
On the MySQL SQL Console, delete a row:
DELETE FROM runoob_tbl WHERE runoob_id = 1;
Query StarRocks to confirm the row is removed:
SELECT * FROM runoob_tbl1;
Expected output:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 18 | new | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+
Verify schema change: add a nullable column
On the MySQL SQL Console, add a nullable column and insert a row with a value in that column:
ALTER TABLE `runoob_tbl` ADD COLUMN `add_col2` INT;
INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`, `add_col2`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1, 2);
Query StarRocks to confirm the new column appears and existing rows show NULL:
SELECT * FROM runoob_tbl1;
Expected output:
+-----------+--------------+---------------+-----------------+---------+----------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 |
+-----------+--------------+---------------+-----------------+---------+----------+
| 18 | new | tom | 2022-06-22 | 3 | NULL |
| 1 | second | tom2 | 2022-06-23 | 1 | 2 |
+-----------+--------------+---------------+-----------------+---------+----------+
The add_col2 column was automatically added to StarRocks when the first data row containing it arrived.
CDAS: synchronize an entire database
The CREATE DATABASE AS (CDAS) statement is syntactic sugar for CTAS — it creates one Flink deployment that synchronizes all selected tables from a MySQL database to StarRocks at once. Use the INCLUDING TABLE clause to select specific tables by name.
Create the MySQL and StarRocks catalogs first (same as Step 2), then run:
CREATE DATABASE IF NOT EXISTS sr_db WITH (
'starrocks.create.table.properties' = 'buckets 8',
'starrocks.create.table.mode' = 'simple',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'username' = 'test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS DATABASE mysql.test_cdc INCLUDING TABLE 'tbl1', 'tbl2', 'tbl3'
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc'
) */;
The INCLUDING TABLE clause accepts a comma-separated list of table names. Omit it to synchronize all tables in the source database.
What's next
-
Continuously load data from Apache Flink — full parameter reference for the StarRocks Flink connector
-
Checkpointing — configure checkpoint intervals and timeout for exactly-once semantics
-
Access the UI and ports — confirm the correct
load-urlport for your EMR cluster version