All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive scheduled messages and delayed messages

Last Updated:Nov 22, 2024

This topic provides sample code based on which you can use the TCP client SDK for Java of the Community Edition to send and receive scheduled messages and delayed messages.

Background information

  • Scheduled messages: Scheduled messages are messages that are delivered by ApsaraMQ for RocketMQ brokers to consumers at a specified point in time.
  • Delayed messages: Delayed messages are messages that are delivered by ApsaraMQ for RocketMQ brokers to consumers after a specified period of time.

For more information, see Scheduled messages and delayed messages.

Important

The configuration methods and results of scheduled messages and delayed messages vary between Apache RocketMQ and ApsaraMQ for RocketMQ. Apache RocketMQ supports delayed messages but not scheduled messages. Therefore, no dedicated interface is available for schedule messages. ApsaraMQ for RocketMQ supports delayed messages and scheduled messages. It allows you to configure scheduled time and delay periods that are accurate to seconds and provides higher concurrency. We recommend that you send and receive scheduled messages and delayed messages on the cloud. For more information, see the following sections.

Prerequisites

Before you start, make sure that the following operations are performed:

Send scheduled messages and delayed messages

The following sample code provides an example on how to send scheduled messages and delayed messages by using the TCP client SDK for Java of the Community Edition:

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
    /**
    * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    */ 
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         * Create a producer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
         * If you do not want to enable the message trace feature, you can use the following method to create a producer:
         *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
        /**
         * Specify Alibaba Cloud as the access channel. If you want to use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
         */
        producer.setAccessChannel(AccessChannel.CLOUD);
        /**
         * The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80. 
         */
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try { 
                /* Specify the topic that you created in the ApsaraMQ for RocketMQ console. */
                Message msg = new Message("YOUR TOPIC",
                    /* The message tag. */
                    "YOUR MESSAGE TAG",
                    // The message body. */
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                /* The delay period after which the delayed message is sent. Unit: milliseconds. For example, if you want the delayed message to be sent after 3 seconds, specify the value as 3000. */
                long delayTime = System.currentTimeMillis()+3000;
                msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(delayTime));
      
                /**
                * The point in time at which the scheduled message is sent. For example, if you want the scheduled message to be sent at 18:45:00 on August 10, 2021, specify the value as 2021-08-10 18:45:00. 
                * The value of this parameter is in the format of yyyy-MM-dd HH:mm:ss. If you specify a time that is earlier than the current time, the message is immediately sent to the consumer. 
                * long timeStamp = newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                * msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                */
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // The logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer object. 
        // Note: This operation is optional. 
        producer.shutdown();
    }
}

Subscribe to scheduled messages and delayed messages

The following sample code provides an example on how to subscribe to scheduled messages and delayed messages by using the TCP client SDK for Java of the Community Edition:

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * Create a consumer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
         * If you do not want to enable the message trace feature, you can use the following method to create a producer:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        // The endpoint of the ApsaraMQ for RocketMQ instance. 
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        // Set the AccessChannel parameter to CLOUD. If you want to use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}