This topic describes how to initialize an open source MQTT client when the client is used to connect to an ApsaraMQ for MQTT broker for the first time. This topic also provides a solution to the connection failure issue that occurs during connection for the first time. In this topic, the open source client SDK for Java is used as an example.
Download URL of the open source client SDK for Java
paho.mqtt.java
SDK version
The following code snippet shows the dependency of the SDK. We recommend that you use the latest version.
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
Initialize the client
Create MqttClient
final String brokerUrl = properties.getProperty("brokerUrl");
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final String topic = properties.getProperty("topic");
final int qosLevel = Integer.parseInt(properties.getProperty("qos"));
final MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
mqttClient.setTimeToWait(3000L);
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
}
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
Initialize configuration items for connection
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
.....
connOpts.setCleanSession(cleanSession);
connOpts.setKeepAliveInterval(60);
connOpts.setAutomaticReconnect(true);
connOpts.setMaxInflight(1000);
.....
Connect for the first time
Initialize mqttClient
and connect to the broker.
mqttClient.connect(mqttConnectOptions);
Troubleshoot reconnection failure
Problem description
The client does not automatically try to reconnect to the broker after a connection failure caused by network jitters or latency occurs during connection for the first time. The following exception is thrown:
Caused by: java.net.ConnectException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:74)
Possible cause
The wasConnected=false
configuration is used, which causes the client to fail to reconnect upon a connection failure.
Note
The following code snippet is included in the source code of the SDK. You do not need to add the snippet to the SDK code.
if (wasConnected && callback != null) {
callback.connectionLost(reason);
}
Solution
We recommend that you try to reconnect until the connection is successful.
for(;;) {
try {
mqttClient.connect(mqttConnectOptions);
break;
} catch (Throwable e) {
log.error("",e);
Thread.sleep(5000L);
}
}
Automatic client reconnection
Sample exception
If the client is disconnected from the broker after a successful connection, the connectionLost
error is reported.
2023-06-20 17:10:26:972 connectionLost clientId=XXX
Disconnected (32109) - java.net.SocketException: Operation timed out (Read failed)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)
Source code analysis
Note
The following code snippet is included in the source code of the SDK. You do not need to add the snippet to the SDK code.
public void connectionLost(MqttException cause) {
final String methodName = "connectionLost";
try {
if (mqttCallback != null && cause != null) {
log.fine(CLASS_NAME, methodName, "708", new Object[] { cause });
mqttCallback.connectionLost(cause);
}
if(reconnectInternalCallback != null && cause != null){
reconnectInternalCallback.connectionLost(cause);
}
} catch (java.lang.Throwable t) {
log.fine(CLASS_NAME, methodName, "720", new Object[] { t });
}
}
Trigger a reconnection
Note
The following code snippet is included in the source code of the SDK. You do not need to add the snippet to the SDK code.
MqttReconnectCallback
public void connectionLost(Throwable cause) {
if (automaticReconnect) {
comms.setRestingState(true);
reconnecting = true;
startReconnectCycle();
}
}
MqttReconnectActionListener
The retry interval increases along the number of retry attempts in the following pattern: 1s, 2s, 4s, 8s...128s. The default maximum interval is 128s.
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log.fine(CLASS_NAME, methodName, "502", new Object[] { asyncActionToken.getClient().getClientId() });
if (reconnectDelay < connOpts.getMaxReconnectDelay()) {
reconnectDelay = reconnectDelay * 2;
}
rescheduleReconnectCycle(reconnectDelay);
}
Successful reconnection
connectComplete
callback:
2023-06-20 17:12:36:764 connect success to: tcp: