When IoT devices or mobile apps need to exchange messages directly without a backend intermediary, you can connect both the publisher and subscriber as ApsaraMQ for MQTT clients. Each client uses the Eclipse Paho Java SDK to connect to the ApsaraMQ for MQTT broker over the Internet, publish messages to a topic, and receive messages from subscribed topics.

Prerequisites
Before you begin, make sure that you have:
Created ApsaraMQ for MQTT resources (instance, group ID, and parent topic)
An AccessKey pair for authentication
JDK installed
An IDE such as IntelliJ IDEA or Eclipse
Endpoints
Specify the endpoint of your ApsaraMQ for MQTT instance when connecting through the client SDK. The endpoint format depends on your network access method:
| Access method | Endpoint format | Typical use case |
|---|---|---|
| Public (Internet) | <instance-id>.mqtt.aliyuncs.com | IoT devices, mobile apps |
| VPC (private network) | <instance-id>-internal-vpc.mqtt.aliyuncs.com | Backend applications in the cloud |
Find your endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for MQTT console. The instance ID is displayed in the Basic Information section.
Always use the domain name, not an IP address. IP addresses may change without notice when domain name resolution is updated. Alibaba Cloud is not responsible for connection failures caused by hardcoded IP addresses or IP-based firewall rules.
Step 1: Add Maven dependencies
Clone or download the demo project, then verify that pom.xml includes the following dependencies:
<dependencies>
<!-- Eclipse Paho: MQTT 3.1.1 client library -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<!-- Commons Codec: HMAC signature calculation for authentication -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<!-- Apache HttpClient: HTTP requests for token-based auth -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<!-- Fastjson: JSON parsing for API responses -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- Alibaba Cloud SDK for MQTT: server-side API calls -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<!-- Alibaba Cloud Core SDK: shared SDK infrastructure -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>For the Eclipse Paho Java Client source code and documentation, see Eclipse Paho Java Client.
Step 2: Set environment variables
Store your AccessKey pair in environment variables to prevent accidental credential exposure in source code.
export MQTT_AK_ENV=<your-access-key-id>
export MQTT_SK_ENV=<your-access-key-secret>| Placeholder | Description | Example |
|---|---|---|
<your-access-key-id> | AccessKey ID from the RAM console | LTAI5tXxx |
<your-access-key-secret> | AccessKey secret from the RAM console | xXxXxXx |
Use a RAM user's AccessKey pair rather than the Alibaba Cloud account's AccessKey pair. The Alibaba Cloud account's AccessKey pair grants access to all API operations and poses a security risk if leaked. For details, see Configure an access credential.
Step 3: Define connection parameters
Set the following parameters based on the resources you created in the ApsaraMQ for MQTT console:
// Instance ID from the console's Basic Information section
String instanceId = "<your-instance-id>";
// Endpoint from the console's Endpoints tab
String endPoint = "<your-instance-id>.mqtt.aliyuncs.com";
// Credentials from environment variables
String accessKey = System.getenv("MQTT_AK_ENV");
// The AccessKey secret is required for signature authentication mode
String secretKey = System.getenv("MQTT_SK_ENV");
// Client ID format: GroupID@@@DeviceID (max 64 characters)
// Each TCP connection must use a unique client ID.
// Duplicate client IDs cause connection conflicts and unexpected disconnections.
String clientId = "<your-group-id>@@@<your-device-id>";
// Parent topic created in the console
final String parentTopic = "<your-parent-topic>";
// Subtopic for message filtering (max 128 characters total)
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
// QoS level: 0 (at most once), 1 (at least once), or 2 (exactly once)
final int qosLevel = 0;If the client publishes or subscribes to a topic that does not exist or that the client is not authorized to access, the broker closes the connection immediately.
Step 4: Create the MQTT client and register callbacks
Register callbacks before connecting. If you register callbacks after connect(), you may miss messages -- especially when resuming a persistent session.
Create the MQTT client:
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
// Protocol and port must match:
// TCP -> tcp://endpoint:1883
// SSL -> ssl://endpoint:8883
final MqttClient mqttClient = new MqttClient(
"tcp://" + endPoint + ":1883", clientId, memoryPersistence);
// Timeout for waiting on broker responses (milliseconds)
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());Set up event callbacks to handle connection events and incoming messages:
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// Subscribe immediately after each connection (including reconnections)
System.out.println("connect success");
executorService.submit(() -> {
try {
final String[] topicFilter = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// Process incoming messages here.
// Do not throw exceptions from this callback. The broker treats a
// normally returned callback as message acknowledgment.
// Implement deduplication to ensure idempotent consumption.
System.out.println(
"receive msg from topic " + topic + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});Key callback behaviors:
| Callback | Behavior |
|---|---|
connectComplete | Called on each connection, including automatic reconnections. Re-subscribe here to restore subscriptions. |
connectionLost | Called when the connection drops. Log the error or implement reconnection logic. |
messageArrived | Called when a message arrives. Do not throw exceptions -- the broker treats a normally returned callback as acknowledgment. Consume messages within the timeout period. Implement deduplication for idempotent processing. |
deliveryComplete | Called when a QoS 1 or QoS 2 message is delivered. |
Step 5: Connect and publish messages
Connect to the broker and publish messages. This example sends both normal pub/sub messages and point-to-point (P2P) messages:
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
// Publish a normal message to the subscribed topic
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);
// Publish a P2P message directly to a specific client
// P2P topic format: {parentTopic}/p2p/{targetClientId}
// The target client does not need to subscribe to this topic.
final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
// Keep the client running to continue receiving messages
Thread.sleep(Long.MAX_VALUE);P2P messaging: When you know the exact target client, send a message to the topic {parentTopic}/p2p/{targetClientId}. The target client receives the message without subscribing. This simplifies scenarios where a message is intended for a single recipient.
Complete code
The following listing combines all the steps above into a single runnable class. It is also available in the demo project on GitHub.
package com.aliyun.openservices.lmq.example.demo;
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
public static void main(String[] args) throws Exception {
// --- Connection parameters ---
String instanceId = "<your-instance-id>";
String endPoint = "<your-instance-id>.mqtt.aliyuncs.com";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String clientId = "<your-group-id>@@@<your-device-id>";
final String parentTopic = "<your-parent-topic>";
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
final int qosLevel = 0;
// --- Create client ---
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final MqttClient mqttClient = new MqttClient(
"tcp://" + endPoint + ":1883", clientId, memoryPersistence);
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
// --- Register callbacks before connecting ---
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
executorService.submit(() -> {
try {
final String[] topicFilter = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println(
"receive msg from topic " + topic + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});
// --- Connect and publish ---
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);
final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}Verify the results
After running the application, query message traces in the ApsaraMQ for MQTT console to confirm that messages were sent and received. For detailed steps, see Query message traces.