Realtime ComputeFlink版基於Flink CDC,通過開發YAML作業的方式有效地實現了將資料從源端同步到目標端的資料攝入工作。本文將為您介紹資料攝入YAML作業開發的操作步驟。
背景資訊
資料攝入模組整合了Flink CDC連接器,相對於CDAS和CTAS,它通過YAML配置的方式可以輕鬆定義複雜的ETL流程,並自動轉化為Flink運算邏輯。除支援整庫同步、單表同步、分庫分表同步、新增表同步、表結構變更和自訂計算列同步等能力,還支援ETL處理、Where條件過濾、列裁剪和計算資料行,極大地簡化了Data Integration過程,有效提升了Data Integration的效率和可靠性。
使用限制
僅Realtime Compute引擎VVR 8.0.9及以上版本支援YAML作業。
說明目前VVR 8.0.9對接Flink CDC 3.0,CDC與VVR版本對應關係請參見CDC與VVR版本對應關係。
僅支援從一個源端流向一個目標端。從多個資料來源讀取或寫入多個目標端時需編寫多個YAML作業。
暫不支援將YAML作業部署到Session叢集。
資料攝入連接器
當前支援作為資料攝入源端和目標端的連接器如下表所示。
類型 | 連接器 |
資料來源(Source) | MySQL |
資料目標(Sink) | 訊息佇列Kafka |
Upsert Kafka | |
即時數倉Hologres | |
流式資料湖倉Paimon | |
StarRocks | |
操作步驟
單擊目標工作空間操作列的控制台。
在左側導覽列選擇
。單擊建立,選擇空白的資料攝入草稿,單擊下一步。
您也可以直接選擇目標資料同步模板(MySQL到Starrocks資料同步、MySQL到Paimon資料同步或MySQL到Hologres資料同步)快速配置YAML作業開發資訊。
填寫作業名稱,儲存位置和選擇引擎版本後,單擊確定。
配置YAML作業開發資訊。
# 必填 source: # 資料來源類型 type: <替換為您源端連接器類型> # 資料來源配置資訊,配置項詳情請參見對應連接器文檔。 ... # 必填 sink: # 目標類型 type: <替換為您目標端連接器類型> # 資料目標配置資訊,配置項詳情請參見對應連接器文檔。 ... # 可選 transform: # 轉換規則,針對flink_test.customers表 - source-table: flink_test.customers # 投影配置,指定要同步的列,並進行資料轉換 projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name # 過濾條件,只同步id大於10的資料 filter: id > 10 # 描述資訊,用於解釋轉換規則 description: append calculated columns based on source table # 可選 route: # 路由規則,指定源表和目標表之間的對應關係 - source-table: flink_test.customers sink-table: db.customers_o # 描述資訊,用於解釋路由規則 description: sync customers table - source-table: flink_test.customers_suffix sink-table: db.customers_s # 描述資訊,用於解釋路由規則 description: sync customers_suffix table #可選 pipeline: # 任務名稱 name: MySQL to Hologres Pipeline
涉及的代碼塊說明詳情如下。
是否必填
代碼模組
說明
必填
source(資料來源端)
資料管道的起點,Flink CDC將從資料來源中捕獲變更資料。
說明目前僅支援MySQL作為資料來源。
您可以使用變數對敏感資訊進行設定,詳情請參見變數管理。
sink(資料目標端)
資料管道的終點,Flink CDC將捕獲的資料變更傳輸到這些目標系統中。
可選
pipeline
(資料管道)
定義整個資料通道作業的一些基礎配置,例如pipeline名稱等。
transform(資料轉換)
填寫資料轉化規則。轉換是指對流經Flink管道的資料進行操作的過程。支援ETL處理、Where條件過濾,列裁剪和計算資料行。
當Flink CDC捕獲的原始變更資料需要經過轉換以適應特定的下遊系統時,可以通過transform實現。
route(路由)
如果未配置該模組,則代表整庫或目標表同步。
在某些情況下,捕獲的變更資料可能需要根據特定規則被發送到不同的目的地。路由機制允許您靈活指定上下遊的映射關係,將資料發送到不同的資料目標端。
以將MySQL中app_db資料庫下的所有表同步到Hologres的某個資料庫為例,程式碼範例如下。
source: type: mysql hostname: <hostname> port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: app_db.\.* server-id: 5400-5404 sink: type: hologres name: Hologres Sink endpoint: <endpoint> dbname: <database-name> username: ${secret_values.holousername} password: ${secret_values.holopassword} pipeline: name: Sync MySQL Database to Hologres
(可選)單擊深度檢查。
您可以進行文法檢測、網路連通性和存取權限檢查。
相關文檔
YAML作業開發完成後,您需要部署上線,詳情請參見部署作業。