ApsaraMQ for RocketMQ allows you to send normal messages in synchronous, asynchronous, and one-way transmission modes. This topic describes the principles and scenarios of the three transmission modes and provides sample code. This topic also compares the three transmission modes.
Prerequisites
The SDK for Java is installed. For more information, see Prepare the environment.
The resources that you want to specify in the code are created in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.
The AccessKey pair of your Alibaba Cloud account is obtained. For more information, see Create an AccessKey pair.
Optional. Logging settings are configured. for more information, see Logging settings.
Synchronous transmission
How synchronous transmission works
In synchronous transmission mode, the sender sends a message only after a response for the previous message is received from the ApsaraMQ for RocketMQ broker.
Use scenarios
You can use the synchronous transmission mode in scenarios in which you want to send important notifications to emails, notification messages for registration, and promotional messages.
Sample code:
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ProducerTest { public static void main(String[] args) { Properties properties = new Properties(); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // The AccessKey ID that is used for authentication. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // The AccessKey secret that is used for authentication. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // The timeout period for sending the message. Unit: milliseconds. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // Before you send the message, call the start() method only once to start the producer. producer.start(); // Cyclically send messages. for (int i = 0; i < 100; i++){ Message msg = new Message( // The topic in which normal messages are produced. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types. "TopicTestMQ", // The message tag. A message tag is similar to a Gmail tag and is used by consumers to sort and filter messages in the ApsaraMQ for RocketMQ broker. // For information about the format and configurations of tags, see the Best practices of topics and tags topic. "TagA", // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. // The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies. "Hello MQ".getBytes()); // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console and send the message again. // Note: You can send and receive a message even if you do not specify the key. msg.setKey("ORDERID_" + i); try { SendResult sendResult = producer.send(msg); // Send the message in synchronous transmission mode. If no exception is thrown, the message is sent. if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } } // Before you exit the application, destroy the producer. // Note: If you destroy a producer, memory can be saved. If you need to frequently send messages, do not destroy a producer. producer.shutdown(); } }
Asynchronous transmission
How asynchronous transmission works
In asynchronous transmission mode, the sender sends a message without receiving a response for the previous message from the ApsaraMQ for RocketMQ broker. If you use the asynchronous transmission mode in ApsaraMQ for RocketMQ to send messages, you must write the implementation logic of the SendCallback operation. The sender sends another message immediately after it sends a message, without waiting for a response from the ApsaraMQ for RocketMQ broker. The sender calls the SendCallback operation to receive a response from the ApsaraMQ for RocketMQ broker and processes the response.
Use scenarios
This mode is used for time-consuming processes in business scenarios that are sensitive to response time. For example, after you upload a video, a callback is used to enable transcoding. After the video is transcoded, a callback is used to push transcoding results.
Sample code:
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.OnExceptionContext; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendCallback; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; import java.util.concurrent.TimeUnit; public class ProducerTest { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // The AccessKey ID that is used for authentication. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // The AccessKey secret that is used for authentication. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // The timeout period for sending the message. Unit: milliseconds. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // Before you send the message, call the start() method only once to start the producer. producer.start(); Message msg = new Message( // The topic in which normal messages are produced. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types. "TopicTestMQ", // The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker. "TagA", // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies. "Hello MQ".getBytes()); // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console and send the message again. // Note: You can send and receive a message even if you do not specify the key. msg.setKey("ORDERID_100"); // Send the message in asynchronous mode. The result is returned to the producer after the producer calls the SendCallback operation. producer.sendAsync(msg, new SendCallback() { @Override public void onSuccess(final SendResult sendResult) { // The message is sent. System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId()); } @Override public void onException(OnExceptionContext context) { // The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId()); } }); // Block the current thread for 3 seconds and wait for the result to return. TimeUnit.SECONDS.sleep(3); // Before you exit the application, destroy the producer. // Note: If you destroy a producer, memory can be saved. If you need to frequently send messages, do not destroy a producer. producer.shutdown(); } }
One-way transmission
How one-way transmission works
In one-way transmission mode, the producer only sends messages. The producer does not need to wait for responses from the ApsaraMQ for RocketMQ broker or trigger the callback function. In this mode, a message can be sent within microseconds.
Use scenarios
The one-way transmission mode is suitable for scenarios in which messages are transmitted in a short time but with low requirements on reliability. For example, this mode can be used for log collection.
Sample code:
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ProducerTest { public static void main(String[] args) { Properties properties = new Properties(); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // The AccessKey ID that is used for authentication. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // The AccessKey secret that is used for authentication. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // The timeout period for sending the message. Unit: milliseconds. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // Before you send the message, call the start() method only once to start the producer. producer.start(); // Cyclically send messages. for (int i = 0; i < 100; i++){ Message msg = new Message( // The topic in which normal messages are produced. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types. "TopicTestMQ", // Message Tag, // The message tag. A message tag is similar to a Gmail tag and is used by consumers to sort and filter messages in the ApsaraMQ for RocketMQ broker. "TagA", // Message Body // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the serialization and deserialization methods. "Hello MQ".getBytes()); // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console and send the message again. // Note: You can send and receive a message even if you do not specify the key. msg.setKey("ORDERID_" + i); // In one-way transmission mode, the producer does not wait for responses from the ApsaraMQ for RocketMQ broker. Therefore, data loss occurs if messages that fail to be sent are not retried. If data loss is not acceptable, we recommend that you use the reliable synchronous or asynchronous transmission mode. producer.sendOneway(msg); } // Before you exit the application, destroy the producer. // Note: If you destroy a producer, memory can be saved. If you need to frequently send messages, do not destroy a producer. producer.shutdown(); } }
Comparison among the three transmission modes
Transmission mode | TPS | Response | Reliability |
Synchronous | High | Yes | No message loss |
Asynchronous | High | Yes | No message loss |
One-way | Highest | No | Possible message loss |
Subscribe to normal messages
For information about the sample code for subscribing to normal messages, see Subscribe to messages.