全部產品
Search
文件中心

DataWorks:配置Kafka輸出

更新時間:Nov 28, 2024

Kafka輸出節點只需要選擇表,進列欄位映射即可完成配置。

前提條件

配置Kafka輸出節點前,您需要先配置好相應的輸入或轉換資料來源,即時同步支援的資料來源

背景資訊

寫入資料不支援去重,即如果任務重設位點或者Failover後再啟動,會導致重複資料寫入。

操作步驟

  1. 登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的資料開發與治理 > 資料開發,在下拉框中選擇對應工作空間後單擊進入資料開發

  2. 滑鼠移至上方至建立表徵圖,單擊建立節點 > Data Integration > 即時同步

    您也可以展開商務程序,按右鍵目標商務程序,選擇建立節點 > Data Integration > 即時同步

  3. 建立節點對話方塊中,選擇同步方式單表(Topic)到單表(Topic)ETL,輸入名稱,並選擇路徑

  4. 單擊確認

  5. 在即時同步節點的編輯頁面,按一下滑鼠輸出 > Kafka並拖拽至編輯面板,連線已配置好的輸入或轉換節點。

  6. 單擊Kafka節點,在節點配置對話方塊中,配置各項參數。

    image

    參數

    描述

    資料來源

    選擇已經配置好的Kafka資料來源,此處僅支援Kafka資料來源。如果未配置資料來源,請單擊右側的建立資料來源,跳轉至工作空間管理 > 資料來源管理 頁面進行建立。詳情請參見:配置Kafka資料來源

    主題

    Kafka的Topic名稱,是Kafka處理資源的訊息源(feeds of messages)的不同分類。

    每條發布至Kafka叢集的訊息都有一個類別,該類別被稱為Topic,一個Topic是對一組訊息的歸納。

    說明

    一個Kafka輸入僅支援一個Topic。

    鍵取值列

    指定哪些源端列的值拼接後作為Kafka記錄Key,如果選擇多列,使用逗號作為分隔字元拼接列值。如果不選,則Key為空白。

    值取值列

    指定哪些源端列的值拼接後作為Kafka記錄Value。如果不填寫,預設將所有列拼起來作為Value。拼接方式取決於選擇的寫入模式,詳情請參見:Kafka Writer的參數說明。

    鍵類型

    Kafka的Key的類型,決定了初始化KafkaProducer時的key.serializer配置,可選值包括STRINGBYTEARRAYDOUBLEFLOATINTEGERLONGSHORT

    實值型別

    Kafka的Value的類型,決定了初始化KafkaProducer時的value.serializer配置,可選值包括STRINGBYTEARRAYDOUBLEFLOATINTEGERLONGSHORT

    單次寫入位元組數

    一次寫入請求包含的位元組數,建議設定大於16000。

    寫入模式

    該配置項決定將源端列拼接作為寫入Kafka記錄Value的格式,可選值為textJSON

    • 配置為text,將所有列按照資料行分隔符號進行拼接。

    • 配置為JSON,將所有列拼接為JSON字串。

    例如,列配置為col1、col2和col3,源端某記錄這三列的值為a、b和c,寫入模式配置為text、資料行分隔符號配置為#時,對應寫入Kafka的記錄Value為字串a#b#c;寫入模式配置為JSON時,寫入Kafka的記錄Value為字串{"col1":"a","col2":"b","col3":"c"}

    資料行分隔符號

    當寫入模式配置為text,將源端列按照該配置項指定資料行分隔符號拼接作為寫入Kafka記錄的Value,支援配置單個或者多個字元作為分隔字元,支援以\u0001格式配置unicode字元,支援\t\n等逸出字元。預設值為\t

    配置參數

    建立Kafka資料消費用戶端KafkaConsumer可以指定擴充參數,例如bootstrap.serversackslinger.ms等,您可以基於kafkaConfig控制KafkaConsumer消費資料的行為。即時同步Kafka輸出節點,KafkaConsumer的預設acks參數為all,如果對效能有更高要求可以在配置參數中指定acks覆蓋預設值。acks取值如下:

    • 0:不進行寫入成功確認。

    • 1:確認主副本寫入成功。

    • all:確認所有副本寫入成功。

  7. 單擊工具列中的儲存表徵圖。