This topic describes the sample code for sending and receiving messages by using Apache RocketMQ SDK for Java.
Sample code
If you use a serverless ApsaraMQ for RocketMQ instance, take note of the version of the SDK for Java when you access the instance over the Internet. Only the SDK for Java of specific versions can access serverless ApsaraMQ for RocketMQ instances over the Internet. For more information, see Version description for accessing serverless instances over the Internet.
rocketmq-client-java
Message type | Sample code for sending messages | Sample code for receiving messages | |
|
| ||
rocketmq-client
Normal messages
Send normal messages in synchronous transmission mode
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Date;
public class RocketMQProducer {
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the virtual private cloud (VPC) information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook.
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook.
// If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook.
// DefaultMQProducer producer = new DefaultMQProducer();
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
producer.setProducerGroup("YOUR GROUP ID");
// Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty.
producer.setAccessChannel(AccessChannel.CLOUD);
// If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
producer.setEnableTrace(true);
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
// The logic to resend or persist a message if the message fails to be sent and needs to be re-sent.
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
Send normal messages in asynchronous transmission mode
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class RocketMQAsyncProducer {
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException, InterruptedException {
// If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook.
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook.
// If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook.
// DefaultMQProducer producer = new DefaultMQProducer();
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
producer.setProducerGroup("YOUR GROUP ID");
// Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty.
producer.setAccessChannel(AccessChannel.CLOUD);
// If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
producer.setEnableTrace(true);
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
// The message is sent.
System.out.println("send message success. msgId= " + result.getMsgId());
}
@Override
public void onException(Throwable throwable) {
// Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again.
System.out.println("send message failed.");
throwable.printStackTrace();
}
});
} catch (Exception e) {
// Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again.
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// Block the current thread for 3 seconds and wait for the result to return.
TimeUnit.SECONDS.sleep(3);
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
Send normal messages in one-way transmission mode
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Date;
public class RocketMQOnewayProducer {
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook.
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook.
// If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook.
// DefaultMQProducer producer = new DefaultMQProducer();
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
producer.setProducerGroup("YOUR GROUP ID");
// Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty.
producer.setAccessChannel(AccessChannel.CLOUD);
// If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
producer.setEnableTrace(true);
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
} catch (Exception e) {
// Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again.
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
Subscribe to normal messages
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class RocketMQPushConsumer {
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
// If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook.
// If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook.
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
consumer.setConsumerGroup("YOUR GROUP ID");
// Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty.
consumer.setAccessChannel(AccessChannel.CLOUD);
// If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
consumer.setEnableTrace(true);
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
consumer.setNamesrvAddr("YOUR ACCESS POINT");
// The topic that you created in the ApsaraMQ for RocketMQ console.
consumer.subscribe("YOUR TOPIC", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
Ordered messages
Send ordered messages
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class RocketMQOrderProducer {
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook.
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook.
// If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook.
// DefaultMQProducer producer = new DefaultMQProducer();
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
producer.setProducerGroup("YOUR GROUP ID");
// Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty.
producer.setAccessChannel(AccessChannel.CLOUD);
// If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
producer.setEnableTrace(true);
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
int orderId = i % 10;
Message msg = new Message("YOUR ORDER TOPIC",
"YOUR MESSAGE TAG",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// Note: This parameter is required. Ordered messages can be evenly distributed to each queue only after this parameter is configured.
// If you use an ApsaraMQ for RocketMQ 5.x instance, you can replace the following line with msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
msg.putUserProperty("__SHARDINGKEY", orderId + "");
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// Select a partition selection algorithm that is suitable for your business. The algorithm can be used to ensure that the results of the same parameter are the same.
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
Subscribe to ordered messages
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class RocketMQOrderConsumer {
private static RPCHook getAclRPCHook() {
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
// If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook.
// If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook.
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
consumer.setConsumerGroup("YOUR GROUP ID");
// Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty.
consumer.setAccessChannel(AccessChannel.CLOUD);
// If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
consumer.setEnableTrace(true);
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
consumer.setNamesrvAddr("YOUR ACCESS POINT");
consumer.subscribe("YOUR ORDER TOPIC", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// If consumption fails, the request is suspended and retried. The following value is returned: ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT.
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
Scheduled and delayed messages
Send scheduled or delayed messages
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Date;
public class RocketMQDelayProducer {
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook.
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook.
// If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook.
// DefaultMQProducer producer = new DefaultMQProducer();
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
producer.setProducerGroup("YOUR GROUP ID");
// Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty.
producer.setAccessChannel(AccessChannel.CLOUD);
// If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
producer.setEnableTrace(true);
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
// The topic that you created in the ApsaraMQ for RocketMQ console.
Message msg = new Message("YOUR TOPIC",
// The message tag.
"YOUR MESSAGE TAG",
// The message content.
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// The delay time for sending the delayed message. Unit: milliseconds. After you configure this parameter, the message is sent after the specified period of time. For example, if you set this parameter to 3000, the message is sent after 3 seconds.
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
// The scheduled time for sending the scheduled message. After you configure this parameter, the message is sent at the specified point in time. For example, if you set this parameter to 2021-08-10 18:45:00, the message is sent at 18:45:00 on August 10, 2021.
// Specify the time in the yyyy-MM-dd HH:mm:ss format. If you specify a time that is earlier than the current time, the message is immediately sent to the consumer.
// longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
// msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
// Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again.
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
The sample code that is used to subscribe to scheduled or delayed messages are the same as the sample code that is used to subscribe to normal messages.
Transactional messages
Send transactional messages
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQTransactionProducer {
private static RPCHook getAclRPCHook() {
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook.
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. Note: Transactional messages cannot share a group with messages of other types.
TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
// If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook.
// If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook.
// TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID");
// Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty.
transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
// If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
transactionMQProducer.setEnableTrace(true);
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
transactionMQProducer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("Start executing the local transaction: " + msg);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("The request to check the transaction status of the message is received. MsgId: " + msg.getMsgId());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
transactionMQProducer.start();
for (int i = 0; i < 10; i++) {
try {
Message message = new Message("YOUR TRANSACTION TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
assert sendResult != null;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
The sample code that is used to subscribe to transactional messages is the same as the sample code that is used to subscribe to normal messages.
Version description for accessing serverless instances over the Internet
If you access a serverless ApsaraMQ for RocketMQ instance over the Internet to send and receive messages, you must make sure that the version of the SDK for Java meets the following conditions and add the following information in the code:
Replace InstanceId
with the ID of your ApsaraMQ for RocketMQ instance.
rocketmq-client: version 5.2.0 or later
Add the following information in the code when you send messages:
producer.setNamespaceV2("InstanceId");
Add the following information in the code when you receive messages:
consumer.setNamespaceV2("InstanceId");
rocketmq-client-java: version 5.0.6 or later
Add the following information in the code when you send and receive messages:
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace("InstanceId") .setCredentialProvider(sessionCredentialsProvider) .build();