全部產品
Search
文件中心

DataWorks:Kafka增量資料同步至MaxCompute

更新時間:Jun 26, 2024

本文以將Kafka增量資料同步至MaxCompute的一個實踐為例,為您介紹Kafka的分鐘、小時、天增量資料定時調度寫入MaxCompute小時、天分區表的配置詳情。

注意事項

  • Kafka的版本需要大於等於0.10.2小於等於2.2.x,且Kafka啟用了記錄時間戳記,並且記錄帶有正確的業務時間戳記。

  • 增量資料開始同步後,如果仍有時間戳記小於等於起始時間的記錄寫入Kafka Topic的話,這些資料可能被漏讀,所以當Kafka Topic中資料寫入出現延遲或者時間戳記亂序時,要注意對離線同步任務造成的資料漏讀風險。

  • Kafka側參數同步結束策略原則上只有滿足以下條件可以選擇1分鐘讀取不到新資料,否則存在資料漏讀風險。

    • Kafka Topic中部分或全部分區存在長時間(10分鐘以上)無資料寫入情況。

    • 每個周期執行個體啟動後,不會有時間戳記小於結束時間參數的記錄寫入Kafka Topic。

建立資料來源

  • 準備用於運行資料同步任務的工作空間與Data Integration資源群組,操作詳情請參見新增和使用獨享Data Integration資源群組。本實踐下文以一個標準模式的工作空間使用獨享Data Integration資源群組為例,為您樣本操作詳情。

  • 建立Kafka資料來源,並完成Kafka資料來源與Data Integration資源群組之間的網路連通檢測,操作詳情請參見配置Kafka資料來源

    重要

    使用標準模式的工作空間時,您需要確認在Kafka資料來源的開發和生產環境對應的Kafka叢集中有同名Topic,用於開發和產生環境進行資料同步。

  • 準備MaxCompute資料來源,操作詳情請參見建立MaxCompute資料來源

建立離線同步任務

在資料開發(DataStudio)頁面的某個商務程序下,建立一個離線同步節點,根據介面提示配置節點的路徑、名稱等資訊,操作詳情請參見通過嚮導模式配置離線同步任務

配置對應資料來源與資源群組

在建立離線同步節點後,需根據資料來源我的資源群組以及資料去向,依次進行配置並測試連通性。

image

配置資料來源:Kafka側參數

配置離線同步節點的資料來源相關參數。本實踐將Kafka資料增量同步處理至MaxCompute,資料來源為Kafka資料,配置要點如下所示。資料來源kafka

說明

通用的Kafka資料來源的配置項介紹可查看Kafka Reader文檔,以下為本次實踐的配置參考。

配置項

配置要點

資料來源主題

選擇待同步的Kafka Topic。如果您使用的是標準模式的DataWorks,需要在對應開發和生產環境的Kafka叢集中有同名的Topic,此處的主題即選擇此同名的Topic即可。

說明

如果:

  • 開發環境的Topic不存在:則此處配置離線同步節點的主題時,下拉框中無法搜尋到待同步的Topic。

  • 生產環境的Topic不存在:則離線同步任務配置完,提交發布後,在生產環境周期調度運行時會因為沒法找到待同步的表而導致任務失敗。

消費群組ID

根據業務需要填寫,確保在Kafka叢集側唯一,便於在Kafka叢集側統計和監控消費情況。

Kafka版本

根據待同步資料的Kafka叢集實際情況選擇。

說明

Kafka的版本需要大於等於0.10.2、小於等於2.2.x。

讀取起始位點起始時間讀取結束位點結束時間

讀取起始點位讀取結束位點選擇指定時間起始時間結束時間分別設定為調度參數${startTime}${endTime}

這幾個參數明確了後續同步資料時從哪個資料開始同步,同步到哪個資料同步任務結束,本實踐的配置表明${startTime}時間的資料開始同步,一直到${endTime}時間的資料結束。${startTime}${endTime}在同步任務實際運行時會根據調度配置做參數替換。

時區

可置空或選擇預設使用DataWorks所在地區的伺服器時區。

說明

如果您此前有聯絡阿里雲支援人員修改過調度時區,這裡可選擇您修改後的時區。

鍵類型實值型別編碼

根據Kafka Topic記錄實際情況選擇。

同步結束策略

同步結束策略如果滿足以下條件可以選擇1分鐘讀取不到新資料,否則選擇到達指定結束位點

  • Kafka Topic中部分或全部分區存在長時間(10分鐘以上)無資料寫入情況。

  • 每個周期執行個體啟動後,不會有時間戳記小於結束時間參數的記錄寫入Kafka Topic。

進階配置

保持預設即可。

配置資料去向:MaxCompute側參數

配置離線同步節點的資料去向相關參數。本實踐將Kafka資料增量同步處理至MaxCompute,資料去向為MaxCompute表,配置要點如下所示。去向MaxCompute

配置項

配置要點

資料來源

選擇上述建立的MaxCompute資料來源。

選擇資料寫入的MaxCompute表。如果您使用的是標準類型的DataWorks工作空間,請確保在MaxCompute的開發環境和生產環境中存在同名且表結構一致的MaxCompute表。

說明

如果:

  • 開發環境不存在待同步的MaxCompute表,則選擇此處配置離線同步節點的去向表的下拉框中無法搜到待同步表。

  • 生產環境不存在待同步的MaxCompute表,同步任務提交發布後,資料同步任務調度運行時將會由於無法找到待同步表而導致同步任務運行失敗。

  • 開發環境和生產環境的表結構不一致,同步任務提交發布後,同步任務實際調度運行時的列對應關係,可能與此處離線同步節點配置的列對應關係不一致,最終導致資料寫入不正確。

如果待寫入的MaxCompute表未產生,您可以使用一鍵產生目標表結構功能快速建表,一鍵產生目標表結構會自動產生建表SQL,解讀詳情請參見下文的附錄:一鍵產生目標表結構

分區資訊

本實踐樣本的寫入MaxCompute表有一個分區列ds,在ds = 後的輸入框中配置調度參數${partition},表示每次進行資料同步時,向${partition}這個分區中寫入資料,${partition}會根據調度配置做參數替換。

說明

分區資訊會根據MaxCompute表的實際結構定義確定是否有該配置項以及配置項的表單數量,如果選擇寫入的是非分區表,則不會出現該配置項;如果選擇寫入的是分區表,則會根據分區表實際分區列個數和分區列名出現對應的表單項。

其他參數保持預設即可。

配置欄位對應

  1. 編輯欄位對應中Kafka側欄位定義。

    • Kafka側欄位中預設的6個欄位。

      欄位名

      含義

      __key__

      Kafka記錄的Key。

      __value__

      Kafka記錄的Value。

      __partition__

      Kafka記錄所在分區號,分區號為從0開始的整數。

      __headers__

      Kafka記錄的dHeaders。

      __offset__

      Kafka記錄在所在分區的位移量,位移量為從0開始的整數。

      __timestamp__

      Kafka記錄的13位整數毫秒時間戳記。

    • Kafka側欄位可自訂配置JSON解析,可以通過.(擷取子欄位)[](擷取數組元素)兩種文法,擷取Kafka記錄JSON格式的value欄位內容。

      重要

      如果JSON欄位名中帶有"."字元,由於會引發欄位定義文法歧義,無法通過欄位定義擷取欄位值。

      Kafka某條JSON格式的記錄value的資料樣本如下。

      {
            "a": {
            "a1": "hello"
            },
            "b": "world",
            "c":[
                  "xxxxxxx",
                  "yyyyyyy"
                  ],
            "d":[
                  {
                        "AA":"this",
                        "BB":"is_data"
                  },
                  {
                        "AA":"that",
                        "BB":"is_also_data"
                  }
              ],
           "a.b": "unreachable"
      }
      • 如果同步a1的資料“hello”,Kafka側欄位增加a.a1

      • 如果同步b的資料“world”,Kafka側欄位增加b

      • 如果同步c的資料“yyyyyyy”,Kafka側欄位增加c[1]

      • 如果同步AA的資料“this”,Kafka側欄位增加d[0].AA

      • Kafka側欄位定義增加a.b無法同步資料"unreachable"

  2. 指定Kafka側列定義與MaxCompute側列定義的對應關係。

    • 允許源頭表欄位或目標表欄位存在不參與映射的欄位,源頭表不參與映射的欄位同步執行個體不會讀取,目標端不參與映射的欄位將寫入NULL。

    • 不允許一個源頭表欄位對應到多個目標表欄位,也不允許一個目標表欄位對應到多個目標表欄位。

調度配置

本實踐樣本涉及的調度配置要點如下。通用的調度配置指導及全量調度相關參數的介紹請參見調度配置

  • 配置調度參數。

    在上述配置資料來源與資料去向時,使用了三個調度參數:${startTime}${endTime}${partition},在此處調度配置中需根據實際同步需求指定這三個調度參數的替換策略,以下為幾個典型情境的配置樣本。

    典型情境

    推薦配置

    情境樣本說明

    同步任務每5分鐘調度一次

    調度參數1

    • startTime=$[yyyymmddhh24mi-8/24/60]00

    • endTime=$[yyyymmddhh24mi-3/24/60]00

    • partition=$[yyyymmddhh24mi-8/24/60]

    如果同步任務在2022-11-22 10:00被調度啟動,則:

    • 會同步Kafka Topic中時間戳記範圍在2022-11-22 09:52(含)到2022-11-22 09:57(不含)的記錄。

    • 同步的Kafka資料寫入MaxCompute的202211220952分區中。

    • endTime設定比執行個體調度時間($[yyyymmddhh24mi])早三分鐘是為了確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中,避免漏讀。

    同步任務每小時調度一次

    調度參數22

    • startTime=$[yyyymmddhh24-1/24]0000

    • endTime=$[yyyymmddhh24]0000

    • partition=$[yyyymmddhh24]

    說明
    • 同步任務每2小時調度一次時,startTime=$[yyyymmddhh24-2/24]0000,另外調度參數保持不變。

    • 同步任務每3小時調度一次時,startTime=$[yyyymmddhh24-3/24]0000,另外調度參數保持不變。

    • 以此類推其他以小時為調度周期的情境下,調度參數的配置結果。

    如果同步任務在2022-11-22 10:05被調度啟動,則:

    • 會同步Kafka Topic中時間戳記範圍在2022-11-22 9:00(含)到2022-11-22 10:00(不含)的記錄。

    • 同步的Kafka資料寫入MaxCompute的2022112210分區中。

    同步任務每天調度一次

    調度參數3

    • startTime=$[yyyymmdd-1]000000

    • endTime=$[yyyymmdd]000000

    • partition=$[yyyymmdd-1]

    如果同步任務在2022-11-22 00:05被調度啟動,則:

    • 會同步Kafka Topic中時間戳記範圍在2022-11-21 00:00(含)到2022-11-22 00:00(不含)的記錄。

    • 同步的Kafka資料寫入MaxCompute的20221121分區中。

    同步任務每周調度一次

    調度參數4

    • startTime=$[yyyymmdd-7]000000

    • endTime=$[yyyymmdd]000000

    • partition=$[yyyymmdd-1]

    如果同步任務在2022-11-22 00:05被調度啟動,則:

    • 會同步Kafka Topic中時間戳記範圍在2022-11-15 00:00(含)到2022-11-22 00:00(不含)的記錄。

    • 同步的Kafka資料寫入MaxCompute的20221121分區中。

    同步任務每月調度一次

    調度參數4

    • startTime=$[add_months(yyyymmdd,-1)]000000

    • endTime=$[yyyymmdd]000000

    • partition=$[yyyymmdd-1]

    如果同步任務在2022-11-22 00:05被調度啟動,則:

    • 會同步Kafka Topic中時間戳記範圍在2022-10-22 00:00(含)到2022-11-22 00:00(不含)的記錄。

    • 同步的Kafka資料寫入MaxCompute的20221121分區中。

  • 配置調度周期。

    • 根據希望的調度間隔,設定調度周期。

      典型情境

      推薦配置

      情境樣本說明

      同步任務每5分鐘調度一次

      • 調度周期:分鐘

      • 開始時間:00:00

      • 時間間隔:05分鐘

      • 結束時間:23:59

      同步任務每小時調度一次

      • 調度周期:小時

      • 開始時間:00:15

      • 時間間隔:1小時

      • 結束時間:23:59

      開始時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。

      同步任務每天調度一次

      • 調度周期:天

      • 定時調度時間:00:15

      定時調度時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。

      同步任務每周調度一次

      • 調度周期:周

      • 指定時間:星期一

      • 定時調度時間:00:15

      定時調度時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。

      同步任務每月調度一次

      • 調度周期:月

      • 指定時間:每月1號

      • 定時調度時間:00:15

      定時調度時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。

      重要

      如果出現執行個體啟動後,仍有時間戳記小於等於起始時間的記錄寫入Kafka Topic,則這些資料可能被漏讀,所以當Kafka Topic中資料寫入出現延遲或者時間戳記亂序時,要注意對離線同步任務造成的資料漏讀風險。

    • 設定重跑屬性。推薦勾選出錯後自動重跑,重跑次數設定3,重跑間隔設定2,讓同步執行個體出現異常的話能夠自動重跑實現自愈。

  • 配置調度資源群組。

    按照需要選擇調度資源群組,推薦使用獨享調度資源群組。

  • 配置調度依賴。

    本實踐同步任務無需在特定其他任務運行成功後才開始運行,選擇使用工作空間根節點即可。

Data Integration資源群組配置

選擇在建立資料來源時,與Kafka資料和MaxCompute資料來源都完成連通性檢查的Data Integration資源群組。

試運行驗證

完成上述配置後,您可以在資料開發(DataStudio)的離線節點頁面進行調試運行,驗證離線資料同步的結果是否符合預期。

  1. 您可單擊頁面頂部的帶參運行帶參運行按鈕,給Kafka側參數${startTime}${endTime},以及MaxCompute側參數${partition}賦值後,選擇有閒置調度資源群組執行試運行。帶參運行

  2. 等待試運行完成,確認運行成功無異常。

  3. 建立臨時查詢節點,執行檢查SQL查詢語句驗證資料正確寫入MaxCompute表中,且資料內容正確、資料量正確。

    select * from test_project.test_table where ds=2022112200 limit 10;
    select count(*) from test_project.test_table where ds=2022112200;

提交發布

試運行沒有問題後,您可以儲存離線節點的配置,並提交發布至營運中心,後續離線同步任務將會周期性(分鐘或小時或天)將Kafka的資料寫入MaxCompute的表中。提交發布的操作請參見發布任務

附錄:一鍵產生目標表結構

一鍵產生目標表結構功能會自動產生建表SQL語句,SQL語句定義了建表後的表名、欄位等資訊。解讀方式如下。一鍵產生目標表

  • MaxCompute表名與資料來源配置中Kafka的主題(Topic)配置結果一致。

  • MaxCompute表欄位有6個欄位,與Kafka記錄的對應關係如下。

    欄位名

    含義

    __key__

    Kafka記錄的Key。

    __value__

    Kafka記錄的Value。

    __partition__

    Kafka記錄所在分區號,分區號為從0開始的整數。

    __headers__

    Kafka記錄的dHeaders。

    __offset__

    Kafka記錄在所在分區的位移量,位移量為從0開始的整數。

    __timestamp__

    Kafka記錄的13位整數毫秒時間戳記。

  • MaxCompute表生命週期預設為100年。

您可根據實際需要修改預設建表SQL中的這些參數。此外,您也可以通過欄位對應定義對Kafka記錄的value做JSON解析,根據實際需要在預設建表SQL中添加對應JSON解析結果的欄位列。