全部產品
Search
文件中心

DataWorks:配置Kafka輸入

更新時間:Nov 19, 2024

Kafka外掛程式基於Kafka SDK即時讀取Kafka資料。

背景資訊

說明
  • 支援阿里雲Kafka,以及>=0.10.2且<=2.2.x的自建Kafka版本。

  • 對於<0.10.2版本Kafka,由於Kafka不支援檢索分區資料offset,且Kafka資料結構可能不支援時間戳記,因此會引發同步任務延時統計錯亂,造成無法正確重設同步位點。

kafka資料來源配置詳情請參考:配置Kafka資料來源

操作步驟

  1. 進入資料開發頁面。

    登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的資料開發與治理 > 資料開發,在下拉框中選擇對應工作空間後單擊進入資料開發

  2. 滑鼠移至上方至建立表徵圖,單擊建立節點 > Data Integration > 即時同步

    您也可以展開商務程序,按右鍵目標商務程序,選擇建立節點 > Data Integration > 即時同步

  3. 建立節點對話方塊中,選擇同步方式單表(Topic)到單表(Topic)ETL,輸入名稱,並選擇路徑

    重要

    節點名稱必須是大小寫字母、中文、數字、底線(_)以及英文句號(.),且不能超過128個字元。

  4. 單擊確認

  5. 在即時同步節點的編輯頁面,按一下滑鼠輸入 > Kafka並拖拽至編輯面板。

  6. 單擊Kafka節點,在節點配置對話方塊中,配置各項參數。

    image

    參數

    描述

    資料來源

    選擇已經配置好的Kafka資料來源,此處僅支援Kafka資料來源。如果未配置資料來源,請單擊右側的建立資料來源,跳轉至工作空間管理 > 資料來源管理 頁面進行建立。詳情請參見:配置Kafka資料來源

    主題

    Kafka的Topic名稱,是Kafka處理資源的訊息源的不同分類。

    每條發布至Kafka叢集的訊息都有一個類別,該類別被稱為Topic,一個Topic是對一組訊息的歸納。

    說明

    一個Kafka輸入僅支援一個Topic。

    鍵類型

    Kafka的Key的類型,決定了初始化KafkaConsumer時的key.deserializer配置,可選值包括STRINGBYTEARRAYDOUBLEFLOATINTEGERLONGSHORT

    實值型別

    Kafka的Value的類型,決定了初始化KafkaConsumer時的value.deserializer配置,可選值包括STRINGBYTEARRAYDOUBLEFLOATINTEGERLONGSHORT

    輸出模式

    定義解析kafka記錄的方式

    • 單行輸出:以無結構字串或者JSON對象解析kafka記錄,一個kafka記錄解析出一個輸出記錄。

    • 多行輸出:以JSON數組解析Kafka記錄,一個JSON數組元素解析出一個輸出記錄,因此一個Kafka記錄可能解析出多個輸出記錄。

    說明

    目前只在部分地區支援該配置項,如發現無該配置項請耐心等待功能在對應地區發布。

    數組所在位置路徑

    當輸出模式設定為多行輸出時,指定JSON數組在kafka記錄value中的路徑,路徑支援以a.a1的格式引用特定JSON對象中的欄位或者以a[0].a1的格式引用特定JSON數組中的欄位,如果該配置項為空白,則將整個kafka記錄value作為一個JSON數組解析。

    注意解析的目標JSON數組必須是對象數組,例如[{"a":"hello"},{"b":"world"}],不能是數值或字串數組,例如["a","b"]

    配置參數

    建立Kafka資料消費用戶端KafkaConsumer 可以指定擴充參數,例如,bootstrap.serversauto.commit.interval.mssession.timeout.ms等,各版本Kafka叢集支援的KafkaConsumer 參數可以參考Kafka官方文檔,您可以基於kafkaConfig控制KafkaConsumer讀取資料的行為。即時同步Kafka輸入節點,KafkaConsumer預設使用隨機字串設定group.id,如果希望同步位點上傳到Kafka叢集指定群組,可以在配置參數中手動指定group.id。即時同步Kafka輸入節點不依賴Kafka服務端維護的群組資訊管理位點,所以對配置參數中group.id的設定不會影響同步任務啟動、重啟、Failover等情境下的讀取位點。

    輸出欄位

    您可以自訂Kafka資料對外輸出的欄位名:

    • 單擊添加更多欄位,輸入欄位名,並選擇類型,即可新增自訂欄位。

      取值方式支援從kafka記錄中取得欄位值的方式,單擊右側箭頭按鈕可以在兩類取值方式間切換。

      • 預置取值方式:提供6種可選預置從kafka記錄中取值的方式:

        • value:訊息體

        • key:訊息鍵

        • partition:分區號

        • offset:位移量

        • timestamp:訊息的毫秒時間戳記

        • headers:訊息頭

      • JSON解析取值:可以通過.(擷取子欄位)和[](擷取數組元素)兩種文法,擷取複雜JSON格式的內容,同時為了相容歷史邏輯,支援在選擇JSON解析取值時使用例如__value__這樣以兩個底線開頭的字串擷取kafka記錄的特定內容作為欄位值。Kafka的資料樣本如下。

        {
           "a": {
           "a1": "hello"
           },
           "b": "world",
           "c":[
              "xxxxxxx",
              "yyyyyyy"
              ],
           "d":[
              {
                 "AA":"this",
                 "BB":"is_data"
              },
              {
                 "AA":"that",
                 "BB":"is_also_data"
              }
            ]
        }
        • 不同情況下,輸出欄位的取值為:

          • 如果同步Kafka記錄value,取值方式填寫__value__

          • 如果同步Kafka記錄key,取值方式填寫__key__

          • 如果同步Kafka記錄partition,取值方式填寫__partition__

          • 如果同步Kafka記錄offset,取值方式填寫__offset__

          • 如果同步Kafka記錄timestamp,取值方式填寫__timestamp__

          • 如果同步Kafka記錄headers,取值方式填寫__headers__

          • 如果同步a1的資料"hello",取值方式填寫a.a1

          • 如果同步b的資料"world,取值方式填寫b

          • 如果同步c的資料"yyyyyyy",取值方式填寫c[1]

          • 如果同步AA的資料"this",取值方式填寫d[0].AA

    • 滑鼠移至上方至相應欄位,單擊顯示的刪除表徵圖,即可刪除該欄位。

    情境樣本:在輸出模式選擇多行輸出情況下,將先根據數組所在位置路徑指定的JSON路徑解析出JSON數組,然後取出JSON數組中的每一個JSON對象,再根據定義的欄位名和取值方式解析組成輸出欄位,取值方式的定義與單行輸出模式一樣,可以通過.(擷取子欄位)和[](擷取數組元素)兩種文法,擷取複雜JSON格式的內容。Kafka執行個體資料如下:

    {
        "c": {
            "c0": [
                {
                    "AA": "this",
                    "BB": "is_data"
                },
                {
                    "AA": "that",
                    "BB": "is_also_data"
                }
            ]
        }
    }

    當數組所在位置路徑填寫c.c0,輸出欄位定義兩個欄位,一個欄位名為AA,取值方式為AA,一個欄位名為BB,取值方式為BB,那麼該條Kafka記錄將解析得到如下兩條記錄:記錄

  7. 單擊工具列中的儲存表徵圖。

    說明

    一個Kafka輸入僅支援一個Topic。