本文為您介紹如何使用DataWorksData Integration,將Kafka叢集上的資料移轉至MaxCompute。
前提條件
新增MaxCompute資料來源。詳情請參見建立MaxCompute資料來源。
在DataWorks上完成建立商務程序,本例使用DataWorks簡單模式。詳情請參見建立商務程序。
搭建Kafka叢集
進行資料移轉前,您需要保證自己的Kafka叢集環境正常。本文使用阿里雲EMR服務自動化搭建Kafka叢集,詳細過程請參見Kafka快速入門。
本文使用的EMR Kafka版本資訊如下:
EMR版本:EMR-3.12.1
叢集類型:Kafka
軟體資訊:Ganglia 3.7.2,ZooKeeper 3.4.12,Kafka 2.11-1.0.1,Kafka-Manager 1.3.X.XX
Kafka叢集使用專用網路,地區為華東1(杭州),主執行個體組ECS計算資源配置公網及內網IP。
背景資訊
Kafka是一款分布式發布與訂閱的訊息中介軟體,具有高效能、高吞量的特點被廣泛使用,每秒能處理上百萬的訊息。Kafka適用於流式資料處理,主要應用於使用者行為跟蹤、日誌收集等情境。
一個典型的Kafka叢集包含若干個生產者(Producer)、Broker、消費者(Consumer)以及一個Zookeeper叢集。Kafka叢集通過Zookeeper管理自身叢集的配置並進行服務協同。
Topic是Kafka叢集上最常用的訊息的集合,是一個訊息儲存邏輯概念。物理磁碟不儲存Topic,而是將Topic中具體的訊息按分區(Partition)儲存在叢集中各個節點的磁碟上。每個Topic可以有多個生產者向它發送訊息,也可以有多個消費者向它拉取(消費)訊息。
每個訊息被添加到分區時,會分配一個Offset(位移量,從0開始編號),是訊息在一個分區中的唯一編號。
步驟一:準備Kafka資料
您需要在Kafka叢集建立測試資料。為保證您可以順利登入EMR叢集Header主機,以及保證MaxCompute和DataWorks可以順利和EMR叢集Header主機通訊,請您首先配置EMR叢集Header主機安全性群組,允許存取TCP 22及TCP 9092連接埠。
登入EMR叢集Header主機地址。
進入EMR Hadoop控制台。
在頂部導覽列,單擊叢集管理。
在顯示的頁面,找到您需要建立測試資料的叢集,進入叢集詳情頁。
在叢集詳情頁面,單擊主機列表,確認EMR叢集Header主機地址,並通過SSH連結遠程登入。
建立測試Topic。
執行如下命令建立測試所使用的Topic testkafka。
kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka --create
寫入測試資料。
執行如下命令,可以類比生產者向Topic testkafka中寫入資料。由於Kafka用於處理流式資料,您可以持續不斷地向其中寫入資料。為保證測試結果,建議寫入10條以上的資料。
kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka
您可以同時再開啟一個SSH視窗,執行如下命令,類比消費者驗證資料是否已成功寫入Kafka。當資料寫入成功時,您可以看到已寫入的資料。
kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning
步驟二:在DataWorks上建立目標表
在DataWorks上建立目標表用以接收Kafka資料。
進入資料開發頁面。
登入DataWorks控制台。
單擊左側導覽列 。
在下拉框中選擇對應工作空間後單擊進入資料開發。
按右鍵商務程序,選擇 。
在彈出的建立表對話方塊中,填寫表名稱,並單擊建立。
說明表名必須以字母開頭,不能包含中文或特殊字元。
如果在資料開發中綁定多個MaxCompute資料來源,則按需選擇MaxCompute引擎執行個體。
在表的編輯頁面,單擊DDL模式。
在DDL對話方塊中,輸入如下建表語句,單擊產生表結構。
CREATE TABLE testkafka ( key string, value string, partition1 string, timestamp1 string, offset string, t123 string, event_id string, tag string ) ;
其中的每一列,對應於DataWorksData IntegrationKafka Reader的預設列:
__key__表示訊息的key。
__value__表示訊息的完整內容 。
__partition__表示當前訊息所在分區。
__headers__表示當前訊息headers資訊。
__offset__表示當前訊息的位移量。
__timestamp__表示當前訊息的時間戳記。
您還可以自主命名,詳情參見Kafka Reader。
單擊提交到生產環境並確認。
步驟三:同步資料
建立獨享Data Integration資源群組。
由於當前DataWorks的公用資源群組無法完美支援Kafka外掛程式,您需要使用獨享Data Integration資源群組完成資料同步。詳情請參見新增和使用獨享Data Integration資源群組。
建立Data Integration節點。
進入資料開發頁面,按右鍵指定商務程序,選擇
。在建立節點對話方塊中,輸入節點名稱,並單擊確認。
在頂部功能表列上,單擊表徵圖。
在指令碼模式下,單擊頂部功能表列上的表徵圖。
配置指令碼,範例程式碼如下。
{ "type": "job", "steps": [ { "stepType": "kafka", "parameter": { "server": "47.xxx.xxx.xxx:9092", "kafkaConfig": { "group.id": "console-consumer-83505" }, "valueType": "ByteArray", "column": [ "__key__", "__value__", "__partition__", "__timestamp__", "__offset__", "'123'", "event_id", "tag.desc" ], "topic": "testkafka", "keyType": "ByteArray", "waitTime": "10", "beginOffset": "0", "endOffset": "3" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "truncate": true, "compress": false, "datasource": "odps_source",// MaxCompute資料來源名稱 "column": [ "key", "value", "partition1", "timestamp1", "offset", "t123", "event_id", "tag" ], "emptyAsNull": false, "table": "testkafka" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "" }, "speed": { "throttle": false, "concurrent": 1 } } }
您可以通過在Header主機上使用kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list命令查看group.id參數,及消費者的Group名稱。
命令樣本
kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list
返回結果
_emr-client-metrics-handler-group console-consumer-69493 console-consumer-83505 console-consumer-21030 console-consumer-45322 console-consumer-14773
以console-consumer-83505為例,您可以根據該參數在Header主機上使用kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505命令確認beginOffset及endOffset參數。
命令樣本。
kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
返回結果
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID testkafka 6 0 0 0 - - - test 6 3 3 0 - - - testkafka 0 0 0 0 - - - testkafka 1 1 1 0 - - - testkafka 5 0 0 0 - - -
配置調度資源群組。
在節點編輯頁面的右側導覽列,單擊調度配置。
在資源屬性地區,選擇調度資源群組為您建立的獨享Data Integration資源群組。
說明如果您需要將Kafka的資料周期性(例如每小時)寫入MaxCompute,您可以使用beginDateTime及endDateTime參數,設定資料讀取的時間區間為1小時,然後每小時調度一次Data Integration任務。詳情請參見Kafka Reader。
單擊表徵圖運行代碼。
您可以在作業記錄查看運行結果。
後續步驟
您可以建立一個資料開發工作單位運行SQL語句,查看當前表中是否已存在從雲訊息佇列 Kafka 版同步過來的資料。本文以select * from testkafka
為例,具體步驟如下:
進入資料開發頁面。
登入DataWorks控制台。
在左側導覽列,單擊工作空間。進入工作空間列表詳情介面。
在頂部切換至目標地區,找到已建立的工作空間,單擊操作列的
,進入資料開發頁面。
單擊左側的表徵圖,進入臨時查詢頁面。單擊上面的表徵圖。選擇
節點。在建立節點對話方塊中,輸入路徑、名稱資訊。
單擊確認。
在建立的節點頁面,輸入
select * from testkafka
,單擊表徵圖,運行完成後,查看作業記錄。