All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive messages with the Java SDK

Last Updated:Mar 10, 2026

After you create ApsaraMQ for RocketMQ resources such as a topic and a consumer group, you can integrate messaging into your Java application. This guide shows how to add the 5.x Java SDK dependency, connect to your instance, send normal messages, and consume them with a push consumer or a simple consumer.

Before you begin

Make sure that you have:

Add the SDK dependency

  1. Create a Java project in your IDE.

  2. Add the following dependency to pom.xml:

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.7</version>
        </dependency>

Gather connection parameters

Collect the following values from the ApsaraMQ for RocketMQ console before writing any code:

ParameterExampleWhere to find it
endpointsrmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080Instance Details page > Endpoints tab. Use the VPC endpoint for internal access or the public endpoint for Internet access. See Get the endpoint of an instance.
topicnormal_testThe topic you created for sending and receiving messages. Must be pre-created. See Create a topic.
consumerGroupGID_testThe consumer group you created. Must be pre-created. See Create a consumer group.
InstanceIdrmq-cn-xxxThe ID of your ApsaraMQ for RocketMQ instance. Required only for serverless instances accessed over the Internet.
Instance username1XVg0hzgKm******Access Control page > Intelligent Authentication tab. See Get the username and password of an instance.
Instance passwordijSt8rEc45******Same location as the username.

When credentials are required

Whether you need to provide a username, password, and namespace depends on your access method:

Access methodUsername and passwordNamespace (instance ID)
VPC (standard instance)Not required. The broker gets credentials from VPC information automatically.Not required.
VPC (serverless instance, authentication-free enabled)Not required.Not required.
VPC (serverless instance, authentication-free disabled)Required.Not required.
Internet (standard instance)Required. Enable Internet access on the instance first.Not required.
Internet (serverless instance)Required.Required. Set via .setNamespace("InstanceId").

Send messages

Create a ProducerExample.java file in your project and run it:

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        // Instance endpoint (VPC endpoint for internal access, public endpoint for Internet access)
        String endpoints = "<your-endpoint>";
        // Topic (must be pre-created in the console)
        String topic = "<your-topic>";

        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // Uncomment for serverless instances accessed over the Internet:
                // .setNamespace("<your-instance-id>")
                // Uncomment for Internet access or serverless VPC without authentication-free:
                // .setCredentialProvider(new StaticSessionCredentialsProvider("<instance-username>", "<instance-password>"))
                .build();

        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(clientConfiguration)
                .build();

        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setKeys("messageKey")
                .setTag("messageTag")
                .setBody("messageBody".getBytes())
                .build();
        try {
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

Replace the placeholders with your values:

PlaceholderDescription
<your-endpoint>Instance endpoint, for example rmq-cn-xxx.cn-hangzhou.rmq.aliyuncs.com:8080
<your-topic>Topic name, for example normal_test
<your-instance-id>Instance ID (serverless Internet access only)
<instance-username>Instance username (Internet access or serverless VPC without authentication-free)
<instance-password>Instance password (Internet access or serverless VPC without authentication-free)
Note .setTopics() during producer initialization validates topic settings upfront. This is optional for normal messages (validated dynamically at send time) but required for transactional messages to prevent query API failures.

Receive messages

ApsaraMQ for RocketMQ provides two consumer types:

Push consumerSimple consumer
How it worksThe SDK delivers messages to a callback (message listener).Your application pulls messages and explicitly acknowledges each one.
ConcurrencyManaged by the SDK.Managed by your application.
FlexibilityLower. The SDK encapsulates the consumption flow.Higher. Atomic operations let you build custom workflows.
Best forMost use cases where you need to process incoming messages.Scenarios that require fine-grained control over pulling, processing, or acknowledgment.

Start with a push consumer unless you need custom control over the consumption flow.

Push consumer

Create a PushConsumerExample.java file and run it:

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        String endpoints = "<your-endpoint>";
        String topic = "<your-topic>";
        String consumerGroup = "<your-consumer-group>";

        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // Uncomment for serverless instances accessed over the Internet:
                // .setNamespace("<your-instance-id>")
                // Uncomment for Internet access or serverless VPC without authentication-free:
                // .setCredentialProvider(new StaticSessionCredentialsProvider("<instance-username>", "<instance-password>"))
                .build();

        // Subscribe to all messages in the topic (tag = "*")
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);

        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .setMessageListener(messageView -> {
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();

        // Keep the consumer running
        Thread.sleep(Long.MAX_VALUE);

        // To shut down the consumer gracefully, call:
        // pushConsumer.close();
    }
}

Simple consumer

Create a SimpleConsumerExample.java file and run it:

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        String endpoints = "<your-endpoint>";
        String topic = "<your-topic>";
        String consumerGroup = "<your-consumer-group>";

        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // Uncomment for serverless instances accessed over the Internet:
                // .setNamespace("<your-instance-id>")
                // Uncomment for Internet access or serverless VPC without authentication-free:
                // .setCredentialProvider(new StaticSessionCredentialsProvider("<instance-username>", "<instance-password>"))
                .build();

        // Long polling timeout
        Duration awaitDuration = Duration.ofSeconds(10);
        // Subscribe to all messages in the topic (tag = "*")
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);

        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setAwaitDuration(awaitDuration)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();

        // Maximum messages per pull
        int maxMessageNum = 16;
        // Messages stay invisible to other consumers for this duration after being received
        Duration invisibleDuration = Duration.ofSeconds(10);

        // Poll for messages in a loop. For real-time consumption, use multiple threads.
        while (true) {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            messages.forEach(messageView -> {
                System.out.println("Received message: " + messageView);
            });
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    // ACK each message to commit the consumption result to the broker
                    consumer.ack(message);
                    System.out.println("Message is acknowledged successfully, messageId= " + messageId);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        }
        // To shut down the consumer gracefully, call:
        // consumer.close();
    }
}

Verify message delivery

Check the delivery status in the ApsaraMQ for RocketMQ console:

  1. Log on to the ApsaraMQ for RocketMQ console.

  2. On the Instances page, click your instance name.

  3. In the left-side navigation pane, click Message Tracing.

SDK versions required for serverless Internet access

Accessing a serverless ApsaraMQ for RocketMQ instance over the Internet requires a minimum SDK version. Replace InstanceId in the examples with your actual instance ID.

SDK for Java 5.x (rocketmq-client-java)

Minimum version: 5.0.6

Set the namespace in ClientConfiguration:

ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .setNamespace("InstanceId")
    .setCredentialProvider(sessionCredentialsProvider)
    .build();

SDK for Java 5.x (rocketmq-client)

Minimum version: 5.2.0

Set the namespace on the producer and consumer separately:

// Producer
producer.setNamespaceV2("InstanceId");

// Consumer
consumer.setNamespaceV2("InstanceId");

TCP client SDK for Java 1.x

Minimum version: 1.9.0.Final

Set the namespace through properties:

properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");

What's next