This topic describes how to create an OSS sink connector to synchronize data from a source topic in a ApsaraMQ for Kafka instance to Object Storage Service (OSS).
Prerequisites
- The connector feature is enabled for your ApsaraMQ for Kafka instance. For more information, see Enable the connector feature.
- A topic is created in the ApsaraMQ for Kafka instance. For more information, see Step 1: Create a topic.
- An OSS bucket is created in the OSS console. For more information, see Create buckets.
- Function Compute is activated. For more information, see Create a function in the Function Compute console.
Usage notes
- When you synchronize data from a topic in a ApsaraMQ for Kafka instance to an OSS bucket, make sure that the Message Queue for Apache Kafka instance and OSS bucket reside in the same region and that Function Compute is available in the specified region. Message Queue for Apache Kafka synchronizes your data to Function Compute first, which then synchronizes the data to OSS. For more information about the limits on connectors, see Limits.
- OSS sink connectors export data by using Function Compute. Function Compute provides a certain amount of resources for free. When you use up this free quota, you are charged for the Function Compute resources that you use based on the billing rules. For more information, see Billing overview.
- Function Compute allows you to query the logs of function calls. For more information, see Configure the logging feature.
- ApsaraMQ for Kafka serializes messages into UTF-8-encoded strings for transfer. Message Queue for Apache Kafka does not support binary data.
Create and deploy an OSS sink connector
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
- In the left-side navigation pane, click Connectors.
- On the Connectors page, select the instance in which the data source topic resides from the Select Instance drop-down list and click Create Connector.
- In the Create Connector wizard, perform the following steps:
- In the Configure Basic Information step, configure parameters and click Next. The following table describes the parameters. Important By default, Authorize to Create Service Linked Role is selected. This means that ApsaraMQ for Kafka will create a service-linked role based on the following rules:
- If no service-linked role is created, ApsaraMQ for Kafka automatically creates a service-linked role for you to use the OSS sink connector to synchronize data from ApsaraMQ for Kafka to OSS.
- If a service-linked role is available, ApsaraMQ for Kafka does not create a new role.
Parameter Description Example Name The name of the connector. Specify a connector name based on the following naming conventions: - The connector name must be 1 to 48 characters in length and can contain digits, lowercase letters, and hyphens (-). The name cannot start with a hyphen (-).
- The name must be unique within a ApsaraMQ for Kafka instance.
The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format.Group If you have not created such a consumer group, Message Queue for Apache Kafka automatically creates one for you.Group
kafka-oss-sink Instance The information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed. demo alikafka_post-cn-st21p8vj**** - In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, configure parameters, and then click Next. The following table describes the parameters.
Parameter Description Example Data Source Topic The name of the topic from which data is to be synchronized. oss-test-input Consumer Thread Concurrency The number of concurrent consumer threads that are used to synchronize data from the source topic. Default value: 6. Valid values: - 1
- 2
- 3
- 6
- 12
6 Consumer Offset The consumer offset from which you want message consumption to start. Valid values: - Earliest Offset: Message consumption starts from the earliest consumer offset.
- Latest Offset: Message consumption starts from the latest consumer offset.
Earliest Offset VPC ID The ID of the virtual private cloud (VPC) in which the source instance is deployed. Click Configure Runtime Environment to display the parameter. By default, the VPC ID that you specified when you deployed the source ApsaraMQ for Kafka instance is displayed. You do not need to configure this parameter. vpc-bp1xpdnd3l*** vSwitch ID The ID of the vSwitch to which the source instance is connected. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the source ApsaraMQ for Kafka instance. By default, the vSwitch ID that you specified when you deployed the source ApsaraMQ for Kafka instance is displayed. vsw-bp1d2jgg81*** Failure Handling Policy Specifies whether to retain the subscription to a partition after a message sending error occurs in that partition. Click Configure Runtime Environment to display the parameter. Valid values: - Continue Subscription: retains the subscription. A log entry is generated for the error.
- Stop Subscription: stops the subscription. A log entry is generated for the error.
Note- For more information, see Manage a connector.
- For information about how to troubleshoot errors based on error codes, see Error codes.
Continue Subscription Resource Creation Method The method to create the topic and group that are required by the OSS sink connector. Click Configure Runtime Environment to display the parameter. - Auto
- Manual
Auto Connector Consumer Group The name of the consumer group that is required by the OSS sink connector.Group Click Configure Runtime Environment to display the parameter. We recommend that you start the name with connect-cluster.Group connect-cluster-kafka-oss-sink Task Offset Topic The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter. - Topic: We recommend that you start the topic name with connect-offset.
- Partitions: The number of partitions in the topic must be greater than 1.
- Storage Engine: You must set the storage engine of the topic to Local Storage. Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
- cleanup.policy: You must set the log cleanup policy for the topic to Compact.
connect-offset-kafka-oss-sink Task Configuration Topic The topic that is used to store the task configurations. Click Configure Runtime Environment to display the parameter. - Topic: We recommend that you start the topic name with connect-config.
- Partitions: The topic can contain only one partition.
- Storage Engine: You must set the storage engine of the topic to Local Storage. Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
- cleanup.policy: You must set the log cleanup policy for the topic to Compact.
connect-config-kafka-oss-sink Task Status Topic The topic that is used to store the task status. Click Configure Runtime Environment to display the parameter. - Topic: We recommend that you start the topic name with connect-status.
- Partitions: We recommend that you set the number of partitions in the topic to 6.
- Storage Engine: You must set the storage engine of the topic to Local Storage. Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
- cleanup.policy: You must set the log cleanup policy for the topic to Compact.
connect-status-kafka-oss-sink Dead-letter Queue Topic The topic that is used to store the error data of the Kafka Connect framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic. - Topic: We recommend that you start the topic name with connect-error.
- Partitions: We recommend that you set the number of partitions in the topic to 6.
- Storage Engine: You can set the storage engine of the topic to Local Storage or Cloud Storage. Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
connect-error-kafka-oss-sink Error Data Topic The topic that is used to store the error data of the sink connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic. - Topic: We recommend that you start the topic name with connect-error.
- Partitions: We recommend that you set the number of partitions in the topic to 6.
- Storage Engine: You can set the storage engine of the topic to Local Storage or Cloud Storage. Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
connect-error-kafka-oss-sink - In the Configure Destination Service step, select Object Storage Service as the destination service, set parameters, and then click Create. The following table describes the parameters.
Parameter Description Example Bucket Name The name of the OSS bucket to which the data is to be synchronized. bucket_test AccessKey ID The AccessKey ID of your Alibaba Cloud account. LTAI4GG2RGAjppjK******** AccessKey Secret The AccessKey secret of your Alibaba Cloud account. WbGPVb5rrecVw3SQvEPw6R******** Make sure that your Alibaba Cloud account is granted the following permissions according to the principle of least privilege:
{ "Version": "1", "Statement": [ { "Action": [ "oss:GetObject", "oss:PutObject" ], "Resource": "*", "Effect": "Allow" } ] }
NoteThe AccessKey ID and AccessKey secret are passed to OSS as environment variables when the data synchronization task is created. After the task is created, ApsaraMQ for Kafka does not store the AccessKey ID or AccessKey secret of your Alibaba Cloud account.ApsaraMQ for Kafka
After the connector is created, you can view the connector on the Connectors page.
- In the Configure Basic Information step, configure parameters and click Next. The following table describes the parameters.
- Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.
Send messages
After you deploy the OSS sink connector, send a message to the source topic in the ApsaraMQ for Kafka instance to test whether the message can be synchronized to OSS.
On the Connectors page, find the connector that you want to use and click Test in the Actions column.
In the Send Message panel, configure the required parameters to send a test message.
Set the Method of Sending parameter to Console.
In the Message Key field, enter the key of the message. For example, you can enter demo as the key of the message.
In the Message Content field, enter the content of the message. For example, you can enter {"key": "test"} as the content of the message.
Configure the Send to Specified Partition parameter to specify whether to send the message to a specified partition.
If you want to send the message to a specified partition, click Yes and enter the partition ID in the Partition ID field. For example, you can enter 0 as the partition ID. For information about how to query partition IDs, see View partition status.
If you do not want to send the message to a specified partition, click No.
Set the Method of Sending parameter to Docker and run the docker commands that are provided in the Run the Docker container to produce a sample message section to send a test message.
Set the Method of Sending parameter to SDK and click the link to the topic that describes how to obtain and use the SDK that you want to use. Then, use the SDK to send and consume a test message. Message Queue for Apache Kafka provides topics that describe how to use SDKs for different programming languages based on different connection types.
Verify the results
After you send a test message to the source topic in the ApsaraMQ for Kafka instance, you can check whether the message is synchronized to OSS on the Files page of the specified OSS bucket in the OSS console. For more information, see Overview.
If new objects are generated in the OSS bucket, the data is synchronized to OSS.
[
{
"key":"123",
"offset":4,
"overflowFlag":true,
"partition":0,
"timestamp":1603779578478,
"topic":"Test",
"value":"1",
"valueSize":272687
}
]
Related operations
You can configure the Function Compute resources that are required by the OSS sink connector based on your requirements.