All Products
Search
Document Center

ApsaraMQ for Kafka:Message retrieval

Last Updated:Aug 29, 2024

ApsaraMQ for Kafka provides the message query feature for you to query messages by offset or point in time in the ApsaraMQ for Kafka console. If this feature cannot meet your message query requirements, you can use the message retrieval feature provided by ApsaraMQ for Kafka. The message retrieval feature allows you to retrieve messages by partition, offset range, time range, and keyword of message keys and values. This topic describes how to enable the message retrieval feature and specify search conditions for a message retrieval task. This topic also describes how to suspend, resume, and delete a message retrieval task.

Prerequisites

A topic is created as the data source on your ApsaraMQ for Kafka instance. For more information, see Step 1: Create a topic.

Background information

  • The message retrieval feature of ApsaraMQ for Kafka is based on the connector feature of ApsaraMQ for Kafka and the index feature of Tablestore. To be more specific, messages in the source topic are dumped to a connector and forwarded to a Tablestore table. Then, you can use the index feature provided by Tablestore to retrieve messages.

    By enabling the message retrieval feature, you actually create a connector that is used to synchronize data from ApsaraMQ for Kafka to Tablestore. The connector name is in the ots-ms-{Topic name}-{Six random characters} format. You can view and manage the connector on the Message Retrieval page instead of the Connectors page.

  • The first time you enable the message retrieval feature, ApsaraMQ for Kafka automatically activates Tablestore for you and creates a Tablestore instance and a table. One table is created in Tablestore for each topic that has the message retrieval feature enabled. The names of the automatically created Tablestore instances and tables are in the following formats:

    • Instance name: kfk-{Last 12 characters of the name of the ApsaraMQ for Kafka instance}

    • Table name: {Topic name}:kafka_topic_{Topic name}_{Six random characters}

  • For each topic that has the message retrieval feature enabled, ApsaraMQ for Kafka automatically creates four topics and two groups on the corresponding instance. The topics and groups are used to record the configurations and status of the connector. The names of the created topics and groups are in the following formats:

    • The topic that is used to record the consumer offsets of the connector: connect-offset-{Connector name}

    • The topic that is used to record the configurations of the connector: connect-config-{Connector name}

    • The topic that is used to record the status of the connector: connect-status-{Connector name}

    • The topic that is used to record the data of the dead-letter queue and exceptions: connect-error-{Connector name}

    • Consumer groups: connect-{Connector name} or connect-cluster-{Connector name}

Billing

The message retrieval feature of ApsaraMQ for Kafka is in public preview and is independent of ApsaraMQ for Kafka instances. ApsaraMQ for Kafka does not charge you for this feature. The automatically created Tablestore instances and tables are also free of charge during public preview. Alibaba Cloud does not provide a service level agreement (SLA) for the message retrieval feature. For information about the SLAs and billing of other services that are required to use the message retrieval feature, see the documentation of the related services.

Usage notes

  • The first time you enable the message retrieval feature for a topic on an ApsaraMQ for Kafka instance, only a Tablestore instance that resides in the same region as the ApsaraMQ for Kafka instance is created. The message retrieval feature is available in multiple regions. For more information, see Supported regions.

  • The first time you enable the message retrieval feature, ApsaraMQ for Kafka automatically creates the service-linked role AliyunServiceRoleForAlikafkaConnector to allow you to use the connector feature. If the service-linked role is created, ApsaraMQ for Kafka does not create the role again. For more information, see Service-linked roles.

  • By default, you can enable the message retrieval feature for up to three topics on an ApsaraMQ for Kafka instance at the same time. Up to 10 message retrieval results can be displayed for each topic.

  • Up to 1 KB of the content of each retrieved message can be displayed in the ApsaraMQ for Kafka console. If the content of a message exceeds 1 KB in size, the system automatically truncates the content. If you want to view the complete content of a message, download the message.

  • Up to 2 MB of data can be stored in each field in a Tablestore table. Messages that exceed 2 MB in size cannot be synchronized to Tablestore. As a result, such messages cannot be displayed on the Message Retrieval page in the ApsaraMQ for Kafka console.

  • Tablestore retains messages that are synchronized from an ApsaraMQ for Kafka instance for the same period of time as ApsaraMQ for Kafka retains messages. After the retention period expires, Tablestore automatically clears the messages and removes the related indexes. For information about the configuration and description of the message retention period in ApsaraMQ for Kafka, see Modify configurations for messages.

    The data expiration policy of Tablestore is different from that of ApsaraMQ for Kafka. Therefore, the messages retrieved by each message retrieval task may be different from the messages that meet the specified search conditions. In this case, the final retrieval result shall prevail.

Enable the message retrieval feature

After you enable the message retrieval feature for a topic on an ApsaraMQ for Kafka instance, you can retrieve messages in the topic based on your business requirements.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. In the left-side navigation pane, click Instances.

  4. On the Instances page, click the name of the instance that you want to manage.

  5. In the left-side navigation pane, click Message Retrieval. On the page that appears, click Enable Message Retrieval.

  6. In the Enable Message Retrieval panel, configure the parameters and click OK.

    Note

    The first time you enable the message retrieval feature for an ApsaraMQ for Kafka instance, a message indicating that you have not enabled the connector feature for the instance appears after you click Enable Message Retrieval. In this case, click OK in the message and then configure the parameters in the Enable Message Retrieval panel.

    Table 1. Parameters configured for enabling the message retrieval feature

    Parameter

    Description

    Example

    Data Source Topic

    The topic for which you want to enable the message retrieval feature.

    test

    Consumer Offset

    The offset from which you want to start message consumption. Default value: Latest Offset. Valid values:

    • Earliest Offset: consumes messages from the earliest offset.

    • Latest Offset: consumes messages from the latest offset.

    Latest Offset

    Record Time

    The time when each retrieved message is recorded. Default value: Instant Time. Valid values:

    • Native Time: the message creation time that is recorded by ApsaraMQ for Kafka. This is the timestamp that is recorded by the producer when the message is sent or the value of the timestamp field that you specify for ProducerRecord.

    • Instant Time: the time when the message is synchronized to Tablestore.

      This value specifies the time when the message is synchronized to Tablestore instead of when the message is produced. Therefore, the value in the search result may be different from the time when the message is produced.

    Native Time

    On the Message Retrieval page, you can view the topic for which you enabled the message retrieval feature.

Send messages for testing

After you enable the message retrieval feature for a topic on an ApsaraMQ for Kafka instance, you can send messages to the topic to start the connector for data synchronization and test whether the message retrieval task is created.

  1. On the Message Retrieval page, find the topic that you want to manage and perform operations based on the status of the message retrieval task.

    • If the message retrieval task is not in the Running or Suspended state, click Test in the Actions column.

    • If the message retrieval task is in the Running or Suspended state, choose More > Test in the Actions column.

  2. In the Start to Send and Consume Message panel, configure the parameters to send a message for testing.

    1. In the Message Key field, enter the message key. Example: demo.

    2. In the Message Content field, enter the message content. Example: {"key": "test"}.

    3. Configure the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.

      • If you want to send the test message to a specific partition, click Yes and enter the partition ID in the Partition ID field. Example: 0. For information about how to query partition IDs, see View partition status.

      • If you do not want to send the test message to a specific partition, click No.

Retrieve messages

  1. On the Message Retrieval page, find the topic that you want to manage and click Search in the Actions column.

  2. In the Search panel, perform the following operations to specify a search condition: Select an item from the search condition drop-down list, click Add Search Condition, specify the search condition in the Value column, and then click OK.

    Search condition

    Description

    Partition

    The partition from which you want to retrieve messages.

    Offset Range

    The offset range of the messages that you want to retrieve.

    Key

    The key or content of the messages that you want to retrieve.

    The following items describe the rules that you must follow when you specify a search keyword:

    • Phrase matching is used as the search pattern. For example, if the message key is "ApsaraMQ for Kafka is a distributed, high-throughput, and scalable message queue service that is provided by Alibaba Cloud", you can set the search keyword to "distributed" or a combination of "Alibaba Cloud" and "distributed".

    • If the search keyword contains an asterisk (*) or a question mark (?), the wildcard matching pattern is used. The asterisk (*) specifies an arbitrary character string. The question mark (?) specifies an arbitrary character. Letters are not case-sensitive. For example, if the message content is "AliKafkaTest001qaz", you can specify the search keyword as "AliKafkaTe*".

      To improve retrieval efficiency, we recommend that you do not use keywords that start with a wildcard character. Make sure that strings that contain a wildcard character do not exceed 20 characters in length.

    For information about the usage and limits of tokenization, phrase matching, and wildcard matching, see Tokenization, Match phrase query, and Wildcard query.

    Value

    Time Range

    The time range in which the messages that you want to retrieve fall. You can select a time range within the last three days or specify a custom time range. The time range that you can specify is accurate to the minute.

    Actions

    Click the 删除 icon to clear the search condition.

    To clear all search conditions, click Clear All above the search conditions list.

    Note

    If multiple search conditions are specified, messages that meet all the search conditions are retrieved.

    On the Topic Search page, view the retrieved messages.

    Table 2. Parameters included in the retrieval results

    Parameter

    Description

    Partition

    The partition from which the message is retrieved.

    Offset

    The offset of the message.

    Key

    The message key. The key is transformed into a string.

    Value

    The message value, which is also known as message content. The message value is transformed into a string.

    Created At

    The time when the message was created or when the message was synchronized to Tablestore.

    The message creation time is the timestamp that was recorded by the producer when the message was sent or the value of the timestamp field that you specified for ProducerRecord.

    Note
    • If a value is specified for the timestamp field, the specified value is displayed.

    • If no value is specified for the timestamp field, the system time when the message is sent is displayed.

    • A value in the format of 1970/x/x x:x:x indicates that the timestamp field is set to 0 or an invalid value.

    Actions

    • Click Download Key to download the message key.

    • Click Download Value to download the message content.

    Important
    • Up to 1 KB of content for each retrieved message can be displayed in the ApsaraMQ for Kafka console. If a retrieved message exceeds 1 KB in size, the system automatically truncates the content. If you want to view the complete content of a message, download the message.

    • You can download up to 10 MB of messages at a time. If the total size of the retrieved messages exceeds 10 MB, only the first 10 MB of message content can be downloaded.

View the details of a message retrieval task

After you enable the message retrieval feature for a topic, a message retrieval task is automatically created. You can view the details of the task, such as the topics and groups that are automatically created on the ApsaraMQ for Kafka instance, and the names of the instance and table that are automatically created in Tablestore. You can also navigate to the details page of the Tablestore table from the Task Details page.

On the Message Retrieval page, find the topic that you want to manage and click Details in the Actions column.

On the Task Details page, you can view the details of the message retrieval task that is created for the topic. You can also click Tablestore next to Destination Service in the Basic Information section to go to the table details page.

View consumption details

After you enable the message retrieval feature for a topic, you can view the consumption progress of the active groups in each partition of the topic. This helps you obtain information about message consumption and accumulation.

On the Message Retrieval page, find the topic that you want to manage and click Consumption Progress in the Actions column.

On the Consumption Details page, you can view the consumption status of the groups that subscribe to the topic in each partition of the topic.

Suspend a message retrieval task

  1. On the Message Retrieval page, find the topic that you want to manage and choose More > Suspend in the Actions column.

  2. In the message that appears, click OK.

Resume a message retrieval task

You can resume a suspended message retrieval task based on your business requirements.

  1. On the Message Retrieval page, find the topic that you want to manage and choose More > Enable in the Actions column.

  2. In the message that appears, click OK.

Delete a message retrieval task

After you delete a message retrieval task for a topic, the relevant Tablestore table and search index are also deleted. The topic no longer provides the message retrieval feature. If you want to use the message retrieval feature for the topic again, re-create a message retrieval task and wait for data synchronization to complete.

  1. On the Message Retrieval page, find the topic that you want to manage and perform one of the following operations based on the status of the message retrieval task:

    • If the task is not in the Running or Suspended state, click Delete in the Actions column.

    • If the task is in the Running or Suspended state, choose More > Delete in the Actions column.

  2. In the message that appears, click OK.

    On the Message Retrieval page, you can no longer view the topic for which the message retrieval task is deleted.