Kafka輸出節點只需要選擇表,進列欄位映射即可完成配置。
前提條件
配置Kafka輸出節點前,您需要先配置好相應的輸入或轉換資料來源,即時同步支援的資料來源。
背景資訊
寫入資料不支援去重,即如果任務重設位點或者Failover後再啟動,會導致有重複資料寫入。
操作步驟
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的 ,在下拉框中選擇對應工作空間後單擊進入資料開發。
滑鼠移至上方至表徵圖,單擊 。
您也可以展開商務程序,按右鍵目標商務程序,選擇
。在建立節點對話方塊中,選擇同步方式為單表(Topic)到單表(Topic)ETL,輸入名稱,並選擇路徑。
說明在建立節點對話方塊中,選擇同步方式為單表(Topic)到單表(Topic)ETL,輸入名稱,並選擇路徑。
單擊確認。
在即時同步節點的編輯頁面,按一下滑鼠 並拖拽至編輯面板,連線已配置好的輸入或轉換節點。
單擊Kafka節點,在節點配置對話方塊中,配置各項參數。
參數
描述
資料來源
選擇已經配置好的Kafka資料來源,此處僅支援Kafka資料來源。如果未配置資料來源,請單擊右側的建立資料來源,跳轉至配置Kafka資料來源。
頁面進行建立。詳情請參見:主題
Kafka的Topic名稱,是Kafka處理資源的訊息源(feeds of messages)的不同分類。
每條發布至Kafka叢集的訊息都有一個類別,該類別被稱為Topic,一個Topic是對一組訊息的歸納。
說明一個Kafka輸入僅支援一個Topic。
鍵取值列
指定哪些源端列的值拼接後作為Kafka記錄Key,如果選擇多列,使用逗號作為分隔字元拼接列值。如果不選,則Key為空白。
值取值列
指定哪些源端列的值拼接後作為Kafka記錄Value。如果不填寫,預設將所有列拼起來作為Value。拼接方式取決於選擇的寫入模式,詳情請參見:Kafka Writer的參數說明。
鍵類型
Kafka的Key的類型,決定了初始化KafkaProducer時的key.serializer配置,可選值包括STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
實值型別
Kafka的Value的類型,決定了初始化KafkaProducer時的value.serializer配置,可選值包括STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
單次寫入位元組數
一次寫入請求包含的位元組數,建議設定大於16000。
寫入模式
該配置項決定將源端列拼接作為寫入Kafka記錄Value的格式,可選值為text和JSON。
配置為text,將所有列按照資料行分隔符號進行拼接。
配置為JSON,將所有列拼接為JSON字串。
例如,列配置為col1、col2和col3,源端某記錄這三列的值為a、b和c,寫入模式配置為text、資料行分隔符號配置為
#
時,對應寫入Kafka的記錄Value為字串a#b#c
;寫入模式配置為JSON時,寫入Kafka的記錄Value為字串{"col1":"a","col2":"b","col3":"c"}
。資料行分隔符號
當寫入模式配置為text,將源端列按照該配置項指定資料行分隔符號拼接作為寫入Kafka記錄的Value,支援配置單個或者多個字元作為分隔字元,支援以
\u0001
格式配置unicode字元,支援\t
、\n
等逸出字元。預設值為\t
。配置參數
建立Kafka資料消費用戶端KafkaConsumer可以指定擴充參數,例如bootstrap.servers、acks、linger.ms等,您可以基於kafkaConfig控制KafkaConsumer消費資料的行為。即時同步Kafka輸出節點,KafkaConsumer的預設acks參數為all,如果對效能有更高要求可以在配置參數中指定acks覆蓋預設值。acks取值如下:
0:不進行寫入成功確認。
1:確認主副本寫入成功。
all:確認所有副本寫入成功。
單擊工具列中的表徵圖。