All Products
Search
Document Center

ApsaraMQ for RocketMQ:Sample code

Last Updated:Oct 27, 2025

This topic provides sample code for sending and receiving messages using the Apache RocketMQ Java SDK.

Sample code

Important

If you use a Serverless instance, take note of the SDK version and other requirements for public network access. For more information, see SDK version requirements for public network access.

gRPC protocol SDK

Remoting protocol SDK

  • For sample code for RocketMQ-Spring, see rocketmq-spring-boot-samples.

  • The rocketmq-client SDK uses the Remoting protocol. The following provides sample code for this SDK.

    Normal messages

    Send a normal message (synchronous)

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    
    public class RocketMQProducer {
        /**
         * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance.
         * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console.
         * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret.
         * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC.
         * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access.
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // When you use a public endpoint, configure the RPCHook.
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // When you use a VPC endpoint, you do not need to configure the RPCHook.
            // If you use a Serverless instance, you must configure the RPCHook.
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            // The group ID that you created in the ApsaraMQ for RocketMQ console.
            producer.setProducerGroup("YOUR GROUP ID");
    
            // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code.
            producer.setAccessChannel(AccessChannel.CLOUD);
            // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces.
            producer.setEnableTrace(true);
    
            // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080.
            // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address.
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    // If the message fails to be sent, retry sending it or save the data for later processing.
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // Before the application exits, destroy the producer object.
            // Note: Destroying the producer object saves system memory. To send messages frequently, do not destroy the producer object.
            producer.shutdown();
        }
    }

    Send a normal message (asynchronous)

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    public class RocketMQAsyncProducer {
        /**
         * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance.
         * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console.
         * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret.
         * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC.
         * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access.
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException, InterruptedException {
            // When you use a public endpoint, configure the RPCHook.
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // When you use a VPC endpoint, you do not need to configure the RPCHook.
            // If you use a Serverless instance, you must configure the RPCHook.
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            // The group ID that you created in the ApsaraMQ for RocketMQ console.
            producer.setProducerGroup("YOUR GROUP ID");
    
            // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code.
            producer.setAccessChannel(AccessChannel.CLOUD);
            // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces.
            producer.setEnableTrace(true);
    
            // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080.
            // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address.
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult result) {
                            // The message is sent.
                            System.out.println("send message success. msgId= " + result.getMsgId());
                        }
    
                        @Override
                        public void onException(Throwable throwable) {
                            // If the message fails to be sent, retry sending it or save the data for later processing.
                            System.out.println("send message failed.");
                            throwable.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    // If the message fails to be sent, retry sending it or save the data for later processing.
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
            // Block the current thread for 3 seconds to wait for the asynchronous send result.
            TimeUnit.SECONDS.sleep(3);
    
            // Before the application exits, destroy the producer object.
            // Note: Destroying the producer object saves system memory. To send messages frequently, do not destroy the producer object.
            producer.shutdown();
        }
    }

    Send a normal message (one-way)

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    
    public class RocketMQOnewayProducer {
        /**
         * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance.
         * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console.
         * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret.
         * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC.
         * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access.
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // When you use a public endpoint, configure the RPCHook.
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // When you use a VPC endpoint, you do not need to configure the RPCHook.
            // If you use a Serverless instance, you must configure the RPCHook.
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            // The group ID that you created in the ApsaraMQ for RocketMQ console.
            producer.setProducerGroup("YOUR GROUP ID");
    
            // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code.
            producer.setAccessChannel(AccessChannel.CLOUD);
            // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces.
            producer.setEnableTrace(true);
    
            // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080.
            // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address.
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.sendOneway(msg);
                } catch (Exception e) {
                    // If the message fails to be sent, retry sending it or save the data for later processing.
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // Before the application exits, destroy the producer object.
            // Note: Destroying the producer object saves system memory. To send messages frequently, do not destroy the producer object.
            producer.shutdown();
        }
    }

    Subscribe to normal messages

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.RPCHook;
    
    import java.util.List;
    
    public class RocketMQPushConsumer {
        /**
         * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance.
         * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console.
         * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret.
         * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC.
         * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access.
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // When you use a public endpoint, configure the RPCHook.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
            // When you use a VPC endpoint, you do not need to configure the RPCHook.
            // If you use a Serverless instance, you must configure the RPCHook.
            // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
    
            // The group ID that you created in the ApsaraMQ for RocketMQ console.
            consumer.setConsumerGroup("YOUR GROUP ID");
    
            // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code.
            consumer.setAccessChannel(AccessChannel.CLOUD);
            // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces.
            consumer.setEnableTrace(true);
    
            // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080.
            // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address.
            consumer.setNamesrvAddr("YOUR ACCESS POINT");
            // Set this to the topic that you created in the ApsaraMQ for RocketMQ console.
            consumer.subscribe("YOUR TOPIC", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("Receive New Messages: %s %n", msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    }

    Ordered messages

    Send an ordered message

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.List;
    
    public class RocketMQOrderProducer {
        /**
         * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance.
         * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console.
         * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret.
         * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC.
         * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access.
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // When you use a public endpoint, configure the RPCHook.
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // When you use a VPC endpoint, you do not need to configure the RPCHook.
            // If you use a Serverless instance, you must configure the RPCHook.
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            // The group ID that you created in the ApsaraMQ for RocketMQ console.
            producer.setProducerGroup("YOUR GROUP ID");
    
            // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code.
            producer.setAccessChannel(AccessChannel.CLOUD);
            // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces.
            producer.setEnableTrace(true);
    
            // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080.
            // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address.
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    int orderId = i % 10;
                    Message msg = new Message("YOUR ORDER TOPIC",
                            "YOUR MESSAGE TAG",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // Important: You must set this configuration item to ensure that ordered messages are evenly distributed to queues.
                    // In V5.x, you can replace the following line of code with msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                    msg.putUserProperty("__SHARDINGKEY", orderId + "");
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            // Select a partition selection algorithm that suits your needs to ensure that the same parameter produces the same result.
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
    
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            producer.shutdown();
        }
    }

    Subscribe to ordered messages

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.RPCHook;
    
    import java.util.List;
    
    public class RocketMQOrderConsumer {
        private static RPCHook getAclRPCHook() {
            /**
             * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance.
             * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console.
             * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret.
             * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC.
             * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access.
             */
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // When you use a public endpoint, configure the RPCHook.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
            // When you use a VPC endpoint, you do not need to configure the RPCHook.
            // If you use a Serverless instance, you must configure the RPCHook.
            // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
    
            // The group ID that you created in the ApsaraMQ for RocketMQ console.
            consumer.setConsumerGroup("YOUR GROUP ID");
    
            // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code.
            consumer.setAccessChannel(AccessChannel.CLOUD);
            // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces.
            consumer.setEnableTrace(true);
    
            // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080.
            // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address.
            consumer.setNamesrvAddr("YOUR ACCESS POINT");
            consumer.subscribe("YOUR ORDER TOPIC", "*");
    
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(new MessageListenerOrderly() {
    
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    context.setAutoCommit(true);
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    // If consumption fails, return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT to suspend and retry.
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }

    Scheduled and delayed messages

    Send a scheduled or delayed message

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    
    public class RocketMQDelayProducer {
        /**
         * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance.
         * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console.
         * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret.
         * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC.
         * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access.
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // When you use a public endpoint, configure the RPCHook.
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // When you use a VPC endpoint, you do not need to configure the RPCHook.
            // If you use a Serverless instance, you must configure the RPCHook.
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            // The group ID that you created in the ApsaraMQ for RocketMQ console.
            producer.setProducerGroup("YOUR GROUP ID");
    
            // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code.
            producer.setAccessChannel(AccessChannel.CLOUD);
            // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces.
            producer.setEnableTrace(true);
    
            // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080.
            // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address.
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    // Set this to the topic that you created in the ApsaraMQ for RocketMQ console.
                    Message msg = new Message("YOUR TOPIC",
                            // Set the message tag.
                            "YOUR MESSAGE TAG",
                            // Message body.
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // To send a delayed message, set a delay time in milliseconds (ms). The message is delivered after the specified delay. For example, the message is delivered after 3 seconds.
                    long delayTime = System.currentTimeMillis() + 3000;
                    msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
    
                    // To send a scheduled message, set a specific time for delivery. For example, the message is delivered at 18:45:00 on 2021-08-10.
                    // The time format is yyyy-MM-dd HH:mm:ss. If the specified time is earlier than the current time, the message is immediately delivered to the consumer.
                    // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                    // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    // If the message fails to be sent, retry sending it or save the data for later processing.
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // Before the application exits, destroy the producer object.
            // Note: Destroying the producer object saves system memory. To send messages frequently, do not destroy the producer object.
            producer.shutdown();
        }
    }

    The sample code for subscribing to scheduled and delayed messages is the same as that for normal messages.

    Transactional messages

    Send a transactional message

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
             * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance.
             * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console.
             * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret.
             * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC.
             * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access.
             */
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // When you use a public endpoint, configure the RPCHook.
            // The group ID that you created in the ApsaraMQ for RocketMQ console. Note: The group ID for transactional messages cannot be shared with other message types.
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
            // When you use a VPC endpoint, you do not need to configure the RPCHook.
            // If you use a Serverless instance, you must configure the RPCHook.
            // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID");
    
            // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code.
            transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
            // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces.
            transactionMQProducer.setEnableTrace(true);
    
            // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080.
            // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address.
            transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
            transactionMQProducer.setTransactionListener(new TransactionListener() {
    
                @Override
                public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    System.out.println("Start to execute the local transaction: " + msg);
                    return LocalTransactionState.UNKNOW;
                }
    
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    System.out.println("Received a transaction check request, MsgId: " + msg.getMsgId());
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    The sample code for subscribing to transactional messages is the same as for normal messages.

Serverless Instance Public Network Access Version Guide

To access a Serverless instance of ApsaraMQ for RocketMQ over the public network, your SDK must meet the following version requirements. You must also add the specified code to your application.

Note

Replace InstanceId with your actual instance ID.

  • SDK version: rocketmq-client ≥ 5.2.0

    To send messages, add the following code: producer.setNamespaceV2("InstanceId");

    To consume messages, add the following code: consumer.setNamespaceV2("InstanceId");

  • SDK version: rocketmq-client-java ≥ 5.0.6

    To send and consume messages, add the following code:

    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .setNamespace("InstanceId")
    .setCredentialProvider(sessionCredentialsProvider)
    .build();