Realtime Compute for Apache Flink allows you to create YAML deployments for data ingestion from a source to a sink based on Change Data Capture (CDC) connectors for Apache Flink. This topic describes how to develop a YAML draft of Realtime Compute for Apache Flink to synchronize all data from a MySQL database to StarRocks.
Prerequisites
A Realtime Compute for Apache Flink workspace is created. For more information, see Activate Realtime Compute for Apache Flink.
Upstream and downstream storage instances are created.
An ApsaraDB RDS for MySQL instance is created. For more information, see Create an ApsaraDB RDS for MySQL instance.
An E-MapReduce (EMR) Serverless StarRocks instance is created.
NoteThe ApsaraDB RDS for MySQL instance and the EMR Serverless StarRocks instance must reside in the same virtual private cloud (VPC) as the Realtime Compute for Apache Flink workspace. If the instances and the workspace do not reside in the same VPC, network connections must be established between the instances and the workspace. In addition, an IP address whitelist must be configured for the ApsaraDB RDS for MySQL instance. For more information, see Console operations, Console operations, and Console operations.
Background information
For example, the ApsaraDB RDS for MySQL instance has a database named order_dw_mysql. In the order_dw_mysql database, three business tables named orders, orders_pay, and product_catalog are created. If you want to develop a YAML draft to synchronize data from the business tables in the order_dw_mysql database to the order_dw_sr database in StarRocks, you can perform the following steps:
Step 1: Prepare test data of the ApsaraDB RDS for MySQL instance
Create a database in the ApsaraDB RDS for MySQL instance and an account for the database.
Create a database named order_dw_mysql and a standard account that has read and write permissions on the database. For more information, see Create accounts and databases and Manage databases.
Log on to the ApsaraDB RDS for MySQL instance from the Data Management (DMS) console.
For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
On the SQL Console tab, enter the following statements and click Execute to create three business tables in the database and insert data into the business tables:
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- Prepare data. INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
Step 2: Develop a YAML draft
Log on to the management console of Realtime Compute for Apache Flink.
Find the workspace that you want to manage and click Console in the Actions column. In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, choose
.In the upper-left corner of the Drafts page, click New. In the New Draft dialog box, click Data Ingestion from MySQL to Starrocks and click Next.
On the page that appears, configure the Name and Location parameters, select a value from the Engine Version drop-down list, and then click OK.
Configure the code of the YAML draft.
The following code provides an example on how to synchronize all tables from the order_dw_mysql database in ApsaraDB RDS for MySQL to the order_dw_sr database in StarRocks.
source: type: mysql hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: order_dw_mysql.\.* server-id: 5405-5415 sink: type: starrocks name: StarRocks Sink jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030 load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030 username: ${secret_values.starrocksusername} password: ${secret_values.starrockspassword} table.create.properties.replication_num: 1 route: - source-table: order_dw_mysql.\.* sink-table: order_dw_sr.<> replace-symbol: <> description: route all tables in source_db to sink_db pipeline: name: Sync MySQL Database to StarRocks
The following table describes the configuration information required for the ApsaraDB RDS for MySQL and StarRocks instances in this example. For more information about other parameters used for data ingestion, see MySQL connector and StarRocks connector.
Category
Parameter
Description
Example
source
hostname
The IP address or hostname that is used to access the ApsaraDB RDS for MySQL database.
We recommend that you enter the VPC endpoint of the database.
rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
port
The port number that is used to access the ApsaraDB RDS for MySQL database.
3306
username
The username and password that are used to access the ApsaraDB RDS for MySQL database. Set the parameters to the username and password of the account that is created in Step 1: Prepare test data of the ApsaraDB RDS for MySQL instance.
NoteIn this example, variables are used to prevent security risks that are caused by plaintext password information. For more information, see Manage variables.
${secret_values.mysqlusername}
password
${secret_values.mysqlpassword}
tables
The names of the ApsaraDB RDS for MySQL tables. You can use regular expressions to read data from multiple tables.
In this example, all tables and data in the order_dw_mysql database are synchronized.
order_dw_mysql.\.*
server-id
The numeric ID that is allocated to a database client.
5405-5415
sink
jdbc-url
The Java Database Connectivity (JDBC) URL that is used to connect to the database.
The JDBC URL contains a specific IP address and the JDBC port of a frontend (FE). Specify a value for this parameter in the
jdbc:mysql://ip:port
format.You can go to the Instance Details tab of the EMR Serverless StarRocks instance in the EMR console to view the internal endpoint and the query port of the instance in the FE Details section.
jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
load-url
The HTTP service URL that is used to connect to the FE node.
You can go to the Instance Details tab of the EMR Serverless StarRocks instance in the EMR console to view the internal endpoint and HTTP port of the instance in the FE Details section.
fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
username
The username and password that are used to access the StarRocks database.
Set the parameters to the username and password that you used when you create the EMR Serverless StarRocks instance.
NoteIn this example, variables are used to prevent security risks that are caused by plaintext password information. For more information, see Manage variables.
${secret_values.starrocksusername}
password
${secret_values.starrockspassword}
route
source-table
Specifies the effective source tables.
order_dw_mysql.\.*
sink-table
Specifies the destination location for data routing.
order_dw_sr.<>
replace-symbol
The string to be replaced by the name of a source table when the pattern matching feature is used.
<>
Click Deploy.
Step 3: Start the YAML deployment for the draft
In the upper-right corner of the Data Ingestion page, click Deploy. In the dialog box that appears, configure the parameters and click Confirm.
In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, choose
. On the Deployments page, find the desired YAML deployment and click Start in the Actions column.In the Start Job panel, configure the parameters and click Start.
In this example, Initial Mode is selected. For more information about the parameters, see Start a deployment. You can view the status of and information about the deployment on the Deployments page after you start the deployment.
Step 4: View the synchronization result in StarRocks
After the YAML deployment enters the RUNNING state, you can view the data synchronization status in StarRocks.
Use EMR StarRocks Manager to access the EMR Serverless StarRocks instance.
In the left-side navigation pane of the page that appears, click SQL Editor. On the Databases tab, click the icon.
A database named order_dw_sr appears in default_catalog.
On the Queries tab, click + File. In the Create a file dialog box, configure the parameter to create a query script. On the query script editing page, enter the following SQL statements and click Run.
SELECT * FROM default_catalog.order_dw_sr.orders order by order_id; SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id; SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
View the synchronization result below the statements.
Tables that have the same names as the tables in the ApsaraDB RDS for MySQL database and data of the tables already exist in the StarRocks database.
References
For more information about how to develop a YAML draft, see Develop a YAML draft (public preview).