All Products
Search
Document Center

ApsaraMQ for Kafka:Create a MaxCompute sink connector

Last Updated:Nov 07, 2024

This topic describes how to create a MaxCompute sink connector to export data from a data source topic of a ApsaraMQ for Kafka instance to a MaxCompute table.

Prerequisites

The following requirements must be met:

  • ApsaraMQ for Kafka

    • The connector feature is enabled for the ApsaraMQ for Kafka instance. For more information, see Enable the connector feature.

    • A topic is created in the ApsaraMQ for Kafka instance. For more information, see Step 1: Create a topic.

      A topic named maxcompute-test-input is used in this example.

  • MaxCompute

    • A MaxCompute table is created on the MaxCompute client. For more information, see Create tables.

      In this example, a MaxCompute table named test_kafka is created in a project named connector_test. You can execute the following statement to create a MaxCompute table named test_kafka:

      CREATE TABLE IF NOT EXISTS test_kafka(topic STRING,partition BIGINT,offset BIGINT,key STRING,value STRING) PARTITIONED by (pt STRING);
  • Optional:EventBridge

    Note

    EventBridge is required to be activated only when the instance that contains the data source topic is in the China (Hangzhou) or China (Chengdu) region.

Precautions

  • You can only export data from a data source topic of a ApsaraMQ for Kafka instance to a MaxCompute table within the same region. For more information about the limits on connectors, see Limits.

  • If the instance that contains the data source topic is in the China (Hangzhou) or China (Chengdu) region, the connector task is published to EventBridge.

    • At present, EventBridge is free of charge. For more information, see Billing.

    • When you create a connector, EventBridge creates the AliyunServiceRoleForEventBridgeSourceKafka service-linked role for you.

      • If the service-linked role is not available, EventBridge automatically creates one for you to allow EventBridge to access ApsaraMQ for Kafka.

      • If the service-linked role is available, EventBridge does not create a new one.

      For more information about service-linked roles, see Service-linked roles.

    • You cannot view the operational logs of connector tasks that are published to EventBridge. After a connector task is completed, you can view the consumption details of the groups that subscribe to the data source topic to see the status of the connector task. For more information, see View consumer details.

Procedure

To export data from a data source topic of a ApsaraMQ for Kafka instance to a MaxCompute table by using a MaxCompute sink connector, perform the following steps:

  1. Grant ApsaraMQ for Kafka the permissions to access MaxCompute.

  2. Optional: Create the topics and group that are required by a MaxCompute sink connector.

    If you do not want to manually create the topics and group, skip this step and set the Resource Creation Method parameter to Auto in the next step.

    Important

    Specific topics that are required by a MaxCompute sink connector must use a local storage engine. If the major version of your ApsaraMQ for Kafka instance is 0.10.2, topics that use a local storage engine cannot be created manually and must be created automatically.

    1. Create the topics that are required by a MaxCompute sink connector

    2. Create the group that is required by a MaxCompute sink connector

  3. Create and deploy a MaxCompute sink connector

  4. Verify the result.

    1. Send a test message

    2. View data in the MaxCompute table

Create a RAM role

You cannot select ApsaraMQ for Kafka as the trusted service when you create a RAM role. Therefore, select any service that can be the trusted service first. Then, manually modify the trust policy of the RAM role.

  1. Log on to the RAM console.

  2. In the left-side navigation pane, choose Identities > Roles.

  3. On the Roles page, click Create Role.

  4. In the Create Role panel, perform the following operations:

    1. Select Alibaba Cloud Service as the trusted entity and click Next.

    2. Set the Role Type parameter to Normal Service Role. In the RAM Role Name field, enter AliyunKafkaMaxComputeUser1. From the Select Trusted Service drop-down list, select MaxCompute. Then, click OK.

  5. On the Roles page, find and click AliyunKafkaMaxComputeUser1.

  6. On the AliyunKafkaMaxComputeUser1 page, click the Trust Policy Management tab and then click Edit Trust Policy.

  7. In the Edit Trust Policy panel, replace fc in the script with alikafka and click OK.

    pg_ram

Add permissions

To use a MaxCompute sink connector to export messages to a MaxCompute table, you must grant the following permissions to the RAM role.

Object

Operation

Description

Project

CreateInstance

The permissions to create instances in projects.

Table

Describe

The permissions to read the metadata of tables.

Table

Alter

The permissions to modify the metadata of tables and the permissions to create and delete partitions.

Table

Update

The permissions to overwrite data in tables and insert data into tables.

For more information about the preceding permissions and how to grant these permissions, see MaxCompute permissions.

To grant the required permissions to AliyunKafkaMaxComputeUser1, perform the following steps:

  1. Log on to the MaxCompute client.

  2. Run the following command to add the AliyunKafkaMaxComputeUser1 RAM role as a RAM user:

    add user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
    Note

    Replace <accountid> with the ID of your Alibaba Cloud account.

  3. Grant the RAM user the minimum permissions that are required to access MaxCompute.

    1. Run the following command to grant the RAM user the permissions on the connector_test project:

      grant CreateInstance on project connector_test to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      Note

      Replace <accountid> with the ID of your Alibaba Cloud account.

    2. Run the following command to grant the RAM user the permissions on the test_kafka table:

      grant Describe, Alter, Update on table test_kafka to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      Note

      Replace <accountid> with the ID of your Alibaba Cloud account.

Create the topics that are required by a MaxCompute sink connector

In the ApsaraMQ for Kafka console, you can manually create the five topics that a MaxCompute sink connector requires. The five topics are the task offset topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. The five topics differ in partition count and storage engine. For more information, see Parameters in the Configure Source Service step.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

    Important

    You must create topics in the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if the producers and consumers of messages run on an ECS instance that is deployed in the China (Beijing) region, the topic must also be created in the China (Beijing) region.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Topics.

  5. On the Topics page, click Create Topic.

  6. In the Create Topic panel, specify the properties of the topic and click OK.

    Parameter

    Description

    Example

    Name

    The topic name.

    demo

    Description

    The topic description.

    demo test

    Partitions

    The number of partitions in the topic.

    12

    Storage Engine

    Note

    You can specify the storage engine type only if you use a Professional Edition instance. If you use a Standard Edition instance, cloud storage is selected by default.

    The type of the storage engine that is used to store messages in the topic.

    ApsaraMQ for Kafka supports the following types of storage engines:

    • Cloud Storage: If you select this value, the system uses Alibaba Cloud disks for the topic and stores data in three replicas in distributed mode. This storage engine features low latency, high performance, long durability, and high reliability. If you set the Instance Edition parameter to Standard (High Write) when you created the instance, you can set this parameter only to Cloud Storage.

    • Local Storage: If you select this value, the system uses the in-sync replicas (ISR) algorithm of open source Apache Kafka and stores data in three replicas in distributed mode.

    Cloud Storage

    Message Type

    The message type of the topic. Valid values:

    • Normal Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. If a broker in the cluster fails, the order of messages that are stored in the partitions may not be preserved. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.

    • Partitionally Ordered Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. If a broker in the cluster fails, messages are still stored in the partitions in the order in which the messages are sent. Messages in some partitions cannot be sent until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.

    Normal Message

    Log Cleanup Policy

    The log cleanup policy that is used by the topic.

    If you set the Storage Engine parameter to Local Storage, you must configure the Log Cleanup Policy parameter. You can set the Storage Engine parameter to Local Storage only if you use an ApsaraMQ for Kafka Professional Edition instance.

    ApsaraMQ for Kafka provides the following log cleanup policies:

    • Delete: the default log cleanup policy. If sufficient storage space is available in the system, messages are retained based on the maximum retention period. After the storage usage exceeds 85%, the system deletes the earliest stored messages to ensure service availability.

    • Compact: the log compaction policy that is used in Apache Kafka. Log compaction ensures that the latest values are retained for messages that have the same key. This policy is suitable for scenarios such as restoring a failed system or reloading the cache after a system restarts. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the information about the system status and configurations in a log-compacted topic.

      Important

      You can use log-compacted topics only in specific cloud-native components, such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos.

    Compact

    Tag

    The tags that you want to attach to the topic.

    demo

    After a topic is created, you can view the topic on the Topics page.

Create the group that is required by a MaxCompute sink connector

In the ApsaraMQ for Kafka console, you can manually create the group that is required by a MaxCompute sink connector. The name of the group must be in the connect-task name format. For more information, see Parameters in the Configure Source Service step.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Groups.

  5. On the Groups page, click Create Group.

  6. In the Create Group panel, enter a group name in the Group ID field and a group description in the Description field, attach tags to the group, and then click OK.

    After a consumer group is created, you can view the consumer group on the Groups page.

Create and deploy a MaxCompute sink connector

To create and deploy a MaxCompute sink connector that is used to export data from ApsaraMQ for Kafka to MaxCompute, perform the following steps:

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Connectors.

  5. On the Connectors page, click Create Connector.

  6. In the Create Connector wizard, perform the following steps:

    1. In the Configure Basic Information step, set the parameters that are described in the following table and click Next.

      Parameter

      Description

      Example

      Name

      The name of the connector. Take note of the following rules when you specify a connector name:

      • The connector name must be 1 to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).

      • Each connector name must be unique within a ApsaraMQ for Kafka instance.

      The name of the group that is used by the connector task must be in the connect-task name format. If you have not already created such a group, Message Queue for Apache Kafka automatically creates one for you.

      kafka-maxcompute-sink

      Instance

      The information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed.

      demo alikafka_post-cn-st21p8vj****

    2. In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, set the parameters that are described in the following table, and then click Next.

      Note

      If you have created the topics and consumer group in advance, set the Resource Creation Method parameter to Manual and enter the names of the created resources in the fields below. Otherwise, set the Resource Creation Method parameter to Auto.

      Table 1. Parameters in the Configure Source Service step

      Parameter

      Description

      Example

      Data Source Topic

      The name of the data source topic from which data is to be exported.

      maxcompute-test-input

      Consumer Thread Concurrency

      The number of concurrent consumer threads used to export data from the data source topic. Default value: 6. Valid values:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      Consumer Offset

      The offset where consumption starts. Valid values:

      • Earliest Offset: Consumption starts from the earliest offset.

      • Latest Offset: Consumption starts from the latest offset.

      Earliest Offset

      VPC ID

      The ID of the virtual private cloud (VPC) where the data export task runs. Click Configure Runtime Environment to display the parameter. The default value is the VPC ID that you specified when you deployed the ApsaraMQ for Kafka instance. You do not need to change the value.

      vpc-bp1xpdnd3l***

      vSwitch ID

      The ID of the vSwitch where the data export task runs. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the ApsaraMQ for Kafka instance. The default value is the vSwitch ID that you specified when you deployed the ApsaraMQ for Kafka instance.

      vsw-bp1d2jgg81***

      Failure Handling Policy

      Specifies whether to retain the subscription to a partition where a message send failure occurs. Click Configure Runtime Environment to display the parameter. Valid values:

      • Continue Subscription: retains the subscription to the partition where the error occurred and returned the logs.

      • Stop Subscription: stops the subscription to the partition where the error occurred and returned the logs.

      Note

      Continue Subscription

      Resource Creation Method

      The method to create the topics and group that are required by the MaxCompute sink connector. Click Configure Runtime Environment to display the parameter.

      • Auto

      • Manual

      Auto

      Connector Consumer Group

      The group that is used by the data export task of the connector. Click Configure Runtime Environment to display the parameter. The name of the group must be in the connect-task name format.

      connect-kafka-maxcompute-sink

      Task Offset Topic

      The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.

      • Topic: We recommend that you start the topic name with connect-offset.

      • Partitions: The number of partitions in the topic must be greater than 1.

      • Storage Engine: The storage engine of the topic must be set to Local Storage.

      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.

      connect-offset-kafka-maxcompute-sink

      Task Configuration Topic

      The topic that is used to store task configurations. Click Configure Runtime Environment to display the parameter.

      • Topic: We recommend that you start the topic name with connect-config.

      • Partitions: The topic can contain only one partition.

      • Storage Engine: The storage engine of the topic must be set to Local Storage.

      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.

      connect-config-kafka-maxcompute-sink

      Task Status Topic

      The topic that is used to store the task status. Click Configure Runtime Environment to display the parameter.

      • Topic: We recommend that you start the topic name with connect-status.

      • Partitions: We recommend that you set the number of partitions in the topic to 6.

      • Storage Engine: The storage engine of the topic must be set to Local Storage.

      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.

      connect-status-kafka-maxcompute-sink

      Dead-letter Queue Topic

      The topic that is used to store the error data of the Kafka Connect framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.

      • Topic: We recommend that you start the topic name with connect-error.

      • Partitions: We recommend that you set the number of partitions in the topic to 6.

      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.

      connect-error-kafka-maxcompute-sink

      Error Data Topic

      The topic that is used to store the error data of the connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.

      • Topic: We recommend that you start the topic name with connect-error.

      • Partitions: We recommend that you set the number of partitions in the topic to 6.

      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.

      connect-error-kafka-maxcompute-sink

    3. In the Configure Destination Service step, select MaxCompute as the destination service, set the parameters that are described in the following table, and then click Create.

      Note

      If the instance that contains the data source topic is in the China (Hangzhou) or China (Chengdu) region, the Service Authorization dialogue box appears when you select MaxCompute as the destination service. Click OK in the Service Authorization dialogue box, set the parameters described in the following table, and click Create.

      Parameter

      Description

      Example

      Endpoint

      The endpoint of MaxCompute. For more information, see Endpoints.

      • VPC endpoint: We recommend that you use the VPC endpoint because it has lower latency. The VPC endpoint can be used if the ApsaraMQ for Kafka instance and the MaxCompute project are in the same region.

      • Public endpoint: We recommend that you do not use the public endpoint because it has higher latency. The public endpoint can be used if the ApsaraMQ for Kafka instance and the MaxCompute project are in different regions. To use the public endpoint, you must enable Internet access for the connector. For more information, see Enable Internet access for a connector.

      http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api

      Workspace

      The name of the MaxCompute project to which you want to export data.

      connector_test

      Table

      The name of the MaxCompute table to which you want to export data.

      test_kafka

      Region for Table

      The region where the MaxCompute table is created.

      China (Hangzhou)

      Alibaba Cloud Account ID

      The ID of the Alibaba Cloud account that is used to access MaxCompute.

      188***

      RAM Role

      The name of the RAM role that is assumed by ApsaraMQ for Kafka. For more information, see Create a RAM role.

      AliyunKafkaMaxComputeUser1

      Mode

      The mode in which messages are exported to the MaxCompute sink connector. Default value: DEFAULT. Valid values:

      • KEY: Only the keys of messages are retained and written into the Key column of the MaxCompute table.

      • VALUE: Only the values of messages are retained and written into the Value column of the MaxCompute table.

      • DEFAULT: Both keys and values of messages are retained. Keys are written into the Key column and values are written into the Value column of the MaxCompute table.

        Important

        In the DEFAULT mode, the CSV format is not supported. You can select only the TEXT and BINARY formats.

      DEFAULT

      Format

      The format in which messages are exported to the MaxCompute sink connector. Default value: TEXT. Valid values:

      • TEXT: strings

      • BINARY: byte arrays

      • CSV: strings separated with commas (,)

        Important

        If you set the parameter to CSV, the DEFAULT mode is not supported. Only the KEY and VALUE modes are supported.

        • KEY mode: Only the keys of messages are retained. Keys are separated with commas (,) and then written into the MaxCompute table in the order of indexes.

        • VALUE mode: Only the values of messages are retained. Values are separated with commas (,) and then written into the MaxCompute table in the order of indexes.

      TEXT

      Partition

      The frequency at which partitions are created. Default value: HOUR. Valid values:

      • DAY: writes data into a new partition every day.

      • HOUR: writes data into a new partition every hour.

      • MINUTE: writes data into a new partition every minute.

      HOUR

      Time Zone

      The time zone of the ApsaraMQ for Kafka producer client that sends messages to the data source topic. Default value: GMT 08:00.

      GMT 08:00

      After the connector is created, you can view it on the Connectors page.

  7. Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.

Send a test message

After you deploy the MaxCompute sink connector, you can send a message to the data source topic in ApsaraMQ for Kafka to test whether the message can be exported to MaxCompute.

  1. On the Connectors page, find the connector that you want to manage and click Test in the Actions column.

  2. In the Send Message panel, configure the parameters to send a message for testing.

    • If you set the Sending Method parameter to Console, perform the following steps:

      1. In the Message Key field, enter the message key. Example: demo.

      2. In the Message Content field, enter the message content. Example: {"key": "test"}.

      3. Configure the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.

        • If you want to send the test message to a specific partition, click Yes and enter the partition ID in the Partition ID field. Example: 0. For information about how to query partition IDs, see View partition status.

        • If you do not want to send the test message to a specific partition, click No.

    • If you set the Sending Method parameter to Docker, run the Docker command in the Run the Docker container to produce a sample message section to send the test message.

    • If you set the Sending Method parameter to SDK, select an SDK for the required programming language or framework and an access method to send and subscribe to the test message.

View data in the MaxCompute table

After you send a message to the data source topic in ApsaraMQ for Kafka, you can log on to the MaxCompute client to check whether the message is received.

To view the test_kafka table, perform the following steps:

  1. Log on to the MaxCompute client.

  2. Run the following command to view the partitions of the table:

    show partitions test_kafka;

    In this example, the following result is returned:

    pt=11-17-2020 15
    
    OK
  3. Run the following command to view the data stored in the partitions:

    select * from test_kafka where pt ="11-17-2020 14";

    In this example, the following result is returned:

    +----------------------+------------+------------+-----+-------+---------------+
    | topic                | partition  | offset     | key | value | pt            |
    +----------------------+------------+------------+-----+-------+---------------+
    | maxcompute-test-input| 0          | 0          | 1   | 1     | 11-17-2020 14 |
    +----------------------+------------+------------+-----+-------+---------------+