Kafka外掛程式基於Kafka SDK即時讀取Kafka資料。
背景資訊
說明
- 支援阿里雲Kafka,以及>=0.10.2且<=2.2.x的自建Kafka版本。
- 對於<0.10.2版本Kafka,由於Kafka不支援檢索分區資料offset,且Kafka資料結構可能不支援時間戳記,因此會引發同步任務延時統計錯亂,造成無法正確重設同步位點。
操作步驟
進入資料開發頁面。
登入DataWorks控制台。
在左側導覽列,單擊工作空間列表。
選擇工作空間所在地區後,單擊相應工作空間後的 。
滑鼠移至上方至表徵圖,單擊 。
您也可以展開商務程序,按右鍵目標商務程序,選擇
。在建立節點對話方塊中,選擇同步方式為單表(Topic)到單表(Topic)ETL,輸入名稱,並選擇路徑。
重要節點名稱必須是大小寫字母、中文、數字、底線(_)以及英文句號(.),且不能超過128個字元。
單擊確認。
- 在即時同步節點的編輯頁面,按一下滑鼠 並拖拽至編輯面板。
- 單擊Kafka節點,在節點配置對話方塊中,配置各項參數。
參數 描述 資料來源 選擇已經配置好的Kafka資料來源,此處僅支援Kafka資料來源。如果未配置資料來源,請單擊右側的建立資料來源,跳轉至配置Kafka資料來源。 頁面進行建立。詳情請參見:主題 Kafka的Topic名稱,是Kafka處理資源的訊息源的不同分類。 每條發布至Kafka叢集的訊息都有一個類別,該類別被稱為Topic,一個Topic是對一組訊息的歸納。
說明 一個Kafka輸入僅支援一個Topic。鍵類型 Kafka的Key的類型,決定了初始化KafkaConsumer時的key.deserializer配置,可選值包括STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 實值型別 Kafka的Value的類型,決定了初始化KafkaConsumer時的value.deserializer配置,可選值包括STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 輸出模式 定義解析kafka記錄的方式 - 單行輸出:以無結構字串或者JSON對象解析kafka記錄,一個kafka記錄解析出一個輸出記錄。
- 多行輸出:以JSON數組解析kafka記錄,一個JSON數組元素解析出一個輸出記錄,因而一個kafka記錄可能解析出多個輸出記錄。
說明 目前只在部分地區支援該配置項,如發現無該配置項請耐心等待功能在對應地區發布。數組所在位置路徑 當輸出模式設定為多行輸出時,指定JSON數組在kafka記錄value中的路徑,路徑支援以 a.a1
的格式引用特定JSON對象中的欄位或者以a[0].a1
的格式引用特定JSON數組中的欄位,如果該配置項為空白,則將整個kafka記錄value作為一個JSON數組解析。注意解析的目標JSON數組必須是對象數組,例如
[{"a":"hello"},{"b":"world"}]
,不能是數值或字串數組,例如["a","b"]
。配置參數 建立Kafka資料消費用戶端KafkaConsumer 可以指定擴充參數,例如,bootstrap.servers、auto.commit.interval.ms、session.timeout.ms等,各版本Kafka叢集支援的KafkaConsumer 參數可以參考Kafka官方文檔,您可以基於kafkaConfig控制KafkaConsumer讀取資料的行為。即時同步Kafka輸入節點,KafkaConsumer預設使用隨機字串設定 group.id
,如果希望同步位點上傳到Kafka叢集指定群組,可以在配置參數中手動指定group.id
。即時同步Kafka輸入節點不依賴Kafka服務端維護的群組資訊管理位點,所以對配置參數中group.id
的設定不會影響同步任務啟動、重啟、Failover等情境下的讀取位點。輸出欄位 您可以自訂Kafka資料對外輸出的欄位名: - 單擊添加更多欄位,輸入欄位名,並選擇類型,即可新增自訂欄位。取值方式支援從kafka記錄中取得欄位值的方式,單擊右側按鈕可以在兩類取值方式間切換。
- 預置取值方式:提供6種可選預置從kafka記錄中取值的方式:
- value:訊息體
- key:訊息鍵
- partition:分區號
- offset:位移量
- timestamp:訊息的毫秒時間戳記
- headers:訊息頭
- JSON解析取值:可以通過.(擷取子欄位)和[](擷取數組元素)兩種文法,擷取複雜JSON格式的內容,同時為了相容歷史邏輯,支援在選擇JSON解析取值時使用例如__value__這樣以兩個底線開頭的字串擷取kafka記錄的特定內容作為欄位值。Kafka的資料樣本如下。
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ] }
- 不同情況下,輸出欄位的取值為:
- 如果同步kafka記錄value,取值方式填寫__value__。
- 如果同步kafka記錄key,取值方式填寫__key__。
- 如果同步kafka記錄partition,取值方式填寫__partition__。
- 如果同步kafka記錄offset,取值方式填寫__offset__。
- 如果同步kafka記錄timestamp,取值方式填寫__timestamp__。
- 如果同步kafka記錄headers,取值方式填寫__headers__。
- 如果同步a1的資料"hello",取值方式填寫a.a1。
- 如果同步b的資料"world,取值方式填寫b。
- 如果同步c的資料"yyyyyyy",取值方式填寫c[1]。
- 如果同步AA的資料"this",取值方式填寫d[0].AA。
- 不同情況下,輸出欄位的取值為:
- 預置取值方式:提供6種可選預置從kafka記錄中取值的方式:
- 滑鼠移至上方至相應欄位,單擊顯示的表徵圖,即可刪除該欄位。
當數組所在位置路徑填寫{ "c": { "c0": [ { "AA": "this", "BB": "is_data" }, { "AA": "that", "BB": "is_also_data" } ] } }
c.c0
,輸出欄位定義兩個欄位,一個欄位名為AA
,取值方式為AA
,一個欄位名為BB
,取值方式為BB
,那麼該條Kafka記錄將解析得到如下兩條記錄: - 單擊工具列中的表徵圖。說明 一個Kafka輸入僅支援一個Topic。