Configure an open source client for connection for the first time

Updated at: 2024-08-06 07:40

This topic describes how to initialize an open source MQTT client when the client is used to connect to an ApsaraMQ for MQTT broker for the first time. This topic also provides a solution to the connection failure issue that occurs during connection for the first time. In this topic, the open source client SDK for Java is used as an example.

Download URL of the open source client SDK for Java

paho.mqtt.java

SDK version

The following code snippet shows the dependency of the SDK. We recommend that you use the latest version.

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

Initialize the client

Create MqttClient

final String brokerUrl = properties.getProperty("brokerUrl");
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final String topic = properties.getProperty("topic");
final int qosLevel = Integer.parseInt(properties.getProperty("qos"));
final MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
mqttClient.setTimeToWait(3000L);

mqttClient.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        // The callback that is fired when the connection is successful. You can subscribe to the topic. 
    }

    @Override
    public void connectionLost(Throwable throwable) {
        // The callback that is fired when the connection fails. We recommend that you print logs to facilitate troubleshooting. 
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // The callback that is fired when the message is received. Do not block this callback. You cannot send the message synchronously here. Otherwise, a deadlock and a heartbeat sending failure may occur, which results in link disruption. 
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        // Send the message to the broker. 
    }
});

Initialize configuration items for connection

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
.....
connOpts.setCleanSession(cleanSession);
connOpts.setKeepAliveInterval(60); // The heartbeat interval. Unit: seconds. 
connOpts.setAutomaticReconnect(true); // Make sure that you enable automatic reconnection. 
connOpts.setMaxInflight(1000);
.....

Connect for the first time

Initialize mqttClient and connect to the broker.

mqttClient.connect(mqttConnectOptions);

Troubleshoot reconnection failure

Problem description

The client does not automatically try to reconnect to the broker after a connection failure caused by network jitters or latency occurs during connection for the first time. The following exception is thrown:

Caused by: java.net.ConnectException: Network is unreachable (connect failed)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:607)
	at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:74)

Possible cause

The wasConnected=false configuration is used, which causes the client to fail to reconnect upon a connection failure.

Note

The following code snippet is included in the source code of the SDK. You do not need to add the snippet to the SDK code.

if (wasConnected && callback != null) {
    // Let the user know client has disconnected either normally or abnormally.
    callback.connectionLost(reason);
}

Solution

We recommend that you try to reconnect until the connection is successful.

for(;;) {
    try {
        mqttClient.connect(mqttConnectOptions);
        break;
    } catch (Throwable e) {
        log.error("",e); // We recommend that you print logs to facilitate troubleshooting. 
        Thread.sleep(5000L);
    }
}

Automatic client reconnection

Sample exception

If the client is disconnected from the broker after a successful connection, the connectionLost error is reported.

2023-06-20 17:10:26:972	 connectionLost clientId=XXX
Disconnected (32109) - java.net.SocketException: Operation timed out (Read failed)
	at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)

Source code analysis

Note

The following code snippet is included in the source code of the SDK. You do not need to add the snippet to the SDK code.

public void connectionLost(MqttException cause) {
    final String methodName = "connectionLost";
    // If there was a problem and a client callback has been set inform
    // the connection lost listener of the problem.
    try {
        if (mqttCallback != null && cause != null) {
            // @TRACE 708=call connectionLost
            log.fine(CLASS_NAME, methodName, "708", new Object[] { cause });
            mqttCallback.connectionLost(cause);
        }
        if(reconnectInternalCallback != null && cause != null){
            reconnectInternalCallback.connectionLost(cause);
        }
    } catch (java.lang.Throwable t) {
        // Just log the fact that a throwable has caught connection lost 
        // is called during shutdown processing so no need to do anything else
        // @TRACE 720=exception from connectionLost {0}
        log.fine(CLASS_NAME, methodName, "720", new Object[] { t });
    }
}

Trigger a reconnection

Note

The following code snippet is included in the source code of the SDK. You do not need to add the snippet to the SDK code.

MqttReconnectCallback

public void connectionLost(Throwable cause) {
    if (automaticReconnect) {
        // Automatic reconnect is set so make sure comms is in resting
        // state
        comms.setRestingState(true);
        reconnecting = true;
        startReconnectCycle();
    }
}

MqttReconnectActionListener

The retry interval increases along the number of retry attempts in the following pattern: 1s, 2s, 4s, 8s...128s. The default maximum interval is 128s.

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    // @Trace 502=Automatic Reconnect failed, rescheduling: {0}
    log.fine(CLASS_NAME, methodName, "502", new Object[] { asyncActionToken.getClient().getClientId() });
    if (reconnectDelay < connOpts.getMaxReconnectDelay()) {
        reconnectDelay = reconnectDelay * 2;
    }
    rescheduleReconnectCycle(reconnectDelay);
}

Successful reconnection

connectComplete callback:

2023-06-20 17:12:36:764	connect success to: tcp://xxxxxx.mqtt.aliyuncs.com:1883,reconnect=true

  • On this page (1, T)
  • Download URL of the open source client SDK for Java
  • SDK version
  • Initialize the client
  • Create MqttClient
  • Initialize configuration items for connection
  • Connect for the first time
  • Troubleshoot reconnection failure
  • Problem description
  • Possible cause
  • Solution
  • Automatic client reconnection
  • Sample exception
  • Source code analysis
  • Trigger a reconnection
  • Successful reconnection
Feedback
phone Contact Us

Chat now with Alibaba Cloud Customer Service to assist you in finding the right products and services to meet your needs.

alicare alicarealicarealicare