This topic describes the development references for the source, sink, transform, route, and pipeline modules related to data ingestion.
Supported connectors
Connector | Type | |
Source | Sink | |
√ Note The Kafka connector can be used in the source only in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.10 or later. | √ | |
× | √ | |
Note The MySQL connector supports the ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases. | √ | × |
× | √ | |
× | √ | |
× | √ | |
× | √ |
source
The source module defines the data ingestion source. Only the Kafka connector and MySQL connector can be used in the source.
Syntax
source:
type: mysql
name: mysql source
xxx: ...
For more information about the parameters that you must configure when you use a connector, see the "Data ingestion" section of the topic for the corresponding connector.
sink
The sink module defines the data ingestion sink. The following connectors are supported: Kafka connector, Upsert Kafka connector, Hologres connector, Apache Paimon connector, StarRocks connector, and Print connector.
Syntax
sink:
type: hologres
name: hologres sink
xxx: ...
For more information about the parameters that you must configure when you use a connector, see the "Data ingestion" section of the topic for the corresponding connector.
transform
You can configure rules in the transform module of a YAML deployment to implement features such as data projection, calculation, and filtering in a source table.
Syntax
transform:
- source-table: db.tbl1
projection: ...
filter: ...
- source-table: db.tbl2
projection: ...
filter: ...
Parameters
Parameter | Description | Required | Remarks |
| The effective source tables. | Yes | Regular expressions are supported. |
| The projection rule used to retain specific upstream columns in the source tables. | No | The syntax used for the rule is similar to the syntax of the SQL SELECT statement. If you do not configure this parameter, no columns are appended or deleted. Note In Realtime Compute for Apache Flink that uses VVR 8.0.9, you must manually enter |
| The row filtering rule. | No | The syntax used for the rule is similar to the syntax of the SQL WHERE clause. If you do not configure this parameter, no rows are filtered. |
| The primary key columns in the table schema after data transformation. | No | If you do not configure this parameter, the primary key columns of the original table schema are retained. Separate multiple primary key columns with commas ( |
| The partition key columns in the table schema after data transformation. | No | If you do not configure this parameter, the partition key columns of the original table schema are retained. Separate multiple partition key columns with commas ( |
| The additional configuration information that you want to write to the sink. | No | You can configure options such as the comments and the number of buckets for an Apache Paimon sink. |
| The description of the transform module. | No | N/A. |
Computed columns
You can use the <Expression> AS <ColName>
syntax in the projection rule to define computed columns. The expression calculates data records in the source and fills the related values in the computed columns.
The expression of a computed column cannot reference the value of another computed column even if the referenced column appears before the current computed column. For example, a, b AS c, c AS d
is invalid in the projection rule.
For example, when the data record [+I, id = 1]
from the source table db.tbl is processed, the data record is converted into the data row [+I, id = 1, inc_id = 2]
and synchronized to the sink.
transform:
- source-table: db.tbl
projection: id, id + 1 AS inc_id
Wildcard characters
If you want to synchronize columns, including all columns in the source table and new columns appended to the source table, unchanged to the sink, you can use an asterisk (*
) as a wildcard character in a projection rule.
If you do not use an asterisk (*
) as a wildcard character in a projection rule, a fixed schema is generated and is always the same as the schema written in the projection rule.
For example, *, 'extras' AS extras
indicates that an extra column is appended to the end of the source table schema and the schema changes of the source table are continuously synchronized to the sink.
transform:
- source-table: db.tbl
projection: \*, 'extras' AS extras
Metadata columns
When you create a projection rule, you can use the following predefined metadata columns as common data columns.
Do not define a common data column that has the same name as a metadata column.
Metadata column name | Data type | Description |
| STRING | The namespace of the source table in which the data change occurs. |
| STRING | The schema of the source table in which the data change occurs. |
| STRING | The source table in which the data change occurs. |
| STRING | The change type of the data change record, including Important The Update Before and Update After events for a data change are always combined into one Change Data Capture (CDC) event. Therefore, the |
For example, you can use predefined metadata columns as common data columns to write the fully qualified name of the source table to the computed columns and synchronize the fully qualified name to the sink.
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier
The following table describes the mappings of the names of namespaces, schemas, and tables to objects in different connectors.
Database type | Namespace name | Schema name | Table name |
Java Database Connectivity (JDBC) | Catalog | Schema | Table |
Debezium | Catalog | Schema | Table |
MySQL | Database | - | Table |
Postgres | Database | Schema | Table |
Oracle | - | Schema | Table |
Microsoft SQL Server | Database | Schema | Table |
StarRocks | Database | - | Table |
Doris | Database | - | Table |
Precautions
After you modify the statements of the transform module, the deployment cannot be restored from a specific state. You must start the deployment without states.
In most cases, the statements used to specify the projection and filtering rules do not need to be enclosed in single quotation marks (') or double quotation marks (").
transform: - projection: a, b, c # The preceding statement is equivalent to the following statement: - projection: "a, b, c"
However, if the first character of a projection expression is a special character, such as an asterisk (
*
) or a single quotation mark ('
), the entire expression may not be parsed as a valid YAML string literal. In this case, you must manually enclose the entire expression in single quotation marks (') or double quotation marks ("), or use a backslash (\
) to escape the expression.transform: - projection: *, 42 # An invalid YAML string. - projection: '*, 42' # OK - projection: \*, 42 # OK
In Realtime Compute for Apache Flink that uses VVR 8.0.9, you cannot synchronize schema changes from the source tables to the sink if you do not configure a projection rule when you define a statement block in the transform module. The following sample code shows an example on how to configure a projection rule in the transform module:
transform: - source-table: db.\.* projection: \* # Synchronize all upstream columns and schema changes from the source tables to the sink.
route
You can define a statement block that contains several routing rules in the route module of a YAML deployment. The statement block can be used to show a complex topology from a source to a sink.
Syntax
route:
- source-table: db.tbl1
sink-table: sinkdb.tbl1
- source-table: db.tbl2
sink-table: sinkdb.tbl2
Parameters
Parameter | Description | Required | Remarks |
| The effective source tables. | Yes | Regular expressions are supported. |
| The destination location for data routing. | Yes | N/A. |
| The strings to be replaced by the name of a source table when the pattern matching feature is used. | No | For example, if you set the replace-symbol parameter to |
| The description of the route module. | No | N/A. |
Use the route service
One-to-one routing
Route data from the source table mydb.web_order
to the sink table mydb.ods_web_order
.
route:
- source-table: mydb.web_order
sink-table: mydb.ods_web_order
description: sync table to one destination table with given prefix ods_
Merging of multiple tables in a sharded database
Merge all tables in the source database mydb
into the sink table mydb.merged
.
route:
- source-table: mydb.\.*
sink-table: mydb.merged
description: sync sharding tables to one destination table
Multiple routing rules
You can use a hyphen (-
) as a YAML list symbol in a route statement block to define multiple rules. The rules take effect at the same time.
route:
- source-table: mydb.orders
sink-table: ods_db.ods_orders
description: sync orders table to orders
- source-table: mydb.shipments
sink-table: ods_db.ods_shipments
description: sync shipments table to ods_shipments
- source-table: mydb.products
sink-table: ods_db.ods_products
description: sync products table to ods_products
Pattern matching
Synchronize all tables in the source database source_db
to the sink database sink_db
in a one-to-one mapping mode. The table names before and after synchronization must be the same.
route:
- source-table: source_db.\.*
sink-table: sink_db.<>
replace-symbol: <>
description: route all tables in source_db to sink_db
The special strings <>
specified by the replace-symbol
parameter are replaced by the table names. In this case, one-to-one mappings between the source tables and the sink tables are implemented.
Data distribution
To distribute data from the same table to multiple sink tables, you need to only define multiple routing rules. For example, data in the table mydb.orders
is distributed to the sink_db
and backup_sink_db
databases.
route:
- source-table: mydb.orders
sink-table: sink_db.orders
- source-table: mydb.orders
sink-table: backup_sink_db.orders
Precautions
After you modify the statements of the route module, the deployment cannot be restored from a specific state. You must start the deployment without states.
pipeline
You can configure the overall settings of a YAML deployment for data ingestion in the pipeline module.
Syntax
pipeline:
name: CDC YAML job
schema.change.behavior: LENIENT
Parameters
Parameter | Description | Required | Data type | Default value | Remarks |
name | The name of the YAML deployment. | No | STRING | Flink CDC Pipeline Job | N/A. |
schema.change.behavior | The schema change processing method. | No | STRING | LENIENT | You can set this parameter to one of the following values. For more information, see Schema change configuration.
|
Schema change processing methods
A YAML deployment allows you to synchronize schema changes from a source to a sink. Schema changes include the operations to create tables, add columns, rename columns, change column types, delete columns, and delete tables. Some schema changes may not be supported by a sink. You can configure the schema.change.behavior
parameter to specify how to process schema changes in a sink.
Methods for processing schema changes
Method | Description |
LENIENT (default value) | A YAML deployment for data ingestion can convert schema changes into changes that can be processed by the sink and synchronize the schema changes to the sink. Take note of the following rules:
|
EXCEPTION | Schema changes are not synchronized to the sink. You can use this method if the sink cannot process schema changes. When a schema change event is synchronized to the sink, the YAML deployment for data ingestion reports an error. |
EVOLVE | A YAML deployment for data ingestion applies all schema changes to the sink. If the schema changes fail to be applied in the sink, the YAML deployment for data ingestion reports an error and is restarted. |
TRY_EVOLVE | A YAML deployment for data ingestion attempts to apply schema changes to the sink. If the sink cannot process the schema changes, the YAML deployment is not restarted but attempts to process the schema changes by transforming data. Warning When you use this method, specific columns from the source may be lost or truncated if schema changes fail to be applied in the sink. |
IGNORE | All schema changes are not applied to the sink. You can use this method if the sink is not ready for schema changes and needs to continue receiving data from the columns that are not changed. |
Management of schema changes received by the sink
In specific cases, you do not need to synchronize all schema changes to the sink. For example, you are allowed to add columns to a sink but not allowed to drop columns from the sink in some scenarios. This prevents existing data from being dropped.
You can configure the include.schema.changes
and exclude.schema.changes
parameters in the sink
module to manage the schema changes that you want to synchronize to the sink.
Parameter | Description | Required | Data type | Default value | Remarks |
include.schema.changes | Schema changes that can be applied in the sink. | No | List<String> | No default value | By default, all schema changes can be applied in the sink. |
exclude.schema.changes | Schema changes that cannot be applied in the sink. | No | List<String> | No default value | This parameter takes precedence over the |
The following table describes the schema change events.
Event | Description |
| Add a column. |
| Change the data type of a column. |
| Create a table. |
| Drop a column. |
| Drop a table. |
| Change the name of a column. |
| Clear data. |
Partial match is supported for schema changes. For example, if you pass the keyword drop
, the drop.column
and drop.table
events are matched at the same time.
Sample code
Example 1: Use the EVOLVE method to process schema changes.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.pipeline: EVOLVE
Example 2: Match the table creation event and column-related events and exclude the column deletion event.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # Match the create.table, add.column, alter.column.type, rename.column, and drop.column events.
exclude.schema.changes: [drop.column] # Exclude the drop.column event.
pipeline:
name: mysql to print job
schema.change.pipeline: EVOLVE
Functions
Built-in functions
Flink CDC YAML provides a wide range of built-in functions that can be directly used in the projection and filter expressions in the transform module.
Comparison functions
Unless otherwise stated, the following built-in functions return NULL if the input parameters contain null values.
Function | Description |
value1 = value2 | Returns TRUE if value1 is equal to value2. Returns FALSE if value1 is not equal to value2. |
value1 <> value2 | Returns TRUE if value1 is not equal to value2. Returns FALSE if value1 is equal to value2. |
value1 > value2 | Returns TRUE if value1 is greater than value2. Returns FALSE if value1 is less than or equal to value2. |
value1 >= value2 | Returns TRUE if value1 is greater than or equal to value2. Returns FALSE if value1 is less than value2. |
value1 < value2 | Returns TRUE if value1 is less than value2. Returns FALSE if value1 is greater than or equal to value2. |
value1 <= value2 | Returns TRUE if value1 is less than or equal to value2. Returns FALSE if value1 is greater than value2. |
value IS NULL | Returns TRUE if the value is NULL. Returns FALSE if the value is not NULL. |
value IS NOT NULL | Returns TRUE if the value is not NULL. Returns FALSE if the value is NULL. |
value1 BETWEEN value2 AND value3 | Returns TRUE if value1 is greater than or equal to value2 and less than or equal to value3. Returns FALSE if value1 is less than value2 or greater than value3. |
value1 NOT BETWEEN value2 AND value3 | Returns TRUE if value1 is less than value2 or greater than value3. Returns FALSE if value1 is greater than or equal to value2 and less than or equal to value3. |
string1 LIKE string2 | Returns TRUE if string1 matches string2. Returns FALSE if string1 does not match string2. |
string1 NOT LIKE string2 | Returns TRUE if string1 does not match string2. Returns FALSE if string1 matches string2. |
value1 IN (value2 [, value3]* ) | Returns TRUE if value1 exists in the list of values [value2, value3, …]. Returns FALSE if value1 does not exist in the list of values [value2, value3, …]. |
value1 NOT IN (value2 [, value3]* ) | Returns TRUE if value1 does not exist in the list of values [value2, value3, …]. Returns FALSE if value1 exists in the list of values [value2, value3, …]. |
Logical functions
Function | Description |
boolean1 OR boolean2 | Returns TRUE if boolean1 or boolean2 is TRUE. |
boolean1 AND boolean2 | Returns TRUE if boolean1 and boolean2 are TRUE. |
NOT boolean | Returns FALSE if boolean is TRUE. Returns TRUE if boolean is FALSE. |
boolean IS FALSE | Returns FALSE if boolean is TRUE. Returns TRUE if boolean is FALSE. |
boolean IS NOT FALSE | Returns TRUE if boolean is TRUE. Returns FALSE if boolean is FALSE. |
boolean IS TRUE | Returns TRUE if boolean is TRUE. Returns FALSE if boolean is FALSE. |
boolean IS NOT TRUE | Returns FALSE if boolean is TRUE. Returns TRUE if boolean is FALSE. |
Arithmetic functions
Function | Description |
numeric1 + numeric2 | Returns the sum of numeric1 and numeric2. |
numeric1 - numeric2 | Returns the difference between numeric1 and numeric2. |
numeric1 * numeric2 | Returns the product of numeric1 multiplied by numeric2. |
numeric1 / numeric2 | Returns the quotient of numeric1 divided by numeric2. |
numeric1 % numeric2 | Returns the remainder of numeric1 divided by numeric2. |
ABS(numeric) | Returns the absolute value of the numeric value. |
CEIL(numeric) | Returns the smallest integer that is greater than or equal to numeric. |
FLOOR(numeric) | Returns the largest integer that is less than or equal to numeric. |
ROUND(numeric, int) | Returns a value rounded to n decimal places for the numeric value. |
UUID() | Returns a Universally Unique Identifier (UUID) string, such as 3d3c68f7-f608-473f-b60c-b0c44ad4cc4e, based on RFC 4122 Type 4.
|
String functions
Function | Description |
string1 || string2 | Returns the concatenation of string1 and string2. Important Do not confuse the operator || with the logical disjunction operator OR. |
CHAR_LENGTH(string) | Returns the number of characters in a string. |
UPPER(string) | Returns a string in uppercase letters. |
LOWER(string) | Returns a string in lowercase letters. |
TRIM(string1) | Removes the spaces on both sides of a string. |
REGEXP_REPLACE(string1, string2, string3) | Replaces all substrings in string1 that match string2 with string3. For example, the result of |
SUBSTRING(string FROM integer1 [ FOR integer2 ]) | Returns a substring that starts from the position specified by integer1 in a string and ends at the position specified by integer2. Note By default, if you do not specify |
CONCAT(string1, string2) | Concatenates multiple strings and returns a new string. For example, the result of |
Time functions
Function | Description |
LOCALTIME | Returns the current time in the local time zone. The return value is of the |
LOCALTIMESTAMP | Returns the current timestamp in the local time zone. The return value is of the |
CURRENT_TIME | Returns the current time in the local time zone. This function is equivalent to the LOCALTIME function. |
CURRENT_DATE | Returns the current date in the local time zone. |
CURRENT_TIMESTAMP | Returns the current timestamp in the local time zone. The return value is of the |
NOW() | Returns the current timestamp in the local time zone. This function is equivalent to the CURRENT_TIMESTAMP function. |
DATE_FORMAT(timestamp, string) | Converts a timestamp into a string in the specified format. Note The formatted string is compatible with the SimpleDateFormat format in Java. |
TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | Returns the time interval between timepoint1 and timepoint2. The timepointunit parameter can be set to SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
TO_DATE(string1[, string2]) | Converts string1 in the format of string2 into a value of the DATE type. Note If string2 is not specified, the |
TO_TIMESTAMP(string1[, string2]) | Converts string1 in the format of string2 into a value of the TIMESTAMP type without information about the time zone. Note If string2 is not specified, the |
If you use the projection and filter expressions for calculation, the YAML deployment ensures that the time points obtained by each sub-expression are the same when multiple time functions are used. For example, t1
, t2
, and t3
obtained by the NOW() AS t1, NOW() AS t2, NOW() AS t3
expression must correspond to the same timestamp, regardless of the time and order in which they are calculated.
Conditional functions
Function | Description |
CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, value2_2 ]* THEN result_2)* (ELSE result_z) END | Checks whether value is equal to the values given by the WHEN clauses in sequence and returns the RESULT value of the first matched WHEN clause. Returns the value specified in the ELSE clause if no WHEN clause meets the condition. Returns NULL if no WHEN clause meets the condition and no value is specified in the ELSE clause. |
CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE result_z) END | Checks whether value meets the conditions specified by the WHEN clauses in sequence and returns the RESULT value of the first matched WHEN clause. Returns the value specified in the ELSE clause if no WHEN clause meets the condition. Returns NULL if no WHEN clause meets the condition and no value is specified in the ELSE clause. |
COALESCE(value1 [, value2]*) | Returns the first non-NULL value from [value1, value2, …...]. Returns NULL if all values from [value1, value2, …...] are NULL. |
IF(condition, true_value, false_value) | Returns true_value if the condition is met, and returns false_value if the condition is not met. |
UDFs
Flink CDC YAML allows you to create user-defined functions (UDFs) in Java. You can call UDFs in the same manner that you call built-in functions.
Define a UDF
You can use a Java class that meets the following requirements as the implementation class of a UDF supported by Flink CDC YAML:
Implements
org.apache.flink.cdc.common.udf.UserDefinedFunction
.Has a public parameterless constructor.
Contains at least one public method named
eval
.
You can apply the @Override annotation to the following methods in a UDF class to implement finer-grained semantic control:
Override the
getReturnType
method to manually specify the data type of the output value that is defined in the getReturnType method.Override the
open
andclose
methods to insert a lifecycle function.
The following sample code provides an example on how to call the UDF to return the value after the input integer parameter is increased by 1.
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
// The data type of the output value that is defined in the eval() method is unclear.
// Use the getReturnType method to explicitly specify the data type of the output value.
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}
Register a UDF
Add the following definition to the pipeline
module of Flink CDC YAML to register a UDF:
pipeline:
user-defined-function:
- name: inc
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
The JAR package that corresponds to the classpath must be uploaded as an external dependency.
The name of the UDF can be specified and can be different from the name of the UDF class.
Use a UDF
After you register a UDF, you can directly call the UDF in the projection and filter statement blocks in the same manner that you call built-in functions. Sample code:
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100
Compatibility of scalar functions
UDF functions inherited from scalar functions
of Realtime Compute for Apache Flink can also be directly registered and used as UDFs of Flink CDC YAML. Take note of the following limits when you use this type of function:
Scalar functions
that contain parameters are not supported.The TypeInformation interface of Apache Flink is ignored.
The lifecycle hooks in which the
open
andclose
methods are used are not called.
References
For more information about how to develop a YAML draft for data ingestion, see Develop a YAML draft for data ingestion (public preview).