This topic provides sample code for sending and receiving messages using the Apache RocketMQ Java SDK.
Sample code
If you use a Serverless instance, take note of the SDK version and other requirements for public network access. For more information, see SDK version requirements for public network access.
gRPC protocol SDK
For sample code for RocketMQ-Spring, see rocketmq-v5-client-spring-boot-samples.
The
rocketmq-client-javaSDK uses the gRPC protocol. The following provides sample code for this SDK.ImportantWhen you send transactional messages using the SDK for the gRPC protocol, transaction checks are delayed if you do not set a topic when you start the producer. If the message is not sent within four hours, the half-transactional message may be discarded. Therefore, you must set a topic when you start a transaction producer.
Message type
Sample code for sending messages
Sample code for subscribing to messages
Synchronous sending: ProducerNormalMessageExample.java
Asynchronous sending: AsyncProducerExample.java
Synchronous subscription: SimpleConsumerExample.java
Asynchronous subscription: AsyncSimpleConsumerExample.java
Lightweight message
None
Remoting protocol SDK
For sample code for RocketMQ-Spring, see rocketmq-spring-boot-samples.
The
rocketmq-clientSDK uses the Remoting protocol. The following provides sample code for this SDK.Normal messages
Send a normal message (synchronous)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQProducer { /** * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance. * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console. * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret. * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC. * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // When you use a public endpoint, configure the RPCHook. DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // When you use a VPC endpoint, you do not need to configure the RPCHook. // If you use a Serverless instance, you must configure the RPCHook. // DefaultMQProducer producer = new DefaultMQProducer(); // The group ID that you created in the ApsaraMQ for RocketMQ console. producer.setProducerGroup("YOUR GROUP ID"); // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code. producer.setAccessChannel(AccessChannel.CLOUD); // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces. producer.setEnableTrace(true); // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address. producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { // If the message fails to be sent, retry sending it or save the data for later processing. System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // Before the application exits, destroy the producer object. // Note: Destroying the producer object saves system memory. To send messages frequently, do not destroy the producer object. producer.shutdown(); } }Send a normal message (asynchronous)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; import java.util.concurrent.TimeUnit; public class RocketMQAsyncProducer { /** * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance. * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console. * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret. * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC. * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException, InterruptedException { // When you use a public endpoint, configure the RPCHook. DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // When you use a VPC endpoint, you do not need to configure the RPCHook. // If you use a Serverless instance, you must configure the RPCHook. // DefaultMQProducer producer = new DefaultMQProducer(); // The group ID that you created in the ApsaraMQ for RocketMQ console. producer.setProducerGroup("YOUR GROUP ID"); // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code. producer.setAccessChannel(AccessChannel.CLOUD); // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces. producer.setEnableTrace(true); // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address. producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // The message is sent. System.out.println("send message success. msgId= " + result.getMsgId()); } @Override public void onException(Throwable throwable) { // If the message fails to be sent, retry sending it or save the data for later processing. System.out.println("send message failed."); throwable.printStackTrace(); } }); } catch (Exception e) { // If the message fails to be sent, retry sending it or save the data for later processing. System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // Block the current thread for 3 seconds to wait for the asynchronous send result. TimeUnit.SECONDS.sleep(3); // Before the application exits, destroy the producer object. // Note: Destroying the producer object saves system memory. To send messages frequently, do not destroy the producer object. producer.shutdown(); } }Send a normal message (one-way)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQOnewayProducer { /** * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance. * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console. * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret. * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC. * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // When you use a public endpoint, configure the RPCHook. DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // When you use a VPC endpoint, you do not need to configure the RPCHook. // If you use a Serverless instance, you must configure the RPCHook. // DefaultMQProducer producer = new DefaultMQProducer(); // The group ID that you created in the ApsaraMQ for RocketMQ console. producer.setProducerGroup("YOUR GROUP ID"); // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code. producer.setAccessChannel(AccessChannel.CLOUD); // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces. producer.setEnableTrace(true); // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address. producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } catch (Exception e) { // If the message fails to be sent, retry sending it or save the data for later processing. System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // Before the application exits, destroy the producer object. // Note: Destroying the producer object saves system memory. To send messages frequently, do not destroy the producer object. producer.shutdown(); } }Subscribe to normal messages
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import java.util.List; public class RocketMQPushConsumer { /** * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance. * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console. * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret. * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC. * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // When you use a public endpoint, configure the RPCHook. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook()); // When you use a VPC endpoint, you do not need to configure the RPCHook. // If you use a Serverless instance, you must configure the RPCHook. // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // The group ID that you created in the ApsaraMQ for RocketMQ console. consumer.setConsumerGroup("YOUR GROUP ID"); // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code. consumer.setAccessChannel(AccessChannel.CLOUD); // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces. consumer.setEnableTrace(true); // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address. consumer.setNamesrvAddr("YOUR ACCESS POINT"); // Set this to the topic that you created in the ApsaraMQ for RocketMQ console. consumer.subscribe("YOUR TOPIC", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("Receive New Messages: %s %n", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }Ordered messages
Send an ordered message
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.List; public class RocketMQOrderProducer { /** * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance. * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console. * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret. * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC. * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // When you use a public endpoint, configure the RPCHook. DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // When you use a VPC endpoint, you do not need to configure the RPCHook. // If you use a Serverless instance, you must configure the RPCHook. // DefaultMQProducer producer = new DefaultMQProducer(); // The group ID that you created in the ApsaraMQ for RocketMQ console. producer.setProducerGroup("YOUR GROUP ID"); // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code. producer.setAccessChannel(AccessChannel.CLOUD); // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces. producer.setEnableTrace(true); // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address. producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { int orderId = i % 10; Message msg = new Message("YOUR ORDER TOPIC", "YOUR MESSAGE TAG", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // Important: You must set this configuration item to ensure that ordered messages are evenly distributed to queues. // In V5.x, you can replace the following line of code with msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + ""); msg.putUserProperty("__SHARDINGKEY", orderId + ""); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // Select a partition selection algorithm that suits your needs to ensure that the same parameter produces the same result. Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }Subscribe to ordered messages
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import java.util.List; public class RocketMQOrderConsumer { private static RPCHook getAclRPCHook() { /** * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance. * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console. * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret. * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC. * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access. */ return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // When you use a public endpoint, configure the RPCHook. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook()); // When you use a VPC endpoint, you do not need to configure the RPCHook. // If you use a Serverless instance, you must configure the RPCHook. // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // The group ID that you created in the ApsaraMQ for RocketMQ console. consumer.setConsumerGroup("YOUR GROUP ID"); // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code. consumer.setAccessChannel(AccessChannel.CLOUD); // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces. consumer.setEnableTrace(true); // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address. consumer.setNamesrvAddr("YOUR ACCESS POINT"); consumer.subscribe("YOUR ORDER TOPIC", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // If consumption fails, return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT to suspend and retry. return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }Scheduled and delayed messages
Send a scheduled or delayed message
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQDelayProducer { /** * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance. * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console. * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret. * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC. * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // When you use a public endpoint, configure the RPCHook. DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // When you use a VPC endpoint, you do not need to configure the RPCHook. // If you use a Serverless instance, you must configure the RPCHook. // DefaultMQProducer producer = new DefaultMQProducer(); // The group ID that you created in the ApsaraMQ for RocketMQ console. producer.setProducerGroup("YOUR GROUP ID"); // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code. producer.setAccessChannel(AccessChannel.CLOUD); // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces. producer.setEnableTrace(true); // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address. producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { // Set this to the topic that you created in the ApsaraMQ for RocketMQ console. Message msg = new Message("YOUR TOPIC", // Set the message tag. "YOUR MESSAGE TAG", // Message body. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // To send a delayed message, set a delay time in milliseconds (ms). The message is delivered after the specified delay. For example, the message is delivered after 3 seconds. long delayTime = System.currentTimeMillis() + 3000; msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime)); // To send a scheduled message, set a specific time for delivery. For example, the message is delivered at 18:45:00 on 2021-08-10. // The time format is yyyy-MM-dd HH:mm:ss. If the specified time is earlier than the current time, the message is immediately delivered to the consumer. // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime(); // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { // If the message fails to be sent, retry sending it or save the data for later processing. System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // Before the application exits, destroy the producer object. // Note: Destroying the producer object saves system memory. To send messages frequently, do not destroy the producer object. producer.shutdown(); } }The sample code for subscribing to scheduled and delayed messages is the same as that for normal messages.
Transactional messages
Send a transactional message
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQTransactionProducer { private static RPCHook getAclRPCHook() { /** * If you access the instance over the public network, you must configure an RPCHook with the username and password of the instance. * You can obtain the username and password on the Intelligent Identity Recognition tab of the Resource Access Management console. * Important: Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret. * If you access the instance from an Alibaba Cloud ECS instance over the internal network, you do not need to initialize the RPCHook. The server automatically obtains the information based on the VPC. * If you use a Serverless instance, you must set the username and password for public network access. If password-free access over the internal network is enabled, you do not need to set the username and password for internal network access. */ return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // When you use a public endpoint, configure the RPCHook. // The group ID that you created in the ApsaraMQ for RocketMQ console. Note: The group ID for transactional messages cannot be shared with other message types. TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook()); // When you use a VPC endpoint, you do not need to configure the RPCHook. // If you use a Serverless instance, you must configure the RPCHook. // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID"); // Set the connection type to Alibaba Cloud. This is required to use the cloud message trace feature. If you do not enable message traces, you do not need to run this code. transactionMQProducer.setAccessChannel(AccessChannel.CLOUD); // For SDKs V5.3.0 and later, you must also add the EnableTrace parameter in addition to setting AccessChannel to enable message traces. transactionMQProducer.setEnableTrace(true); // Set this to the endpoint that you obtained from the ApsaraMQ for RocketMQ console. Example: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Important: Enter the domain name and port provided in the console. Do not add the http:// or https:// prefix. Do not use a resolved IP address. transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT"); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("Start to execute the local transaction: " + msg); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("Received a transaction check request, MsgId: " + msg.getMsgId()); return LocalTransactionState.COMMIT_MESSAGE; } }); transactionMQProducer.start(); for (int i = 0; i < 10; i++) { try { Message message = new Message("YOUR TRANSACTION TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null); assert sendResult != null; } catch (Exception e) { e.printStackTrace(); } } } }The sample code for subscribing to transactional messages is the same as for normal messages.
Serverless Instance Public Network Access Version Guide
To access a Serverless instance of ApsaraMQ for RocketMQ over the public network, your SDK must meet the following version requirements. You must also add the specified code to your application.
Replace InstanceId with your actual instance ID.
SDK version: rocketmq-client ≥ 5.2.0
To send messages, add the following code:
producer.setNamespaceV2("InstanceId");To consume messages, add the following code:
consumer.setNamespaceV2("InstanceId");SDK version: rocketmq-client-java ≥ 5.0.6
To send and consume messages, add the following code:
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace("InstanceId") .setCredentialProvider(sessionCredentialsProvider) .build();