全部產品
Search
文件中心

MaxCompute:Kafka資料移轉MaxCompute最佳實務

更新時間:Nov 26, 2024

本文為您介紹如何使用DataWorksData Integration,將Kafka叢集上的資料移轉至MaxCompute。

前提條件

  • 開通MaxCompute和DataWorks

  • 新增MaxCompute資料來源。詳情請參見建立MaxCompute資料來源

  • 在DataWorks上完成建立商務程序,本例使用DataWorks簡單模式。詳情請參見建立商務程序

  • 搭建Kafka叢集

    進行資料移轉前,您需要保證自己的Kafka叢集環境正常。本文使用阿里雲EMR服務自動化搭建Kafka叢集,詳細過程請參見Kafka快速入門

    本文使用的EMR Kafka版本資訊如下:

    • EMR版本:EMR-3.12.1

    • 叢集類型:Kafka

    • 軟體資訊:Ganglia 3.7.2,ZooKeeper 3.4.12,Kafka 2.11-1.0.1,Kafka-Manager 1.3.X.XX

    Kafka叢集使用專用網路,地區為華東1(杭州),主執行個體組ECS計算資源配置公網及內網IP。

背景資訊

Kafka是一款分布式發布與訂閱的訊息中介軟體,具有高效能、高吞量的特點被廣泛使用,每秒能處理上百萬的訊息。Kafka適用於流式資料處理,主要應用於使用者行為跟蹤、日誌收集等情境。

一個典型的Kafka叢集包含若干個生產者(Producer)、Broker、消費者(Consumer)以及一個Zookeeper叢集。Kafka叢集通過Zookeeper管理自身叢集的配置並進行服務協同。

Topic是Kafka叢集上最常用的訊息的集合,是一個訊息儲存邏輯概念。物理磁碟不儲存Topic,而是將Topic中具體的訊息按分區(Partition)儲存在叢集中各個節點的磁碟上。每個Topic可以有多個生產者向它發送訊息,也可以有多個消費者向它拉取(消費)訊息。

每個訊息被添加到分區時,會分配一個Offset(位移量,從0開始編號),是訊息在一個分區中的唯一編號。

步驟一:準備Kafka資料

您需要在Kafka叢集建立測試資料。為保證您可以順利登入EMR叢集Header主機,以及保證MaxCompute和DataWorks可以順利和EMR叢集Header主機通訊,請您首先配置EMR叢集Header主機安全性群組,允許存取TCP 22及TCP 9092連接埠。

  1. 登入EMR叢集Header主機地址。

    1. 進入EMR Hadoop控制台。

    2. 在頂部導覽列,單擊叢集管理

    3. 在顯示的頁面,找到您需要建立測試資料的叢集,進入叢集詳情頁。

    4. 在叢集詳情頁面,單擊主機列表,確認EMR叢集Header主機地址,並通過SSH連結遠程登入。

  2. 建立測試Topic。

    執行如下命令建立測試所使用的Topic testkafka。

    kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka  --create
  3. 寫入測試資料。

    執行如下命令,可以類比生產者向Topic testkafka中寫入資料。由於Kafka用於處理流式資料,您可以持續不斷地向其中寫入資料。為保證測試結果,建議寫入10條以上的資料。

    kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka

    您可以同時再開啟一個SSH視窗,執行如下命令,類比消費者驗證資料是否已成功寫入Kafka。當資料寫入成功時,您可以看到已寫入的資料。

    kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning

步驟二:在DataWorks上建立目標表

在DataWorks上建立目標表用以接收Kafka資料。

  1. 進入資料開發頁面。

    1. 登入DataWorks控制台

    2. 單擊左側導覽列資料開發與治理 > 資料開發

    3. 在下拉框中選擇對應工作空間後單擊進入資料開發

  2. 按右鍵商務程序,選擇建立 > MaxCompute >

  3. 在彈出的建立表對話方塊中,填寫表名稱,並單擊建立

    說明
    • 表名必須以字母開頭,不能包含中文或特殊字元。

    • 如果在資料開發中綁定多個MaxCompute資料來源,則按需選擇MaxCompute引擎執行個體

  4. 在表的編輯頁面,單擊DDL模式

  5. DDL對話方塊中,輸入如下建表語句,單擊產生表結構

    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition1      string,
     timestamp1      string,
     offset          string,
     t123            string,
     event_id        string,
     tag             string
    ) ;

    其中的每一列,對應於DataWorksData IntegrationKafka Reader的預設列:

    • __key__表示訊息的key。

    • __value__表示訊息的完整內容 。

    • __partition__表示當前訊息所在分區。

    • __headers__表示當前訊息headers資訊。

    • __offset__表示當前訊息的位移量。

    • __timestamp__表示當前訊息的時間戳記。

    您還可以自主命名,詳情參見Kafka Reader

  6. 單擊提交到生產環境確認

步驟三:同步資料

  1. 建立獨享Data Integration資源群組。

    由於當前DataWorks的公用資源群組無法完美支援Kafka外掛程式,您需要使用獨享Data Integration資源群組完成資料同步。詳情請參見新增和使用獨享Data Integration資源群組

  2. 建立Data Integration節點。

    1. 進入資料開發頁面,按右鍵指定商務程序,選擇建立節點 > Data Integration > 離線同步

    2. 建立節點對話方塊中,輸入節點名稱,並單擊確認

  3. 在頂部功能表列上,單擊轉化指令碼表徵圖。

  4. 在指令碼模式下,單擊頂部功能表列上的**表徵圖。

  5. 配置指令碼,範例程式碼如下。

    {
        "type": "job",
        "steps": [
            {
                "stepType": "kafka",
                "parameter": {
                    "server": "47.xxx.xxx.xxx:9092",
                    "kafkaConfig": {
                        "group.id": "console-consumer-83505"
                    },
                    "valueType": "ByteArray",
                    "column": [
                        "__key__",
                        "__value__",
                        "__partition__",
                        "__timestamp__",
                        "__offset__",
                        "'123'",
                        "event_id",
                        "tag.desc"
                    ],
                    "topic": "testkafka",
                    "keyType": "ByteArray",
                    "waitTime": "10",
                    "beginOffset": "0",
                    "endOffset": "3"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "truncate": true,
                    "compress": false,
                    "datasource": "odps_source",// MaxCompute資料來源名稱
                    "column": [
                        "key",
                        "value",
                        "partition1",
                        "timestamp1",
                        "offset",
                        "t123",
                        "event_id",
                        "tag"
                    ],
                    "emptyAsNull": false,
                    "table": "testkafka"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "throttle": false,
                "concurrent": 1
            }
        }
    }

    您可以通過在Header主機上使用kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list命令查看group.id參數,及消費者的Group名稱。

    • 命令樣本

      kafka-consumer-groups.sh  --bootstrap-server emr-header-1:9092  --list
    • 返回結果

      _emr-client-metrics-handler-group
      console-consumer-69493
      console-consumer-83505
      console-consumer-21030
      console-consumer-45322
      console-consumer-14773

    console-consumer-83505為例,您可以根據該參數在Header主機上使用kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505命令確認beginOffsetendOffset參數。

    • 命令樣本。

      kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
    • 返回結果

      TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
      testkafka                      6          0               0               0          -                                                 -                              -
      test                           6          3               3               0          -                                                 -                              -
      testkafka                      0          0               0               0          -                                                 -                              -
      testkafka                      1          1               1               0          -                                                 -                              -
      testkafka                      5          0               0               0          -                                                 -                              -
  6. 配置調度資源群組。

    1. 在節點編輯頁面的右側導覽列,單擊調度配置

    2. 資源屬性地區,選擇調度資源群組為您建立的獨享Data Integration資源群組。

      說明

      如果您需要將Kafka的資料周期性(例如每小時)寫入MaxCompute,您可以使用beginDateTimeendDateTime參數,設定資料讀取的時間區間為1小時,然後每小時調度一次Data Integration任務。詳情請參見Kafka Reader

  7. 單擊**表徵圖運行代碼。

  8. 您可以在作業記錄查看運行結果。

後續步驟

您可以建立一個資料開發工作單位運行SQL語句,查看當前表中是否已存在從雲訊息佇列 Kafka 版同步過來的資料。本文以select * from testkafka為例,具體步驟如下:

  1. 進入資料開發頁面。

    1. 登入DataWorks控制台

    2. 在左側導覽列,單擊工作空間。進入工作空間列表詳情介面。

    3. 在頂部切換至目標地區,找到已建立的工作空間,單擊操作列的快速進入 > 資料開發,進入資料開發頁面。

  2. 單擊左側的image表徵圖,進入臨時查詢頁面。單擊上面的image表徵圖。選擇建立 > ODPS SQL節點。

  3. 建立節點對話方塊中,輸入路徑名稱資訊。

  4. 單擊確認

  5. 在建立的節點頁面,輸入select * from testkafka,單擊image表徵圖,運行完成後,查看作業記錄。

    image