All Products
Search
Document Center

Data Transmission Service:Configure an ETL task in Flink SQL mode

Last Updated:Dec 23, 2024

Flink SQL is a programming language developed by Alibaba Cloud to simplify the computing model of extract, transform, load (ETL) and to decrease the requirements for user skills. Flink SQL is compatible with standard SQL syntax. Compared with the directed acyclic graph (DAG) mode, Flink SQL has more advanced capabilities. In the script editor of Flink SQL, you can enter statements that are not supported in DAG mode. This topic describes how to configure an ETL task in Flink SQL mode.

Background information

Note

This feature will be unavailable soon and is available free only for specific users that have activated this feature. We recommend that you configure an ETL task in a data synchronization or migration instance. For more information, see Configure ETL in a data migration or synchronization task.

  • Before you configure an ETL task, take note of the following information:

    • Input/Dimension Table indicates the source database of the ETL task.

    • Output indicates the destination database of the ETL task.

  • DTS provides the streaming ETL feature for the data synchronization process. You can add a variety of transformation components between the source and destination databases to transform data and write the processed data to the destination database in real time. For example, you can join two stream tables into a large table and write the data of the large table to the destination database. You can also add a field to the source table and configure a function to assign values to the field. Then, you can write the field to the destination database.

Prerequisites

  • An ETL task is created in one of the following regions: China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Zhangjiakou), China (Shenzhen), China (Guangzhou), and China (Hong Kong).

  • The source database belongs to one of the following types: MySQL, PolarDB for MySQL, Oracle, PostgreSQL, iSeries DB2 (AS/400), Db2 for LUW, PolarDB-X 1.0, PolarDB for PostgreSQL, MariaDB, PolarDB for Oracle, SQL Server, and PolarDB-X 2.0.

  • The destination database belongs to one of the following types: MySQL, PolarDB for MySQL, Oracle, AnalyticDB for MySQL V3.0, PolarDB for PostgreSQL, PostgreSQL, Db2 for LUW, iSeries DB2 (AS/400), AnalyticDB for PostgreSQL, SQL Server, MariaDB, PolarDB-X 1.0, PolarDB for Oracle, and Tablestore.

  • The schemas of tables in the destination database are created. This is because the ETL feature does not support schema migration. For example, Table A contains Field 1, Field 2, and Field 3, and Table B contains Field 2, Field 3, and Field 4. If you want to join Table A and Table B into a table that contains Field 2 and Field 3, you must create Table C that contains Field 2 and Field 3 in the destination database.

  • The ETL feature does not support full data synchronization. Therefore, you can transform only incremental data in real time.

Usage notes

  • The source and destination databases must reside in the same region.

  • All stream tables must belong to the same instance.

  • All names of the databases and names of the tables must be unique.

  • The source and destination databases must be within the same Alibaba Cloud account.

Procedure

  1. Go to the Streaming ETL page.

    1. Log on to the DTS console.

    2. In the left-side navigation pane, click ETL.

  2. Click Create Data Flow. In the Create Data Flow dialog box, enter a name for the ETL task in the Data Flow Name field and select FlinkSQL as Development Method.

  3. Click OK.

  4. In the Data Flow Information section of the Streaming ETL page, add the source and destination databases.

    Parameter

    Description

    Region

    The region in which the source or destination database resides.

    Type

    The type of the table.

    • If the source table is a stream table (A table that updates in real time and can be associated with a dimension table for data association queries), select Stream Table. If the source table is a dimension table (A table that does not update in real time and is generally used to assemble real-time data into a wide table for data analysis), select Dimension Table.

    • If you configure the destination table, select Output.

    Database Type

    The type of the source or destination database.

    Instance

    The name or ID of the source or destination instance.

    Important

    Before you set this parameter, you must register the source and destination instances with Data Management (DMS). For more information, see Instance Management.

    Database

    The source or destination database to which the data transformation objects belong.

    Physical Table

    The source or destination table to which the data transformation objects belong.

    Alias of Physical Table

    The readable name of the source or destination table. The name helps you identify the table when you execute SQL statements in ETL.

  5. On the Streaming ETL page, enter SQL statements in the script editor to configure an ETL task.

    The following SQL statements show how to configure an ETL task to join a stream table named test_orders with a dimension table named product and insert the results into the destination table test_orders_new.

    Important

    Each SQL statement must end with a semicolon (;).

    CREATE TABLE `etltest_test_orders` (
      `order_id` BIGINT,
      `user_id` BIGINT,
      `product_id` BIGINT,
      `total_price` DECIMAL(15,2),
      `order_date` TIMESTAMP(6),
      `dts_etl_schema_db_table` STRING,
      `dts_etl_db_log_time` BIGINT,
      `pt` AS PROCTIME(),
      WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
    ) WITH (
      'streamType'= 'append',
      'alias'= 'test_orders',
      'vertexType'= 'stream'
    );
    CREATE TABLE `etltest_product` (
      `product_id` BIGINT,
      `product_name` STRING,
      `product_price` DECIMAL(15,2)
    ) WITH (
      'alias'= 'product',
      'vertexType'= 'lookup'
    );
    CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
    SELECT
      `etltest_test_orders`.`order_id` AS `order_id`,
      `etltest_test_orders`.`user_id` AS `user_id`,
      `etltest_test_orders`.`product_id` AS `product_id`,
      `etltest_test_orders`.`total_price` AS `total_price`,
      `etltest_test_orders`.`order_date` AS `order_date`,
      `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
      `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
      `etltest_product`.`product_id` AS `product_id_0001011101`,
      `etltest_product`.`product_name` AS `product_name`,
      `etltest_product`.`product_price` AS `product_price`
    FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
    ;
    CREATE TABLE `test_orders_new` (
      `order_id` BIGINT,
      `user_id` BIGINT,
      `product_id` BIGINT,
      `total_price` DECIMAL(15,2),
      `order_date` TIMESTAMP(6),
      `product_name` STRING,
      `product_price` DECIMAL(15,2)
    ) WITH (
      'alias'= 'test_orders_new',
      'vertexType'= 'sink'
    );
    INSERT INTO `test_orders_new` (
      `order_id`,
      `user_id`,
      `product_id`,
      `total_price`,
      `order_date`,
      `product_name`,
      `product_price`
    )
    SELECT
      `etltest_test_orders_JOIN_etltest_product`.`order_id`,
      `etltest_test_orders_JOIN_etltest_product`.`user_id`,
      `etltest_test_orders_JOIN_etltest_product`.`product_id`,
      `etltest_test_orders_JOIN_etltest_product`.`total_price`,
      `etltest_test_orders_JOIN_etltest_product`.`order_date`,
      `etltest_test_orders_JOIN_etltest_product`.`product_name`,
      `etltest_test_orders_JOIN_etltest_product`.`product_price`
    FROM `etltest_test_orders_JOIN_etltest_product`;

    Type

    Description

    Source and destination tables

    • You must execute the CREATE TABLE statement to define the source table and destination tables.

    • You can set three parameters in the WITH clause of a CREATE TABLE statement: streamType, alias, and vertexType. You must set the preceding three parameters for a stream table. You need to set only the alias and vertexType parameters for a dimension table and the destination table.

      • streamType: the type of the stream. ETL converts a stream into a dynamic table and performs continuous queries on the dynamic table to generate a new dynamic table. In this process, the dynamic table is continuously changed by the INSERT, UPDATE, and DELETE operations. When the dynamic table is finally written to the destination database, the new dynamic table is converted back into a stream. When the new dynamic table is converted into a stream, you must specify this parameter to encode the changes of the dynamic table.

        • upsert: an upsert stream. The data in a dynamic table can be modified by using the INSERT, UPDATE, and DELETE operations. When the dynamic table is converted into a stream, the INSERT and UPDATE operations are encoded as upsert message and the DELETE operations are encoded as delete messages.

          Note

          A dynamic table that is converted into an upsert stream requires a unique key. The key may be composite.

        • append: an append-only stream. The data in a dynamic table can be modified only by the INSERT operation. When the dynamic table is converted into a stream, only the inserted rows are emitted.

      • alias: the physical table alias that is set when you configure the source and destination databases in Step 4.

    • vertexType: the type of the table. Valid values:

      • stream: a stream table.

      • lookup: a dimension table.

      • sink: a destination table.

    Computing logic of data transformation

    You must execute the CREATE VIEW statement to describe the computing logic of data transformation.

    Transformed destination table

    You must execute the INSERT INTO statement to define the transformed destination table.

  6. After you configure the source database, destination database, and SQL statements, click Generate Flink SQL Validation.

    Note
    • You can also click Publish to validate the SQL statements and run a precheck.

    • If the Flink SQL validation succeeds, you can click View ETL validation details to view the details of the Flink SQL validation.

    • If the Flink SQL validation fails, you can click View ETL validation details to view the details of the Flink SQL validation. You can edit the SQL statements based on the error message and then perform Flink SQL validation again.

  7. After the Flink SQL validation succeeds, click Publish to run a precheck.

  8. Wait until the success rate becomes 100%. Then, click Next: Purchase Instance (Free).

    Note

    If the task fails to pass the precheck, click View Details next to each failed item. Troubleshoot the issues based on the causes and run a precheck again.

  9. On the Purchase Instance page, select the Instance Class and confirm the Compute Units (CUs). The value is fixed to 2 during public preview. Then, read and select the check box to agree to Data Transmission Service (Pay-as-you-go) Service Terms and Service Terms for Public Preview.

    Note

    During public preview, each user can create two ETL instances for free.

  10. Click Buy and Start to start the ETL task.