Kafka外掛程式基於Kafka SDK即時讀取Kafka資料。
背景資訊
支援阿里雲Kafka,以及>=0.10.2且<=2.2.x的自建Kafka版本。
對於<0.10.2版本Kafka,由於Kafka不支援檢索分區資料offset,且Kafka資料結構可能不支援時間戳記,因此會引發同步任務延時統計錯亂,造成無法正確重設同步位點。
kafka資料來源配置詳情請參考:配置Kafka資料來源。
操作步驟
進入資料開發頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的 ,在下拉框中選擇對應工作空間後單擊進入資料開發。
滑鼠移至上方至表徵圖,單擊 。
您也可以展開商務程序,按右鍵目標商務程序,選擇
。在建立節點對話方塊中,選擇同步方式為單表(Topic)到單表(Topic)ETL,輸入名稱,並選擇路徑。
重要節點名稱必須是大小寫字母、中文、數字、底線(_)以及英文句號(.),且不能超過128個字元。
單擊確認。
在即時同步節點的編輯頁面,按一下滑鼠 並拖拽至編輯面板。
單擊Kafka節點,在節點配置對話方塊中,配置各項參數。
參數
描述
資料來源
選擇已經配置好的Kafka資料來源,此處僅支援Kafka資料來源。如果未配置資料來源,請單擊右側的建立資料來源,跳轉至配置Kafka資料來源。
頁面進行建立。詳情請參見:主題
Kafka的Topic名稱,是Kafka處理資源的訊息源的不同分類。
每條發布至Kafka叢集的訊息都有一個類別,該類別被稱為Topic,一個Topic是對一組訊息的歸納。
說明一個Kafka輸入僅支援一個Topic。
鍵類型
Kafka的Key的類型,決定了初始化KafkaConsumer時的key.deserializer配置,可選值包括STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
實值型別
Kafka的Value的類型,決定了初始化KafkaConsumer時的value.deserializer配置,可選值包括STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
輸出模式
定義解析kafka記錄的方式
單行輸出:以無結構字串或者JSON對象解析kafka記錄,一個kafka記錄解析出一個輸出記錄。
多行輸出:以JSON數組解析Kafka記錄,一個JSON數組元素解析出一個輸出記錄,因此一個Kafka記錄可能解析出多個輸出記錄。
說明目前只在部分地區支援該配置項,如發現無該配置項請耐心等待功能在對應地區發布。
數組所在位置路徑
當輸出模式設定為多行輸出時,指定JSON數組在kafka記錄value中的路徑,路徑支援以
a.a1
的格式引用特定JSON對象中的欄位或者以a[0].a1
的格式引用特定JSON數組中的欄位,如果該配置項為空白,則將整個kafka記錄value作為一個JSON數組解析。注意解析的目標JSON數組必須是對象數組,例如
[{"a":"hello"},{"b":"world"}]
,不能是數值或字串數組,例如["a","b"]
。配置參數
建立Kafka資料消費用戶端KafkaConsumer 可以指定擴充參數,例如,bootstrap.servers、auto.commit.interval.ms、session.timeout.ms等,各版本Kafka叢集支援的KafkaConsumer 參數可以參考Kafka官方文檔,您可以基於kafkaConfig控制KafkaConsumer讀取資料的行為。即時同步Kafka輸入節點,KafkaConsumer預設使用隨機字串設定
group.id
,如果希望同步位點上傳到Kafka叢集指定群組,可以在配置參數中手動指定group.id
。即時同步Kafka輸入節點不依賴Kafka服務端維護的群組資訊管理位點,所以對配置參數中group.id
的設定不會影響同步任務啟動、重啟、Failover等情境下的讀取位點。輸出欄位
您可以自訂Kafka資料對外輸出的欄位名:
單擊添加更多欄位,輸入欄位名,並選擇類型,即可新增自訂欄位。
取值方式支援從kafka記錄中取得欄位值的方式,單擊右側按鈕可以在兩類取值方式間切換。
預置取值方式:提供6種可選預置從kafka記錄中取值的方式:
value:訊息體
key:訊息鍵
partition:分區號
offset:位移量
timestamp:訊息的毫秒時間戳記
headers:訊息頭
JSON解析取值:可以通過.(擷取子欄位)和[](擷取數組元素)兩種文法,擷取複雜JSON格式的內容,同時為了相容歷史邏輯,支援在選擇JSON解析取值時使用例如__value__這樣以兩個底線開頭的字串擷取kafka記錄的特定內容作為欄位值。Kafka的資料樣本如下。
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ] }
不同情況下,輸出欄位的取值為:
如果同步Kafka記錄value,取值方式填寫__value__。
如果同步Kafka記錄key,取值方式填寫__key__。
如果同步Kafka記錄partition,取值方式填寫__partition__。
如果同步Kafka記錄offset,取值方式填寫__offset__。
如果同步Kafka記錄timestamp,取值方式填寫__timestamp__。
如果同步Kafka記錄headers,取值方式填寫__headers__。
如果同步a1的資料"hello",取值方式填寫a.a1。
如果同步b的資料"world,取值方式填寫b。
如果同步c的資料"yyyyyyy",取值方式填寫c[1]。
如果同步AA的資料"this",取值方式填寫d[0].AA。
滑鼠移至上方至相應欄位,單擊顯示的表徵圖,即可刪除該欄位。
情境樣本:在輸出模式選擇多行輸出情況下,將先根據數組所在位置路徑指定的JSON路徑解析出JSON數組,然後取出JSON數組中的每一個JSON對象,再根據定義的欄位名和取值方式解析組成輸出欄位,取值方式的定義與單行輸出模式一樣,可以通過.(擷取子欄位)和[](擷取數組元素)兩種文法,擷取複雜JSON格式的內容。Kafka執行個體資料如下:
{ "c": { "c0": [ { "AA": "this", "BB": "is_data" }, { "AA": "that", "BB": "is_also_data" } ] } }
當數組所在位置路徑填寫
c.c0
,輸出欄位定義兩個欄位,一個欄位名為AA
,取值方式為AA
,一個欄位名為BB
,取值方式為BB
,那麼該條Kafka記錄將解析得到如下兩條記錄:單擊工具列中的表徵圖。
說明一個Kafka輸入僅支援一個Topic。