MaxCompute與Kafka的整合能夠提供高效、可靠的資料處理和分析能力,適用於需要即時處理、大規模資料流和複雜資料分析的情境。本文介紹訊息佇列Kafka版和自建Kafka資料的寫入流程,以及自建Kafka資料的寫入樣本。
Kafka資料寫入MaxCompute流程:阿里雲全託管Kafka
MaxCompute與訊息佇列Kafka版服務緊密整合,藉助訊息佇列Kafka版服務的MaxCompute Sink Connector,無需第三方工具及二次開發,即可滿足將指定Topic資料持續匯入MaxCompute資料表的需求,操作詳情請參見建立MaxCompute Sink Connector。
Kafka資料寫入MaxCompute流程:自建開源Kafka
前提條件
已部署V2.2及以上版本的Kafka服務(推薦最新版本V3.4.0),並已建立Kafka Topic資訊。
已建立MaxCompute專案和表。具體操作,請參見建立MaxCompute專案和建立表。
注意事項
Kafka-connector服務支援TEXT、CSV、JSON和FLATTEN類型的Kafka資料寫入,不同類型的注意事項詳情如下。關於資料類型的詳情介紹,請參見資料類型說明。
TEXT和JSON類型的Kafka資料寫入MaxCompute時,MaxCompute表要求如下:
欄位名稱
欄位類型
是否為固定欄位
topic
STRING
是
partition
BIGINT
是
offset
BIGINT
是
key
TEXT類型Kafka資料寫入時,欄位類型必須為STRING。
JSON類型Kafka資料寫入時,根據寫入的資料類型設定,支援STRING與JSON。
需要將Kafka訊息的中的Key值同步到MaxCompute表中時,此欄位為固定欄位。關於Kafka訊息同步到MaxCompute的模式,詳情請參見mode。
value
TEXT類型Kafka資料寫入時,欄位類型必須為STRING。
JSON類型Kafka資料寫入時,根據寫入的資料類型設定,支援STRING與JSON。
需要將Kafka訊息的中的Value值同步到MaxCompute表中時,此欄位為固定欄位。關於Kafka訊息同步到MaxCompute的模式,詳情請參見mode。
pt
STRING(分區欄位)
是
FLATTEN和CSV類型的Kafka資料寫入MaxCompute時,必須包含以下欄位和欄位類型,您可以根據寫入資料的內容自訂其他欄位。
欄位名稱
欄位類型
topic
STRING
partition
BIGINT
offset
BIGINT
pt
STRING(分區欄位)
CSV類型的Kafka資料寫入MaxCompute表中時,MaxCompute表中自訂的欄位順序和欄位類型,必須與Kafka寫入的資料保持一致,以確保資料能正確寫入。
FLATTEN類型的Kafka資料寫入MaxCompute表中時,MaxCompute表中自訂的欄位名稱必須Kafka資料中欄位名稱保持一致,以確保資料能正確寫入。
例如:要寫入的FLATTEN類型的Kafka資料內容為
{"A":a,"B":"b","C":{"D":"d","E":"e"}}
,那MaxCompute表資訊如下所示。CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, A BIGINT, B STRING, C JSON ) PARTITIONED BY (pt STRING);
配置並啟動Kafka-connector服務
以Linux環境為例,在命令視窗執行以下命令或下載連結,下載
kafka-connector-2.0.jar
包。wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar
為防止依賴衝突,建議在
$KAFKA_HOME/libs
下建立一個子檔案夾,例如connector
,用來放置kafka-connector-2.0.jar
包。說明若
kafka-connector-2.0.jar
包與Kafka的部署環境不一致,配置並啟動Kafka-connector
服務的操作詳情,請參見配置Kafka-connector。在
$KAFKA_HOME/config
目錄下,配置connect-distributed.properties
檔案。在
connect-distributed.properties
檔案中補充以下內容。##新增以下內容 plugin.path=<KAFKA_HOME>/libs/connector ##更新key.converter和value.converter參數值 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
在
$KAFKA_HOME/
路徑下,執行以下命令,啟動Kafka-connector
服務。##啟動命令 bin/connect-distributed.sh config/connect-distributed.properties &
配置並啟動Kafka-connector任務
建立並配置
odps-sink-connector.json
設定檔,並將odps-sink-connector.json
檔案上傳至任意位置。odps-sink-connector.json
設定檔內容與參數介紹如下。{ "name": "Kafka connector task name", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "your_topic", "endpoint": "endpoint", "tunnel_endpoint": "your_tunnel endpoint", "project": "project", "schema":"default", "table": "your_table", "account_type": "account type (STS or ALIYUN)", "access_id": "access id", "access_key": "access key", "account_id": "account id for sts", "sts.endpoint": "sts endpoint", "region_id": "region id for sts", "role_name": "role name for sts", "client_timeout_ms": "STS Token valid period (ms)", "format": "TEXT", "mode": "KEY", "partition_window_type": "MINUTE", "use_streaming": false, "buffer_size_kb": 65536, "sink_pool_size":"150", "record_batch_size":"8000", "runtime.error.topic.name":"kafka topic when runtime errors happens", "runtime.error.topic.bootstrap.servers":"kafka bootstrap servers of error topic queue", "skip_error":"false" } }
公用參數
參數名
是否必填
說明
name
是
任務名稱,且名稱必須保持唯一。
connector.class
是
啟動
Kafka connector
服務的類名,預設值為com.aliyun.odps.kafka.connect.MaxComputeSinkConnector
。tasks.max
是
Kafka connector
中消費者進程最大個數,必須為大於0的整數。topics
是
Kafka的Topic名稱。
endpoint
是
MaxCompute服務的串連地址。
您需要根據建立MaxCompute專案時選擇的地區以及網路連接方式配置Endpoint。各地區及網路對應的Endpoint值,請參見Endpoint。
tunnel_endpoint
否
Tunnel服務的外網訪問連結。
如果您未配置Tunnel Endpoint,Tunnel會自動路由到MaxCompute服務所在網路對應的Tunnel Endpoint。如果您配置了Tunnel Endpoint,則以配置為準,不進行自動路由。
各地區及網路對應的Tunnel Endpoint值,請參見Endpoint。
project
是
訪問的目標MaxCompute專案名稱。
schema
否
若目標MaxCompute專案配置Schema三層模型,則需要此參數,且預設值為default。
若目標MaxCompute專案未配置Schema三層模型,則無需配置此參數。
關於Schema的介紹詳情,請參見Schema操作。
table
是
目標MaxCompute專案的表名稱。
format
否
寫入的訊息格式。取值如下:
TEXT(預設值):訊息的格式為字串。
BINARY:訊息的格式為位元組數組。
CSV:訊息的格式為逗號(,)分隔的字串。
JSON:訊息格式為JSON資料類型的字串。關於MaxCompute JSON類型的詳情,請參見MaxCompute JSON類型使用指南(試用Beta版本)。
FLATTEN:訊息格式為JSON資料類型的字串,JSON中的Key和Value會被解析,寫入到對應的MaxCompute表中,其中JSON資料中的Key和需要與MaxCompute的表列名對應。
關於不同格式訊息匯入的案例,詳情請參見使用樣本。
mode
否
訊息同步到MaxCompute的模式。取值說明如下:
KEY:只保留訊息的Key,並將Key值寫入目標MaxCompute表中。
VALUE:只保留訊息的Value,並將Value值寫入目標MaxCompute表中。
DEFAULT(預設值):同時保留訊息的Key和Value,並將Key和Value值都寫入目標MaxCompute表中。
DEFAULT模式下,只支援TEXT和BINARY格式資料寫入。
partition_window_type
否
按照系統時間進行資料分區。取值為DAY、HOUR(預設值)、MINUTE。
use_streaming
否
是否使用流式資料通道。取值說明如下:
false(預設值):不使用。
true:使用。
buffer_size_kb
否
odps partition writer內部緩衝區的大小,單位KB。預設65536 KB。
sink_pool_size
否
多線程寫入的最大線程數,預設為系統CPU核心數。
record_batch_size
否
一個Kafka-connector任務內部的一個線程最多可以一次並行發送訊息數量。
skip_error
否
是否跳過發生未知錯誤的記錄。取值說明如下:
false(預設值):不會跳過。
true:跳過。
說明當skip_error為false且未配置runtime.error.topic.name參數,若遇到未知錯誤,會停止後續的資料寫入,進程會被阻塞並在日誌中拋出異常。
當skip_error取值true且runtime.error.topic.name未配置,寫入資料的進程會繼續寫入,異常資料會被丟棄。
當skip_error為false且已配置runtime.error.topic.name參數,寫入資料的進程會繼續寫入,異常資料會被記錄到runtime.error.topic.name配置的Topic中。
異常資料處理樣本詳情,請參見異常資料處理樣本。
runtime.error.topic.name
否
將資料寫入時發生的未知錯誤的資料寫入至Kafka的Topic名稱。
runtime.error.topic.bootstrap.servers
否
將資料寫入時發生的未知錯誤的資料寫入至Kafka的bootstrap servers地址。
account_type
是
訪問目標MaxCompute服務的方式,支援STS、ALIYUN兩種方式,預設ALIYUN。
不同方式訪問MaxCompute需要配置不同的訪問憑證參數,詳情請參見通過ALIYUN方式訪問MaxCompute和通過STS方式訪問MaxComput。
通過ALIYUN方式訪問MaxCompute,除公用參數外還需配置以下參數。
參數名
說明
access_id
阿里雲帳號或RAM帳號的AccessKey ID。
您可以進入AccessKey管理頁面擷取AccessKey ID。
access_key
AccessKey ID對應的AccessKey Secret。
您可以進入AccessKey管理頁面擷取AccessKey Secret。
通過STS方式訪問MaxCompute,除公用參數外還需配置以下參數。
參數名
說明
account_id
訪問目標MaxCompute專案的帳號ID。您可以進入帳號中心查看您的帳號ID。
region_id
訪問目標MaxCompute專案的地區ID。各地區對應的地區ID,請參見服務存取點。
role_name
訪問目標MaxCompute專案的角色名稱。您可以進入角色頁面查看角色名稱。
client_timeout_ms
STS Token重新整理的時間間隔,單位為毫秒(ms),預設值為11(ms)。
sts.endpoint
使用臨時安全性權杖(STS)進行身份認證時需要的STS 服務地址。
各地區及網路對應的Endpoint值,請參見服務存取點。
執行以下命令,啟動Kafka-connector資料轉送任務。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json
使用樣本
TEXT類型資料寫入
資料準備。
通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,建立目標MaxCompute表。
CREATE TABLE IF NOT EXISTS table_text( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value STRING ) PARTITIONED BY (pt STRING);
建立Kafka資料。
在
$KAFKA_HOME/bin/
目錄下,執行以下命令,建立Kafka Topic。以topic_text
為例。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text
執行以下命令,建立Kafka訊息。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true >123 abc >456 edf
(可選)啟動
Kafka-connector
服務。具體操作,請參見配置並啟動Kafka-connector服務。說明若
Kafka-connector
服務已啟動,可跳過此步驟。建立並配置
odps-sink-connector.json
檔案,並將odps-sink-connector.json
檔案上傳至任意位置。本文以$KAFKA_HOME/config
路徑為例。odps-sink-connector.json
檔案內容樣本如下,關於odps-sink-connector.json
檔案詳情介紹,請參見配置並啟動Kafka-connector任務。{ "name": "odps-test-text", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_text", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_text", "account_type": "ALIYUN", "access_id": "LTAI5tM2iHkTd4W69nof****", "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"TEXT", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
執行以下命令,啟動Kafka-connector資料轉送任務。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
結果驗證。
通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢資料寫入結果。
set odps.sql.allow.fullscan=true; select * from table_text;
返回結果如下:
# 這裡由於我們odps-sink-connector.json設定檔中的mode值為VALUE,所以只保留value的內容,key欄位為NULL +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | topic_text | 0 | 0 | NULL | abc | 07-13-2023 21:13 | | topic_text | 0 | 1 | NULL | edf | 07-13-2023 21:13 | +-------+------------+------------+-----+-------+----+
CSV類型資料寫入
資料準備。
通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,建立目標MaxCompute表。
CREATE TABLE IF NOT EXISTS table_csv( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, region STRING ) PARTITIONED BY (pt STRING);
建立Kafka資料。
在
$KAFKA_HOME/bin/
目錄下,執行以下命令,建立Kafka Topic。以topic_csv
為例。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv
執行以下命令,建立Kafka訊息。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true >123 1103,zhangsan,china >456 1104,lisi,usa
(可選)啟動
Kafka-connector
服務。具體操作,請參見配置並啟動Kafka-connector服務。說明若
Kafka-connector
服務已啟動,可跳過此步驟。建立並配置
odps-sink-connector.json
檔案,並將odps-sink-connector.json
檔案上傳至任意位置。本文以$KAFKA_HOME/config
路徑為例。odps-sink-connector.json
檔案內容樣本如下,關於odps-sink-connector.json
檔案詳情介紹,請參見配置並啟動Kafka-connector任務。{ "name": "odps-test-csv", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_csv", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_csv", "account_type": "ALIYUN", "access_id": "LTAI5tM2iHkTd4W69nof****", "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****", "partition_window_type": "MINUTE", "format":"CSV", "mode":"VALUE", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
執行以下命令,啟動Kafka-connector資料轉送任務。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
結果驗證。
通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢資料寫入結果。
set odps.sql.allow.fullscan=true; select * from table_csv;
返回結果如下:
+-------+------------+------------+------------+------+--------+----+ | topic | partition | offset | id | name | region | pt | +-------+------------+------------+------------+------+--------+----+ | csv_test | 0 | 0 | 1103 | zhangsan | china | 07-14-2023 00:10 | | csv_test | 0 | 1 | 1104 | lisi | usa | 07-14-2023 00:10 | +-------+------------+------------+------------+------+--------+----+
JSON類型資料寫入
資料準備。
通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,建立目標MaxCompute表。
CREATE TABLE IF NOT EXISTS table_json( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value JSON ) PARTITIONED BY (pt STRING);
建立Kafka資料。
在
$KAFKA_HOME/bin/
目錄下,執行以下命令,建立Kafka Topic。以topic_json
為例。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json
執行以下命令,建立Kafka訊息。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true >123 {"id":123,"name":"json-1","region":"beijing"} >456 {"id":456,"name":"json-2","region":"hangzhou"}
(可選)啟動
Kafka-connector
服務。具體操作,請參見配置並啟動Kafka-connector服務。說明若
Kafka-connector
服務已啟動,可跳過此步驟。建立並配置
odps-sink-connector.json
檔案,並將odps-sink-connector.json
檔案上傳至任意位置。本文以$KAFKA_HOME/config
路徑為例。odps-sink-connector.json
檔案內容樣本如下,關於odps-sink-connector.json
檔案詳情介紹,請參見配置並啟動Kafka-connector任務。{ "name": "odps-test-json", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_json", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_json", "account_type": "ALIYUN", "access_id": "LTAI5tM2iHkTd4W69nof****", "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"JSON", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
執行以下命令,啟動Kafka-connector資料轉送任務。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
結果驗證。
通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢資料寫入結果。
set odps.sql.allow.fullscan=true; select * from table_json;
返回結果如下:
# json 資料被成功寫入value欄位中 +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | Topic_json | 0 | 0 | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 | | Topic_json | 0 | 1 | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 | +-------+------------+------------+-----+-------+----+
FLATTEN類型資料寫入
資料準備。
通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,建立目標MaxCompute表。
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);
建立Kafka資料。
在
$KAFKA_HOME/bin/
目錄下,執行以下命令,建立Kafka Topic。以topic_flatten
為例。./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten
執行以下命令,建立Kafka訊息。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true >123 {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}} >456 {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}
(可選)啟動
Kafka-connector
服務。具體操作,請參見配置並啟動Kafka-connector服務。說明若
Kafka-connector
服務已啟動,可跳過此步驟。建立並配置
odps-sink-connector.json
檔案,並將odps-sink-connector.json
檔案上傳至任意位置。本文以$KAFKA_HOME/config
路徑為例。odps-sink-connector.json
檔案內容樣本如下,關於odps-sink-connector.json
檔案詳情介紹,請參見配置並啟動Kafka-connector任務。{ "name": "odps-test-flatten", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_flatten", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_flatten", "account_type": "ALIYUN", "access_id": "LTAI5tM2iHkTd4W69nof****", "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
執行以下命令,啟動Kafka-connector任務。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
結果驗證。
通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢資料寫入結果。
set odps.sql.allow.fullscan=true; select * from table_flatten;
返回結果如下:
# json資料被解析寫入MaxCompute表中,且支援json巢狀型別exteninfo為JSON欄位 +-------+------------+--------+-----+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+--------+-----+------+------------+----+ | topic_flatten | 0 | 0 | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 | | topic_flatten | 0 | 1 | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 | +-------+------------+--------+-----+------+------------+----+
異常資料處理樣本
資料準備。
通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,建立目標MaxCompute表。
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);
建立Kafka資料。
在
$KAFKA_HOME/bin/
目錄下,執行以下命令,建立Kafka Topic。topic_abnormal
Topic。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormal
runtime_error
異常訊息Topic。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error
說明當資料寫入發生未知錯誤(通常是Kafka資料與MaxCompute表格式不匹配),異常資料會被寫入到
runtime_error
Topic中。
執行以下命令,建立Kafka訊息。
以下訊息中,其中一條資料格式與目標MaxCompute表格式不匹配。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true >100 {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}} >101 {"id":101,"name":"json-4","extendinfos":"null"} >102 {"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}}
(可選)啟動
Kafka-connector
服務。具體操作,請參見配置並啟動Kafka-connector服務。說明若
Kafka-connector
服務已啟動,可跳過此步驟。建立並配置
odps-sink-connector.json
檔案,並將odps-sink-connector.json
檔案上傳至任意位置。本文以$KAFKA_HOME/config
路徑為例。odps-sink-connector.json
檔案內容樣本如下,關於odps-sink-connector.json
檔案詳情介紹,請參見配置並啟動Kafka-connector任務。{ "name": "odps-test-runtime-error", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_abnormal", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "test_flatten", "account_type": "ALIYUN", "access_id": "LTAI5tM2iHkTd4W69nof****", "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000", "runtime.error.topic.name":"runtime_error", "runtime.error.topic.bootstrap.servers":"http://XXXX", "skip_error":"false" } }
執行以下命令,啟動Kafka-connector任務。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
結果驗證。
查詢MaxCompute表資料
通過使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢資料寫入結果。
set odps.sql.allow.fullscan=true; select * from table_flatten;
返回結果如下:
# 我們看到最後兩條資料,因為設定了skip_error參數為true,所以id為101的資料沒有被寫入MaxCompute,且沒有block後面資料的寫入。 +-------+------------+------------+------------+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+------------+------------+------+------------+----+ | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 | | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 | | flatten_test | 0 | 2 | 100 | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 4 | 102 | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | +-------+------------+------------+------------+------+------------+----+
查詢
runtime_error
Topic的訊息在
$KAFKA_HOME/bin/
目錄下,執行以下命令,查看訊息寫入結果。sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning
返回結果如下:
# 異常資料被成功寫入runtime_error訊息佇列中 {"id":101,"name":"json-4","extendinfos":"null"}