All Products
Search
Document Center

ApsaraMQ for Kafka:Create an OSS sink connector

Last Updated:Sep 14, 2023

This topic describes how to create an OSS sink connector to synchronize data from a source topic in a ApsaraMQ for Kafka instance to Object Storage Service (OSS).

Prerequisites

The following requirements are met:

Usage notes

  • When you synchronize data from a topic in a ApsaraMQ for Kafka instance to an OSS bucket, make sure that the Message Queue for Apache Kafka instance and OSS bucket reside in the same region and that Function Compute is available in the specified region. Message Queue for Apache Kafka synchronizes your data to Function Compute first, which then synchronizes the data to OSS. For more information about the limits on connectors, see Limits.
  • OSS sink connectors export data by using Function Compute. Function Compute provides a certain amount of resources for free. When you use up this free quota, you are charged for the Function Compute resources that you use based on the billing rules. For more information, see Billing overview.
  • Function Compute allows you to query the logs of function calls. For more information, see Configure the logging feature.
  • ApsaraMQ for Kafka serializes messages into UTF-8-encoded strings for transfer. Message Queue for Apache Kafka does not support binary data.

Create and deploy an OSS sink connector

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.

  3. In the left-side navigation pane, click Connectors.
  4. On the Connectors page, select the instance in which the data source topic resides from the Select Instance drop-down list and click Create Connector.
  5. In the Create Connector wizard, perform the following steps:
    1. In the Configure Basic Information step, configure parameters and click Next. The following table describes the parameters.
      Important By default, Authorize to Create Service Linked Role is selected. This means that ApsaraMQ for Kafka will create a service-linked role based on the following rules:
      • If no service-linked role is created, ApsaraMQ for Kafka automatically creates a service-linked role for you to use the OSS sink connector to synchronize data from ApsaraMQ for Kafka to OSS.
      • If a service-linked role is available, ApsaraMQ for Kafka does not create a new role.
      For more information about service-linked roles, see Service-linked roles.
      ParameterDescriptionExample
      NameThe name of the connector. Specify a connector name based on the following naming conventions:
      • The connector name must be 1 to 48 characters in length and can contain digits, lowercase letters, and hyphens (-). The name cannot start with a hyphen (-).
      • The name must be unique within a ApsaraMQ for Kafka instance.

      The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format.Group If you have not created such a consumer group, Message Queue for Apache Kafka automatically creates one for you.Group

      kafka-oss-sink
      InstanceThe information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, configure parameters, and then click Next. The following table describes the parameters.
      ParameterDescriptionExample
      Data Source TopicThe name of the topic from which data is to be synchronized. oss-test-input
      Consumer Thread ConcurrencyThe number of concurrent consumer threads that are used to synchronize data from the source topic. Default value: 6. Valid values:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      Consumer OffsetThe consumer offset from which you want message consumption to start. Valid values:
      • Earliest Offset: Message consumption starts from the earliest consumer offset.
      • Latest Offset: Message consumption starts from the latest consumer offset.
      Earliest Offset
      VPC IDThe ID of the virtual private cloud (VPC) in which the source instance is deployed. Click Configure Runtime Environment to display the parameter. By default, the VPC ID that you specified when you deployed the source ApsaraMQ for Kafka instance is displayed. You do not need to configure this parameter. vpc-bp1xpdnd3l***
      vSwitch IDThe ID of the vSwitch to which the source instance is connected. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the source ApsaraMQ for Kafka instance. By default, the vSwitch ID that you specified when you deployed the source ApsaraMQ for Kafka instance is displayed. vsw-bp1d2jgg81***
      Failure Handling PolicySpecifies whether to retain the subscription to a partition after a message sending error occurs in that partition. Click Configure Runtime Environment to display the parameter. Valid values:
      • Continue Subscription: retains the subscription. A log entry is generated for the error.
      • Stop Subscription: stops the subscription. A log entry is generated for the error.
      Note
      Continue Subscription
      Resource Creation MethodThe method to create the topic and group that are required by the OSS sink connector. Click Configure Runtime Environment to display the parameter.
      • Auto
      • Manual
      Auto
      Connector Consumer GroupThe name of the consumer group that is required by the OSS sink connector.Group Click Configure Runtime Environment to display the parameter. We recommend that you start the name with connect-cluster.Group connect-cluster-kafka-oss-sink
      Task Offset TopicThe topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-offset.
      • Partitions: The number of partitions in the topic must be greater than 1.
      • Storage Engine: You must set the storage engine of the topic to Local Storage.
        Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
      • cleanup.policy: You must set the log cleanup policy for the topic to Compact.
      connect-offset-kafka-oss-sink
      Task Configuration TopicThe topic that is used to store the task configurations. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-config.
      • Partitions: The topic can contain only one partition.
      • Storage Engine: You must set the storage engine of the topic to Local Storage.
        Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
      • cleanup.policy: You must set the log cleanup policy for the topic to Compact.
      connect-config-kafka-oss-sink
      Task Status TopicThe topic that is used to store the task status. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-status.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: You must set the storage engine of the topic to Local Storage.
        Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
      • cleanup.policy: You must set the log cleanup policy for the topic to Compact.
      connect-status-kafka-oss-sink
      Dead-letter Queue TopicThe topic that is used to store the error data of the Kafka Connect framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: You can set the storage engine of the topic to Local Storage or Cloud Storage.
        Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
      connect-error-kafka-oss-sink
      Error Data TopicThe topic that is used to store the error data of the sink connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: You can set the storage engine of the topic to Local Storage or Cloud Storage.
        Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
      connect-error-kafka-oss-sink
    3. In the Configure Destination Service step, select Object Storage Service as the destination service, set parameters, and then click Create. The following table describes the parameters.
      ParameterDescriptionExample
      Bucket NameThe name of the OSS bucket to which the data is to be synchronized. bucket_test
      AccessKey IDThe AccessKey ID of your Alibaba Cloud account. LTAI4GG2RGAjppjK********
      AccessKey SecretThe AccessKey secret of your Alibaba Cloud account. WbGPVb5rrecVw3SQvEPw6R********

      Make sure that your Alibaba Cloud account is granted the following permissions according to the principle of least privilege:

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "oss:GetObject",
                      "oss:PutObject"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
      Note

      The AccessKey ID and AccessKey secret are passed to OSS as environment variables when the data synchronization task is created. After the task is created, ApsaraMQ for Kafka does not store the AccessKey ID or AccessKey secret of your Alibaba Cloud account.ApsaraMQ for Kafka

      After the connector is created, you can view the connector on the Connectors page.
  6. Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.

Send messages

After you deploy the OSS sink connector, send a message to the source topic in the ApsaraMQ for Kafka instance to test whether the message can be synchronized to OSS.

  1. On the Connectors page, find the connector that you want to use and click Test in the Actions column.

  2. In the Send Message panel, configure the required parameters to send a test message.

    • Set the Method of Sending parameter to Console.

      1. In the Message Key field, enter the key of the message. For example, you can enter demo as the key of the message.

      2. In the Message Content field, enter the content of the message. For example, you can enter {"key": "test"} as the content of the message.

      3. Configure the Send to Specified Partition parameter to specify whether to send the message to a specified partition.

        • If you want to send the message to a specified partition, click Yes and enter the partition ID in the Partition ID field. For example, you can enter 0 as the partition ID. For information about how to query partition IDs, see View partition status.

        • If you do not want to send the message to a specified partition, click No.

    • Set the Method of Sending parameter to Docker and run the docker commands that are provided in the Run the Docker container to produce a sample message section to send a test message.

    • Set the Method of Sending parameter to SDK and click the link to the topic that describes how to obtain and use the SDK that you want to use. Then, use the SDK to send and consume a test message. Message Queue for Apache Kafka provides topics that describe how to use SDKs for different programming languages based on different connection types.

Verify the results

After you send a test message to the source topic in the ApsaraMQ for Kafka instance, you can check whether the message is synchronized to OSS on the Files page of the specified OSS bucket in the OSS console. For more information, see Overview.

If new objects are generated in the OSS bucket, the data is synchronized to OSS.

files
The data that is synchronized from ApsaraMQ for Kafka to OSS is in the following format:
[
    {
        "key":"123",
        "offset":4,
        "overflowFlag":true,
        "partition":0,
        "timestamp":1603779578478,
        "topic":"Test",
        "value":"1",
        "valueSize":272687
    }
]

Related operations

You can configure the Function Compute resources that are required by the OSS sink connector based on your requirements.

On the Connectors page, find the connector that you created, click More in the Actions column, and then select Configure Function.
You are redirected to the Function Compute console, where you can configure the resources as required.