Flink SQL是ETL為了簡化計算模型、降低使用門檻而設計的一套符合標準SQL語義的開發語言。相對於DAG模式(可視化拖拽方式),Flink SQL的功能更為強大,您可在Flink SQL的命令視窗中輸入DAG模式暫不支援的文法。本文將介紹如何通過Flink SQL模式配置ETL任務。
背景資訊
此功能即將下線,僅部分使用者可以免費體驗,未曾使用過該功能的使用者已無法體驗,建議您在同步或遷移執行個體中配置ETL任務。更多資訊,請參見在DTS遷移或同步任務中配置ETL。
在配置ETL任務前,請您瞭解以下資訊:
輸入/維表指ETL的源庫。
輸出指經過ETL處理後寫入的目標庫。
資料庫傳輸服務DTS為資料同步過程提供了流式的ETL功能,您可以在源庫和目標庫之間添加各種轉換組件,實現豐富的轉換操作,並將處理後的資料即時寫入目標庫。例如將兩張流表做JOIN操作後形成一張大表,寫入目標庫;或者給源表新增一個欄位,並為該欄位配置函數進行賦值,源表該欄位經過賦值轉換後寫入目標庫。
前提條件
當前僅支援在華東1(杭州)、華東2(上海)、華北1(青島)、華北2(北京)、華北3(張家口)、華南1(深圳)、華南3(廣州)和中國香港建立ETL任務。
當前源庫支援MySQL、PolarDB MySQL、Oracle、PostgreSQL、DB2 iSeries(AS/400)、DB2 LUW、DRDS(PolarDB-X 1.0)、PolarDB PostgreSQL、MariaDB、PolarDB Oracle、SQLServer、PolarDB-X 2.0。
當前目標庫支援MySQL、PolarDB MySQL、Oracle、AnalyticDB MySQL 3.0、PolarDB PostgreSQL、PostgreSQL、DB2 LUW、DB2 iSeries(AS/400)、AnalyticDB PostgreSQL、SQLServer、MariaDB、DRDS(PolarDB-X 1.0)、PolarDB Oracle、Tablestore。
由於ETL功能暫不支援結構遷移,所以您需要根據轉換條件在目標庫側完成對應表結構的建立。例如A表中包含欄位1、欄位2和欄位3,B表中包含欄位2、欄位3和欄位4,對兩張表通過做JOIN操作後,需要輸出欄位2和欄位3,則需要在目標庫側建立做JOIN操作後的C表,C表中包含欄位2和欄位3。
由於ETL功能暫不支援全量資料同步,所以您只能對增量資料進行即時轉換。
注意事項
所有的源庫和目標庫屬於同一地區。
所有流表均來源於同一執行個體。
資料庫的庫名和表名唯一。
當前暫不支援配置跨帳號的任務。
操作步驟
進入ETL任務的列表頁面。
在左側導覽列,單擊ETL。
單擊左上方的,在新增資料流對話方塊中,您需在資料流名稱配置ETL任務名稱,選擇開發方式為FlinkSQL。
單擊確認。
在流式ETL頁面的資料流資訊部分,添加源庫和目標庫。
參數
說明
地區
選擇資料來源所在地區。
類型
選擇庫表類型。
配置源表資訊時,如源表為流表(即時發生變化的表,可以關聯一個維表,用於資料關聯查詢),則需選擇流表;如源表為維表(更新不頻繁或非即時更新的表,一般用於結合即時資料拼裝成寬表進行資料分析),則需選擇維表。
配置目標表資訊時,則需選擇輸出。
資料庫類型
選擇源庫或目標庫的資料庫類型。
執行個體
輸入執行個體名稱或執行個體ID,搜尋並選擇源和目標執行個體。
重要您需要先在DMS中錄入源執行個體和目標執行個體。錄入方式,請參見執行個體管理。
資料庫
選擇資料加工對象所屬的源庫或目標庫。
物理表
選擇資料加工對象所屬的源表或目標表。
物理表別名
為源表或目標表設定精簡易讀的別名,便於ETL在運行SQL語句時定位至具體的表。
在流式ETL頁面的SQL命令視窗,添加用於配置ETL任務的SQL語句。
本案例以如下SQL語句為例,配置ETL任務,將流表test_orders與維表product結合至目標表test_orders_new中。
重要SQL語句間需以英文分號(;)分割。
CREATE TABLE `etltest_test_orders` ( `order_id` BIGINT, `user_id` BIGINT, `product_id` BIGINT, `total_price` DECIMAL(15,2), `order_date` TIMESTAMP(6), `dts_etl_schema_db_table` STRING, `dts_etl_db_log_time` BIGINT, `pt` AS PROCTIME(), WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND ) WITH ( 'streamType'= 'append', 'alias'= 'test_orders', 'vertexType'= 'stream' ); CREATE TABLE `etltest_product` ( `product_id` BIGINT, `product_name` STRING, `product_price` DECIMAL(15,2) ) WITH ( 'alias'= 'product', 'vertexType'= 'lookup' ); CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS SELECT `etltest_test_orders`.`order_id` AS `order_id`, `etltest_test_orders`.`user_id` AS `user_id`, `etltest_test_orders`.`product_id` AS `product_id`, `etltest_test_orders`.`total_price` AS `total_price`, `etltest_test_orders`.`order_date` AS `order_date`, `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`, `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`, `etltest_product`.`product_id` AS `product_id_0001011101`, `etltest_product`.`product_name` AS `product_name`, `etltest_product`.`product_price` AS `product_price` FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id ; CREATE TABLE `test_orders_new` ( `order_id` BIGINT, `user_id` BIGINT, `product_id` BIGINT, `total_price` DECIMAL(15,2), `order_date` TIMESTAMP(6), `product_name` STRING, `product_price` DECIMAL(15,2) ) WITH ( 'alias'= 'test_orders_new', 'vertexType'= 'sink' ); INSERT INTO `test_orders_new` ( `order_id`, `user_id`, `product_id`, `total_price`, `order_date`, `product_name`, `product_price` ) SELECT `etltest_test_orders_JOIN_etltest_product`.`order_id`, `etltest_test_orders_JOIN_etltest_product`.`user_id`, `etltest_test_orders_JOIN_etltest_product`.`product_id`, `etltest_test_orders_JOIN_etltest_product`.`total_price`, `etltest_test_orders_JOIN_etltest_product`.`order_date`, `etltest_test_orders_JOIN_etltest_product`.`product_name`, `etltest_test_orders_JOIN_etltest_product`.`product_price` FROM `etltest_test_orders_JOIN_etltest_product`;
類型
說明
源表和目標表資訊
您需使用CREATE TABLE語句定義源表和目標表資訊。
SQL語句的WITH從句中可設定三個參數:streamType 、alias、vertexType 。其中流表必須設定以上三個參數,維表和輸出僅需設定alias和vertexType 。
streamType :流類型。ETL在處理資料時會將流轉換為動態表,在該動態表上進行持續查詢(即動態表會被INSERT、UPDATE、DELETE操作持續更改),產生一個新的動態表。最終寫入目標庫時,再將新的動態表會轉化為流。當新的動態錶轉化為流時,您需要指定本參數,對動態表前後更改資訊進行編碼。
Upsert:Upsert流。動態表中的資料支援通過INSERT、UPDATE和DELETE操作修改,當轉換為流時,會將INSERT和UPDATE操作編碼為upsert message,將DELETE操作編碼為delete message。
說明該編碼方式要求動態表具有唯一鍵(可能是複合的)。
append: Append-only流。動態表中的資料僅支援INSERT操作修改,當轉換為流時僅需發送INSERT的資料。
alias:在步驟3配置源庫和目標庫時設定的物理表別名。
vertexType :表類型。
stream:流表。
lookup:維表。
sink:目標表。
資料加工的計算邏輯
您需使用CREATE VIEW語句描述資料加工的計算邏輯。
加工後的目標表資訊
您需使用INSERT INTO語句定義加工後的目標表資訊。
配置完成源庫和目標庫資訊,以及SQL語句後,單擊產生 Flink SQL校正。
說明您也可以單擊發布,直接執行校正和預檢查。
如Flink SQL校正成功,您可單擊,查看Flink SQL校正詳情。
如Flink SQL校正失敗,您可單擊,根據提示資訊修複SQL語句,並重新進行產生Flink SQL校正。
Flink SQL校正成功後,單擊發布進入預檢查階段。
預檢查通過率顯示為100%時,單擊下一步購買(免費)。
說明如果預檢查失敗,請單擊檢查失敗項後的查看詳情,根據提示資訊修複後,重新進行預檢查。
在購買頁面,選擇鏈路規格並確認計算資源(CU)(公測期間,固定為2)。閱讀並勾選資料轉送(隨用隨付)服務條款和公測協議條款。
說明ETL功能公測中,每個使用者可以免費建立並使用兩個ETL執行個體。
單擊購買並啟動,ETL任務正式開始。