All Products
Search
Document Center

ApsaraMQ for RocketMQ:Sample code

Last Updated:Aug 15, 2024

ApsaraMQ for RocketMQ 5.x instances are compatible with clients that use RocketMQ 3.x or 4.x SDKs. You can use these SDKs to connect to ApsaraMQ for RocketMQ 5.x instances to send and receive messages. This topic provides sample code that is used to send and receive messages by using RocketMQ 3.x or 4.x SDK for Java.

Important
  • We recommend that you use the latest RocketMQ 5.x SDKs. These SDKs are fully compatible with ApsaraMQ for RocketMQ 5.x brokers and provide more functions and enhanced features. For more information, see Release notes.

  • Alibaba Cloud only maintains RocketMQ 3.x, 4.x, and TCP client SDKs. We recommend that you use them only for existing business.

Send and receive normal messages

Send normal messages in synchronous transmission mode

import java.util.Date;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the virtual private cloud (VPC) information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // The logic to resend or persist a message if the message fails to be sent and needs to be re-sent. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

Send normal messages in asynchronous transmission mode

import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQAsyncProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException, InterruptedException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult result) {
                        // The message is sent. 
                        System.out.println("send message success. msgId= " + result.getMsgId());
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        // The logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                        System.out.println("send message failed.");
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                // The logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }
        // Block the current thread for 3 seconds and wait for the result to return. 
        TimeUnit.SECONDS.sleep(3);

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

Send normal messages in one-way transmission mode

import java.util.Date;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOnewayProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.sendOneway(msg);
            } catch (Exception e) {
                // The logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

Subscribe to normal messages

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQPushConsumer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        consumer.setConsumerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
        consumer.subscribe("YOUR TOPIC", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

Send and receive ordered messages

Send ordered messages

import java.util.List;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOrderProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                int orderId = i % 10;
                Message msg = new Message("YOUR ORDER TOPIC",
                        "YOUR MESSAGE TAG",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // Note: This parameter is required. Ordered messages can be evenly distributed to each queue only after this parameter is configured. 
                // If you use an ApsaraMQ for RocketMQ 5.x instance, you can replace the following line with msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                msg.putUserProperty("__SHARDINGKEY", orderId + "");
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // Select a partition selection algorithm that is suitable for your business. The algorithm can be used to ensure that the results of the same parameter are the same. 
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

Subscribe to ordered messages

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQOrderConsumer {
    private static RPCHook getAclRPCHook() {
        /**
         * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        consumer.setConsumerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        consumer.subscribe("YOUR ORDER TOPIC", "*");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // If consumption fails, the request is suspended and retried. The following value is returned: ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT. 
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

Send and receive scheduled or delayed messages

Send scheduled or delayed messages

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQDelayProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                // The topic that you created in the ApsaraMQ for RocketMQ console. 
                Message msg = new Message("YOUR TOPIC",
                        // The message tag. 
                        "YOUR MESSAGE TAG",
                        // The message content. 
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // The delay time for sending the delayed message. Unit: milliseconds. After you configure this parameter, the message is sent after the specified period of time. For example, if you set this parameter to 3000, the message is sent after 3 seconds. 
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                // The scheduled time for sending the scheduled message. After you configure this parameter, the message is sent at the specified point in time. For example, if you set this parameter to 2021-08-10 18:45:00, the message is sent at 18:45:00 on August 10, 2021. 
                // Specify the time in the yyyy-MM-dd HH:mm:ss format. If you specify a time that is earlier than the current time, the message is immediately sent to the consumer. 
                // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // The logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

Subscribe to scheduled or delayed messages

The sample code that is used to subscribe to scheduled or delayed messages are the same as the sample code that is used to subscribe to normal messages. For more information, see Subscribe to normal messages.

Send and receive transactional messages

Send transactional messages

  1. The following sample code provides an example on how to send a half message and execute a local transaction:

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
             * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
             * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
             * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
             * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
             */
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
            // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. Note: Transactional messages cannot share a group with messages of other types. 
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
            // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
            // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
            // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID");
    
            // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
            transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
    
            // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
            // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
            transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
            transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                        @Override
                        public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                            System.out.println("Start executing the local transaction: " + msg);
                            return LocalTransactionState.UNKNOW;
                        }
                    }, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
  2. The following sample code shows how to commit the status of a transactional message:

    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionCheckListener;
    import org.apache.rocketmq.common.message.MessageExt;
    
    // The class that is used to check the status of local transactions implemented by sending ApsaraMQ for RocketMQ transactional messages. 
    public class LocalTransactionCheckerImpl implements TransactionCheckListener {
        // The local transaction checker. For more information, see Transactional messages. 
        @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("The request to check the transaction status of the message is received. MsgId: " + msg.getMsgId());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }

Subscribe to transactional messages

The sample code that is used to subscribe to transactional messages is the same as the sample code that is used to subscribe to normal messages. For more information, see Send and receive normal messages.