The integration of MaxCompute and Kafka provides efficient and reliable data processing and analytics capabilities. The integration of MaxCompute and Kafka is suitable for scenarios that require real-time processing, large-scale data streams, and complex data analytics. This topic describes how to import data of ApsaraMQ for Kafka and self-managed Kafka to MaxCompute. This topic also provides examples on how to import data of self-managed Kafka to MaxCompute.
Import data from ApsaraMQ for Kafka to MaxCompute
MaxCompute is closely integrated with ApsaraMQ for Kafka. You can directly use MaxCompute sink connectors provided by ApsaraMQ for Kafka to continuously import data of specific topics to MaxCompute tables. You do not need to use third-party tools or perform custom development. For more information about how to create MaxCompute sink connectors, see Create MaxCompute sink connectors.
Import data from self-managed Apache Kafka to MaxCompute
Prerequisites
A Kafka service of V2.2 or later is deployed, and a Kafka topic is created. We recommend that you deploy a Kafka service of V3.4.0.
A MaxCompute project and a MaxCompute table are created. For more information, see Create a MaxCompute project and Create tables.
Precautions
The Kafka-connector service allows you to write Kafka data of the TEXT, CSV, JSON, or FLATTEN type to MaxCompute. Take note of the following items when you write different types of Kafka data. For more information about data types, see the description of the format parameter.
The following table describes the requirements for a MaxCompute table to which Kafka data of the TEXT or JSON type is written.
Field name
Data type
Required
topic
STRING
Yes.
partition
BIGINT
Yes.
offset
BIGINT
Yes.
key
If you write Kafka data of the TEXT type, the field must be of the STRING type.
If you write Kafka data of the JSON type, the field can be of the STRING or JSON type based on the data type settings of written data.
This field is required if you need to synchronize the key in a Kafka message to the MaxCompute table. For more information about the mode in which Kafka messages are synchronized to MaxCompute, see the description of the mode parameter.
value
If you write Kafka data of the TEXT type, the field must be of the STRING type.
If you write Kafka data of the JSON type, the field can be of the STRING or JSON type based on the data type settings of written data.
This field is required if you need to synchronize the value in a Kafka message to the MaxCompute table. For more information about the mode in which Kafka messages are synchronized to MaxCompute, see the description of the mode parameter.
pt
STRING (partition field)
Yes.
If you write Kafka data of the FLATTEN or CSV type to MaxCompute, the fields listed in the following table must be included and must be of the required data types. You can also configure custom fields based on the written data.
Field name
Data type
topic
STRING
partition
BIGINT
offset
BIGINT
pt
STRING (partition field)
If you write Kafka data of the CSV type to a MaxCompute table, the custom field sequence and field types in the MaxCompute table must be consistent with those of the Kafka data. This ensures that the Kafka data can be correctly written.
If you write Kafka data of the FLATTEN type to a MaxCompute table, the custom field names in the MaxCompute table must be consistent with the field names in the Kafka data. This ensures that the Kafka data can be correctly written.
For example, if the Kafka data of the FLATTEN type that you want to write is
{"A":a,"B":"b","C":{"D":"d","E":"e"}}
, you can execute the following statement to create a MaxCompute table for storing the data.CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, A BIGINT, B STRING, C JSON ) PARTITIONED BY (pt STRING);
Configure and start the Kafka-connector service
In the Linux environment, run the following command in the CLI or click the download link to download the
kafka-connector-2.0.jar
package:wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar
To prevent dependency conflicts, we recommend that you create a subfolder such as
connector
in$KAFKA_HOME/libs
to store thekafka-connector-2.0.jar
package.NoteIf the deployment environment of the
kafka-connector-2.0.jar
package is not the same as the deployment environment of Kafka data, you must configure and start theKafka-connector
service by following the instructions provided in aliware-kafka-demos.In the
$KAFKA_HOME/config
directory, configure theconnect-distributed.properties
file.Add the following configurations to the
connect-distributed.properties
file.## Add the following content: plugin.path=<KAFKA_HOME>/libs/connector ## Update the values of the key.converter and value.converter parameters. key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
Run the following command in the
$KAFKA_HOME/
directory to start theKafka-connector
service:## Run the following command: bin/connect-distributed.sh config/connect-distributed.properties &
Configure and start a Kafka-connector task
Create and configure the
odps-sink-connector.json
file and upload theodps-sink-connector.json
file to any location.The following code and tables describe the content and parameters of the
odps-sink-connector.json
file.{ "name": "Kafka connector task name", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "your_topic", "endpoint": "endpoint", "tunnel_endpoint": "your_tunnel endpoint", "project": "project", "schema":"default", "table": "your_table", "account_type": "account type (STS or ALIYUN)", "access_id": "access id", "access_key": "access key", "account_id": "account id for sts", "sts.endpoint": "sts endpoint", "region_id": "region id for sts", "role_name": "role name for sts", "client_timeout_ms": "STS Token valid period (ms)", "format": "TEXT", "mode": "KEY", "partition_window_type": "MINUTE", "use_streaming": false, "buffer_size_kb": 65536, "sink_pool_size":"150", "record_batch_size":"8000", "runtime.error.topic.name":"kafka topic when runtime errors happens", "runtime.error.topic.bootstrap.servers":"kafka bootstrap servers of error topic queue", "skip_error":"false" } }
Common parameters
Parameter
Required
Description
name
Yes
The name of the task. The name must be unique.
connector.class
Yes
The class name of the
Kafka-connector
service. Default value:com.aliyun.odps.kafka.connect.MaxComputeSinkConnector
.tasks.max
Yes
The maximum number of consumer processes in the
Kafka-connector
service. The value must be an integer greater than 0.topics
Yes
The name of the Kafka topic.
endpoint
Yes
The endpoint of MaxCompute.
You must configure this parameter based on the region and network connection type that you selected when you create the MaxCompute project. For more information about the endpoints of different network types in each region, see Endpoints.
tunnel_endpoint
No
The public endpoint of MaxCompute Tunnel.
If you do not configure this parameter, traffic is automatically routed to the Tunnel endpoint that corresponds to the network in which MaxCompute resides. If you configure this parameter, traffic is routed to the specified endpoint and automatic routing is not performed.
For more information about the Tunnel endpoints of different network types in each region, see Endpoints.
project
Yes
The name of the MaxCompute project that you want to access.
schema
No
This parameter is required if the destination MaxCompute project has a three-layer schema model. Default value: default.
If the destination MaxCompute project does not have a three-layer schema model, you do not need to configure this parameter.
For more information about schemas, see Schema-related operations.
table
Yes
The name of the table in the destination MaxCompute project.
format
No
The format of the written message. Valid values:
TEXT: a string. This is the default value.
BINARY: a byte array.
CSV: a list of strings separated by commas (,).
JSON: a JSON string. For more information about MaxCompute JSON data types, see Instructions for using the JSON type of MaxCompute (beta version).
FLATTEN: a JSON string. The keys and values in the JSON string are parsed and written to the specified MaxCompute table. The keys in the JSON string must correspond to the column names in the MaxCompute table.
For more information about how to import messages in different formats, see Examples.
mode
No
The mode in which messages are synchronized to MaxCompute. Valid values:
KEY: Only the key of the message is retained and written to the destination MaxCompute table.
VALUE: Only the value of the message is retained and written to the destination MaxCompute table.
DEFAULT: Both the key and value of the message are retained and written to the destination MaxCompute table. This is the default value.
If you set this parameter to DEFAULT, only data of the TEXT or BINARY type can be written.
partition_window_type
No
Data is partitioned based on the system time. Valid values: DAY, HOUR, and MINUTE. Default value: HOUR.
use_streaming
No
Specifies whether to use Streaming Tunnel. Valid values:
false: Streaming Tunnel is not used. This is the default value.
true: Streaming Tunnel is used.
buffer_size_kb
No
The internal buffer size of the odps partition writer. Unit: KB. The default size is 65,536 KB.
sink_pool_size
No
The maximum number of threads for multi-thread writing. The default value is the number of CPU cores in the system.
record_batch_size
No
The maximum number of messages that can be simultaneously sent by a thread in a Kafka-connector task.
skip_error
No
Specifies whether to skip records generated when unknown errors occur. Valid values:
false: The records are not skipped. This is the default value.
true: The records are skipped.
NoteIf skip_error is set to false and the runtime.error.topic.name parameter is not configured, subsequent data write operations are stopped, processes are blocked, and an exception is logged when an unknown error occurs.
If the skip_error is set to true and the runtime.error.topic.name parameter is not configured, the process for data writing continues to write data, and abnormal data is discarded.
If the skip_error parameter is set to false and the runtime.error.topic.name parameter is configured, the process for data writing continues to write data, and abnormal data is recorded in the topic specified by runtime.error.topic.name topic.
For more examples on processing abnormal data, see Process abnormal data.
runtime.error.topic.name
No
The name of the Kafka topic to which data is written when an unknown error occurs.
runtime.error.topic.bootstrap.servers
No
The addresses in the bootstrap-servers configuration. The addresses are the addresses of the Kafka brokers to which data is written when an unknown error occurs.
account_type
Yes
The method that is used to access the destination MaxCompute service. Valid values: STS and ALIYUN. Default value: ALIYUN.
You must configure different access credential parameters for different methods to access MaxCompute. For more information, see Access MaxCompute by using the ALIYUN method and Access MaxCompute by using the STS method in this topic.
Access MaxCompute by using the ALIYUN method: You must configure the following parameters in addition to the common parameters.
Parameter
Description
access_id
The AccessKey ID of your Alibaba Cloud account or a RAM user within the Alibaba Cloud account.
You can obtain the AccessKey ID from the AccessKey Pair page.
access_key
The AccessKey secret that corresponds to the AccessKey ID.
You can obtain the AccessKey secret from the AccessKey Pair page.
Access MaxCompute by using the STS method: You must configure the following parameters in addition to the common parameters.
Parameter
Description
account_id
The ID of the account that is used to access the destination MaxCompute project. You can view your account ID in the Account Center.
region_id
The ID of the region in which the destination MaxCompute project resides. For more information about the ID of each region, see Endpoints.
role_name
The name of the role that is used to access the destination MaxCompute project. You can view the role name on the Roles page.
client_timeout_ms
The interval at which an STS token is refreshed. Unit: milliseconds. Default value: 11.
sts.endpoint
The STS service endpoint that is required when you use an STS token for identity authentication.
For more information about the endpoints of different network types in each region, see Endpoints.
Run the following command to start the Kafka-connector task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json
Examples
Write data of the TEXT type
Prepare data.
Create a MaxCompute table by using the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL.
CREATE TABLE IF NOT EXISTS table_text( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value STRING ) PARTITIONED BY (pt STRING);
Create Kafka data.
In the
$KAFKA_HOME/bin/
directory, run the following command to create a Kafka topic. In this example, a Kafka topic namedtopic_text
is created.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text
Run the following command to create Kafka messages:
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true >123 abc >456 edf
(Optional) Start the
Kafka-connector
service. For more information, see Configure and start the Kafka-connector service.NoteIf the
Kafka-connector
service is started, skip this step.Create and configure the
odps-sink-connector.json
file, and upload theodps-sink-connector.json
file to any location. In this example, the odps-sink-connector.json file is uploaded to the$KAFKA_HOME/config
directory.The following code shows the content of the
odps-sink-connector.json
file. For more information about theodps-sink-connector.json
file, see Configure and start a Kafka-connector task.{ "name": "odps-test-text", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_text", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_text", "account_type": "ALIYUN", "access_id": "LTAI5tM2iHkTd4W69nof****", "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"TEXT", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
Run the following command to start the Kafka-connector task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
Verify the result.
Run the following commands on the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL to query the data write result:
set odps.sql.allow.fullscan=true; select * from table_text;
The following result is returned:
# The mode value in the odps-sink-connector.json configuration file is VALUE. Therefore, only the value is retained and the key field is NULL. +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | topic_text | 0 | 0 | NULL | abc | 07-13-2023 21:13 | | topic_text | 0 | 1 | NULL | edf | 07-13-2023 21:13 | +-------+------------+------------+-----+-------+----+
Write data of the CSV type
Prepare data.
Create a destination MaxCompute table by using the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL.
CREATE TABLE IF NOT EXISTS table_csv( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, region STRING ) PARTITIONED BY (pt STRING);
Create Kafka data.
In the
$KAFKA_HOME/bin/
directory, run the following command to create a Kafka topic. In this example, a Kafka topic namedtopic_csv
is created.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv
Run the following command to create Kafka messages:
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true >123 1103,zhangsan,china >456 1104,lisi,usa
(Optional) Start the
Kafka-connector
service. For more information, see Configure and start the Kafka-connector service.NoteIf the
Kafka-connector
service is started, skip this step.Create and configure the
odps-sink-connector.json
file, and upload theodps-sink-connector.json
file to any location. In this example, the odps-sink-connector.json file is uploaded to the$KAFKA_HOME/config
directory.The following code shows the content of the
odps-sink-connector.json
file. For more information about theodps-sink-connector.json
file, see Configure and start a Kafka-connector task.{ "name": "odps-test-csv", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_csv", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_csv", "account_type": "ALIYUN", "access_id": "LTAI5tM2iHkTd4W69nof****", "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****", "partition_window_type": "MINUTE", "format":"CSV", "mode":"VALUE", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
Run the following command to start the Kafka-connector task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
Verify the result.
Run the following commands on the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL to query the data write result:
set odps.sql.allow.fullscan=true; select * from table_csv;
The following result is returned:
+-------+------------+------------+------------+------+--------+----+ | topic | partition | offset | id | name | region | pt | +-------+------------+------------+------------+------+--------+----+ | csv_test | 0 | 0 | 1103 | zhangsan | china | 07-14-2023 00:10 | | csv_test | 0 | 1 | 1104 | lisi | usa | 07-14-2023 00:10 | +-------+------------+------------+------------+------+--------+----+
Write data of the JSON type
Prepare data.
Create a destination MaxCompute table by using the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL.
CREATE TABLE IF NOT EXISTS table_json( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value JSON ) PARTITIONED BY (pt STRING);
Create Kafka data.
In the
$KAFKA_HOME/bin/
directory, run the following command to create a Kafka topic. In this example, a topic namedtopic_json
is created.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json
Run the following command to create Kafka messages:
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true >123 {"id":123,"name":"json-1","region":"beijing"} >456 {"id":456,"name":"json-2","region":"hangzhou"}
(Optional) Start the
Kafka-connector
service. For more information, see Configure and start the Kafka-connector service.NoteIf the
Kafka-connector
service is started, skip this step.Create and configure the
odps-sink-connector.json
file, and upload theodps-sink-connector.json
file to any location. In this example, the odps-sink-connector.json file is uploaded to the$KAFKA_HOME/config
directory.The following code shows the content of the
odps-sink-connector.json
file. For more information about theodps-sink-connector.json
file, see Configure and start a Kafka-connector task.{ "name": "odps-test-json", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_json", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_json", "account_type": "ALIYUN", "access_id": "LTAI5tM2iHkTd4W69nof****", "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"JSON", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
Run the following command to start the Kafka-connector task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
Verify the result.
Run the following commands on the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL to query the data write result:
set odps.sql.allow.fullscan=true; select * from table_json;
The following result is returned:
# Write JSON data to the value field. +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | Topic_json | 0 | 0 | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 | | Topic_json | 0 | 1 | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 | +-------+------------+------------+-----+-------+----+
Write data of the FLATTEN type
Prepare data.
Create a destination MaxCompute table by using the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL.
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);
Create Kafka data.
In the
$KAFKA_HOME/bin/
directory, run the following command to create a Kafka topic. In this example, a topic namedtopic_flatten
is created../kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten
Run the following command to create Kafka messages:
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true >123 {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}} >456 {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}
(Optional) Start the
Kafka-connector
service. For more information, see Configure and start the Kafka-connector service.NoteIf the
Kafka-connector
service is started, skip this step.Create and configure the
odps-sink-connector.json
file, and upload theodps-sink-connector.json
file to any location. In this example, the odps-sink-connector.json file is uploaded to the$KAFKA_HOME/config
directory.The following code shows the content of the
odps-sink-connector.json
file. For more information about theodps-sink-connector.json
file, see Configure and start a Kafka-connector task.{ "name": "odps-test-flatten", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_flatten", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_flatten", "account_type": "ALIYUN", "access_id": "LTAI5tM2iHkTd4W69nof****", "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
Run the following command to start the Kafka-connector task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
Verify the result.
Run the following commands on the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL to query the data write result:
set odps.sql.allow.fullscan=true; select * from table_flatten;
The following result is returned:
# JSON data is parsed and written to the MaxCompute table. The exteninfo field in the nested JSON format can be a JSON field. +-------+------------+--------+-----+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+--------+-----+------+------------+----+ | topic_flatten | 0 | 0 | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 | | topic_flatten | 0 | 1 | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 | +-------+------------+--------+-----+------+------------+----+
Process abnormal data
Prepare data.
Create a destination MaxCompute table by using the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL.
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);
Create Kafka data.
In the
$KAFKA_HOME/bin/
directory, run commands to create the following Kafka topics:topic_abnormal
sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormal
runtime_error
sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error
NoteIf an unknown error occurs when data is written, the abnormal data is written to the
runtime_error
topic. In most cases, an unknown error occurs because the format of Kafka data is not consistent with the format of the MaxCompute table.
Run the following command to create Kafka messages:
In the following messages, the data format of one message is not the same as the format of the MaxCompute table.
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true >100 {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}} >101 {"id":101,"name":"json-4","extendinfos":"null"} >102 {"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}}
(Optional) Start the
Kafka-connector
service. For more information, see Configure and start the Kafka-connector service.NoteIf the
Kafka-connector
service is started, skip this step.Create and configure the
odps-sink-connector.json
file, and upload theodps-sink-connector.json
file to any location. In this example, the odps-sink-connector.json file is uploaded to the$KAFKA_HOME/config
directory.The following code shows the content of the
odps-sink-connector.json
file. For more information about theodps-sink-connector.json
file, see Configure and start a Kafka-connector task.{ "name": "odps-test-runtime-error", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_abnormal", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "test_flatten", "account_type": "ALIYUN", "access_id": "LTAI5tM2iHkTd4W69nof****", "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000", "runtime.error.topic.name":"runtime_error", "runtime.error.topic.bootstrap.servers":"http://XXXX", "skip_error":"false" } }
Run the following command to start the Kafka-connector task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
Verify the result.
Query data in the MaxCompute table.
Run the following commands on the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL to query the data write result:
set odps.sql.allow.fullscan=true; select * from table_flatten;
The following result is returned:
# The last two records are displayed. This is because the skip_error parameter is set to true. The data with the id of 101 is not written to the MaxCompute table, and subsequent records are not blocked from being written to the MaxCompute table. +-------+------------+------------+------------+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+------------+------------+------+------------+----+ | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 | | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 | | flatten_test | 0 | 2 | 100 | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 4 | 102 | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | +-------+------------+------------+------------+------+------------+----+
Query messages in the
runtime_error
topic.In the
$KAFKA_HOME/bin/
directory, run the following command to view the message write result:sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning
The following result is returned:
# Abnormal data is written to the runtime_error topic. {"id":101,"name":"json-4","extendinfos":"null"}