ApsaraMQ for RocketMQ 5.x instances are compatible with clients that use RocketMQ 1.x TCP client SDK for Java. You can use the SDK to connect to an ApsaraMQ for RocketMQ 5.x instance to send and receive messages. This topic provides the sample code that is used to send and receive messages by using RocketMQ 1.x TCP client SDK for Java.
We recommend that you use the latest RocketMQ 5.x SDKs. These SDKs are fully compatible with ApsaraMQ for RocketMQ 5.x brokers and provide more functions and enhanced features. For more information, see Release notes.
Alibaba Cloud only maintains RocketMQ 3.x, 4.x, and TCP client SDKs. We recommend that you use them only for existing business.
Version description for accessing serverless instances over the Internet
If you access a serverless ApsaraMQ for RocketMQ instance over the Internet to send and receive messages, you must make sure that the version of RocketMQ 1.x TCP client SDK for Java is 1.9.0.Final or later and add the following information in the code:
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");
Replace InstanceId
with the ID of your ApsaraMQ for RocketMQ instance.
Send and receive normal messages
Send normal messages in synchronous transmission mode
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Date;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the virtual private cloud (VPC) information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The timeout period for sending the message. Unit: milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
// Cyclically send the message.
for (int i = 0; i < 100; i++){
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
// The topic to which the normal message belongs. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types.
"TopicTestMQ",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
// For information about the format of a tag and how to specify a tag, see Message filtering.
"TagA",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies.
// The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
"Hello MQ".getBytes());
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can be sent and received even if you do not specify the message key.
msg.setKey("ORDERID_" + i);
try {
SendResult sendResult = producer.send(msg);
// Send the message in synchronous transmission mode. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
Send normal messages in asynchronous transmission mode
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class ProducerTest {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The timeout period for sending the message. Unit: milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
// The topic to which the normal message belongs. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types.
"TopicTestMQ",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. A message body is in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the serialization and deserialization methods.
"Hello MQ".getBytes());
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible. If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console and send the message again.
// Note: Messages can be sent and received even if you do not specify the message key.
msg.setKey("ORDERID_100");
// Send the message in asynchronous transmission mode. The result is returned to the producer after the producer calls the SendCallback operation.
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// The message is sent.
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// Block the current thread for 3 seconds and wait for the asynchronous result to return.
TimeUnit.SECONDS.sleep(3);
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
Send normal messages in one-way transmission mode
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The timeout period for sending the message. Unit: milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
// Cyclically send the message.
for (int i = 0; i < 100; i++){
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
// The topic to which the normal message belongs. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types.
"TopicTestMQ",
// Message Tag,
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// Message Body
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
"Hello MQ".getBytes());
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can be sent and received even if you do not specify the message key.
msg.setKey("ORDERID_" + i);
// In one-way transmission mode, the producer does not wait for responses from the ApsaraMQ for RocketMQ broker. Therefore, data loss occurs if messages that fail to be sent are not retried. If data loss is not acceptable, we recommend that you use the reliable synchronous or asynchronous transmission mode.
producer.sendOneway(msg);
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
Send normal messages by using multiple threads
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Date;
import java.util.Properties;
public class SharedProducer {
public static void main(String[] args) {
// Initialize the producer configurations.
Properties properties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The timeout period for sending the message. Unit: milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// Before you send the message, call the start() method only once to start the producer.
Producer producer = ONSFactory.createProducer(properties);
producer.start();
// The created producer and consumer are thread-safe and can be shared among threads. Do not create a producer instance or consumer instance for each thread.
// Two threads share the producer and concurrently send the message to ApsaraMQ for RocketMQ.
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
// The topic to which the normal message belongs. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types.
"TopicTestMQ",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies.
// The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
"Hello MQ".getBytes());
try {
SendResult sendResult = producer.send(msg);
// Send the message in synchronous transmission mode. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
try {
SendResult sendResult = producer.send(msg);
// Send the message in synchronous transmission mode. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
});
anotherThread.start();
// (Optional) If you no longer require the producer instance, shut down the producer and release the allocated resources.
// producer.shutdown();
}
}
Subscribe to normal messages in push mode
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// The clustering consumption mode. This is the default mode.
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// The broadcasting consumption mode.
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribe to multiple tags.
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
// Subscribe to another topic. To unsubscribe from a topic, delete the code for subscription and restart the consumer.
consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribe to all tags.
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
Batch subscribe to normal messages in push mode
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
public class SimpleBatchConsumer {
public static void main(String[] args) {
Properties consumerProperties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
consumerProperties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
consumerProperties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
consumerProperties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
consumerProperties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// The maximum number of messages to be consumed at a time. In this example, the value is specified as 128. If the number of messages cached in the specified topic reaches this value, the SDK immediately calls the callback method. This way, the consumer can consume the messages. Valid values: 1 to 1024. Default value: 32.
consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
// The maximum wait time between two consecutive batches. In this example, the value is specified as 10 seconds. If the specified wait time is reached, the SDK immediately calls the callback method. This way, the consumer can consume the messages. Valid values: 0 to 450. Default value: 0. Unit: seconds.
consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
batchConsumer.subscribe("TopicTestMQ", "TagA", new BatchMessageListener() {
@Override
public Action consume(final List<Message> messages, ConsumeContext context) {
System.out.printf("Batch-size: %d\n", messages.size());
// Process multiple messages at a time.
return Action.CommitMessage;
}
});
// Start the batch consumer.
batchConsumer.start();
System.out.println("Consumer start success.");
// Wait a specific period of time to prevent the process from exiting.
try {
Thread.sleep(200000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Subscribe to normal messages in pull mode
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class PullConsumerClient {
public static void main(String[] args){
Properties properties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
PullConsumer consumer = ONSFactory.createPullConsumer(properties);
// Start the consumer.
consumer.start();
// Query all partitions in topic-xxx.
Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
// The partition from which you want to pull messages.
consumer.assign(topicPartitions);
while (true) {
// The timeout period for pulling messages. In this example, the value is specified as 3000 milliseconds.
List<Message> messages = consumer.poll(3000);
System.out.printf("Received message: %s %n", messages);
}
}
}
Send and receive ordered messages
Send ordered messages
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import java.util.Date;
import java.util.Properties;
public class ProducerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
OrderProducer producer = ONSFactory.createOrderProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
"Order_global_topic",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
"send order global msg".getBytes()
);
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can be sent and received even if you do not specify the message key.
msg.setKey(orderId);
// The key field that is used in the ordered message to identify the partition. The sharding key is different from the key of a normal message.
// This field can be set to a non-empty string for globally ordered messages.
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = producer.send(msg, shardingKey);
// Send the message. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
Subscribe to ordered messages
package com.aliyun.openservices.ons.example.order;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import java.util.Properties;
public class ConsumerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// The wait time in milliseconds before a retry is performed on the ordered message if the message fails to be consumed. Valid values: 10 to 30000.
properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
// The maximum number of retries that can be performed on the message if the message fails to be consumed.
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
// Before you send the message, call the start() method only once to start the producer.
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
consumer.subscribe(
// The topic that you created in the ApsaraMQ for RocketMQ console.
"Order_global_topic",
// Subscribe to messages that contain the specified tags in the specified topic.
// 1. An asterisk (*) specifies that the consumer subscribes to all messages.
// 2. TagA || TagB || TagC specifies that the consumer subscribes to messages that contain Tag A, Tag B, or Tag C.
"*",
new MessageOrderListener() {
/**
* 1 If the message fails to be consumed or an exception occurs during message processing, OrderAction.Suspend is returned.
* 2. If the message is processed, OrderAction.Success is returned.
*/
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
System.out.println(message);
return OrderAction.Success;
}
});
consumer.start();
}
}
Send and receive scheduled messages
Send scheduled messages
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
Producer producer = ONSFactory.createProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
"Topic",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"tag",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
"Hello MQ".getBytes());
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can be sent and received even if you do not specify the message key.
msg.setKey("ORDERID_100");
try {
// The timestamp that indicates when the ApsaraMQ for RocketMQ broker delivers the message to the consumer. Unit: milliseconds. For example, if you set this parameter to 2016-03-07 16:21:00, the broker delivers the message at 16:21:00 on March 7, 2016. The value must be later than the current time. If you set this parameter to a time that is earlier than the current time, the message is immediately delivered to the consumer.
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// Send the message. If no exception is thrown, the message is sent.
SendResult sendResult = producer.send(msg);
System.out.println("Message Id:" + sendResult.getMessageId());
}
catch (Exception e) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
Subscribe to scheduled messages
The sample code that is used to subscribe to scheduled messages is the same as the sample code that is used to subscribe to normal messages. For more information, see Send and receive normal messages.
Send and receive delayed messages
Send delayed messages
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Date;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
"Topic",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"tag",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
"Hello MQ".getBytes());
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query and resend the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can be sent and received even if you do not specify the message key.
msg.setKey("ORDERID_100");
try {
// The delay time before the message is sent. The value must be later than the current time. Maximum value: 3456000000 (40 days). Unit: milliseconds.
// In the following example, the message is sent after a delay of 3 seconds.
long delayTime = System.currentTimeMillis() + 3000;
// The point in time when the ApsaraMQ for RocketMQ broker starts to deliver the delayed message.
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// Send the message in synchronous transmission mode. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
Subscribe to delayed messages
The sample code that is used to subscribe to delayed messages is the same as the sample code that is used to subscribe to normal messages. For more information, see Send and receive normal messages.
Send and receive transactional messages
Send transactional messages
package com.aliyun.openservices.tcp.example.producer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Date;
import java.util.Properties;
public class SimpleTransactionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. Note: Transactional messages cannot share a group with messages of other types.
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// Before you initialize the producer, you must register a checker to check the status of the local transaction.
LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
transactionProducer.start();
Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());
for (int i = 0; i < 3; i++) {
try{
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("Execute the local transaction and commit the transaction status.");
return TransactionStatus.CommitTransaction;
}
}, null);
assert sendResult != null;
}catch (ONSClientException e){
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
System.out.println("Send transaction message success.");
}
}
// The local transaction checker.
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("The request to check the transaction status of the message is received. MsgId: " + msg.getMsgID());
return TransactionStatus.CommitTransaction;
}
}
Subscribe to transactional messages
The sample code that is used to subscribe to transactional messages is the same as the sample code that is used to subscribe to normal messages. For more information, see Send and receive normal messages.