ApsaraMQ for RocketMQ provides SDKs for multiple programming languages to send and receive messages of different types. This topic describes how to use the SDK for Java to connect to an ApsaraMQ for RocketMQ broker to send and receive normal messages.
Prerequisites
The required resources are created in ApsaraMQ for RocketMQ. For more information, see Step 2: Create resources.
IntelliJ IDEA is installed. For more information, see IntelliJ IDEA.
You can use IntelliJ IDEA or Eclipse. In the examples of this topic, IntelliJ IDEA Ultimate is used.
JDK 1.8 or later is installed. For more information, see Java Downloads.
Maven 2.5 or later is installed. For more information, see Downloading Apache Maven.
Install the Java dependency library
Create a Java project in IntelliJ IDEA.
Add the following dependency to the pom.xml file to import the Java dependency library:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.7</version> </dependency>
ImportantIf you use a serverless ApsaraMQ for RocketMQ instance, take note of the version of the SDK for Java when you access the instance over the Internet. Only the SDK for Java of specific versions can access serverless ApsaraMQ for RocketMQ instances over the Internet. For more information, see Version description for accessing serverless instances over the Internet
Produce messages
In the created Java project, create a program that sends normal messages and run the program. Sample code:
package doc;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned.
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
/**
* When you initialize a producer, you can specify the topics that you want to use to check whether the topic settings are valid and prevent invalid topics from being started.
* You do not need to specify the topics for non-transactional messages. The broker dynamically checks whether the topics are valid.
* Note: To prevent the API operation that is called to query transactional messages from failing, you must specify topics for transactional messages in advance.
*/
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// Send a normal message.
Message message = provider.newMessageBuilder()
.setTopic(topic)
// The message key. You can use a keyword to accurately find the message.
.setKeys("messageKey")
// The message tag. The consumer can use the tag to filter messages.
.setTag("messageTag")
// The message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the message. Take note of the result and capture exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
Consume messages
In the created Java project, create a program to subscribe to normal messages and run the program. ApsaraMQ for RocketMQ allows you to consume messages in simple mode and push mode. For more information, see SimpleConsumer and PushConsumer. You can select one of the modes to subscribe to messages. The following table describes the differences between simple consumers and push consumers.
Item | PushConsumer | SimpleConsumer |
API operation call | The callback operation is called to return the consumption result by using a message listener. Consumers can process the consumption logic only within the scope of the message listener. | Business applications implement message processing and call the corresponding operation to return the consumption result. |
Consumption concurrency management | ApsaraMQ forRocketMQ SDKs are used to manage the number of concurrent threads for message consumption. | The number of concurrent threads that are used for message consumption is based on the consumption logic of individual business applications. |
API flexibility | The API operations are encapsulated and provide poor flexibility. | The atomic operations provide great flexibility. |
Scenarios | This consumer type is suitable for development scenarios that do not require a custom process. | This consumer type is suitable for development scenarios that require custom processes. |
PushConsumer
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String topic = "Your Topic";
// Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
// The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Initialize a push consumer. When you initialize the push consumer, you must specify the consumer group, communication parameters, and subscription for the consumer.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Specify the consumer group.
.setConsumerGroup(consumerGroup)
// Specify the subscription.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Specify the message listener.
.setMessageListener(messageView -> {
// Consume the messages and return the consumption result.
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// If you no longer require the push consumer, shut down the process.
//pushConsumer.close();
}
}
SimpleConsumer
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String topic = "Your Topic";
// Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
// The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Initialize a simple consumer. When you initialize the simple consumer, you must specify the consumer group, communication parameters, and subscription for the consumer.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Specify the consumer group.
.setConsumerGroup(consumerGroup)
// Specify the timeout period for long polling requests.
.setAwaitDuration(awaitDuration)
// Specify the subscription.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Specify the maximum number of messages to be pulled.
int maxMessageNum = 16;
// Specify the invisible time of the messages.
Duration invisibleDuration = Duration.ofSeconds(10);
// If you use a simple consumer to consume messages, the client must obtain and consume messages in a loop.
// To consume messages in real time, we recommend that you use multiple threads to concurrently pull messages.
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Received message: {}", messageView);
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
// After consumption is complete, the consumer must call the ACK method to commit the consumption result to the broker.
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// If you no longer require the simple consumer, shut down the process.
// consumer.close();
}
}
Version description for accessing serverless instances over the Internet
Only SDKs of specific versions can access serverless ApsaraMQ for RocketMQ instances over the Internet.
SDK for Java 5.x
If you access a serverless ApsaraMQ for RocketMQ instance over the Internet to send and receive messages, you must add the following information in the code.
Replace InstanceId
with the ID of your ApsaraMQ for RocketMQ instance.
The SDK for Java whose version is 5.2.0 or later
Add the following information in the code when you send messages:
producer.setNamespaceV2("InstanceId");
Add the following information in the code when you receive messages:
consumer.setNamespaceV2("InstanceId");
The SDK for Java whose version is 5.0.6 or later but earlier than 5.2.0
Add the following information in the code when you send and receive messages:
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace("InstanceId") .setCredentialProvider(sessionCredentialsProvider) .build();
The SDK for Java whose version is earlier than 5.0.6: You cannot access a serverless ApsaraMQ for RocketMQ instance over the Internet.
TCP client SDK for Java 1.x
If you access a serverless ApsaraMQ for RocketMQ instance over the Internet to send and receive messages, you must make sure that the version of RocketMQ 1.x TCP client SDK for Java is 1.9.0.Final or later and add the following information in the code:
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");
Replace InstanceId
with the ID of your ApsaraMQ for RocketMQ instance.
SDK parameters
Parameter | Example | Description |
endpoints | rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080 | The endpoint of the ApsaraMQ for RocketMQ instance. For information about how to obtain the endpoint, see Obtain the endpoint of an instance.
|
topic | normal_test | The topic to which messages are sent or from which messages are consumed in the ApsaraMQ for RocketMQ instance. You must create the topic on the ApsaraMQ for RocketMQ instance in advance. For more information, see Create a topic. |
group | GID_test | The consumer group that is used by consumers to consume messages in the ApsaraMQ for RocketMQ instance. You must create the consumer group on the ApsaraMQ for RocketMQ instance in advance. For more information, see Create a consumer group. |
Instance UserName | 1XVg0hzgKm****** | The username of the ApsaraMQ for RocketMQ instance. If you access the instance over the Internet, you must specify the username. If you access the instance in a VPC, you need to specify the username only if the instance is a serverless instance and the authentication-free in VPCs feature is disabled for the instance. For information about how to obtain the username, see Obtain the username and password of an instance. |
Instance Password | ijSt8rEc45****** | The password of the ApsaraMQ for RocketMQ instance. If you access the instance over the Internet, you must specify the password. If you access the instance in a VPC, you need to specify the password only if the instance is a serverless instance and the authentication-free in VPCs feature is disabled for the instance. For information about how to obtain the password, see Obtain the username and password of an instance. |
Verify message consumption
After you consume a message, you can check the consumption status of the message in the ApsaraMQ for RocketMQ console.
Log on to the ApsaraMQ for RocketMQ console. On the Instances page, click the name of the instance that you want to manage.
In the left-side navigation pane of the page that appears, click Message Tracing.
SDK references
For information about how to use SDKs for other programming languages to send and receive messages of other types, see Overview.