This topic describes how to create a MaxCompute sink connector to export data from an ApsaraMQ for Kafka topic to a MaxCompute table.
Prerequisites
For information about the prerequisites, see Prerequisites.
Usage notes
If you want to use the partition feature of MaxCompute, you must create an additional partition key column whose name is time and type is string when you create a table.
Step 1: Create MaxCompute resources
Create a table on the MaxCompute client. For more information, see Create a table.
In this example, a table named kafka_to_maxcompute is created. The table contains three columns, and the partition feature is enabled. The following code shows the statement that is executed to create the table:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);
The following code shows the statement that is executed to create the table if the partition feature is disabled:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);
If the statement is executed, the following result is displayed.
On the Tables page, view the information about the created table.
Step 2: Create and start a MaxCompute sink connector
Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.
In the left-side navigation pane, choose .
On the Tasks page, click Create Task.
In the Create Task page, configure the Task Name and Description parameters. Then, follow the on-screen instructions to configure other parameters.
Task Creation
In the Source step, set the Data Provider parameter to Message Queue for Apache Kafka and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.
Parameter
Description
Example
Region
The region where the ApsaraMQ for Kafka instance resides.
China (Hangzhou)
Message Queue for Apache Kafka Instance
The ID of the ApsaraMQ for Kafka instance in which the data that you want to route are produced.
alikafka_post-cn-9hdsbdhd****
Topic
The topic on the ApsaraMQ for Kafka instance in which the data that you want to route are produced.
guide-sink-topic
Group ID
The ID of the group on the ApsaraMQ for Kafka instance in which the data that you want to route are produced.
Quickly Create: The system automatically creates a group whose ID is in the GID_EVENTBRIDGE_xxx format.
Use Existing Group: Select the ID of an existing group that is not being used. If you select an existing group that is being used, the publishing and subscription of existing messages are affected.
Use Existing Group
Consumer Offset
Latest Offset: Messages are consumed from the latest offset.
Earliest Offset: Messages are consumed from the earliest offset.
Latest Offset
Network Configuration
If cross-border data transmission is required, select Internet. In other cases, select Basic Network.
Basic Network
Data Format
The data format feature is used to encode binary data delivered from the source into a specific data format. Multiple data formats are supported. If you do not have special requirements on encoding, specify Json as the value.
Json: Binary data is encoded into JSON-formatted data based on UTF-8 encoding and then put into the payload. This is the default value.
Text: Binary data is encoded into strings based on UTF-8 encoding and then put into the payload.
Binary: Binary data is encoded into strings based on Base64 encoding and then put into the payload.
Json
Messages
The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.
2000
Interval (Unit: Seconds)
The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 indicates that messages are sent immediately after aggregation.
3
In the Filtering step, define a data pattern to filter data. For more information, see Message filtering.
In the Transformation step, specify a data cleansing method to implement data processing capabilities such as splitting, mapping, enrichment, and dynamic routing. For more information, see Use Function Compute to perform message cleansing.
In the Sink step, set the Service Type parameter to MaxCompute acs.maxcompute and then follow the on-screen instructions to configure other parameters. The following table describes the parameters.
Parameter
Description
Example
AccessKey ID
The AccessKey ID of your Alibaba Cloud account. The AccessKey ID is used to access MaxCompute.
LTAI5tHPVCZywsoEVvFu****
AccessKey Secret
The AccessKey secret of your Alibaba Cloud account.
4RAKUQpZtUntDgvoKu0tvrkrOM****
MaxCompute Project Name
The MaxCompute project that you created.
test_compute
MaxCompute Table Name
The MaxCompute table that you created.
kafka_to_maxcompute
MaxCompute Table Input Parameter
After you select the MaxCompute table, the column name and type of the table are displayed here. You need to configure only the Value Extraction Rule parameter. The following code shows how to configure the value extraction rule of a message. In this example, the value of the topic column is extracted from the topic field of the message. Therefore, the Value Extraction Rule parameter is set to
$.topic
.{ 'data': { 'topic': 't_test', 'partition': 2, 'offset': 1, 'timestamp': 1717048990499, 'headers': { 'headers': [], 'isReadOnly': False }, 'key': 'MaxCompute-K1', 'value': 'MaxCompute-V1' }, 'id': '9b05fc19-9838-4990-bb49-ddb942307d3f-2-1', 'source': 'acs:alikafka', 'specversion': '1.0', 'type': 'alikafka:Topic:Message', 'datacontenttype': 'application/json; charset=utf-8', 'time': '2024-05-30T06:03:10.499Z', 'aliyunaccountid': '1413397765616316' }
topic:
$.data.topic
valuename:
$.data.value
valueage:
$.data.offset
Partitioning
Enable
Disable
Enable
Partition Dimension
This parameter is required only if the Partitioning parameter is set to Enable.
Time variables {yyyy}, {MM}, {dd}, {HH}, and {mm} are supported. These variables correspond to year, month, day, hour, and minute, respectively. Time variables are case-sensitive.
Constants are supported.
{yyyy}-{MM}-{dd}.{HH}:{mm}.suffix
Network Settings
VPC: Messages in ApsaraMQ for Kafka are delivered to MaxCompute in a virtual private cloud (VPC).
Internet: Messages in ApsaraMQ for Kafka are delivered to MaxCompute over the Internet.
Internet
VPC
The VPC ID. This parameter is required only if you set the Network Settings parameter to VPC.
vpc-bp17fapfdj0dwzjkd****
vSwitch
The vSwitch ID. This parameter is required only if you set the Network Settings parameter to VPC.
vsw-bp1gbjhj53hdjdkg****
Security Group
The security group ID. This parameter is required only if you set the Network Settings parameter to VPC.
test_group
Task Property
Configure the retry policy that you want to use when events fail to be pushed and the method that you want to use to handle faults. For more information, see Retry policies and dead-letter queues.
Click Save. On the Tasks page, find the MaxCompute sink connector that you created. When the status in the Status column changes from Starting to Running, the connector is created.
Step 3: Test the MaxCompute sink connector
On the Tasks page, find the MaxCompute sink connector that you created and click the name of the source topic in the Event Source column.
- On the Topic Details page, click Send Message.
In the Start to Send and Consume Message panel, configure the parameters based on the following figure and click OK.
Go to the MaxCompute console and execute the following SQL statement to query the information about the partition.
show PARTITIONS kafka_to_maxcompute;
The following result is returned.
Execute the following statement based on the partition information to query data in the partition:
SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";
The following result is returned.