You can create and run extract, transform, load (ETL) tasks to cleanse, transform, and dump data in ApsaraMQ for Kafka instances. This topic describes how to create an ETL task in the ApsaraMQ for Kafka console to deliver processed data in a source topic to a destination topic.
Prerequisites
Before you run an ETL task, make sure that the following operations are performed:
Create a source topic and a destination topic on ApsaraMQ for Kafka instances. For more information, see Step 1: Create a topic.
NoteIf you want to manually create auxiliary topics that are required by an ETL task, you must also create topics that are used to store the information about the ETL task. For information about the parameters that are configured to create topics, see Step 3 in the "Create an ETL task" section of this topic.
Activate Function Compute. For more information, see Activate Function Compute.
Obtain the required permissions if you are a Resource Access Management (RAM) user. For more information, see Grant permissions to RAM users.
The following code provides an example of the policy that specifies the permissions:
{ "Version": "1", "Statement": [ { "Action": [ "kafka:CreateETLTask", "kafka:ListETLTask", "kafka:DeleteETLTask" ], "Resource": "*", "Effect": "Allow" } ] }
Background information
ETL is the process in which data is extracted, transformed, and loaded into a destination. You can write a function to implement the logic of the ETL task. ApsaraMQ for Kafka calls the function to process data in the source topic and send the data to the destination topic.
During data processing, Function Compute automatically creates the corresponding service and function. The name of the created service is in the
_FC-kafka
-ETL task name format.The source topic and the destination topic on ApsaraMQ for Kafka instances must reside in the same region.
Function Compute allows you to query the logs of function calls to troubleshoot issues. For more information, see Configure the logging feature.
The ETL task feature of ApsaraMQ for Kafka is in public preview. This feature is independent of ApsaraMQ for Kafka instances. ApsaraMQ for Kafka does not charge you for this feature. If the ETL task that you create depends on other services, refer to the billing rules of the corresponding services.
Enable ETL
ETL tasks are created in the Connector Ecosystem Integration module of the ApsaraMQ for Kafka console. This module provides data filtering and transformation capabilities. For more information, see Overview.
The first time you use the ETL task feature, you must authorize ApsaraMQ for Kafka to access related services. After you confirm the authorization, the system automatically creates the service-linked role AliyunServiceRoleForAlikafkaETL. ApsaraMQ for Kafka can assume the role to access the services that are used during the processing of an ETL task. For more information, see Service-linked roles.
Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.
In the left-side navigation pane, choose .
On the ETL Tasks page, click Create Task.
In the Service Authorization message that appears, click OK.
Create an ETL task
This section describes how to create and deploy an ETL task to extract data from a source topic, process the data, and then load the processed data to a destination topic.
On the ETL Tasks page, click Create Task.
In the Configure Basic Information step, enter a task name and click Next.
In the Configure Source and Destination step, specify the data source, destination topic, and consumption information. Then, click Next.
Parameter
Description
Example
Instance
The instances to which the source topic and the destination topic belong.
alikafka_pre-cn-7pp2bz47****
alikafka_post-cn-2r42bvbm****
Topic
The source topic and the destination topic.
NoteThe source topic and the destination topic must be different topics.
topic****
test****
Consumer Offset
The offset from which you want messages to be consumed. This parameter is displayed only after you click Advanced Settings. Valid values:
Earliest Offset: Consume messages from the earliest offset.
Latest Offset: Consume messages from the latest offset.
Latest Offset
Failure Handling Policy
Specifies whether to send subsequent messages if a message fails to be sent. This parameter is displayed only after you click Advanced Settings. Valid values:
Continue Subscription: Send subsequent messages if a message fails to be sent.
Stop Subscription: Do not send subsequent messages if a message fails to be sent.
Continue Subscription
Resource Creation Method
The method that is used to create auxiliary topics that are required by the ETL task. This parameter is displayed only after you click Advanced Settings. Valid values:
Auto
Manual
Auto
Consumer Group
The consumer group that is used by the ETL task. This parameter is displayed only if you set the Resource Creation Method parameter to Manual. We recommend that you use etl-cluster as the prefix of the consumer group name.
etl-cluster-kafka
Task Offset Topic
The topic that is used to store consumer offsets. This parameter is displayed only if you set the Manual parameter to Resource Creation Method.
Topic: the name of the topic. We recommend that you use etl-offset as the prefix of the topic name.
Partitions: the number of partitions in the topic. This parameter must be set to a value greater than 1.
Storage Engine: the storage engine that is used by the topic. This parameter must be set to Local Storage.
NoteYou can set the Storage Engine parameter to Local Storage only if you create a topic on a Professional Edition instance.
cleanup.policy: the log cleanup policy that is used by the topic. This parameter must be set to Compact.
etl-offset-kafka
Task Configuration Topic
The topic that is used to store task configurations. This parameter is displayed only if you set the Manual parameter to Resource Creation Method.
Topic: the name of the topic. We recommend that you use etl-config as the prefix of the topic name.
Partitions: the number of partitions in the topic. This parameter must be set to 1.
Storage Engine: the storage engine that is used by the topic. This parameter must be set to Local Storage.
NoteYou can set the Storage Engine parameter to Local Storage only if you create a topic on a Professional Edition instance.
cleanup.policy: the log cleanup policy that is used by the topic. This parameter must be set to Compact.
etl-config-kafka
Task Status Topic
The topic that is used to store the task status. This parameter is displayed only if you set the Manual parameter to Resource Creation Method.
Topic: the name of the topic. We recommend that you use etl-status as the prefix of the topic name.
Partitions: the number of partitions in the topic. We recommend that you set this parameter to 6.
Storage Engine: the storage engine that is used by the topic. This parameter must be set to Local Storage.
NoteYou can set the Storage Engine parameter to Local Storage only if you create a topic on a Professional Edition instance.
cleanup.policy: the log cleanup policy that is used by the topic. This parameter must be set to Compact.
etl-status-kafka
Dead-letter Queue Topic
The topic that is used to store the error data of the ETL framework. This parameter is displayed only if you set the Manual parameter to Resource Creation Method. To save topic resources, this topic can be the same as the Error Data Topic.
Topic: the name of the topic. We recommend that you use etl-error as the prefix of the topic name.
Partitions: the number of partitions in the topic. We recommend that you set this parameter to 6.
Storage Engine: the storage engine that is used by the topic. This parameter can be set to Local Storage or Cloud Storage.
NoteYou can set the Storage Engine parameter to Local Storage only if you create a topic on a Professional Edition instance.
etl-error-kafka
Error Data Topic
The topic that is used to store the error data of the sink. This parameter is displayed only if you set the Manual parameter to Resource Creation Method. To save topic resources, this topic can be the same as the Dead-letter Queue Topic.
Topic: the name of the topic. We recommend that you use etl-error as the prefix of the topic name.
Partitions: the number of partitions in the topic. We recommend that you set this parameter to 6.
Storage Engine: the storage engine that is used by the topic. This parameter can be set to Local Storage or Cloud Storage.
NoteYou can set the Storage Engine parameter to Local Storage only if you create a topic on a Professional Edition instance.
etl-error-kafka
In the Configure Function step, configure the parameters and click Create.
Before you click Create, you can click Test to test whether the function works as expected.
Parameter
Description
Example
Programming Language
The language in which the function is written. Set this parameter to Python 3.
Python3
Template
The function template that the system provides. After you select a function template, the system automatically populates the Code field with the code of the function template.
Add Prefix/Suffix
Code
The code that is used to process the message. ApsaraMQ for Kafka provides function templates that you can use to cleanse and transform data. You can modify the code of the selected function template based on your business requirements.
NoteYou can import Python modules based on your requirements.
The message in the code is in the dictionary format. You need only to modify the key and the value.
Return the processed message. If the function is used to filter messages, return None.
def deal_message(message): for keyItem in message.keys(): if (keyItem == 'key'): message[keyItem] = message[keyItem] + "KeySurfix" continue if (keyItem == 'value'): message[keyItem] = message[keyItem] + "ValueSurfix" continue return message
Message Key
The key of the message to be processed in the source topic. Click Test Code to display the parameter.
demo
Message Content
The value of the message to be processed in the source topic.
{"key": "test"}
After the ETL task is created, you can view the task on the ETL Tasks page. The system automatically deploys the task.
Send a test message
After the ETL task is deployed, you can send a test message to the source ApsaraMQ for Kafka topic to test whether the data can be processed by the configured function and sent to the destination topic.
On the ETL Tasks page, find the ETL task that you created and click Test in the Actions column.
In the Send Message panel, enter the information of the test message and click OK to send the test message.
In the Message Key field, enter the key of the test message. Example: demo.
In the Message Content field, enter the content of the test message. Example: {"key": "test"}.
Configure the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
If you want to send the test message to a specific partition, click Partition ID and enter a partition ID in the Yes field. Example: 0. For information about how to query partition IDs, see View partition status.
If you do not want to send the test message to a specific partition, click No.
View function logs
After you extract and process data in ApsaraMQ for Kafka, you can view the logs of the function to verify whether the destination topic receives the processed data. For more information, see Configure the logging feature.
View the details of an ETL task
After you create an ETL task, you can view its details in the ApsaraMQ for Kafka console.
On the ETL Tasks page, find the ETL task that you created and click Details in the Actions column.
On the Task Details page, view the details of the ETL task.
Delete an ETL task
If you no longer require an ETL task, you can delete the task in the ApsaraMQ for Kafka console.
On the ETL Tasks page, find the ETL task that you want to delete and click Delete in the Actions column.
In the Notes message that appears, click OK.