Realtime Compute for Apache Flink allows you to create YAML deployments for data ingestion from a source to a sink based on Change Data Capture (CDC) connectors for Apache Flink. This topic describes how to develop a YAML draft of Realtime Compute for Apache Flink.
Background information
CDC connectors for Apache Flink are integrated with Data Ingestion in Realtime Compute for Apache Flink. Compared with draft development by using the CREATE DATABASE AS and CREATE TABLE AS statements, you can develop YAML drafts on the Data Ingestion page to simplify complex extract, transform, load (ETL) processes and manage ETL processes based on Flink computing logic. YAML deployments support synchronization of all data from a database, single-table synchronization, merging and synchronization of multiple tables in a sharded database, synchronization of new tables, synchronization of table schema changes, and synchronization of custom computed columns. YAML deployments also support ETL processing, WHERE condition-based filtering, column pruning, and computed column addition. You can use YAML deployments to simplify the data integration process. This improves the efficiency and reliability of data integration.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.9 or later supports YAML deployments.
NoteVVR 8.0.9 is mapped to CDC 3.0 for Apache Flink. For more information about version mappings between CDC connectors for Apache Flink and VVR, see Version mappings between CDC connectors for Apache Flink and VVR.
You can use a YAML deployment to synchronize data from only one source to only one sink. If you want to read data from multiple sources or write data to multiple sinks, you must create multiple YAML deployments.
You cannot deploy YAML deployments in session clusters.
Connectors supported for data ingestion
The following table describes the connectors that are supported as sources and sinks for data ingestion.
You can submit a ticket or join the DingTalk group to provide feedback regarding the upstream and downstream storage. This helps Realtime Compute for Apache Flink meet your business requirements for more upstream and downstream storage.
Procedure
Log on to the management console of Realtime Compute for Apache Flink.
Find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, choose
.In the upper-left corner of the Drafts page, click New. In the New Draft dialog box, click Blank Draft and click Next.
You can also select a template to develop a YAML draft to synchronize data. In the New Draft dialog box, you can click Data Ingestion from MySQL to Starrocks, Data Ingestion from MySQL to Paimon, or Data Ingestion from MySQL to Hologres to synchronize data.
On the page that appears, configure the Name and Location parameters, select a value from the Engine Version drop-down list, and then click OK.
Configure the code of the YAML draft.
# Required. source: # The connector type of the source. type: <Replace the value with your connector type of the source> # Configure the parameters for the source. For more information about the parameters, see the documentation of the corresponding connector. ... # Required. sink: # The connector type of the sink. type: <Replace the value with your connector type of the sink> # Configure the parameters for the sink. For more information about the parameters, see the documentation of the corresponding connector. ... # Optional. transform: # Configure a data transformation rule for the flink_test.customers table. - source-table: flink_test.customers # Configure the mappings. You can specify the columns that you want to synchronize and perform data transformation. projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name # Configure the filter condition. Only the data records whose ID is greater than 10 are synchronized. filter: id > 10 # Configure a description for the data transformation rule. description: append calculated columns based on source table # Optional. route: # Configure the routing rule to specify the mapping between a source table and a destination table. - source-table: flink_test.customers sink-table: db.customers_o # Configure a description for the routing rule. description: sync customers table - source-table: flink_test.customers_suffix sink-table: db.customers_s # Configure a description for the routing rule. description: sync customers_suffix table # Optional. pipeline: # The name of the draft. name: MySQL to Hologres Pipeline
The following table describes the code blocks.
Required
Module
Description
Required
source
The start of the pipeline. The CDC connectors of Apache Flink capture the change data from the source.
NoteOnly the MySQL connector can be used in the source. For more information about the parameters that you must configure for the MySQL connector, see MySQL connector.
You can use variables to mask sensitive information. For more information, see Manage variables and keys.
sink
The end of the pipeline. The CDC connectors of Apache Flink synchronize the change data that is captured from the source to the sink.
NoteYou can refer to Connectors supported for data ingestion to obtain the connectors that can be used in the sink. For more information about the parameters that you must configure for a connector used in the sink, see the documentation of the corresponding connector.
You can use variables to mask sensitive information. For more information, see Manage variables and keys.
Optional.
pipeline
This module defines the basic configuration of the YAML draft, such as the pipeline name.
transform
Enter a data transformation rule. You can use a data transformation rule to process data in a Realtime Compute for Apache Flink pipeline. In this module, ETL processing, WHERE condition-based filtering, column pruning, and computed column addition are supported.
If the change data captured by the CDC connectors of Apache Flink needs to be converted to adapt to specific downstream storage, you can use the transform feature.
route
If you do not configure this module, all data in a database or a single table is synchronized.
In specific cases, the captured change data may need to be synchronized to different sinks based on specific rules. The route feature allows you to flexibly specify mappings between the upstream and downstream storage to synchronize data to different sinks.
The following sample code provides an example on how to synchronize all tables from the app_db database in MySQL to a Hologres database. Sample code:
source: type: mysql hostname: <hostname> port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: app_db.\.* server-id: 5400-5404 sink: type: hologres name: Hologres Sink endpoint: <endpoint> dbname: <database-name> username: ${secret_values.holousername} password: ${secret_values.holopassword} pipeline: name: Sync MySQL Database to Hologres
Optional. Click Validate.
You can check the syntax, network connectivity, and access permissions.
References
After you develop a YAML draft, you must deploy the draft. For more information, see Create a deployment.
You can develop a YAML draft to synchronize all data from a MySQL database to StarRocks.