All Products
Search
Document Center

Realtime Compute for Apache Flink:Data ingestion development references

Last Updated:Dec 12, 2024

This topic describes the development references for the source, sink, transform, route, and pipeline modules related to data ingestion.

Supported connectors

Connector

Type

Source

Sink

Kafka connector

Note

The Kafka connector can be used in the source only in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.10 or later.

Hologres connector

×

MySQL connector

Note

The MySQL connector supports the ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases.

×

Upsert Kafka connector

×

Print connector

×

StarRocks connector

×

Apache Paimon connector

×

source

The source module defines the data ingestion source. Only the Kafka connector and MySQL connector can be used in the source.

Syntax

source:
  type: mysql
  name: mysql source
  xxx: ...

For more information about the parameters that you must configure when you use a connector, see the "Data ingestion" section of the topic for the corresponding connector.

sink

The sink module defines the data ingestion sink. The following connectors are supported: Kafka connector, Upsert Kafka connector, Hologres connector, Apache Paimon connector, StarRocks connector, and Print connector.

Syntax

sink:
  type: hologres
  name: hologres sink
  xxx: ...

For more information about the parameters that you must configure when you use a connector, see the "Data ingestion" section of the topic for the corresponding connector.

transform

You can configure rules in the transform module of a YAML deployment to implement features such as data projection, calculation, and filtering in a source table.

Syntax

transform:
  - source-table: db.tbl1
    projection: ...
    filter: ...
  - source-table: db.tbl2
    projection: ...
    filter: ...

Parameters

Parameter

Description

Required

Remarks

source-table

The effective source tables.

Yes

Regular expressions are supported.

projection

The projection rule used to retain specific upstream columns in the source tables.

No

The syntax used for the rule is similar to the syntax of the SQL SELECT statement.

If you do not configure this parameter, no columns are appended or deleted.

Note

In Realtime Compute for Apache Flink that uses VVR 8.0.9, you must manually enter projection: \* in the transform module to configure the projection rule if you want to synchronize schema changes from the source tables to the sink. For more information, see Precautions.

filter

The row filtering rule.

No

The syntax used for the rule is similar to the syntax of the SQL WHERE clause.

If you do not configure this parameter, no rows are filtered.

primary-keys

The primary key columns in the table schema after data transformation.

No

If you do not configure this parameter, the primary key columns of the original table schema are retained. Separate multiple primary key columns with commas (,).

partition-keys

The partition key columns in the table schema after data transformation.

No

If you do not configure this parameter, the partition key columns of the original table schema are retained. Separate multiple partition key columns with commas (,).

table-options

The additional configuration information that you want to write to the sink.

No

You can configure options such as the comments and the number of buckets for an Apache Paimon sink.

description

The description of the transform module.

No

N/A.

Computed columns

You can use the <Expression> AS <ColName> syntax in the projection rule to define computed columns. The expression calculates data records in the source and fills the related values in the computed columns.

Warning

The expression of a computed column cannot reference the value of another computed column even if the referenced column appears before the current computed column. For example, a, b AS c, c AS d is invalid in the projection rule.

For example, when the data record [+I, id = 1] from the source table db.tbl is processed, the data record is converted into the data row [+I, id = 1, inc_id = 2] and synchronized to the sink.

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id

Wildcard characters

If you want to synchronize columns, including all columns in the source table and new columns appended to the source table, unchanged to the sink, you can use an asterisk (*) as a wildcard character in a projection rule.

Note

If you do not use an asterisk (*) as a wildcard character in a projection rule, a fixed schema is generated and is always the same as the schema written in the projection rule.

For example, *, 'extras' AS extras indicates that an extra column is appended to the end of the source table schema and the schema changes of the source table are continuously synchronized to the sink.

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras

Metadata columns

When you create a projection rule, you can use the following predefined metadata columns as common data columns.

Important

Do not define a common data column that has the same name as a metadata column.

Metadata column name

Data type

Description

__namespace_name__

STRING

The namespace of the source table in which the data change occurs.

__schema_name__

STRING

The schema of the source table in which the data change occurs.

__table_name__

STRING

The source table in which the data change occurs.

__data_event_type__

STRING

The change type of the data change record, including +I, -U, +U, and -D.

Important

The Update Before and Update After events for a data change are always combined into one Change Data Capture (CDC) event. Therefore, the __data_event_type__ column of an update event has two values: -U and +U. Do not use __data_event_type__ as a primary key.

For example, you can use predefined metadata columns as common data columns to write the fully qualified name of the source table to the computed columns and synchronize the fully qualified name to the sink.

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier

The following table describes the mappings of the names of namespaces, schemas, and tables to objects in different connectors.

Database type

Namespace name

Schema name

Table name

Java Database Connectivity (JDBC)

Catalog

Schema

Table

Debezium

Catalog

Schema

Table

MySQL

Database

-

Table

Postgres

Database

Schema

Table

Oracle

-

Schema

Table

Microsoft SQL Server

Database

Schema

Table

StarRocks

Database

-

Table

Doris

Database

-

Table

Precautions

  • After you modify the statements of the transform module, the deployment cannot be restored from a specific state. You must start the deployment without states.

  • In most cases, the statements used to specify the projection and filtering rules do not need to be enclosed in single quotation marks (') or double quotation marks (").

    transform:
      - projection: a, b, c
        # The preceding statement is equivalent to the following statement:
      - projection: "a, b, c"

    However, if the first character of a projection expression is a special character, such as an asterisk (*) or a single quotation mark ('), the entire expression may not be parsed as a valid YAML string literal. In this case, you must manually enclose the entire expression in single quotation marks (') or double quotation marks ("), or use a backslash (\) to escape the expression.

    transform:
      - projection: *, 42      # An invalid YAML string.
      - projection: '*, 42'    # OK
      - projection: \*, 42     # OK  
  • In Realtime Compute for Apache Flink that uses VVR 8.0.9, you cannot synchronize schema changes from the source tables to the sink if you do not configure a projection rule when you define a statement block in the transform module. The following sample code shows an example on how to configure a projection rule in the transform module:

    transform:
      - source-table: db.\.*
        projection: \*        # Synchronize all upstream columns and schema changes from the source tables to the sink.

route

You can define a statement block that contains several routing rules in the route module of a YAML deployment. The statement block can be used to show a complex topology from a source to a sink.

Syntax

route:
  - source-table: db.tbl1
    sink-table: sinkdb.tbl1
  - source-table: db.tbl2
    sink-table: sinkdb.tbl2

Parameters

Parameter

Description

Required

Remarks

source-table

The effective source tables.

Yes

Regular expressions are supported.

sink-table

The destination location for data routing.

Yes

N/A.

replace-symbol

The strings to be replaced by the name of a source table when the pattern matching feature is used.

No

For example, if you set the replace-symbol parameter to <>, you can set the sink-table parameter to sinkdb.<>. In this case, data of the source table table1 can be written to the sinkdb.table1 table.

description

The description of the route module.

No

N/A.

Use the route service

One-to-one routing

Route data from the source table mydb.web_order to the sink table mydb.ods_web_order.

route:
  - source-table: mydb.web_order
    sink-table: mydb.ods_web_order
    description: sync table to one destination table with given prefix ods_

Merging of multiple tables in a sharded database

Merge all tables in the source database mydb into the sink table mydb.merged.

route:
  - source-table: mydb.\.*
    sink-table: mydb.merged
    description: sync sharding tables to one destination table

Multiple routing rules

You can use a hyphen (-) as a YAML list symbol in a route statement block to define multiple rules. The rules take effect at the same time.

route:
  - source-table: mydb.orders
    sink-table: ods_db.ods_orders
    description: sync orders table to orders
  - source-table: mydb.shipments
    sink-table: ods_db.ods_shipments
    description: sync shipments table to ods_shipments
  - source-table: mydb.products
    sink-table: ods_db.ods_products
    description: sync products table to ods_products

Pattern matching

Synchronize all tables in the source database source_db to the sink database sink_db in a one-to-one mapping mode. The table names before and after synchronization must be the same.

route:
  - source-table: source_db.\.*
    sink-table: sink_db.<>
    replace-symbol: <>
    description: route all tables in source_db to sink_db

The special strings <> specified by the replace-symbol parameter are replaced by the table names. In this case, one-to-one mappings between the source tables and the sink tables are implemented.

Data distribution

To distribute data from the same table to multiple sink tables, you need to only define multiple routing rules. For example, data in the table mydb.orders is distributed to the sink_db and backup_sink_db databases.

route:
  - source-table: mydb.orders
    sink-table: sink_db.orders
  - source-table: mydb.orders
    sink-table: backup_sink_db.orders

Precautions

After you modify the statements of the route module, the deployment cannot be restored from a specific state. You must start the deployment without states.

pipeline

You can configure the overall settings of a YAML deployment for data ingestion in the pipeline module.

Syntax

pipeline:
  name: CDC YAML job
  schema.change.behavior: LENIENT

Parameters

Parameter

Description

Required

Data type

Default value

Remarks

name

The name of the YAML deployment.

No

STRING

Flink CDC Pipeline Job

N/A.

schema.change.behavior

The schema change processing method.

No

STRING

LENIENT

You can set this parameter to one of the following values. For more information, see Schema change configuration.

  • LENIENT (default value)

  • EXCEPTION

  • EVOLVE

  • TRY_EVOLVE

  • IGNORE

Schema change processing methods

A YAML deployment allows you to synchronize schema changes from a source to a sink. Schema changes include the operations to create tables, add columns, rename columns, change column types, delete columns, and delete tables. Some schema changes may not be supported by a sink. You can configure the schema.change.behavior parameter to specify how to process schema changes in a sink.

Methods for processing schema changes

Method

Description

LENIENT (default value)

A YAML deployment for data ingestion can convert schema changes into changes that can be processed by the sink and synchronize the schema changes to the sink. Take note of the following rules:

  • Schema changes performed by using the DROP TABLE and TRUNCATE TABLE operations are not synchronized to the sink.

  • If you rename a column, a column type change event and a column addition event are synchronized to the sink. The original column is not dropped, but its column property is changed to nullable. A column that has a new name is added and its column property is changed to nullable.

  • If you drop a column, a column type change event is synchronized to the sink. The column property is changed to nullable.

  • If you add a column, a column addition event is synchronized to the sink. The column property is changed to nullable.

EXCEPTION

Schema changes are not synchronized to the sink.

You can use this method if the sink cannot process schema changes. When a schema change event is synchronized to the sink, the YAML deployment for data ingestion reports an error.

EVOLVE

A YAML deployment for data ingestion applies all schema changes to the sink.

If the schema changes fail to be applied in the sink, the YAML deployment for data ingestion reports an error and is restarted.

TRY_EVOLVE

A YAML deployment for data ingestion attempts to apply schema changes to the sink. If the sink cannot process the schema changes, the YAML deployment is not restarted but attempts to process the schema changes by transforming data.

Warning

When you use this method, specific columns from the source may be lost or truncated if schema changes fail to be applied in the sink.

IGNORE

All schema changes are not applied to the sink.

You can use this method if the sink is not ready for schema changes and needs to continue receiving data from the columns that are not changed.

Management of schema changes received by the sink

In specific cases, you do not need to synchronize all schema changes to the sink. For example, you are allowed to add columns to a sink but not allowed to drop columns from the sink in some scenarios. This prevents existing data from being dropped.

You can configure the include.schema.changes and exclude.schema.changes parameters in the sink module to manage the schema changes that you want to synchronize to the sink.

Parameter

Description

Required

Data type

Default value

Remarks

include.schema.changes

Schema changes that can be applied in the sink.

No

List<String>

No default value

By default, all schema changes can be applied in the sink.

exclude.schema.changes

Schema changes that cannot be applied in the sink.

No

List<String>

No default value

This parameter takes precedence over the include.schema.changes parameter.

The following table describes the schema change events.

Event

Description

add.column

Add a column.

alter.column.type

Change the data type of a column.

create.table

Create a table.

drop.column

Drop a column.

drop.table

Drop a table.

rename.column

Change the name of a column.

truncate.table

Clear data.

Note

Partial match is supported for schema changes. For example, if you pass the keyword drop, the drop.column and drop.table events are matched at the same time.

Sample code

  • Example 1: Use the EVOLVE method to process schema changes.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  
pipeline:
  name: mysql to print job
  schema.change.pipeline: EVOLVE
  • Example 2: Match the table creation event and column-related events and exclude the column deletion event.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  include.schema.changes: [create.table, column] # Match the create.table, add.column, alter.column.type, rename.column, and drop.column events.
  exclude.schema.changes: [drop.column] # Exclude the drop.column event.
  
pipeline:
  name: mysql to print job
  schema.change.pipeline: EVOLVE

Functions

Built-in functions

Flink CDC YAML provides a wide range of built-in functions that can be directly used in the projection and filter expressions in the transform module.

Comparison functions

Note

Unless otherwise stated, the following built-in functions return NULL if the input parameters contain null values.

Function

Description

value1 = value2

Returns TRUE if value1 is equal to value2. Returns FALSE if value1 is not equal to value2.

value1 <> value2

Returns TRUE if value1 is not equal to value2. Returns FALSE if value1 is equal to value2.

value1 > value2

Returns TRUE if value1 is greater than value2. Returns FALSE if value1 is less than or equal to value2.

value1 >= value2

Returns TRUE if value1 is greater than or equal to value2. Returns FALSE if value1 is less than value2.

value1 < value2

Returns TRUE if value1 is less than value2. Returns FALSE if value1 is greater than or equal to value2.

value1 <= value2

Returns TRUE if value1 is less than or equal to value2. Returns FALSE if value1 is greater than value2.

value IS NULL

Returns TRUE if the value is NULL. Returns FALSE if the value is not NULL.

value IS NOT NULL

Returns TRUE if the value is not NULL. Returns FALSE if the value is NULL.

value1 BETWEEN value2 AND value3

Returns TRUE if value1 is greater than or equal to value2 and less than or equal to value3. Returns FALSE if value1 is less than value2 or greater than value3.

value1 NOT BETWEEN value2 AND value3

Returns TRUE if value1 is less than value2 or greater than value3. Returns FALSE if value1 is greater than or equal to value2 and less than or equal to value3.

string1 LIKE string2

Returns TRUE if string1 matches string2. Returns FALSE if string1 does not match string2.

string1 NOT LIKE string2

Returns TRUE if string1 does not match string2. Returns FALSE if string1 matches string2.

value1 IN (value2 [, value3]* )

Returns TRUE if value1 exists in the list of values [value2, value3, …]. Returns FALSE if value1 does not exist in the list of values [value2, value3, …].

value1 NOT IN (value2 [, value3]* )

Returns TRUE if value1 does not exist in the list of values [value2, value3, …]. Returns FALSE if value1 exists in the list of values [value2, value3, …].

Logical functions

Function

Description

boolean1 OR boolean2

Returns TRUE if boolean1 or boolean2 is TRUE.

boolean1 AND boolean2

Returns TRUE if boolean1 and boolean2 are TRUE.

NOT boolean

Returns FALSE if boolean is TRUE. Returns TRUE if boolean is FALSE.

boolean IS FALSE

Returns FALSE if boolean is TRUE. Returns TRUE if boolean is FALSE.

boolean IS NOT FALSE

Returns TRUE if boolean is TRUE. Returns FALSE if boolean is FALSE.

boolean IS TRUE

Returns TRUE if boolean is TRUE. Returns FALSE if boolean is FALSE.

boolean IS NOT TRUE

Returns FALSE if boolean is TRUE. Returns TRUE if boolean is FALSE.

Arithmetic functions

Function

Description

numeric1 + numeric2

Returns the sum of numeric1 and numeric2.

numeric1 - numeric2

Returns the difference between numeric1 and numeric2.

numeric1 * numeric2

Returns the product of numeric1 multiplied by numeric2.

numeric1 / numeric2

Returns the quotient of numeric1 divided by numeric2.

numeric1 % numeric2

Returns the remainder of numeric1 divided by numeric2.

ABS(numeric)

Returns the absolute value of the numeric value.

CEIL(numeric)

Returns the smallest integer that is greater than or equal to numeric.

FLOOR(numeric)

Returns the largest integer that is less than or equal to numeric.

ROUND(numeric, int)

Returns a value rounded to n decimal places for the numeric value.

UUID()

Returns a Universally Unique Identifier (UUID) string, such as 3d3c68f7-f608-473f-b60c-b0c44ad4cc4e, based on RFC 4122 Type 4.

String functions

Function

Description

string1 || string2

Returns the concatenation of string1 and string2.

Important

Do not confuse the operator || with the logical disjunction operator OR.

CHAR_LENGTH(string)

Returns the number of characters in a string.

UPPER(string)

Returns a string in uppercase letters.

LOWER(string)

Returns a string in lowercase letters.

TRIM(string1)

Removes the spaces on both sides of a string.

REGEXP_REPLACE(string1, string2, string3)

Replaces all substrings in string1 that match string2 with string3.

For example, the result of REGEXP_REPLACE('foobar', 'oo|ar', '__') is f__b__.

SUBSTRING(string FROM integer1 [ FOR integer2 ])

Returns a substring that starts from the position specified by integer1 in a string and ends at the position specified by integer2.

Note

By default, if you do not specify FOR integer2, a substring that ends at the end of the string is returned.

CONCAT(string1, string2)

Concatenates multiple strings and returns a new string.

For example, the result of CONCAT('AA', 'BB', 'CC') is AABBCC.

Time functions

Function

Description

LOCALTIME

Returns the current time in the local time zone. The return value is of the TIME(0) type.

LOCALTIMESTAMP

Returns the current timestamp in the local time zone. The return value is of the TIMESTAMP(3) type.

CURRENT_TIME

Returns the current time in the local time zone. This function is equivalent to the LOCALTIME function.

CURRENT_DATE

Returns the current date in the local time zone.

CURRENT_TIMESTAMP

Returns the current timestamp in the local time zone. The return value is of the TIMESTAMP_LTZ(3) type.

NOW()

Returns the current timestamp in the local time zone. This function is equivalent to the CURRENT_TIMESTAMP function.

DATE_FORMAT(timestamp, string)

Converts a timestamp into a string in the specified format.

Note

The formatted string is compatible with the SimpleDateFormat format in Java.

TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)

Returns the time interval between timepoint1 and timepoint2.

The timepointunit parameter can be set to SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

TO_DATE(string1[, string2])

Converts string1 in the format of string2 into a value of the DATE type.

Note

If string2 is not specified, the yyyy-MM-dd format is used by default.

TO_TIMESTAMP(string1[, string2])

Converts string1 in the format of string2 into a value of the TIMESTAMP type without information about the time zone.

Note

If string2 is not specified, the yyyy-MM-ddHH:mm:ss format is used by default.

Note

If you use the projection and filter expressions for calculation, the YAML deployment ensures that the time points obtained by each sub-expression are the same when multiple time functions are used. For example, t1, t2, and t3 obtained by the NOW() AS t1, NOW() AS t2, NOW() AS t3 expression must correspond to the same timestamp, regardless of the time and order in which they are calculated.

Conditional functions

Function

Description

CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, value2_2 ]* THEN result_2)* (ELSE result_z) END

Checks whether value is equal to the values given by the WHEN clauses in sequence and returns the RESULT value of the first matched WHEN clause.

Returns the value specified in the ELSE clause if no WHEN clause meets the condition. Returns NULL if no WHEN clause meets the condition and no value is specified in the ELSE clause.

CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE result_z) END

Checks whether value meets the conditions specified by the WHEN clauses in sequence and returns the RESULT value of the first matched WHEN clause.

Returns the value specified in the ELSE clause if no WHEN clause meets the condition. Returns NULL if no WHEN clause meets the condition and no value is specified in the ELSE clause.

COALESCE(value1 [, value2]*)

Returns the first non-NULL value from [value1, value2, …...]. Returns NULL if all values from [value1, value2, …...] are NULL.

IF(condition, true_value, false_value)

Returns true_value if the condition is met, and returns false_value if the condition is not met.

UDFs

Flink CDC YAML allows you to create user-defined functions (UDFs) in Java. You can call UDFs in the same manner that you call built-in functions.

Define a UDF

You can use a Java class that meets the following requirements as the implementation class of a UDF supported by Flink CDC YAML:

  • Implements org.apache.flink.cdc.common.udf.UserDefinedFunction.

  • Has a public parameterless constructor.

  • Contains at least one public method named eval.

You can apply the @Override annotation to the following methods in a UDF class to implement finer-grained semantic control:

  • Override the getReturnType method to manually specify the data type of the output value that is defined in the getReturnType method.

  • Override the open and close methods to insert a lifecycle function.

The following sample code provides an example on how to call the UDF to return the value after the input integer parameter is increased by 1.

public class AddOneFunctionClass implements UserDefinedFunction {
    
    public Object eval(Integer num) {
        return num + 1;
    }
    
    @Override
    public DataType getReturnType() {
        // The data type of the output value that is defined in the eval() method is unclear.
        // Use the getReturnType method to explicitly specify the data type of the output value.
        return DataTypes.INT();
    }
    
    @Override
    public void open() throws Exception {
        // ...
    }

    @Override
    public void close() throws Exception {
        // ...
    }
}

Register a UDF

Add the following definition to the pipeline module of Flink CDC YAML to register a UDF:

pipeline:
  user-defined-function:
    - name: inc
      classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
    - name: format
      classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
Note
  • The JAR package that corresponds to the classpath must be uploaded as an external dependency.

  • The name of the UDF can be specified and can be different from the name of the UDF class.

Use a UDF

After you register a UDF, you can directly call the UDF in the projection and filter statement blocks in the same manner that you call built-in functions. Sample code:

transform:
  - source-table: db.\.*
    projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
    filter: inc(id) < 100

Compatibility of scalar functions

UDF functions inherited from scalar functions of Realtime Compute for Apache Flink can also be directly registered and used as UDFs of Flink CDC YAML. Take note of the following limits when you use this type of function:

  • Scalar functions that contain parameters are not supported.

  • The TypeInformation interface of Apache Flink is ignored.

  • The lifecycle hooks in which the open and close methods are used are not called.

References

For more information about how to develop a YAML draft for data ingestion, see Develop a YAML draft for data ingestion (public preview).