Realtime Compute for Apache Flink allows you to create YAML deployments for data ingestion from a source to a sink based on Change Data Capture (CDC) connectors for Apache Flink. This topic describes how to develop a YAML draft of Realtime Compute for Apache Flink.
Background information
CDC connectors for Apache Flink are integrated with Data Ingestion in Realtime Compute for Apache Flink. Compared with draft development by using the CREATE DATABASE AS and CREATE TABLE AS statements, you can develop YAML drafts on the Data Ingestion page to simplify complex extract, transform, load (ETL) processes and manage ETL processes based on Flink computing logic. YAML deployments support synchronization of all data from a database, single-table synchronization, merging and synchronization of multiple tables in a sharded database, synchronization of new tables, synchronization of table schema changes, and synchronization of custom computed columns. YAML deployments also support ETL processing, WHERE condition-based filtering, column pruning, and computed column addition. You can use YAML deployments to simplify the data integration process. This improves the efficiency and reliability of data integration.
Benefits of YAML deployments for data ingestion
In Realtime Compute for Apache Flink, you can create YAML deployments for data ingestion, SQL deployments, or DataStream deployments to synchronize data. The following section describes the benefits of YAML deployments for data ingestion compared with SQL deployments and DataStream deployments.
YAML deployment for data ingestion vs SQL deployment
YAML deployments for data ingestion and SQL deployments can synchronize data of different types. Take note of the following items:
SQL deployments synchronize data of the RowData type. YAML deployments for data ingestion synchronize data of the DataChangeEvent and SchemaChangeEvent types. Each data record of the RowData type in SQL deployments has its own change type. The change types of data records include INSERT (+I), UPDATE_BEFORE (-U), UPDATE_AFTER (+U), and DELETE (-D).
YAML deployments for data ingestion use the SchemaChangeEvent type to synchronize schema changes, such as table creation, column addition, and table data clearing. YAML deployments for data ingestion also use the DataChangeEvent type to synchronize data changes, including INSERT messages, UPDATE messages (UPDATE_BEFORE or UPDATE_AFTER messages), DELETE messages, and UPDATE messages that combine UPDATE_BEFORE and UPDATE_AFTER messages. This allows you to write raw change data to the sink.
The following table describes the benefits of YAML deployments for data ingestion compared with SQL deployments.
YAML deployment for data ingestion | SQL deployment |
Automatically identifies schemas and supports synchronization of all data from a database. | Requires you to manually write the CREATE TABLE and INSERT statements. |
Supports schema changes for multiple policies. | Does not support schema changes. |
Supports synchronization of original changelogs. | Destroys the structure of original changelogs. |
Reads data from and writes data to multiple tables. | Reads data from and writes data to a single table. |
Compared with deployments that use the CREATE TABLE AS or CREATE DATABASE AS statement, YAML deployments are more powerful and support the following features:
Immediately synchronize schema changes of upstream tables without the need to wait for new data to be written.
Synchronize original changelogs and do not split UPDATE messages.
Synchronize more types of schema changes, such as changes made by using the TRUNCATE TABLE and DROP TABLE statements.
Establish mappings between tables and allow you to flexibly specify the names of sink tables.
Support flexible schema evolution and allow you to configure schema evolution.
Support WHERE condition-based filtering.
Support column pruning.
YAML deployment for data ingestion vs DataStream deployment
The following table describes the benefits of YAML deployments for data ingestion compared with DataStream deployments.
YAML deployment for data ingestion | DataStream deployment |
Is designed for all levels of users, not just experts. | Requires you to be familiar with Java and distributed systems. |
Allows you to perform data development in an efficient manner by hiding underlying details. | Requires you to be familiar with the framework of Realtime Compute for Apache Flink. |
Is easy to understand based on the YAML syntax. | Requires you to understand Maven dependencies. |
Is easy to be reused. | Is difficult to reuse the existing code. |
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.9 or later supports YAML deployments.
You can use a YAML deployment to synchronize data from only one source to only one sink. If you want to read data from multiple sources or write data to multiple sinks, you must create multiple YAML deployments.
You cannot deploy YAML deployments in session clusters.
Connectors supported for data ingestion
The following table describes the connectors that are supported as sources and sinks for data ingestion.
You can submit a ticket or join the DingTalk group to provide feedback regarding the upstream and downstream storage. This helps Realtime Compute for Apache Flink meet your business requirements for more upstream and downstream storage.
Connector | Type | |
Source | Sink | |
× | √ | |
× | √ | |
Note The MySQL connector supports the ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases. | √ | × |
× | √ | |
× | √ | |
× | √ | |
× | √ |
Procedure
Log on to the management console of Realtime Compute for Apache Flink.
Find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, choose
.In the upper-left corner of the Drafts page, click New. In the New Draft dialog box, click Blank Draft and click Next.
You can also select a template to develop a YAML draft to synchronize data. In the New Draft dialog box, you can click Data Ingestion from MySQL to Starrocks, Data Ingestion from MySQL to Paimon, or Data Ingestion from MySQL to Hologres to synchronize data.
On the page that appears, configure the Name and Location parameters, select a value from the Engine Version drop-down list, and then click OK.
Configure the code of the YAML draft.
# Required. source: # The connector type of the source. type: <Replace the value with your connector type of the source> # Configure the parameters for the source. For more information about the parameters, see the documentation of the corresponding connector. ... # Required. sink: # The connector type of the sink. type: <Replace the value with your connector type of the sink> # Configure the parameters for the sink. For more information about the parameters, see the documentation of the corresponding connector. ... # Optional. transform: # Configure a data transformation rule for the flink_test.customers table. - source-table: flink_test.customers # Configure the mappings. You can specify the columns that you want to synchronize and perform data transformation. projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name # Configure the filter condition. Only the data records whose ID is greater than 10 are synchronized. filter: id > 10 # Configure a description for the data transformation rule. description: append calculated columns based on source table # Optional. route: # Configure the routing rule to specify the mapping between a source table and a destination table. - source-table: flink_test.customers sink-table: db.customers_o # Configure a description for the routing rule. description: sync customers table - source-table: flink_test.customers_suffix sink-table: db.customers_s # Configure a description for the routing rule. description: sync customers_suffix table # Optional. pipeline: # The name of the draft. name: MySQL to Hologres Pipeline
The following table describes the code blocks.
Required
Module
Description
Required
source
The start of the pipeline. The CDC connectors of Apache Flink capture the change data from the source.
NoteOnly the MySQL connector can be used in the source. For more information about the parameters that you must configure for the MySQL connector, see MySQL connector.
You can use variables to mask sensitive information. For more information, see Manage variables and keys.
sink
The end of the pipeline. The CDC connectors of Apache Flink synchronize the change data that is captured from the source to the sink.
NoteYou can refer to Connectors supported for data ingestion to obtain the connectors that can be used in the sink. For more information about the parameters that you must configure for a connector used in the sink, see the documentation of the corresponding connector.
You can use variables to mask sensitive information. For more information, see Manage variables and keys.
Optional.
pipeline
This module defines the basic configuration of the YAML draft, such as the pipeline name.
transform
Enter a data transformation rule. You can use a data transformation rule to process data in a Realtime Compute for Apache Flink pipeline. In this module, ETL processing, WHERE condition-based filtering, column pruning, and computed column addition are supported.
If the change data captured by the CDC connectors of Apache Flink needs to be converted to adapt to specific downstream storage, you can use the transform feature.
route
If you do not configure this module, all data in a database or a single table is synchronized.
In specific cases, the captured change data may need to be synchronized to different sinks based on specific rules. The route feature allows you to flexibly specify mappings between the upstream and downstream storage to synchronize data to different sinks.
For more information about the syntax and parameters of each module, see Data ingestion development references.
The following sample code provides an example on how to synchronize all tables from the app_db database in MySQL to a Hologres database. Sample code:
source: type: mysql hostname: <hostname> port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: app_db.\.* server-id: 5400-5404 sink: type: hologres name: Hologres Sink endpoint: <endpoint> dbname: <database-name> username: ${secret_values.holousername} password: ${secret_values.holopassword} pipeline: name: Sync MySQL Database to Hologres
Optional. Click Validate.
You can check the syntax, network connectivity, and access permissions.
References
After you develop a YAML draft, you must deploy the draft. For more information, see Create a deployment.
You can develop a YAML draft to synchronize all data from a MySQL database to StarRocks. For more information, see Getting started with a YAML deployment for data ingestion.