全部產品
Search
文件中心

Data Transmission Service:通過Flink SQL模式配置ETL任務

更新時間:Aug 31, 2024

Flink SQL是ETL為了簡化計算模型、降低使用門檻而設計的一套符合標準SQL語義的開發語言。相對於DAG模式(可視化拖拽方式),Flink SQL的功能更為強大,您可在Flink SQL的命令視窗中輸入DAG模式暫不支援的文法。本文將介紹如何通過Flink SQL模式配置ETL任務。

背景資訊

說明

此功能即將下線,僅部分使用者可以免費體驗,未曾使用過該功能的使用者已無法體驗,建議您在同步或遷移執行個體中配置ETL任務。更多資訊,請參見在DTS遷移或同步任務中配置ETL

  • 在配置ETL任務前,請您瞭解以下資訊:

    • 輸入/維表指ETL的源庫。

    • 輸出指經過ETL處理後寫入的目標庫。

  • 資料庫傳輸服務DTS為資料同步過程提供了流式的ETL功能,您可以在源庫和目標庫之間添加各種轉換組件,實現豐富的轉換操作,並將處理後的資料即時寫入目標庫。例如將兩張流表做JOIN操作後形成一張大表,寫入目標庫;或者給源表新增一個欄位,並為該欄位配置函數進行賦值,源表該欄位經過賦值轉換後寫入目標庫。

前提條件

  • 當前僅支援在華東1(杭州)、華東2(上海)、華北1(青島)、華北2(北京)、華北3(張家口)、華南1(深圳)、華南3(廣州)和中國香港建立ETL任務。

  • 當前源庫支援MySQLPolarDB MySQLOraclePostgreSQLDB2 iSeries(AS/400)DB2 LUWDRDS(PolarDB-X 1.0)、PolarDB PostgreSQLMariaDBPolarDB OracleSQLServerPolarDB-X 2.0

  • 當前目標庫支援MySQLPolarDB MySQLOracleAnalyticDB MySQL 3.0PolarDB PostgreSQLPostgreSQLDB2 LUWDB2 iSeries(AS/400)AnalyticDB PostgreSQLSQLServerMariaDBDRDS(PolarDB-X 1.0)、PolarDB OracleTablestore

  • 由於ETL功能暫不支援結構遷移,所以您需要根據轉換條件在目標庫側完成對應表結構的建立。例如A表中包含欄位1、欄位2和欄位3,B表中包含欄位2、欄位3和欄位4,對兩張表通過做JOIN操作後,需要輸出欄位2和欄位3,則需要在目標庫側建立做JOIN操作後的C表,C表中包含欄位2和欄位3。

  • 由於ETL功能暫不支援全量資料同步,所以您只能對增量資料進行即時轉換。

注意事項

  • 所有的源庫和目標庫屬於同一地區。

  • 所有流表均來源於同一執行個體。

  • 資料庫的庫名和表名唯一。

  • 當前暫不支援配置跨帳號的任務。

操作步驟

  1. 進入ETL任務的列表頁面。

    1. 登入Data Transmission Service控制台

    2. 在左側導覽列,單擊ETL

  2. 單擊左上方的新增資料流,在新增資料流對話方塊中,您需在資料流名稱配置ETL任務名稱,選擇開發方式FlinkSQL

  3. 單擊確認

  4. 流式ETL頁面的資料流資訊部分,添加源庫和目標庫。

    參數

    說明

    地區

    選擇資料來源所在地區。

    類型

    選擇庫表類型。

    • 配置源表資訊時,如源表為 流表 ,則需選擇流表;如源表為 維表 ,則需選擇維表

    • 配置目標表資訊時,則需選擇輸出

    資料庫類型

    選擇源庫或目標庫的資料庫類型。

    執行個體

    輸入執行個體名稱或執行個體ID,搜尋並選擇源和目標執行個體。

    重要

    您需要先在DMS中錄入源執行個體和目標執行個體。錄入方式,請參見執行個體管理

    資料庫

    選擇資料加工對象所屬的源庫或目標庫。

    物理表

    選擇資料加工對象所屬的源表或目標表。

    物理表別名

    為源表或目標表設定精簡易讀的別名,便於ETL在運行SQL語句時定位至具體的表。

  5. 流式ETL頁面的SQL命令視窗,添加用於配置ETL任務的SQL語句。

    本案例以如下SQL語句為例,配置ETL任務,將流表test_orders與維表product結合至目標表test_orders_new中。

    重要

    SQL語句間需以英文分號(;)分割。

    CREATE TABLE `etltest_test_orders` (
      `order_id` BIGINT,
      `user_id` BIGINT,
      `product_id` BIGINT,
      `total_price` DECIMAL(15,2),
      `order_date` TIMESTAMP(6),
      `dts_etl_schema_db_table` STRING,
      `dts_etl_db_log_time` BIGINT,
      `pt` AS PROCTIME(),
      WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
    ) WITH (
      'streamType'= 'append',
      'alias'= 'test_orders',
      'vertexType'= 'stream'
    );
    CREATE TABLE `etltest_product` (
      `product_id` BIGINT,
      `product_name` STRING,
      `product_price` DECIMAL(15,2)
    ) WITH (
      'alias'= 'product',
      'vertexType'= 'lookup'
    );
    CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
    SELECT
      `etltest_test_orders`.`order_id` AS `order_id`,
      `etltest_test_orders`.`user_id` AS `user_id`,
      `etltest_test_orders`.`product_id` AS `product_id`,
      `etltest_test_orders`.`total_price` AS `total_price`,
      `etltest_test_orders`.`order_date` AS `order_date`,
      `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
      `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
      `etltest_product`.`product_id` AS `product_id_0001011101`,
      `etltest_product`.`product_name` AS `product_name`,
      `etltest_product`.`product_price` AS `product_price`
    FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
    ;
    CREATE TABLE `test_orders_new` (
      `order_id` BIGINT,
      `user_id` BIGINT,
      `product_id` BIGINT,
      `total_price` DECIMAL(15,2),
      `order_date` TIMESTAMP(6),
      `product_name` STRING,
      `product_price` DECIMAL(15,2)
    ) WITH (
      'alias'= 'test_orders_new',
      'vertexType'= 'sink'
    );
    INSERT INTO `test_orders_new` (
      `order_id`,
      `user_id`,
      `product_id`,
      `total_price`,
      `order_date`,
      `product_name`,
      `product_price`
    )
    SELECT
      `etltest_test_orders_JOIN_etltest_product`.`order_id`,
      `etltest_test_orders_JOIN_etltest_product`.`user_id`,
      `etltest_test_orders_JOIN_etltest_product`.`product_id`,
      `etltest_test_orders_JOIN_etltest_product`.`total_price`,
      `etltest_test_orders_JOIN_etltest_product`.`order_date`,
      `etltest_test_orders_JOIN_etltest_product`.`product_name`,
      `etltest_test_orders_JOIN_etltest_product`.`product_price`
    FROM `etltest_test_orders_JOIN_etltest_product`;

    類型

    說明

    源表和目標表資訊

    • 您需使用CREATE TABLE語句定義源表和目標表資訊。

    • SQL語句的WITH從句中可設定三個參數:streamType aliasvertexType 。其中流表必須設定以上三個參數,維表和輸出僅需設定aliasvertexType

      • streamType 流類型。

        • Upsert:Upsert流。動態表中的資料支援通過INSERT、UPDATE和DELETE操作修改,當轉換為流時,會將INSERT和UPDATE操作編碼為upsert message,將DELETE操作編碼為delete message。

          說明

          該編碼方式要求動態表具有唯一鍵(可能是複合的)。

        • append: Append-only流。動態表中的資料僅支援INSERT操作修改,當轉換為流時僅需發送INSERT的資料。

      • alias:在步驟3配置源庫和目標庫時設定的物理表別名

    • vertexType :表類型。

      • stream:流表。

      • lookup:維表。

      • sink:目標表。

    資料加工的計算邏輯

    您需使用CREATE VIEW語句描述資料加工的計算邏輯。

    加工後的目標表資訊

    您需使用INSERT INTO語句定義加工後的目標表資訊。

  6. 配置完成源庫和目標庫資訊,以及SQL語句後,單擊產生 Flink SQL校正

    說明
    • 您也可以單擊發布,直接執行校正和預檢查。

    • 如Flink SQL校正成功,您可單擊ETL校正成功,查看Flink SQL校正詳情。

    • 如Flink SQL校正失敗,您可單擊ETL校正成功,根據提示資訊修複SQL語句,並重新進行產生Flink SQL校正。

  7. Flink SQL校正成功後,單擊發布進入預檢查階段。

  8. 預檢查通過率顯示為100%時,單擊下一步購買(免費)

    說明

    如果預檢查失敗,請單擊檢查失敗項後的查看詳情,根據提示資訊修複後,重新進行預檢查。

  9. 購買頁面,選擇鏈路規格並確認計算資源(CU)(公測期間,固定為2)。閱讀並勾選資料轉送(隨用隨付)服務條款公測協議條款

    說明

    ETL功能公測中,每個使用者可以免費建立並使用兩個ETL執行個體。

  10. 單擊購買並啟動,ETL任務正式開始。