全部產品
Search
文件中心

Realtime Compute for Apache Flink:Flink CDC資料攝入作業開發(公測中)

更新時間:Sep 03, 2025

Realtime ComputeFlink版基於Flink CDC,通過開發YAML作業的方式有效地實現了將資料從源端同步到目標端的資料攝入工作。本文將為您介紹Flink CDC資料攝入作業開發的操作步驟。

背景資訊

Flink CDC資料攝入基於Flink CDC完成Data Integration工作,通過YAML配置的方式可以輕鬆定義複雜的ETL流程,並自動轉化為Flink運算邏輯。高效支援整庫同步、單表同步、分庫分表同步、新增表同步、表結構變更和自訂計算列同步等能力,還支援ETL處理、Where條件過濾、列裁剪和計算資料行,極大地簡化了Data Integration過程,有效提升了Data Integration的效率和可靠性。

Flink CDC優勢

在Realtime ComputeFlink版中,您可以選擇開發Flink CDC資料攝入作業、SQL作業或自行開發DataStream作業完成資料同步工作。下面介紹一下Flink CDC資料攝入作業相比於其他兩種開發方式的優勢。

Flink CDC vs Flink SQL

Flink CDC資料攝入作業和SQL作業在資料傳遞過程中使用不同的資料類型:

  • SQL傳遞RowData,Flink CDC傳遞DataChangeEvent和SchemaChangeEvent。SQL的每個RowData都有自己的變更類型,主要有4種類型:insert(+I),update before(-U),update after(+U)和delete(-D)。

  • Flink CDC使用SchemaChangeEvent傳遞Schema變更資訊,例如建立表,添加列、清空表等,DataChangeEvent用來傳遞資料變更,主要是insert,update和delete,update訊息中同時包含了update before和update after的內容,這使得您能夠寫入原始變更資料到目標端。

Flink CDC資料攝入作業相比SQL作業的優勢如下:

Flink CDC資料攝入

Flink SQL

自動識別Schema,支援整庫同步

需要人工寫Create Table和Insert語句

支援多策略的Schema變更

不支援Schema變更

支援原始Changelog同步

破壞原始Changelog結構

支援讀寫多個表

讀寫單個表

相對於CTAS或CDAS語句,Flink CDC作業功能也更為強大,可以支援:

  • 上遊表結構變更立即同步,不用等新資料寫入觸發。

  • 支援原始Changelog同步,Update訊息不拆分。

  • 同步更多類型的Schema變更,例如Truncate Table和Drop Table等變更。

  • 支援指定表的映射關係,靈活定義目標端表名。

  • 支援靈活的Schema Evolution行為,使用者可配置。

  • 支援WHERE條件過濾資料。

  • 支援裁剪欄位。

Flink CDC vs Flink DataStream

Flink CDC資料攝入作業相比DataStream作業的優勢如下:

Flink CDC資料攝入

Flink DataStream

為各層級使用者設計,不只是專家

需要熟悉Java和分布式系統

隱藏底層細節,便於開發

需要熟悉Flink架構

YAML格式容易理解和學習

需要瞭解Maven等工具管理相關依賴

已有作業方便複用

難以複用已有代碼

使用限制

  • 推薦使用Realtime Compute引擎VVR 11.1版本開發Flink CDC資料攝入作業,如果需要VVR 8.x版本開發,請選擇VVR 8.0.11版本開發。

  • 僅支援從一個源端流向一個目標端。從多個資料來源讀取或寫入多個目標端時需編寫多個Flink CDC作業。

  • 暫不支援將Flink CDC作業部署到Session叢集。

  • Flink CDC作業暫不支援配置自動調優。

Flink CDC資料攝入連接器

當前支援作為Flink CDC資料攝入源端和目標端的連接器如下表所示。

說明

歡迎您通過工單、DingTalk等渠道反饋感興趣的上下遊儲存,未來計劃適配更多上下遊以更好滿足您的需要。

支援的連接器

連接器

支援類型

Source

Sink

MySQL

說明

支援串連RDS MySQL版、PolarDB MySQL版及自建MySQL。

×

流式資料湖倉Paimon

×

訊息佇列Kafka

說明

僅Realtime Compute引擎 VVR 8.0.10 及更高版本支援。

Upsert Kafka

×

StarRocks

×

即時數倉Hologres

×

Log ServiceSLS

說明

僅Realtime Compute引擎 VVR 11.1 及更高版本支援。

×

MongoDB

說明

僅Realtime Compute引擎 VVR 11.2 及更高版本支援。

×

MaxCompute

×

說明

僅Realtime Compute引擎 VVR 11.1 及更高版本支援。

SelectDB

×

說明

僅Realtime Compute引擎 VVR 11.1 及更高版本支援。

Print

×

建立Flink CDC資料攝入作業

基於同步作業模板快速產生

  1. 登入Realtime Compute管理主控台

  2. 單擊目標工作空間操作列的控制台

  3. 在左側導覽列選擇資料開發 > 資料攝入

  4. 單擊image後,單擊從模板建立

  5. 選擇目標資料同步模板。

    目前僅支援MySQL到StarRocks、MySQL到Paimon和MySQL到Hologres這三個模板。

    image

  6. 填寫作業資訊(作業名稱、儲存位置和引擎版本)後,單擊確定

  7. 配置Flink CDC作業Source和Sink相關資訊。

    參數配置資訊可查看相關連接器文檔。

通過CTAS/CDAS作業產生

重要
  • 如果作業中包含多條CXAS語句,Flink將僅識別並轉換其中的第一條。

  • 由於Flink SQL與Flink CDC在內建函數支援上存在差異,產生的Transform規則可能無法直接使用,需您手動確認並調整。

  • 如果Source端是MySQL,且原有CTAS/CDAS作業仍在運行中,需要調整Flink CDC資料攝入作業Source的server id,避免和原有作業發生衝突。

  1. 登入Realtime Compute管理主控台

  2. 單擊目標工作空間操作列的控制台

  3. 在左側導覽列選擇資料開發 > 資料攝入

  4. 單擊image後,單擊從已有CTAS/CDAS作業產生,選中目標CTAS或CDAS作業後,單擊確定

    在目標選擇頁面中,系統僅展示合法的CTAS和CDAS作業,普通ETL作業以及存在語法錯誤的作業草稿將不會顯示。

  5. 填寫作業資訊(作業名稱、儲存位置和引擎版本)後,單擊確定

從開源社區作業遷移

  1. 登入Realtime Compute管理主控台

  2. 單擊目標工作空間操作列的控制台

  3. 在左側導覽列選擇資料開發 > 資料攝入

  4. 單擊image,選擇建立資料攝入草稿,填寫檔案名稱引擎版本,單擊建立

  5. 複製開源社區的Flink CDC作業。

  6. (可選)單擊深度檢查

    您可以進行文法檢測、網路連通性和存取權限檢查。

從零開始建立Flink CDC資料攝入作業

  1. 登入Realtime Compute管理主控台

  2. 單擊目標工作空間操作列的控制台

  3. 在左側導覽列選擇資料開發 > 資料攝入

  4. 單擊image,選擇建立資料攝入草稿,填寫檔案名稱引擎版本,單擊建立

  5. 配置Flink CDC作業開發資訊。

    # 必填
    source:
      # 資料來源類型
      type: <替換為您源端連接器類型>
      # 資料來源配置資訊,配置項詳情請參見對應連接器文檔。
      ...
    
    # 必填
    sink:
      # 目標類型
      type: <替換為您目標端連接器類型>
      # 資料目標配置資訊,配置項詳情請參見對應連接器文檔。
      ...
    
    # 可選
    transform:
      # 轉換規則,針對flink_test.customers表
      - source-table: flink_test.customers
        # 投影配置,指定要同步的列,並進行資料轉換
        projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name
        # 過濾條件,只同步id大於10的資料
        filter: id > 10
        # 描述資訊,用於解釋轉換規則
        description: append calculated columns based on source table
    
    # 可選
    route:
      # 路由規則,指定源表和目標表之間的對應關係
      - source-table: flink_test.customers
        sink-table: db.customers_o
        # 描述資訊,用於解釋路由規則
        description: sync customers table
      - source-table: flink_test.customers_suffix
        sink-table: db.customers_s
        # 描述資訊,用於解釋路由規則
        description: sync customers_suffix table
    
    #可選
    pipeline:
      # 任務名稱
      name: MySQL to Hologres Pipeline
    說明

    在Flink CDC作業中,Key和Value之間必須用空格進行分隔,格式為Key: Value

    涉及的代碼塊說明詳情如下。

    是否必填

    代碼模組

    說明

    必填

    source(資料來源端)

    資料管道的起點,Flink CDC將從資料來源中捕獲變更資料。

    說明
    • 目前僅支援MySQL作為資料來源,具體的配置項詳情請參見MySQL

    • 您可以使用變數對敏感資訊進行設定,詳情請參見變數管理

    sink(資料目標端)

    資料管道的終點,Flink CDC將捕獲的資料變更傳輸到這些目標系統中。

    說明
    • 目前支援的目標端系統請參見Flink CDC資料攝入連接器,目標端配置項詳情請參見對應連接器文檔。

    • 您可以使用變數對敏感資訊進行設定,詳情請參見變數管理

    可選

    pipeline

    (資料管道)

    定義整個資料通道作業的一些基礎配置,例如pipeline名稱等。

    transform(資料轉換)

    填寫資料轉化規則。轉換是指對流經Flink管道的資料進行操作的過程。支援ETL處理、Where條件過濾,列裁剪和計算資料行。

    當Flink CDC捕獲的原始變更資料需要經過轉換以適應特定的下遊系統時,可以通過transform實現。

    route(路由)

    如果未配置該模組,則代表整庫或目標表同步。

    在某些情況下,捕獲的變更資料可能需要根據特定規則被發送到不同的目的地。路由機制允許您靈活指定上下遊的映射關係,將資料發送到不同的資料目標端。

    各模組文法結構和配置項說明詳情,請參見Flink CDC資料攝入作業開發參考

    以將MySQL中app_db資料庫下的所有表同步到Hologres的某個資料庫為例,程式碼範例如下。

    source:
      type: mysql
      hostname: <hostname>
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: app_db.\.*
      server-id: 5400-5404
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: <endpoint>
      dbname: <database-name>
      username: ${secret_values.holousername}
      password: ${secret_values.holopassword}
    
    pipeline:
      name: Sync MySQL Database to Hologres
  6. (可選)單擊深度檢查

    您可以進行文法檢測、網路連通性和存取權限檢查。

相關文檔