全部產品
Search
文件中心

MaxCompute:使用Kafka(離線與即時)

更新時間:Jun 19, 2024

MaxCompute與Kafka的整合能夠提供高效、可靠的資料處理和分析能力,適用於需要即時處理、大規模資料流和複雜資料分析的情境。本文介紹訊息佇列Kafka版和自建Kafka資料的寫入流程,以及自建Kafka資料的寫入樣本。

Kafka資料寫入MaxCompute流程:阿里雲全託管Kafka

MaxCompute與訊息佇列Kafka版服務緊密整合,藉助訊息佇列Kafka版服務的MaxCompute Sink Connector,無需第三方工具及二次開發,即可滿足將指定Topic資料持續匯入MaxCompute資料表的需求,操作詳情請參見建立MaxCompute Sink Connector

Kafka資料寫入MaxCompute流程:自建開源Kafka

前提條件

  • 已部署V2.2及以上版本的Kafka服務(推薦最新版本V3.4.0),並已建立Kafka Topic資訊。

  • 已建立MaxCompute專案和表。具體操作,請參見建立MaxCompute專案建立表

注意事項

Kafka-connector服務支援TEXTCSVJSONFLATTEN類型的Kafka資料寫入,不同類型的注意事項詳情如下。關於資料類型的詳情介紹,請參見資料類型說明

  • TEXTJSON類型的Kafka資料寫入MaxCompute時,MaxCompute表要求如下:

    欄位名稱

    欄位類型

    是否為固定欄位

    topic

    STRING

    partition

    BIGINT

    offset

    BIGINT

    key

    • TEXT類型Kafka資料寫入時,欄位類型必須為STRING。

    • JSON類型Kafka資料寫入時,根據寫入的資料類型設定,支援STRING與JSON。

    需要將Kafka訊息的中的Key值同步到MaxCompute表中時,此欄位為固定欄位。關於Kafka訊息同步到MaxCompute的模式,詳情請參見mode

    value

    • TEXT類型Kafka資料寫入時,欄位類型必須為STRING。

    • JSON類型Kafka資料寫入時,根據寫入的資料類型設定,支援STRING與JSON。

    需要將Kafka訊息的中的Value值同步到MaxCompute表中時,此欄位為固定欄位。關於Kafka訊息同步到MaxCompute的模式,詳情請參見mode

    pt

    STRING(分區欄位)

  • FLATTENCSV類型的Kafka資料寫入MaxCompute時,必須包含以下欄位和欄位類型,您可以根據寫入資料的內容自訂其他欄位。

    欄位名稱

    欄位類型

    topic

    STRING

    partition

    BIGINT

    offset

    BIGINT

    pt

    STRING(分區欄位)

    • CSV類型的Kafka資料寫入MaxCompute表中時,MaxCompute表中自訂的欄位順序和欄位類型,必須與Kafka寫入的資料保持一致,以確保資料能正確寫入。

    • FLATTEN類型的Kafka資料寫入MaxCompute表中時,MaxCompute表中自訂的欄位名稱必須Kafka資料中欄位名稱保持一致,以確保資料能正確寫入。

      例如:要寫入的FLATTEN類型的Kafka資料內容為{"A":a,"B":"b","C":{"D":"d","E":"e"}},那MaxCompute表資訊如下所示。

      CREATE TABLE IF NOT EXISTS table_flatten(
       topic STRING,
       `partition` BIGINT,
       `offset` BIGINT,
       A BIGINT,
       B STRING,
       C JSON
      ) PARTITIONED BY (pt STRING);

配置並啟動Kafka-connector服務

  1. 以Linux環境為例,在命令視窗執行以下命令或下載連結,下載kafka-connector-2.0.jar包。

    wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar

    為防止依賴衝突,建議在$KAFKA_HOME/libs下建立一個子檔案夾,例如connector,用來放置kafka-connector-2.0.jar

    說明

    kafka-connector-2.0.jar包與Kafka的部署環境不一致,配置並啟動Kafka-connector服務的操作詳情,請參見配置Kafka-connector

  2. $KAFKA_HOME/config目錄下,配置connect-distributed.properties檔案。

    connect-distributed.properties檔案中補充以下內容。

    ##新增以下內容
    plugin.path=<KAFKA_HOME>/libs/connector
    
    ##更新key.converter和value.converter參數值
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter  
  3. $KAFKA_HOME/路徑下,執行以下命令,啟動Kafka-connector服務。

    ##啟動命令
    bin/connect-distributed.sh config/connect-distributed.properties &

配置並啟動Kafka-connector任務

  1. 建立並配置odps-sink-connector.json設定檔,並將odps-sink-connector.json檔案上傳至任意位置。

    odps-sink-connector.json設定檔內容與參數介紹如下。

    {
      "name": "Kafka connector task name",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "your_topic",
        "endpoint": "endpoint",
        "tunnel_endpoint": "your_tunnel endpoint",
        "project": "project",
        "schema":"default",
        "table": "your_table",
        "account_type": "account type (STS or ALIYUN)",
        "access_id": "access id",
        "access_key": "access key",
        "account_id": "account id for sts",
        "sts.endpoint": "sts endpoint",
        "region_id": "region id for sts",
        "role_name": "role name for sts",
        "client_timeout_ms": "STS Token valid period (ms)",
        "format": "TEXT",
        "mode": "KEY",
        "partition_window_type": "MINUTE",
        "use_streaming": false,
        "buffer_size_kb": 65536,
        "sink_pool_size":"150",
        "record_batch_size":"8000",
        "runtime.error.topic.name":"kafka topic when runtime errors happens",
        "runtime.error.topic.bootstrap.servers":"kafka bootstrap servers of error topic queue",
        "skip_error":"false"
      }
    }
    • 公用參數

      參數名

      是否必填

      說明

      name

      任務名稱,且名稱必須保持唯一。

      connector.class

      啟動Kafka connector服務的類名,預設值為com.aliyun.odps.kafka.connect.MaxComputeSinkConnector

      tasks.max

      Kafka connector中消費者進程最大個數,必須為大於0的整數。

      topics

      Kafka的Topic名稱。

      endpoint

      MaxCompute服務的串連地址。

      您需要根據建立MaxCompute專案時選擇的地區以及網路連接方式配置Endpoint。各地區及網路對應的Endpoint值,請參見Endpoint

      tunnel_endpoint

      Tunnel服務的外網訪問連結。

      如果您未配置Tunnel Endpoint,Tunnel會自動路由到MaxCompute服務所在網路對應的Tunnel Endpoint。如果您配置了Tunnel Endpoint,則以配置為準,不進行自動路由。

      各地區及網路對應的Tunnel Endpoint值,請參見Endpoint

      project

      訪問的目標MaxCompute專案名稱。

      schema

      • 若目標MaxCompute專案配置Schema三層模型,則需要此參數,且預設值為default

      • 若目標MaxCompute專案未配置Schema三層模型,則無需配置此參數。

      關於Schema的介紹詳情,請參見Schema操作

      table

      目標MaxCompute專案的表名稱。

      format

      寫入的訊息格式。取值如下:

      • TEXT(預設值):訊息的格式為字串。

      • BINARY:訊息的格式為位元組數組。

      • CSV:訊息的格式為逗號(,)分隔的字串。

      • JSON:訊息格式為JSON資料類型的字串。關於MaxCompute JSON類型的詳情,請參見MaxCompute JSON類型使用指南(試用Beta版本)

      • FLATTEN:訊息格式為JSON資料類型的字串,JSON中的Key和Value會被解析,寫入到對應的MaxCompute表中,其中JSON資料中的Key和需要與MaxCompute的表列名對應。

      關於不同格式訊息匯入的案例,詳情請參見使用樣本

      mode

      訊息同步到MaxCompute的模式。取值說明如下:

      • KEY:只保留訊息的Key,並將Key值寫入目標MaxCompute表中。

      • VALUE:只保留訊息的Value,並將Value值寫入目標MaxCompute表中。

      • DEFAULT(預設值):同時保留訊息的Key和Value,並將Key和Value值都寫入目標MaxCompute表中。

        DEFAULT模式下,只支援TEXTBINARY格式資料寫入。

      partition_window_type

      按照系統時間進行資料分區。取值為DAYHOUR(預設值)、MINUTE

      use_streaming

      是否使用流式資料通道。取值說明如下:

      • false(預設值):不使用。

      • true:使用。

      buffer_size_kb

      odps partition writer內部緩衝區的大小,單位KB。預設65536 KB。

      sink_pool_size

      多線程寫入的最大線程數,預設為系統CPU核心數。

      record_batch_size

      一個Kafka-connector任務內部的一個線程最多可以一次並行發送訊息數量。

      skip_error

      是否跳過發生未知錯誤的記錄。取值說明如下:

      • false(預設值):不會跳過。

      • true:跳過。

        說明
        • skip_errorfalse且未配置runtime.error.topic.name參數,若遇到未知錯誤,會停止後續的資料寫入,進程會被阻塞並在日誌中拋出異常。

        • skip_error取值trueruntime.error.topic.name未配置,寫入資料的進程會繼續寫入,異常資料會被丟棄。

        • skip_errorfalse且已配置runtime.error.topic.name參數,寫入資料的進程會繼續寫入,異常資料會被記錄到runtime.error.topic.name配置的Topic

        異常資料處理樣本詳情,請參見異常資料處理樣本

      runtime.error.topic.name

      將資料寫入時發生的未知錯誤的資料寫入至Kafka的Topic名稱。

      runtime.error.topic.bootstrap.servers

      將資料寫入時發生的未知錯誤的資料寫入至Kafka的bootstrap servers地址。

      account_type

      訪問目標MaxCompute服務的方式,支援STSALIYUN兩種方式,預設ALIYUN

      不同方式訪問MaxCompute需要配置不同的訪問憑證參數,詳情請參見通過ALIYUN方式訪問MaxCompute通過STS方式訪問MaxComput

    • 通過ALIYUN方式訪問MaxCompute,除公用參數外還需配置以下參數。

      參數名

      說明

      access_id

      阿里雲帳號或RAM帳號的AccessKey ID。

      您可以進入AccessKey管理頁面擷取AccessKey ID。

      access_key

      AccessKey ID對應的AccessKey Secret。

      您可以進入AccessKey管理頁面擷取AccessKey Secret。

    • 通過STS方式訪問MaxCompute,除公用參數外還需配置以下參數。

      參數名

      說明

      account_id

      訪問目標MaxCompute專案的帳號ID。您可以進入帳號中心查看您的帳號ID。

      region_id

      訪問目標MaxCompute專案的地區ID。各地區對應的地區ID,請參見服務存取點

      role_name

      訪問目標MaxCompute專案的角色名稱。您可以進入角色頁面查看角色名稱。

      client_timeout_ms

      STS Token重新整理的時間間隔,單位為毫秒(ms),預設值為11(ms)。

      sts.endpoint

      使用臨時安全性權杖(STS)進行身份認證時需要的STS 服務地址。

      各地區及網路對應的Endpoint值,請參見服務存取點

  2. 執行以下命令,啟動Kafka-connector資料轉送任務。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json

使用樣本

TEXT類型資料寫入

  1. 資料準備。

    • 通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,建立目標MaxCompute表。

      CREATE TABLE IF NOT EXISTS table_text(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value STRING
      ) PARTITIONED BY (pt STRING);
    • 建立Kafka資料。

      $KAFKA_HOME/bin/目錄下,執行以下命令,建立Kafka Topic。以topic_text為例。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text

      執行以下命令,建立Kafka訊息。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true
      >123    abc
      >456    edf
  2. (可選)啟動Kafka-connector服務。具體操作,請參見配置並啟動Kafka-connector服務

    說明

    Kafka-connector服務已啟動,可跳過此步驟。

  3. 建立並配置odps-sink-connector.json檔案,並將odps-sink-connector.json檔案上傳至任意位置。本文以$KAFKA_HOME/config路徑為例。

    odps-sink-connector.json檔案內容樣本如下,關於odps-sink-connector.json檔案詳情介紹,請參見配置並啟動Kafka-connector任務

    {
        "name": "odps-test-text",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_text",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",
          "schema":"default",
          "table": "table_text",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"TEXT",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
  4. 執行以下命令,啟動Kafka-connector資料轉送任務。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果驗證。

    通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢資料寫入結果。

    set odps.sql.allow.fullscan=true;
    select * from table_text;

    返回結果如下:

    # 這裡由於我們odps-sink-connector.json設定檔中的mode值為VALUE,所以只保留value的內容,key欄位為NULL
    
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | topic_text | 0      | 0          | NULL | abc   | 07-13-2023 21:13 |
    | topic_text | 0      | 1          | NULL | edf   | 07-13-2023 21:13 |
    +-------+------------+------------+-----+-------+----+

CSV類型資料寫入

  1. 資料準備。

    • 通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,建立目標MaxCompute表。

      CREATE TABLE IF NOT EXISTS table_csv(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        region STRING
      ) PARTITIONED BY (pt STRING);
    • 建立Kafka資料。

      $KAFKA_HOME/bin/目錄下,執行以下命令,建立Kafka Topic。以topic_csv為例。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv

      執行以下命令,建立Kafka訊息。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true
      >123	1103,zhangsan,china
      >456	1104,lisi,usa
  2. (可選)啟動Kafka-connector服務。具體操作,請參見配置並啟動Kafka-connector服務

    說明

    Kafka-connector服務已啟動,可跳過此步驟。

  3. 建立並配置odps-sink-connector.json檔案,並將odps-sink-connector.json檔案上傳至任意位置。本文以$KAFKA_HOME/config路徑為例。

    odps-sink-connector.json檔案內容樣本如下,關於odps-sink-connector.json檔案詳情介紹,請參見配置並啟動Kafka-connector任務

    {
        "name": "odps-test-csv",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_csv",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_csv",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "format":"CSV",
          "mode":"VALUE",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 執行以下命令,啟動Kafka-connector資料轉送任務。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果驗證。

    通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢資料寫入結果。

    set odps.sql.allow.fullscan=true;
    select * from table_csv;

    返回結果如下:

    +-------+------------+------------+------------+------+--------+----+
    | topic | partition  | offset     | id         | name | region | pt |
    +-------+------------+------------+------------+------+--------+----+
    | csv_test | 0       | 0          | 1103       | zhangsan | china  | 07-14-2023 00:10 |
    | csv_test | 0       | 1          | 1104       | lisi | usa    | 07-14-2023 00:10 |
    +-------+------------+------------+------------+------+--------+----+

JSON類型資料寫入

  1. 資料準備。

    • 通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,建立目標MaxCompute表。

      CREATE TABLE IF NOT EXISTS table_json(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value JSON
      ) PARTITIONED BY (pt STRING);
    • 建立Kafka資料。

      $KAFKA_HOME/bin/目錄下,執行以下命令,建立Kafka Topic。以topic_json為例。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json

      執行以下命令,建立Kafka訊息。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true
      >123    {"id":123,"name":"json-1","region":"beijing"}                         
      >456    {"id":456,"name":"json-2","region":"hangzhou"}
  2. (可選)啟動Kafka-connector服務。具體操作,請參見配置並啟動Kafka-connector服務

    說明

    Kafka-connector服務已啟動,可跳過此步驟。

  3. 建立並配置odps-sink-connector.json檔案,並將odps-sink-connector.json檔案上傳至任意位置。本文以$KAFKA_HOME/config路徑為例。

    odps-sink-connector.json檔案內容樣本如下,關於odps-sink-connector.json檔案詳情介紹,請參見配置並啟動Kafka-connector任務

    {
        "name": "odps-test-json",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_json",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_json",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"JSON",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 執行以下命令,啟動Kafka-connector資料轉送任務。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果驗證。

    通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢資料寫入結果。

    set odps.sql.allow.fullscan=true;
    select * from table_json;

    返回結果如下:

    # json 資料被成功寫入value欄位中
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | Topic_json | 0      | 0          | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 |
    | Topic_json | 0      | 1          | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 |
    +-------+------------+------------+-----+-------+----+

FLATTEN類型資料寫入

  1. 資料準備。

    • 通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,建立目標MaxCompute表。

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • 建立Kafka資料。

      $KAFKA_HOME/bin/目錄下,執行以下命令,建立Kafka Topic。以topic_flatten為例。

      ./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten

      執行以下命令,建立Kafka訊息。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true
      >123  {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}}                         
      >456  {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}

  2. (可選)啟動Kafka-connector服務。具體操作,請參見配置並啟動Kafka-connector服務

    說明

    Kafka-connector服務已啟動,可跳過此步驟。

  3. 建立並配置odps-sink-connector.json檔案,並將odps-sink-connector.json檔案上傳至任意位置。本文以$KAFKA_HOME/config路徑為例。

    odps-sink-connector.json檔案內容樣本如下,關於odps-sink-connector.json檔案詳情介紹,請參見配置並啟動Kafka-connector任務

    {
        "name": "odps-test-flatten",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_flatten",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_flatten",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"FLATTEN",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 執行以下命令,啟動Kafka-connector任務。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果驗證。

    通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢資料寫入結果。

    set odps.sql.allow.fullscan=true;
    select * from table_flatten;

    返回結果如下:

    # json資料被解析寫入MaxCompute表中,且支援json巢狀型別exteninfo為JSON欄位
    +-------+------------+--------+-----+------+------------+----+
    | topic | partition  | offset | id  | name | extendinfo | pt |
    +-------+------------+--------+-----+------+------------+----+
    | topic_flatten | 0   | 0      | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 |
    | topic_flatten | 0   | 1      | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 |
    +-------+------------+--------+-----+------+------------+----+

異常資料處理樣本

  1. 資料準備。

    • 通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,建立目標MaxCompute表。

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • 建立Kafka資料。

      $KAFKA_HOME/bin/目錄下,執行以下命令,建立Kafka Topic。

      • topic_abnormalTopic。

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormal
      • runtime_error異常訊息Topic。

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error
        說明

        當資料寫入發生未知錯誤(通常是Kafka資料與MaxCompute表格式不匹配),異常資料會被寫入到runtime_errorTopic中。

      執行以下命令,建立Kafka訊息。

      以下訊息中,其中一條資料格式與目標MaxCompute表格式不匹配。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true
      
      >100  {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}}                         
      >101  {"id":101,"name":"json-4","extendinfos":"null"}
      >102	{"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}} 
  2. (可選)啟動Kafka-connector服務。具體操作,請參見配置並啟動Kafka-connector服務

    說明

    Kafka-connector服務已啟動,可跳過此步驟。

  3. 建立並配置odps-sink-connector.json檔案,並將odps-sink-connector.json檔案上傳至任意位置。本文以$KAFKA_HOME/config路徑為例。

    odps-sink-connector.json檔案內容樣本如下,關於odps-sink-connector.json檔案詳情介紹,請參見配置並啟動Kafka-connector任務

    {
      "name": "odps-test-runtime-error",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_abnormal",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema":"default",
        "table": "test_flatten",
        "account_type": "ALIYUN",
        "access_id": "LTAI5tM2iHkTd4W69nof****",
        "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
        "partition_window_type": "MINUTE",
        "mode":"VALUE",
        "format":"FLATTEN",
        "sink_pool_size":"150",
        "record_batch_size":"9000",
        "buffer_size_kb":"600000",
        "runtime.error.topic.name":"runtime_error",
        "runtime.error.topic.bootstrap.servers":"http://XXXX",
        "skip_error":"false"
      }
    }
    
  4. 執行以下命令,啟動Kafka-connector任務。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果驗證。

    • 查詢MaxCompute表資料

      通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢資料寫入結果。

      set odps.sql.allow.fullscan=true;
      select * from table_flatten;

      返回結果如下:

      # 我們看到最後兩條資料,因為設定了skip_error參數為true,所以id為101的資料沒有被寫入MaxCompute,且沒有block後面資料的寫入。
      +-------+------------+------------+------------+------+------------+----+
      | topic | partition  | offset     | id         | name | extendinfo | pt |
      +-------+------------+------------+------------+------+------------+----+
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 2          | 100        | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 4          | 102        | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      +-------+------------+------------+------------+------+------------+----+
    • 查詢runtime_errorTopic的訊息

      $KAFKA_HOME/bin/目錄下,執行以下命令,查看訊息寫入結果。

      sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning

      返回結果如下:

      # 異常資料被成功寫入runtime_error訊息佇列中
      {"id":101,"name":"json-4","extendinfos":"null"}