All Products
Search
Document Center

ApsaraMQ for MQTT:Send and receive messages between MQTT clients by using the Java SDK

Last Updated:Mar 11, 2026

When IoT devices or mobile apps need to exchange messages directly without a backend intermediary, you can connect both the publisher and subscriber as ApsaraMQ for MQTT clients. Each client uses the Eclipse Paho Java SDK to connect to the ApsaraMQ for MQTT broker over the Internet, publish messages to a topic, and receive messages from subscribed topics.

Messaging between clients

Prerequisites

Before you begin, make sure that you have:

Endpoints

Specify the endpoint of your ApsaraMQ for MQTT instance when connecting through the client SDK. The endpoint format depends on your network access method:

Access methodEndpoint formatTypical use case
Public (Internet)<instance-id>.mqtt.aliyuncs.comIoT devices, mobile apps
VPC (private network)<instance-id>-internal-vpc.mqtt.aliyuncs.comBackend applications in the cloud

Find your endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for MQTT console. The instance ID is displayed in the Basic Information section.

Important

Always use the domain name, not an IP address. IP addresses may change without notice when domain name resolution is updated. Alibaba Cloud is not responsible for connection failures caused by hardcoded IP addresses or IP-based firewall rules.

Step 1: Add Maven dependencies

Clone or download the demo project, then verify that pom.xml includes the following dependencies:

<dependencies>
    <!-- Eclipse Paho: MQTT 3.1.1 client library -->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.2</version>
    </dependency>
    <!-- Commons Codec: HMAC signature calculation for authentication -->
    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.10</version>
    </dependency>
    <!-- Apache HttpClient: HTTP requests for token-based auth -->
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.2</version>
    </dependency>
    <!-- Fastjson: JSON parsing for API responses -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>
    <!-- Alibaba Cloud SDK for MQTT: server-side API calls -->
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
        <version>1.0.3</version>
    </dependency>
    <!-- Alibaba Cloud Core SDK: shared SDK infrastructure -->
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.0</version>
    </dependency>
</dependencies>

For the Eclipse Paho Java Client source code and documentation, see Eclipse Paho Java Client.

Step 2: Set environment variables

Store your AccessKey pair in environment variables to prevent accidental credential exposure in source code.

export MQTT_AK_ENV=<your-access-key-id>
export MQTT_SK_ENV=<your-access-key-secret>
PlaceholderDescriptionExample
<your-access-key-id>AccessKey ID from the RAM consoleLTAI5tXxx
<your-access-key-secret>AccessKey secret from the RAM consolexXxXxXx
Important

Use a RAM user's AccessKey pair rather than the Alibaba Cloud account's AccessKey pair. The Alibaba Cloud account's AccessKey pair grants access to all API operations and poses a security risk if leaked. For details, see Configure an access credential.

Step 3: Define connection parameters

Set the following parameters based on the resources you created in the ApsaraMQ for MQTT console:

// Instance ID from the console's Basic Information section
String instanceId = "<your-instance-id>";

// Endpoint from the console's Endpoints tab
String endPoint = "<your-instance-id>.mqtt.aliyuncs.com";

// Credentials from environment variables
String accessKey = System.getenv("MQTT_AK_ENV");
// The AccessKey secret is required for signature authentication mode
String secretKey = System.getenv("MQTT_SK_ENV");

// Client ID format: GroupID@@@DeviceID (max 64 characters)
// Each TCP connection must use a unique client ID.
// Duplicate client IDs cause connection conflicts and unexpected disconnections.
String clientId = "<your-group-id>@@@<your-device-id>";

// Parent topic created in the console
final String parentTopic = "<your-parent-topic>";

// Subtopic for message filtering (max 128 characters total)
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";

// QoS level: 0 (at most once), 1 (at least once), or 2 (exactly once)
final int qosLevel = 0;
Note

If the client publishes or subscribes to a topic that does not exist or that the client is not authorized to access, the broker closes the connection immediately.

Step 4: Create the MQTT client and register callbacks

Register callbacks before connecting. If you register callbacks after connect(), you may miss messages -- especially when resuming a persistent session.

Create the MQTT client:

ConnectionOptionWrapper connectionOptionWrapper =
    new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);

final MemoryPersistence memoryPersistence = new MemoryPersistence();

// Protocol and port must match:
//   TCP  -> tcp://endpoint:1883
//   SSL  -> ssl://endpoint:8883
final MqttClient mqttClient = new MqttClient(
    "tcp://" + endPoint + ":1883", clientId, memoryPersistence);

// Timeout for waiting on broker responses (milliseconds)
mqttClient.setTimeToWait(5000);

final ExecutorService executorService = new ThreadPoolExecutor(
    1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

Set up event callbacks to handle connection events and incoming messages:

mqttClient.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        // Subscribe immediately after each connection (including reconnections)
        System.out.println("connect success");
        executorService.submit(() -> {
            try {
                final String[] topicFilter = {mq4IotTopic};
                final int[] qos = {qosLevel};
                mqttClient.subscribe(topicFilter, qos);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        });
    }

    @Override
    public void connectionLost(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // Process incoming messages here.
        // Do not throw exceptions from this callback. The broker treats a
        // normally returned callback as message acknowledgment.
        // Implement deduplication to ensure idempotent consumption.
        System.out.println(
            "receive msg from topic " + topic + " , body is "
            + new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("send msg succeed topic is : "
            + iMqttDeliveryToken.getTopics()[0]);
    }
});

Key callback behaviors:

CallbackBehavior
connectCompleteCalled on each connection, including automatic reconnections. Re-subscribe here to restore subscriptions.
connectionLostCalled when the connection drops. Log the error or implement reconnection logic.
messageArrivedCalled when a message arrives. Do not throw exceptions -- the broker treats a normally returned callback as acknowledgment. Consume messages within the timeout period. Implement deduplication for idempotent processing.
deliveryCompleteCalled when a QoS 1 or QoS 2 message is delivered.

Step 5: Connect and publish messages

Connect to the broker and publish messages. This example sends both normal pub/sub messages and point-to-point (P2P) messages:

mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());

for (int i = 0; i < 10; i++) {
    // Publish a normal message to the subscribed topic
    MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
    message.setQos(qosLevel);
    mqttClient.publish(mq4IotTopic, message);

    // Publish a P2P message directly to a specific client
    // P2P topic format: {parentTopic}/p2p/{targetClientId}
    // The target client does not need to subscribe to this topic.
    final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
    message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
    message.setQos(qosLevel);
    mqttClient.publish(p2pSendTopic, message);
}

// Keep the client running to continue receiving messages
Thread.sleep(Long.MAX_VALUE);

P2P messaging: When you know the exact target client, send a message to the topic {parentTopic}/p2p/{targetClientId}. The target client receives the message without subscribing. This simplifies scenarios where a message is intended for a single recipient.

Complete code

The following listing combines all the steps above into a single runnable class. It is also available in the demo project on GitHub.

package com.aliyun.openservices.lmq.example.demo;

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
    public static void main(String[] args) throws Exception {
        // --- Connection parameters ---
        String instanceId = "<your-instance-id>";
        String endPoint = "<your-instance-id>.mqtt.aliyuncs.com";
        String accessKey = System.getenv("MQTT_AK_ENV");
        String secretKey = System.getenv("MQTT_SK_ENV");
        String clientId = "<your-group-id>@@@<your-device-id>";
        final String parentTopic = "<your-parent-topic>";
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        final int qosLevel = 0;

        // --- Create client ---
        ConnectionOptionWrapper connectionOptionWrapper =
            new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        final MqttClient mqttClient = new MqttClient(
            "tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        mqttClient.setTimeToWait(5000);

        final ExecutorService executorService = new ThreadPoolExecutor(
            1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        // --- Register callbacks before connecting ---
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("connect success");
                executorService.submit(() -> {
                    try {
                        final String[] topicFilter = {mq4IotTopic};
                        final int[] qos = {qosLevel};
                        mqttClient.subscribe(topicFilter, qos);
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                System.out.println(
                    "receive msg from topic " + topic + " , body is "
                    + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : "
                    + iMqttDeliveryToken.getTopics()[0]);
            }
        });

        // --- Connect and publish ---
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(mq4IotTopic, message);

            final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

Verify the results

After running the application, query message traces in the ApsaraMQ for MQTT console to confirm that messages were sent and received. For detailed steps, see Query message traces.

What's next