すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for MQTT:オープンソースクライアントの初回接続を構成する

最終更新日:Jan 14, 2025

このトピックでは、オープンソースの MQTT クライアントを初めて ApsaraMQ for MQTT ブローカーに接続するために使用する場合に、クライアントを初期化する方法について説明します。また、初回接続時に発生する接続失敗の問題の解決策も提供します。このトピックでは、Java 用のオープンソースクライアント SDK を例として使用します。

Java 用オープンソースクライアント SDK のダウンロード URL

paho.mqtt.java

SDKバージョン

次のコード スニペットは、SDK の依存関係を示しています。最新バージョンを使用することをお勧めします。

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

クライアントを初期化する

作成 MqttClient

final String brokerUrl = properties.getProperty("brokerUrl");
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final String topic = properties.getProperty("topic");
final int qosLevel = Integer.parseInt(properties.getProperty("qos"));
final MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
mqttClient.setTimeToWait(3000L);

mqttClient.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        // 接続が成功したときに起動されるコールバック。トピックをサブスクライブできます。
    }

    @Override
    public void connectionLost(Throwable throwable) {
        // 接続に失敗したときに起動されるコールバック。トラブルシューティングを容易にするためにログを出力することをお勧めします。
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // メッセージを受信したときに起動されるコールバック。このコールバックをブロックしないでください。ここでメッセージを同期的に送信することはできません。そうしないと、デッドロックが発生したり、ハートビートの送信に失敗したりして、リンクが切断される可能性があります。
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        // ブローカーにメッセージを送信します。
    }
});

接続の構成項目を初期化する

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
.....
connOpts.setCleanSession(cleanSession);
connOpts.setKeepAliveInterval(60); // ハートビート間隔。単位:秒。
connOpts.setAutomaticReconnect(true); // 自動再接続を有効にしてください。
connOpts.setMaxInflight(1000);
.....

初めて接続する

mqttClient を初期化し、ブローカーに接続します。

mqttClient.connect(mqttConnectOptions);

再接続エラーのトラブルシューティング

問題の説明

クライアントは、初回接続時にネットワークのジッターまたは遅延によって接続障害が発生した場合、ブローカーへの再接続を自動的には試行しません。次の例外がスローされます。

Caused by: java.net.ConnectException: Network is unreachable (connect failed)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:607)
	at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:74)

考えられる原因

wasConnected=false 設定が使用されているため、接続エラーが発生した場合にクライアントは再接続に失敗します。

説明

SDK のソースコードには、次のコードスニペットが含まれています。スニペットを SDK コードに追加する必要はありません。

if (wasConnected && callback != null) {
    // クライアントが正常または異常のいずれかで切断されたことをユーザーに知らせます。
    callback.connectionLost(reason);
}

ソリューション

接続が成功するまで再接続を試みることをお勧めします。

for(;;) {
    try {
        mqttClient.connect(mqttConnectOptions);
        break;
    } catch (Throwable e) {
        log.error("",e); // トラブルシューティングを容易にするためにログを出力することをお勧めします。
        Thread.sleep(5000L);
    }
}

クライアントの自動再接続

サンプル例外

クライアントが接続成功後にブローカーから切断された場合、connectionLost エラーが報告されます。

2023-06-20 17:10:26:972	 connectionLost clientId=XXX
Disconnected (32109) - java.net.SocketException: Operation timed out (Read failed)
	at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)

ソースコード分析

説明

SDK のソースコードには、次のコードスニペットが含まれています。スニペットを SDK コードに追加する必要はありません。

public void connectionLost(MqttException cause) {
    final String methodName = "connectionLost";
    // 問題が発生し、クライアントコールバックが設定されている場合は、
    // 接続が失われたリスナーに問題を通知します。
    try {
        if (mqttCallback != null && cause != null) {
            // @TRACE 708=call connectionLost
            log.fine(CLASS_NAME, methodName, "708", new Object[] { cause });
            mqttCallback.connectionLost(cause);
        }
        if(reconnectInternalCallback != null && cause != null){
            reconnectInternalCallback.connectionLost(cause);
        }
    } catch (java.lang.Throwable t) {
        // Throwable が connectionLost をキャッチしたという事実だけをログに記録します。
        // シャットダウン処理中に呼び出されるため、他に何もする必要はありません。
        // @TRACE 720=exception from connectionLost {0}
        log.fine(CLASS_NAME, methodName, "720", new Object[] { t });
    }
}

再接続をトリガーする

説明

SDK のソースコードには、次のコードスニペットが含まれています。スニペットを SDK コードに追加する必要はありません。

MqttReconnectCallback

public void connectionLost(Throwable cause) {
    if (automaticReconnect) {
        // 自動再接続が設定されているため、通信が休止状態になっていることを確認します。
        comms.setRestingState(true);
        reconnecting = true;
        startReconnectCycle();
    }
}

MqttReconnectActionListener

再試行間隔は、再試行回数の増加に伴い、次のパターンで増加します: 1 秒、2 秒、4 秒、8 秒...128 秒。デフォルトの最大間隔は 128 秒です。

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    // @Trace 502=自動再接続に失敗しました。再スケジュールしています: {0}
    log.fine(CLASS_NAME, methodName, "502", new Object[] { asyncActionToken.getClient().getClientId() });
    if (reconnectDelay < connOpts.getMaxReconnectDelay()) {
        reconnectDelay = reconnectDelay * 2;
    }
    rescheduleReconnectCycle(reconnectDelay);
}

再接続に成功しました

connectComplete コールバック:

2023-06-20 17:12:36:764	connect success to: tcp://xxxxxx.mqtt.aliyuncs.com:1883,reconnect=true