All Products
Search
Document Center

ApsaraMQ for Kafka:Data cleansing

Last Updated:Sep 25, 2024

The data cleansing feature provides common templates for message processing, including the content splitting, dynamic routing, content enrichment, and content mapping templates. You can use the code in a template to process messages. You can also modify the code in the template based on your business requirements.

Background information

The data cleansing feature provides basic operator capabilities based on Function Compute. The data cleansing feature is supported by the following services: ApsaraMQ for RocketMQ, ApsaraMQ for Kafka, ApsaraMQ for MQTT, ApsaraMQ for RabbitMQ, and Simple Message Queue (formerly MNS). After you create a data cleansing task, you can log on to the Function Compute console to write custom code and modify the configurations of the corresponding function.

Operator

Description

Content splitting

Split message content based on regular expressions and send the split messages one by one to the destination.

Dynamic routing

Match message content based on regular expressions. Messages whose content is matched are routed to the corresponding destination service. Messages whose content is not matched are routed to the default destination service.

Content enrichment

Enrich message content based on enrichment sources. If the original content of the message contains an account ID, the account ID is used to query the database and obtain the client region. Then, the information about the database and client region is filled in the body of the source message and sent to the destination service.

Content mapping

Map message content based on regular expressions. For example, the system masks sensitive fields in messages or reduces the message size to the minimum size.

This topic describes how to use the data cleansing feature in ApsaraMQ for Kafka.

Scenarios

Content splitting

The following message contains a list of students.

message:
[Jack, Male, Class 4; Alice, Female, Class 3; John, Male, Class 4]

You want to split the message into the following messages, each of which contains information about a single student, before you send the messages to specified destination services.

message:
    [Jack, Male, Class 4]
message:
    [Alice, Female, Class 3]
message:
    [John, Male, Class 4]

The following figure shows the process.

image

Dynamic routing

The following message contains a list of toothpastes.

message:
[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]

The list needs to be sent to specified destination topics based on the custom dynamic rules. The following items describe the rules:

  • Send the messages that start with BrandA to the BrandA-item-topic and BrandA-discount-topic topics.

  • Send the messages that start with BrandB to the BrandB-item-topic and BrandB-discount-topic topics.

  • Send other messages to the Unknown-brand-topic topic.

The following sample code shows the JSON format of the rules:

{
  "defaultTopic": "Unknown-brand-topic",
  "rules": [
    {
      "regex": "^BrandA",
      "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
      "regex": "^BrandB",
      "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}

The following figure shows the process.

image

Content enrichment

In this example, IP address segments are enriched. The following code snippet shows the access logs of a service:

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX"
}

The source of the IP address needs to be queried, and the mapping relationship needs to be stored in a MySQL database.

CREATE TABLE `tb_ip` (
    ->      `IP` VARCHAR(256) NOT NULL,
    ->     `Region` VARCHAR(256) NOT NULL,
    ->      `ISP` VARCHAR(256) NOT NULL,
    ->      PRIMARY KEY (`IP`)
    -> );

The following code snippet shows the processed message.

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX",
  "region": "beijing"
}

The following figure shows the process.

image

Content mapping

The following message contains the registration information about the employees of a company. The information includes confidential content, such as employee IDs and mobile phone numbers.

James, Employee ID 1, 131 1111 1111
Mary, Employee ID 2, 132 2222 2222
David, Employee ID 3, 133 3333 3333

The confidential content in the preceding message must be masked before the message is sent to the destination services. The following sample code provides an example:

Ja*, Employee ID *, ***********
Ma*, Employee ID *, ***********
Dav*, Employee ID *, *********** 

The following figure shows the process.

image

Procedure

  1. Log on to the ApsaraMQ for Kafka console. In the left-side navigation pane, choose Message Integration > Tasks. On the Tasks page, click Create Task.

  2. In the Source, Filtering, Transformation, and Sink steps of the Create Task page, configure the event source, filtering rule, data cleansing template, and event target.

    image

    Source and ④ Sink

    Select different ApsaraMQ for Kafka instances.

    Filtering

    Specify the matching rule. This step is optional. If you leave this parameter empty, all events are matched. For more information, see Message filtering.

    Transformation

    Select a template provided by Function Compute. Valid values: Content Splitting, Content Mapping, Content Enrichment, and Dynamic Routing. You can select one of the preceding templates based on your business requirements. A template provides the basic logic for data processing. You can use the code in a template to process messages. You can also modify the code in the template based on your business requirements.

    In this example, the Content Splitting template is selected.

    image