After you create ApsaraMQ for RocketMQ resources such as a topic and a consumer group, you can integrate messaging into your Java application. This guide shows how to add the 5.x Java SDK dependency, connect to your instance, send normal messages, and consume them with a push consumer or a simple consumer.
Before you begin
Make sure that you have:
Created resources in ApsaraMQ for RocketMQ, including a topic and a consumer group (Step 2 in this series)
A Java IDE such as IntelliJ IDEA or Eclipse (examples use IntelliJ IDEA Ultimate)
Add the SDK dependency
Create a Java project in your IDE.
Add the following dependency to
pom.xml:<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.7</version> </dependency>
Gather connection parameters
Collect the following values from the ApsaraMQ for RocketMQ console before writing any code:
| Parameter | Example | Where to find it |
|---|---|---|
endpoints | rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080 | Instance Details page > Endpoints tab. Use the VPC endpoint for internal access or the public endpoint for Internet access. See Get the endpoint of an instance. |
topic | normal_test | The topic you created for sending and receiving messages. Must be pre-created. See Create a topic. |
consumerGroup | GID_test | The consumer group you created. Must be pre-created. See Create a consumer group. |
InstanceId | rmq-cn-xxx | The ID of your ApsaraMQ for RocketMQ instance. Required only for serverless instances accessed over the Internet. |
| Instance username | 1XVg0hzgKm****** | Access Control page > Intelligent Authentication tab. See Get the username and password of an instance. |
| Instance password | ijSt8rEc45****** | Same location as the username. |
When credentials are required
Whether you need to provide a username, password, and namespace depends on your access method:
| Access method | Username and password | Namespace (instance ID) |
|---|---|---|
| VPC (standard instance) | Not required. The broker gets credentials from VPC information automatically. | Not required. |
| VPC (serverless instance, authentication-free enabled) | Not required. | Not required. |
| VPC (serverless instance, authentication-free disabled) | Required. | Not required. |
| Internet (standard instance) | Required. Enable Internet access on the instance first. | Not required. |
| Internet (serverless instance) | Required. | Required. Set via .setNamespace("InstanceId"). |
Send messages
Create a ProducerExample.java file in your project and run it:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
// Instance endpoint (VPC endpoint for internal access, public endpoint for Internet access)
String endpoints = "<your-endpoint>";
// Topic (must be pre-created in the console)
String topic = "<your-topic>";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// Uncomment for serverless instances accessed over the Internet:
// .setNamespace("<your-instance-id>")
// Uncomment for Internet access or serverless VPC without authentication-free:
// .setCredentialProvider(new StaticSessionCredentialsProvider("<instance-username>", "<instance-password>"))
.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(clientConfiguration)
.build();
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setKeys("messageKey")
.setTag("messageTag")
.setBody("messageBody".getBytes())
.build();
try {
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}Replace the placeholders with your values:
| Placeholder | Description |
|---|---|
<your-endpoint> | Instance endpoint, for example rmq-cn-xxx.cn-hangzhou.rmq.aliyuncs.com:8080 |
<your-topic> | Topic name, for example normal_test |
<your-instance-id> | Instance ID (serverless Internet access only) |
<instance-username> | Instance username (Internet access or serverless VPC without authentication-free) |
<instance-password> | Instance password (Internet access or serverless VPC without authentication-free) |
.setTopics() during producer initialization validates topic settings upfront. This is optional for normal messages (validated dynamically at send time) but required for transactional messages to prevent query API failures.Receive messages
ApsaraMQ for RocketMQ provides two consumer types:
| Push consumer | Simple consumer | |
|---|---|---|
| How it works | The SDK delivers messages to a callback (message listener). | Your application pulls messages and explicitly acknowledges each one. |
| Concurrency | Managed by the SDK. | Managed by your application. |
| Flexibility | Lower. The SDK encapsulates the consumption flow. | Higher. Atomic operations let you build custom workflows. |
| Best for | Most use cases where you need to process incoming messages. | Scenarios that require fine-grained control over pulling, processing, or acknowledgment. |
Start with a push consumer unless you need custom control over the consumption flow.
Push consumer
Create a PushConsumerExample.java file and run it:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
String endpoints = "<your-endpoint>";
String topic = "<your-topic>";
String consumerGroup = "<your-consumer-group>";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// Uncomment for serverless instances accessed over the Internet:
// .setNamespace("<your-instance-id>")
// Uncomment for Internet access or serverless VPC without authentication-free:
// .setCredentialProvider(new StaticSessionCredentialsProvider("<instance-username>", "<instance-password>"))
.build();
// Subscribe to all messages in the topic (tag = "*")
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
// Keep the consumer running
Thread.sleep(Long.MAX_VALUE);
// To shut down the consumer gracefully, call:
// pushConsumer.close();
}
}Simple consumer
Create a SimpleConsumerExample.java file and run it:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
String endpoints = "<your-endpoint>";
String topic = "<your-topic>";
String consumerGroup = "<your-consumer-group>";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// Uncomment for serverless instances accessed over the Internet:
// .setNamespace("<your-instance-id>")
// Uncomment for Internet access or serverless VPC without authentication-free:
// .setCredentialProvider(new StaticSessionCredentialsProvider("<instance-username>", "<instance-password>"))
.build();
// Long polling timeout
Duration awaitDuration = Duration.ofSeconds(10);
// Subscribe to all messages in the topic (tag = "*")
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setAwaitDuration(awaitDuration)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Maximum messages per pull
int maxMessageNum = 16;
// Messages stay invisible to other consumers for this duration after being received
Duration invisibleDuration = Duration.ofSeconds(10);
// Poll for messages in a loop. For real-time consumption, use multiple threads.
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
// ACK each message to commit the consumption result to the broker
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
// To shut down the consumer gracefully, call:
// consumer.close();
}
}Verify message delivery
Check the delivery status in the ApsaraMQ for RocketMQ console:
Log on to the ApsaraMQ for RocketMQ console.
On the Instances page, click your instance name.
In the left-side navigation pane, click Message Tracing.
SDK versions required for serverless Internet access
Accessing a serverless ApsaraMQ for RocketMQ instance over the Internet requires a minimum SDK version. Replace InstanceId in the examples with your actual instance ID.
SDK for Java 5.x (rocketmq-client-java)
Minimum version: 5.0.6
Set the namespace in ClientConfiguration:
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setNamespace("InstanceId")
.setCredentialProvider(sessionCredentialsProvider)
.build();SDK for Java 5.x (rocketmq-client)
Minimum version: 5.2.0
Set the namespace on the producer and consumer separately:
// Producer
producer.setNamespaceV2("InstanceId");
// Consumer
consumer.setNamespaceV2("InstanceId");TCP client SDK for Java 1.x
Minimum version: 1.9.0.Final
Set the namespace through properties:
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");What's next
Send and receive messages in other programming languages. See the SDK overview.
Learn more about PushConsumer and SimpleConsumer.