All Products
Search
Document Center

ApsaraMQ for RocketMQ:Three modes to send normal messages

Last Updated:Dec 13, 2023

ApsaraMQ for RocketMQ allows you to send normal messages in synchronous, asynchronous, and one-way transmission modes. This topic describes the principles and scenarios of the three transmission modes and provides sample code. This topic also compares the three transmission modes.

Prerequisites

Before you start, make sure that the following operations are performed:

Synchronous transmission

  • How synchronous transmission works

    In synchronous transmission mode, the sender sends a message only after a response for the previous message is received from the ApsaraMQ for RocketMQ broker.同步发送

  • Use scenarios

    You can use the synchronous transmission mode in scenarios in which you want to send important notifications to emails, notification messages for registration, and promotional messages.

  • Sample code

    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQProducer {
        /**
        * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
        * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        */
        private static RPCHook getAclRPCHook() {
    	  return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * Create a producer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
             * If you do not want to enable the message trace feature, you can use the following method to create a producer:
             *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * Specify Alibaba Cloud as the access channel. If you want to use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80. 
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    // Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // Before you exit the application, shut down the producer object. 
            // Note: This operation is optional. 
            producer.shutdown();
        }
    }

Asynchronous transmission

  • How asynchronous transmission works

    In asynchronous transmission mode, the sender sends a message without receiving a response for the previous message from the ApsaraMQ for RocketMQ broker. If you use the asynchronous transmission mode in ApsaraMQ for RocketMQ to send messages, you must write the implementation logic of the SendCallback operation. The sender sends another message immediately after it sends a message, without waiting for a response from the ApsaraMQ for RocketMQ broker. The sender calls the SendCallback operation to receive a response from the ApsaraMQ for RocketMQ broker and processes the response.

    异步发送

  • Use scenarios

    This mode is used for time-consuming processes in business scenarios that are sensitive to response time. For example, after you upload a video, a callback is used to enable transcoding. After the video is transcoded, a callback is used to push transcoding results.

  • Sample code

    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQAsyncProducer {
        /**
        * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
        * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * Create a producer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
             * If you do not want to enable the message trace feature, you can use the following method to create a producer:
             * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * Specify Alibaba Cloud as the access channel. Before you use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80. 
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override public void onSuccess(SendResult result) {
                            // The message is sent to the consumer. 
                            System.out.println("send message success. msgId= " + result.getMsgId());
                        }
    
                        @Override public void onException(Throwable throwable) {
                            // Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
                            System.out.println("send message failed.");
                            throwable.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    // Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // Before you exit the application, shut down the producer object. 
            // Note: This operation is optional. 
            producer.shutdown();
        }
    }

One-way transmission

  • How one-way transmission works

    In one-way transmission mode, the producer only sends messages. The producer does not need to wait for responses from the ApsaraMQ for RocketMQ broker or trigger the callback function. In this mode, a message can be sent within microseconds.

    单向发送

  • Use scenarios

    The one-way transmission mode is suitable for scenarios in which messages are transmitted in a short time but with low requirements on reliability. For example, this mode can be used for log collection.

  • Sample code

    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQOnewayProducer {
        /**
        * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
        * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * Create a producer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
             * If you do not want to enable the message trace feature, you can use the following method to create a producer:
             * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * Specify Alibaba Cloud as the access channel. Before you use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80. 
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.sendOneway(msg);
                } catch (Exception e) {
                    // Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // Before you exit the application, shut down the producer object. 
            // Note: This operation is optional. 
            producer.shutdown();
        }
    }

Comparison among the three transmission modes

Transmission mode

TPS

Response

Reliability

Synchronous

High

Yes

No message loss

Asynchronous

High

Yes

No message loss

One-way

Highest

No

Possible message loss

Subscribe to normal messages

You can use only the following method to subscribe to normal messages. Sample code:

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * Create a consumer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
         * If you do not want to enable the message trace feature, you can use the following method to create a producer:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        // The endpoint of the ApsaraMQ for RocketMQ instance. 
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        // Set the AccessChannel parameter to CLOUD. If you want to use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}