All Products
Search
Document Center

ApsaraMQ for RocketMQ:Ordered messages

Last Updated:Aug 15, 2024

Ordered messages are a type of featured message in ApsaraMQ for RocketMQ. This topic describes the scenarios, working mechanism, limits, and usage notes of ordered messages.

Scenarios

Heterogeneous systems use status synchronization to maintain strong consistency in scenarios such as ordered event processing, transaction matchmaking, and real-time incremental data synchronization. The preceding scenarios require ordered delivery of messages from upstream applications to downstream applications when an event change occurs. ApsaraMQ for RocketMQ provides ordered messages to help you implement ordered message transmission.

Scenario 1: Transaction matchmaking

交易撮合

For example, in securities and stock trading scenarios, if multiple bidders offer the same bid price for a bid order, the bidder who first offers the bid price wins the bid. Therefore, the downstream order processing system must be designed to process orders in the order in which prices were offered.

Scenario 2: Real-time incremental data synchronization

Figure 1. Normal messages普通消息

Figure 2. Ordered messages顺序消息

For example, you want to perform incremental synchronization of data that is related to database modifications. You can use ordered messages in ApsaraMQ for RocketMQ to transmit messages from the upstream source database to the downstream query system. The messages can be binary logs of addition, deletion, and modification operations. The downstream system retrieves the messages in the order in which the messages are sent to make the database status updated in the same order. Ordered messages help you ensure consistency between the operations in the upstream system and the status data in the downstream system. If you use normal messages in this scenario, status inconsistency may occur.

Working mechanism

What is an ordered message?

Ordered messages are a type of featured message in ApsaraMQ for RocketMQ. Ordered messages are delivered to consumers in the order in which the messages are sent. This type of message allows you to implement ordered processing in business scenarios.

Compared with other types of messages, ordered messages are characterized by the order of message sending, storage, and delivery.

ApsaraMQ for RocketMQ uses message groups to identify the order of ordered messages. Before you send ordered messages, you must specify the message group to which each message belongs.

Important

Only messages of the same message group are processed in first in, first out (FIFO) order. The order of messages that belong to different message groups or do not belong to any message group is not ensured.

ApsaraMQ for RocketMQ ensures the processing order of messages in message groups. You can divide messages into different groups based on message consumption logic. This helps improve system concurrency and throughput while ensuring message processing order for specific workloads.

How to maintain the order of messages

The order of messages in ApsaraMQ for RocketMQ is maintained in the production and consumption stages.

  • Production order: ApsaraMQ for RocketMQ uses a protocol that is established between the producer and the broker to ensure that messages are serially sent from the producer to the broker and that the messages are stored and persisted in the same order as they are sent.

    To ensure the production order of messages, make sure that the following conditions are met:

    • Same message group

      The production order takes effect only for messages that belong to the same message group. The production order of messages that belong to different message groups or do not belong to any message group is not ensured. Before you send ordered messages, you can configure a message group for each message.

    • Single producer

      The production order takes effect only for messages that are produced by a single producer. ApsaraMQ for RocketMQ cannot determine the order of messages from different producers in different systems, regardless of whether the same message group is specified for the messages.

    • Serial transmission

      Producers in ApsaraMQ for RocketMQ support secure access from multiple threads. If a producer uses multiple threads to concurrently send messages, the system cannot determine the order of messages from different threads.

    After a producer that meets the preceding conditions sends messages to ApsaraMQ for RocketMQ, the system ensures that messages from the same message group are stored in the same queue based on the order in which the messages are sent. The following figure describes how ordered messages are stored in ApsaraMQ for RocketMQ brokers.顺序存储逻辑

    • ApsaraMQ for RocketMQ stores messages from the same message group in the same queue based on the order in which the messages are sent.

    • ApsaraMQ for RocketMQ can store messages from different message groups in the same queue, but not necessarily in the order in which the messages are sent.

    In the preceding figure, messages from MessageGroup 1 and MessageGroup 4 are stored in MessageQueue 1. ApsaraMQ for RocketMQ ensures that messages G1-M1, G1-M2, and G1-M3 from MessageGroup 1 are stored in the queue in the same order as they are sent. Messages G4-M1 and G4-M2 from MessageGroup 4 are also stored in the order in which the messages are sent. However, messages from MessageGroup 1 and MessageGroup 4 are stored in no particular order.

  • Consumption order: ApsaraMQ for RocketMQ uses a protocol that is established between the consumer and the broker to ensure that messages are consumed in the same order in which the messages are stored.

    To ensure the consumption order of messages, make sure that the following conditions are met:

    • Delivery order

      ApsaraMQ for RocketMQ ensures that messages are delivered in the same order in which they are stored by using a client SDK and the communications protocol of the broker. When consumer applications consume messages, the applications must follow the receive-process-acknowledge process to prevent messages from being out of order due to asynchronous consumption.

      Important

      If a consumer is of the push type, ApsaraMQ for RocketMQ ensures that the messages are delivered to the consumer one by one in the order in which they are stored. If a consumer is of the simple type, multiple messages may be pulled at a time by the consumer. In this case, the consumption order of the messages is implemented by the business application. For information about consumer types, see Consumer types.

    • Limited retries

      ApsaraMQ for RocketMQ limits the number of retries that failed ordered messages can perform. If a message fails to be delivered after the maximum number of retries is reached, ApsaraMQ for RocketMQ skips the message and continues consuming the subsequent messages. This prevents failed messages from constantly blocking message processing.

      In scenarios in which the consumption order is critical, we recommend that you specify an appropriate number of retries to prevent unordered message processing.

Combination of production order and consumption order

If you want messages to be processed in FIFO order, the production order and consumption order of the messages must be ensured. In most business scenarios, a producer may map to multiple consumers, but not all consumers require ordered message consumption. You can use flexible combinations of the production order and consumption order in different business scenarios. For example, you can send ordered messages but concurrently consume the messages to improve throughput. The following table describes different combinations of the production order and the consumption order.

Production order

Consumption order

Effect

Ordered

Ordered

The order of messages in the same message group is ensured.

Messages in the same message group are sent and consumed in the same order.

Ordered

Concurrent

Messages are concurrently and chronologically consumed.

Unordered

Ordered

The order of messages that are stored in the same queue is ensured.

ApsaraMQ for RocketMQ only ensures that messages are consumed in the same order as the are stored in a queue.

Unordered

Concurrent

Messages are concurrently and chronologically consumed.

Lifecycle of an ordered message

生命周期

  • Initialization

    The message is built and initialized by the producer and is ready to be sent to the broker.

  • Pending consumption

    The message is sent to the broker and is visible and available to the consumer.

  • Being consumed

    The message is obtained by the consumer and processed based on the local business logic of the consumer.

    In this process, the broker waits for the consumer to return the consumption result. If no response is received from the consumer in a specific period of time, ApsaraMQ for RocketMQ performs retries on the message. For more information, see Consumption retry.

  • Consumption commitment

    The consumer completes the consumption and commits the consumption result to the broker. The broker marks whether the current message is consumed.

    By default, ApsaraMQ for RocketMQ retains all messages. When the consumption result is committed, the message is marked as consumed instead of being immediately deleted. Messages are deleted only if the retention period expires or the system has insufficient storage space. Before a message is deleted, the consumer can re-consume the message.

  • Message deletion

    If the retention period of message expires or the storage space is insufficient, ApsaraMQ for RocketMQ deletes the earliest stored messages from the physical file in a rolling manner. For more information, see Message storage and cleanup.

Important
  • Message consumption failures or timeouts trigger the retry logic on a broker. If consumption retry is triggered for a message, the message reaches the end of its lifecycle. The message is retried as a new message with a new message ID.

  • If consumption retry is triggered for an ordered message, the subsequent messages can be processed only after the ordered message is processed.

Limits

Only topics whose MessageType is set to FIFO can be used to send and receive ordered messages.

Examples

Different from normal messages, ordered messages must have message groups specified for them. We recommend that you configure message groups at a fine-grained level based on your business requirements to allow for workload decoupling and concurrency scaling.

The following sample code provides an example on how to send and receive ordered messages in Java.

For information about the complete sample code, see Apache RocketMQ 5.x SDKs (recommended).

Sample code

Send ordered messages

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;


public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned. 
        String topic = "Your Topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        // builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        ClientConfiguration configuration = builder.build();
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
         // Send an ordered message. 
        Message message = provider.newMessageBuilder()
                .setTopic("topic")
                // The message key. The system can use the key to locate the message. 
                .setKeys("messageKey")
                // The message tag. The consumer can use the tag to filter the message. 
                .setTag("messageTag")
                // The message group. We recommend that you do not include a large number of messages in the group. 
                .setMessageGroup("fifoGroup001")
                // The message body. 
                .setBody("messageBody".getBytes())
                .build();
        try {
            // Send the message. You must take note of the sending result and capture exceptions such as failures.
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

Consume messages in push mode

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // The topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // The consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a push consumer. When you initialize the push consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        // Make sure that ordered delivery is applied to the consumer group. Otherwise, messages are delivered concurrently and in no particular order. 
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // The consumer group. 
                .setConsumerGroup(consumerGroup)
                // The subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // The message listener. 
                .setMessageListener(messageView -> {
                    // Consume the messages and return the consumption result. 
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // If you no longer require the push consumer, shut down the process. 
        //pushConsumer.close();
    }
}  

Consume messages in simple mode

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // The topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // The consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a simple consumer. When you initialize the simple consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        // Make sure that ordered delivery is applied to the consumer group. Otherwise, messages are delivered concurrently and in no particular order. 
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)
                // The consumer group. 
                .setConsumerGroup(consumerGroup)
                // The timeout period for long polling requests. 
                .setAwaitDuration(awaitDuration)
                // The subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();
        // The maximum number of messages to be pulled. 
        int maxMessageNum = 16;
        // The invisible time of the messages. 
        Duration invisibleDuration = Duration.ofSeconds(10);
        // If you use a simple consumer to consume messages, the client must obtain and consume messages in a loop. 
        // To consume messages in real time, we recommend that you use multiple threads to concurrently pull messages. 
        while (true) {
            // Note: If the consumption of a message in a message group is not complete, the next message in the message group cannot be retrieved if you call the Receive function. 
            final List<MessageView> messageViewList = consumer.receive(maxMessageNum, invisibleDuration);
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                // After consumption is complete, you must invoke ACK to commit the consumption result. 
                try {
                    consumer.ack(messageView);
                } catch (ClientException e) {
                    // If a message fails to be pulled due to throttling or other reasons, you must re-initiate the request to obtain the message. 
                    e.printStackTrace();
                }
            });
        }
        // If you no longer require the simple consumer, shut down the process. 
        // consumer.close();
    }
}                                           

Obtain consumption retry logs of ordered messages

If ordered messages are consumed in Push mode, the messages are retried on the consumer client and the broker cannot obtain the details of the retry logs. If the delivery result displayed in the trace of an ordered message indicates that the message delivery failed, you must check the information about the maximum number of retries and the consumer client in the consumer client logs.

For information about the log path of a consumer client, see Log configuration.

You can search the following keywords to query the information about consumption failures in client logs:

Message listener raised an exception while consuming messages
Failed to consume fifo message finally, run out of attempt times

Usage notes

Use serial consumption to prevent out-of-order message processing

We recommend that you use serial message consumption instead of batch consumption. Consumption of multiple messages at the same time may cause out-of-order message processing.

For example, messages 1, 2, 3, and 4 are sent in the 1-2-3-4 order and the order of batch consumption is 1-[2, 3](processed in batches but failed)-[2, 3](retry)-4. The system may repeatedly process Message 2 if Message 3 fails to be processed. As a result, messages are consumed out of order.

Avoid including a large number of messages in a message group

ApsaraMQ for RocketMQ ensures that messages in the same message group are stored in the same queue. A message group that contains a large number of messages causes the corresponding queue to be overloaded. This affects messaging performance and hinders scalability. When you configure message groups, you can use order IDs and user IDs as the message sequencing conditions. This ensures the order of messages of the same end user.

We recommend that you split messages in your business applications by message group. For example, in a scenario in which you do not need to ensure the order of messages of different users, you can use order IDs and user IDs as message group keywords to implement ordered processing of messages of the same end user.