All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop a YAML draft for data ingestion (public preview)

Last Updated:Dec 06, 2024

Realtime Compute for Apache Flink allows you to create YAML deployments for data ingestion from a source to a sink based on Change Data Capture (CDC) connectors for Apache Flink. This topic describes how to develop a YAML draft of Realtime Compute for Apache Flink.

Background information

CDC connectors for Apache Flink are integrated with Data Ingestion in Realtime Compute for Apache Flink. Compared with draft development by using the CREATE DATABASE AS and CREATE TABLE AS statements, you can develop YAML drafts on the Data Ingestion page to simplify complex extract, transform, load (ETL) processes and manage ETL processes based on Flink computing logic. YAML deployments support synchronization of all data from a database, single-table synchronization, merging and synchronization of multiple tables in a sharded database, synchronization of new tables, synchronization of table schema changes, and synchronization of custom computed columns. YAML deployments also support ETL processing, WHERE condition-based filtering, column pruning, and computed column addition. You can use YAML deployments to simplify the data integration process. This improves the efficiency and reliability of data integration.

Benefits of YAML deployments for data ingestion

In Realtime Compute for Apache Flink, you can create YAML deployments for data ingestion, SQL deployments, or DataStream deployments to synchronize data. The following section describes the benefits of YAML deployments for data ingestion compared with SQL deployments and DataStream deployments.

YAML deployment for data ingestion vs SQL deployment

YAML deployments for data ingestion and SQL deployments can synchronize data of different types. Take note of the following items:

  • SQL deployments synchronize data of the RowData type. YAML deployments for data ingestion synchronize data of the DataChangeEvent and SchemaChangeEvent types. Each data record of the RowData type in SQL deployments has its own change type. The change types of data records include INSERT (+I), UPDATE_BEFORE (-U), UPDATE_AFTER (+U), and DELETE (-D).

  • YAML deployments for data ingestion use the SchemaChangeEvent type to synchronize schema changes, such as table creation, column addition, and table data clearing. YAML deployments for data ingestion also use the DataChangeEvent type to synchronize data changes, including INSERT messages, UPDATE messages (UPDATE_BEFORE or UPDATE_AFTER messages), DELETE messages, and UPDATE messages that combine UPDATE_BEFORE and UPDATE_AFTER messages. This allows you to write raw change data to the sink.

The following table describes the benefits of YAML deployments for data ingestion compared with SQL deployments.

YAML deployment for data ingestion

SQL deployment

Automatically identifies schemas and supports synchronization of all data from a database.

Requires you to manually write the CREATE TABLE and INSERT statements.

Supports schema changes for multiple policies.

Does not support schema changes.

Supports synchronization of original changelogs.

Destroys the structure of original changelogs.

Reads data from and writes data to multiple tables.

Reads data from and writes data to a single table.

Compared with deployments that use the CREATE TABLE AS or CREATE DATABASE AS statement, YAML deployments are more powerful and support the following features:

  • Immediately synchronize schema changes of upstream tables without the need to wait for new data to be written.

  • Synchronize original changelogs and do not split UPDATE messages.

  • Synchronize more types of schema changes, such as changes made by using the TRUNCATE TABLE and DROP TABLE statements.

  • Establish mappings between tables and allow you to flexibly specify the names of sink tables.

  • Support flexible schema evolution and allow you to configure schema evolution.

  • Support WHERE condition-based filtering.

  • Support column pruning.

YAML deployment for data ingestion vs DataStream deployment

The following table describes the benefits of YAML deployments for data ingestion compared with DataStream deployments.

YAML deployment for data ingestion

DataStream deployment

Is designed for all levels of users, not just experts.

Requires you to be familiar with Java and distributed systems.

Allows you to perform data development in an efficient manner by hiding underlying details.

Requires you to be familiar with the framework of Realtime Compute for Apache Flink.

Is easy to understand based on the YAML syntax.

Requires you to understand Maven dependencies.

Is easy to be reused.

Is difficult to reuse the existing code.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.9 or later supports YAML deployments.

  • You can use a YAML deployment to synchronize data from only one source to only one sink. If you want to read data from multiple sources or write data to multiple sinks, you must create multiple YAML deployments.

  • You cannot deploy YAML deployments in session clusters.

Connectors supported for data ingestion

The following table describes the connectors that are supported as sources and sinks for data ingestion.

Note

You can submit a ticket or join the DingTalk group to provide feedback regarding the upstream and downstream storage. This helps Realtime Compute for Apache Flink meet your business requirements for more upstream and downstream storage.

Connector

Type

Source

Sink

Kafka connector

×

Hologres connector

×

MySQL connector

Note

The MySQL connector supports the ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases.

×

Upsert Kafka conenctor

×

Print connector

×

StarRocks connector

×

Apache Paimon connector

×

Procedure

  1. Log on to the management console of Realtime Compute for Apache Flink.

  2. Find the workspace that you want to manage and click Console in the Actions column.

  3. In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, choose Development > Data Ingestion.

  4. In the upper-left corner of the Drafts page, click New. In the New Draft dialog box, click Blank Draft and click Next.

    You can also select a template to develop a YAML draft to synchronize data. In the New Draft dialog box, you can click Data Ingestion from MySQL to Starrocks, Data Ingestion from MySQL to Paimon, or Data Ingestion from MySQL to Hologres to synchronize data.

  5. On the page that appears, configure the Name and Location parameters, select a value from the Engine Version drop-down list, and then click OK.

  6. Configure the code of the YAML draft.

    # Required.
    source:
      # The connector type of the source.
      type: <Replace the value with your connector type of the source>
      # Configure the parameters for the source. For more information about the parameters, see the documentation of the corresponding connector. 
      ...
    
    # Required.
    sink:
      # The connector type of the sink.
      type: <Replace the value with your connector type of the sink>
      # Configure the parameters for the sink. For more information about the parameters, see the documentation of the corresponding connector. 
      ...
    
    # Optional.
    transform:
      # Configure a data transformation rule for the flink_test.customers table.
      - source-table: flink_test.customers
        # Configure the mappings. You can specify the columns that you want to synchronize and perform data transformation.
        projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name
        # Configure the filter condition. Only the data records whose ID is greater than 10 are synchronized.
        filter: id > 10
        # Configure a description for the data transformation rule.
        description: append calculated columns based on source table
    
    # Optional.
    route:
      # Configure the routing rule to specify the mapping between a source table and a destination table.
      - source-table: flink_test.customers
        sink-table: db.customers_o
        # Configure a description for the routing rule.
        description: sync customers table
      - source-table: flink_test.customers_suffix
        sink-table: db.customers_s
        # Configure a description for the routing rule.
        description: sync customers_suffix table
    
    # Optional.
    pipeline:
      # The name of the draft.
      name: MySQL to Hologres Pipeline

    The following table describes the code blocks.

    Required

    Module

    Description

    Required

    source

    The start of the pipeline. The CDC connectors of Apache Flink capture the change data from the source.

    Note
    • Only the MySQL connector can be used in the source. For more information about the parameters that you must configure for the MySQL connector, see MySQL connector.

    • You can use variables to mask sensitive information. For more information, see Manage variables and keys.

    sink

    The end of the pipeline. The CDC connectors of Apache Flink synchronize the change data that is captured from the source to the sink.

    Note
    • You can refer to Connectors supported for data ingestion to obtain the connectors that can be used in the sink. For more information about the parameters that you must configure for a connector used in the sink, see the documentation of the corresponding connector.

    • You can use variables to mask sensitive information. For more information, see Manage variables and keys.

    Optional.

    pipeline

    This module defines the basic configuration of the YAML draft, such as the pipeline name.

    transform

    Enter a data transformation rule. You can use a data transformation rule to process data in a Realtime Compute for Apache Flink pipeline. In this module, ETL processing, WHERE condition-based filtering, column pruning, and computed column addition are supported.

    If the change data captured by the CDC connectors of Apache Flink needs to be converted to adapt to specific downstream storage, you can use the transform feature.

    route

    If you do not configure this module, all data in a database or a single table is synchronized.

    In specific cases, the captured change data may need to be synchronized to different sinks based on specific rules. The route feature allows you to flexibly specify mappings between the upstream and downstream storage to synchronize data to different sinks.

    For more information about the syntax and parameters of each module, see Data ingestion development references.

    The following sample code provides an example on how to synchronize all tables from the app_db database in MySQL to a Hologres database. Sample code:

    source:
      type: mysql
      hostname: <hostname>
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: app_db.\.*
      server-id: 5400-5404
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: <endpoint>
      dbname: <database-name>
      username: ${secret_values.holousername}
      password: ${secret_values.holopassword}
    
    pipeline:
      name: Sync MySQL Database to Hologres
  7. Optional. Click Validate.

    You can check the syntax, network connectivity, and access permissions.

References