All Products
Search
Document Center

ApsaraMQ for RocketMQ:Scheduled and delayed messages

Last Updated:Aug 14, 2024

Scheduled messages and delayed messages are featured messages provided by ApsaraMQ for RocketMQ. This topic describes the scenarios, working mechanism, limits, examples, and usage notes of scheduled messages and delayed messages.

Scenarios

Note

Scheduled messages and delayed messages are essentially the same. They are delivered from brokers to consumers at a specific time. In this topic, delayed messages are also considered as scheduled messages.

Accurate and reliable time-based event triggers are required in scenarios such as distributed timed scheduling and task timeout processing. ApsaraMQ for RocketMQ provides scheduled messages to help you simplify the development of timed scheduling tasks and implement high-performance, scalable, and reliable timed triggering.

Scenario 1: Distributed timed scheduling

定时消息

A distributed timed scheduling scenario involves tasks that require various time granularity levels. Examples: a task to execute file cleanup at 5 o'clock every day and a task to trigger message pushing every 2 minutes. Traditional dataset-based timed scheduling solutions are complex and inefficient in distributed scenarios. In comparison, scheduled messages in ApsaraMQ for RocketMQ allow you to encapsulate multiple types of time triggers.

Scenario 2: Task timeout processing

超时任务处理

A typical scenario that involves task timeout processing is e-commerce payment, where an unpaid order is canceled after it remains unpaid for a specific time period instead of being canceled immediately. In this case, you can use scheduled messages in ApsaraMQ for RocketMQ to check and trigger timeout tasks.

Task timeout processing based on scheduled messages provides the following benefits:

  • Various time granularity levels and simplified development: Scheduled messages in ApsaraMQ for RocketMQ do not have the limit of fixed time increments. You can trigger tasks at any time granularity level and without business deduplication.

  • High performance and scalability: Scheduled messages in ApsaraMQ for RocketMQ offer high concurrency and scalability. This outperforms traditional database scanning methods, which are complex to implement and can cause performance bottlenecks due to frequent API calls for scanning. Scheduled messages in ApsaraMQ for RocketMQ can provide high concurrency and scalability.

Working mechanism

What is a scheduled message?

Scheduled messages are a type of featured message provided by ApsaraMQ for RocketMQ. After a scheduled message is sent to the broker, the message can be consumed only after a specific period of time or at a specific time. You can use scheduled messages to implement delayed scheduling and triggering in distributed scenarios.

Time setting rules

  • The scheduled or delayed time for scheduled messages in ApsaraMQ for RocketMQ is represented as a timestamp, not a time period.

  • The scheduled time is a Unix timestamp in seconds. You must convert the scheduled time of message delivery to a Unix timestamp in seconds.

  • The scheduled time must be within the allowed time range. If the scheduled time exceeds the range, the scheduled time does not take effect and the messages are immediately delivered by the broker.

  • The following items describe the maximum scheduled time for instances of different types:

    • Subscription and pay-as-you-go Standard Edition instances and serverless Standard Edition and Professional Edition instances: 7 days.

    • Subscription and pay-as-you-go Professional Edition and Enterprise Platinum Edition instances: 40 days.

  • The scheduled time must be later than the current time. If the scheduled time is set to a time earlier than the current time, the scheduled time does not take effect and the messages are immediately delivered by the broker.

Examples:

  • Scheduled messages: If the current time is 2022-06-09 17:30:00 and you want to deliver messages at 19:20:00 in the afternoon of the current day, the scheduled time is 2022-06-09 19:20:00 and the Unix timestamp is 1654773600000.

  • Delayed messages: If the current time is 2022-06-09 17:30:00 and you want to deliver messages after 1 hour, the message delivery time is 2022-06-09 18:30:00 and the Unix timestamp is 1654770600000.

Lifecycle of a scheduled message

定时消息生命周期

  • Initialization

    The message is built and initialized by the producer and is ready to be sent to the broker.

  • Being scheduled

    The message is sent to the broker and is stored in a time-based storage system until the specified delivery time is reached. An index is not immediately created for the message.

  • Pending consumption

    At the specified time, the message is written into a regular storage engine, where the message is visible for consumers and waits for consumption by consumers.

  • Being consumed

    The message is obtained by the consumer and processed based on the local business logic of the consumer.

    In this process, the broker waits for the consumer to return the consumption result. If no response is received from the consumer in a specific period of time, ApsaraMQ for RocketMQ performs retries on the message. For more information, see Consumption retry.

  • Consumption commitment

    The consumer completes the consumption and commits the consumption result to the broker. The broker marks whether the current message is consumed.

    By default, ApsaraMQ for RocketMQ retains all messages. When the consumption result is committed, the message is marked as consumed instead of being immediately deleted. Messages are deleted only if the retention period expires or the system has insufficient storage space. Before a message is deleted, the consumer can re-consume the message.

  • Message deletion

    If the retention period of message expires or the storage space is insufficient, ApsaraMQ for RocketMQ deletes the earliest stored messages from the physical file in a rolling manner. For more information, see Message storage and cleanup.

Limits

Message type consistency

Scheduled messages can be sent only to topics whose MessageType is set to Delay.

Time granularity

The scheduled time for scheduled messages in ApsaraMQ for RocketMQ is accurate to milliseconds. The default time granularity value is 1,000 ms.

The status of scheduled messages in ApsaraMQ for RocketMQ can be persistently stored. If the messaging system experiences a failure and is restarted, messages are still delivered based on the specified delivery time. However, if the storage system experiences an exception or is restarted, latency may occur in delivering scheduled messages.

Scenarios

Unlike normal messages, scheduled messages must have a delivery timestamp specified for them.

The following code provides an example on how to send and receive scheduled messages in Java.

For information about the complete sample code of messaging, see Apache RocketMQ 5.x SDKs (recommend).

Sample code

Send scheduled and delayed messages

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;


public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned. 
        String topic = "Your Topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        ClientConfiguration configuration = builder.build();
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        // Send scheduled and delayed messages.
        // Specify a Unix timestamp in milliseconds. In this example, the specified timestamp indicates that the message is delivered in 10 minutes from the current time. 
        long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
        Message message = provider.newMessageBuilder()
                .setTopic("topic")
                // The message key. You can use a keyword to accurately find the message. 
                .setKeys("messageKey")
                // Specify the message tag. The consumer can use the tag to filter messages. 
                .setTag("messageTag")
                .setDeliveryTimestamp(deliverTimeStamp)
                // The message body.
                .setBody("messageBody".getBytes())
                .build();
        try {
            // Send the message. Take note of the result and capture exceptions such as failures. 
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

Consume scheduled and delayed messages in push mode

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a push consumer. When you initialize the push consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Specify the consumer group. 
                .setConsumerGroup(consumerGroup)
                // Specify the subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // Specify the message listener. 
                .setMessageListener(messageView -> {
                    // Consume the messages and return the consumption result. 
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // If you no longer require the push consumer, shut down the process. 
        //pushConsumer.close();
    }
}                                                 

Consume scheduled and delayed messages in simple mode

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a simple consumer. When you initialize the simple consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Specify the consumer group. 
                .setConsumerGroup(consumerGroup)
                // Specify the timeout period for long polling requests. 
                .setAwaitDuration(awaitDuration)
                // Specify the subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        // Specify the maximum number of messages to be pulled. 
        int maxMessageNum = 16;
        // Specify the invisible time of the messages. 
        Duration invisibleDuration = Duration.ofSeconds(10);
        // If you use a simple consumer to consume messages, the client must obtain and consume messages in a loop. 
        // To consume messages in real time, we recommend that you use multiple threads to concurrently pull messages. 
        while (true) {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            messages.forEach(messageView -> {
                // LOGGER.info("Received message: {}", messageView);
                System.out.println("Received message: " + messageView);
            });
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    // After consumption is complete, the consumer must call the ACK method to commit the consumption result to the broker. 
                    consumer.ack(message);
                    System.out.println("Message is acknowledged successfully, messageId= " + messageId);
                    //LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    t.printStackTrace();
                    //LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        }
        // If you no longer require the simple consumer, shut down the process. 
        // consumer.close();
    }
}                                           

Usage notes

Do not schedule the same delivery time for a large number of messages

Scheduled messages are stored in a time-based storage system before they are delivered to consumers at the specified delivery time. If you specify the same delivery time for a large number of scheduled messages, the system has to simultaneously process the messages at the delivery time. This puts the system under heavy load and results in delays in message delivery.

FAQ

Am I able to revoke or change the scheduled time for a scheduled message before the specified scheduled time to send the message is reached?

No, you cannot change the scheduled time for a scheduled message before the specified scheduled time to send the message is reached.

What happens if I specify a scheduled time that is earlier than the current time for a scheduled message?

If you specify a scheduled time that is earlier than the current time for a scheduled message, the scheduled time does not take effect, and the message is immediately delivered.

Why am I unable to query a sent scheduled message in the ApsaraMQ for RocketMQ console?

A scheduled message is visible to consumers and can be queried in the ApsaraMQ for RocketMQ console only after the scheduled time is reached.