全部產品
Search
文件中心

ApsaraMQ for MQTT:快速使用MQTT的Java SDK收發訊息(終端和雲端訊息收發)

更新時間:Dec 27, 2024

本文介紹如何快速使用雲Message QueueTT 版的Java SDK實現MQTT終端和雲端服務的訊息收發。

前提條件

背景資訊

MQTT終端和雲端服務互動流程如下圖所示。終端裝置和雲端服務可分別通過對應的SDK接入雲Message QueueTT 版,實現終端和雲端服務的雙向通訊。終端雲端互動本文以公網環境為例,介紹使用Java SDK實現訊息收發。更多訊息收發範例程式碼,請參見終端Demo工程雲端Demo工程

存取點說明

終端和雲端服務與雲Message QueueTT 版通訊時,需要在各自的SDK代碼中設定雲Message QueueTT 版執行個體的存取點資訊,通過存取點和雲Message QueueTT 版服務端串連。

  • 終端SDK存取點格式

    使用終端SDK接入雲Message QueueTT 版時,需要填寫的存取點格式如下:

    • 公网接入点MQTT執行個體ID.mqtt.aliyuncs.com

    • VPC 接入点MQTT執行個體ID-internal-vpc.mqtt.aliyuncs.com

    終端SDK存取點也可以直接在雲Message QueueTT 版控制台執行個體詳情頁面的存取點頁簽中查看。

  • 雲端SDK存取點格式

    使用雲端SDK接入雲Message QueueTT 版時,需要填寫的存取點格式如下:

    重要

    僅執行個體地區屬於中國內地的執行個體支援雲端SDK接入。

    • 公网接入点MQTT執行個體ID-server-internet.mqtt.aliyuncs.com

    • VPC 接入点MQTT執行個體ID-server-internal.mqtt.aliyuncs.com

說明

MQTT執行個體ID可在雲Message QueueTT 版控制台執行個體詳情頁面的基础信息地區查看。

終端SDK存取點和雲端SDK存取點同時支援公网接入点VPC 接入点公网接入点為本地公網環境訪問的IP地址,一般用於物聯網和移動互連網情境中;VPC 接入点為雲上私網訪問的IP地址,一般用於雲端應用接入雲Message QueueTT 版

重要

SDK使用存取點串連服務時務必使用網域名稱接入,不得直接使用網域名稱背後的IP地址直接連接,因為IP地址隨時會變化。在以下使用方式中出現的問題雲Message QueueTT 版產品方概不負責:

  • 終端或雲端不使用網域名稱接入而是使用IP地址接入,產品方更新了網域名稱解析導致原有IP地址失效。

  • 終端或雲端網路側對IP地址設定網路防火牆策略,產品方更新了網域名稱解析後新IP地址被您的防火牆策略攔截。

調用終端SDK發送訊息

  1. 下載第三方的開源Java SDK。下載地址為Eclipse Paho Java Client
  2. 下載終端SDK的Demo樣本作為您代碼開發的參考。下載地址為mqtt-java-demo

  3. 解壓該Demo工程包至您指定的檔案夾。
  4. 在IntelliJ IDEA中,匯入解壓後的檔案以建立相應的工程,並確認pom.xml中已包含以下依賴。

    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. MQ4IoTProducerDemo.java類中,按代碼注釋說明填寫相應參數,主要涉及您已在建立資源中所建立的MQTT資源資訊。然後執行Main函數運行程式碼完成訊息發送。

    範例程式碼如下。

    說明

    在使用範例程式碼前,需要配置環境變數,通過環境變數讀取存取憑證。關於配置環境變數的方法,請參見配置訪問憑證

    雲Message QueueTT 版的AccessKey ID和AccessKey Secret的環境變數名稱分別為MQTT_AK_ENVMQTT_SK_ENV

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    
    public class MQ4IoTProducerDemo {
        public static void main(String[] args) throws Exception {
            /**
             * 您建立的雲Message QueueTT 版的執行個體ID。
             */
            String instanceId = "XXXXX";
            /**
             * 設定終端SDK的存取點,進入雲Message QueueTT 版控制台執行個體詳情頁面的存取點頁簽查看。
             * 存取點地址必須填寫分配的網域名稱,不得使用IP地址直接連接,否則可能會導致用戶端異常。
             */
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            /**
             * AccessKey ID,阿里雲身分識別驗證,在阿里雲RAM控制台建立。
             * 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
             * 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
             * 本樣本以將AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * AccessKey Secret,阿里雲身分識別驗證,在阿里雲RAM控制台建立。僅在簽名鑒權模式下需要設定。
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
            /**
             * MQTT用戶端ID,由業務系統分配,需要保證每個TCP串連都不一樣,保證全域唯一,如果不同的用戶端對象(TCP串連)使用了相同的clientId會導致串連異常斷開。
             * clientId由兩部分組成,格式為GroupID@@@DeviceID,其中GroupID在雲Message QueueTT 版控制台建立,DeviceID由業務方自己設定,clientId總長度不得超過64個字元。
             */
            String clientId = "GID_XXXXX@@@XXXXX";
            /**
             * 雲Message QueueTT 版訊息的一級Topic,需要在控制台建立才能使用。
             * 如果使用了沒有建立或者沒有被授權的Topic會導致鑒權失敗,服務端會斷開用戶端串連。
             */
            final String parentTopic = "XXXXX";
            /**
             * 雲Message QueueTT 版支援子級Topic,用來做自訂的過濾,此處為樣本,可以填寫任一字元串。
             * 需要注意的是,完整的Topic長度不得超過128個字元。
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * QoS參數代表傳輸品質,可選0,1,2。詳細資料,請參見名詞解釋。
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
             /**
             * 用戶端協議和連接埠。用戶端使用的協議和連接埠必須匹配,如果是SSL加密則設定ssl://endpoint:8883。
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
             /**
             * 設定用戶端發送逾時時間,防止無限阻塞。
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * 用戶端串連成功後就需要儘快訂閱需要的Topic。
                     */
                    System.out.println("connect success");
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                     /**
                     * 消費訊息的回調介面,需要確保該介面不拋異常,該介面運行返回即代表訊息消費成功。
                     * 消費訊息需要保證在規定時間內完成,如果消費耗時超過服務端約定的逾時時間,對於可靠傳輸的模式,服務端可能會重試推送,業務需要做好等冪去重處理。
                     */
                    System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                 /**
                 * 發送普通訊息時,Topic必須和接收方訂閱的Topic一致,或者符合萬用字元匹配規則。
                 */
                mqttClient.publish(mq4IotTopic, message);
                /**
                 * 雲Message QueueTT 版支援點對點訊息,即如果發送方明確知道該訊息只需要給特定的一個裝置接收,且知道對端的clientId,則可以直接發送點對點訊息。
                 * 點對點訊息不需要經過訂閱關係匹配,可以簡化訂閱者的邏輯。點對點訊息的Topic格式規範是 {{parentTopic}}/p2p/{{targetClientId}}。
                 */
                String receiverId = "xxx";
                final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

調用雲端SDK接收訊息

  1. 下載雲Message QueueTT 版提供的雲端SDK。下載地址為雲端SDK版本說明

  2. 下載雲端SDK的Demo樣本做為您代碼開發的參考。下載地址為mqtt-server-sdk-demo

  3. 解壓該Demo工程包至您指定的檔案夾。
  4. 在IntelliJ IDEA中,匯入解壓後的檔案以建立相應的工程,並確認pom.xml中已包含以下依賴。

    <dependencies>
            <dependency>
                <groupId>com.alibaba.mqtt</groupId>
                <artifactId>server-sdk</artifactId>
                <version>1.0.0.Final</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
    </dependencies>
  5. MQTTConsumerDemo.java類中,按代碼注釋說明填寫相應參數,主要涉及您已在建立資源中所建立好的MQTT資源資訊。然後執行Main函數運行程式碼完成訊息接收。

    範例程式碼如下。

    說明

    在使用範例程式碼前,需要配置環境變數,通過環境變數讀取存取憑證。關於配置環境變數的方法,請參見配置訪問憑證

    雲Message QueueTT 版的AccessKey ID和AccessKey Secret的環境變數名稱分別為MQTT_AK_ENVMQTT_SK_ENV

    package com.aliyun.openservices.lmq.example;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.mqtt.server.ServerConsumer;
    import com.alibaba.mqtt.server.callback.MessageListener;
    import com.alibaba.mqtt.server.config.ChannelConfig;
    import com.alibaba.mqtt.server.config.ConsumerConfig;
    import com.alibaba.mqtt.server.model.MessageProperties;
    
    public class MQTTConsumerDemo {
        public static void main(String[] args) throws Exception {
            /**
             * 設定雲端SDK的存取點,請參見存取點說明中的雲端SDK存取點格式。
             * 存取點地址必須填寫分配的網域名稱,不得使用IP地址直接連接,否則可能會導致服務端異常。
             */
            String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
    
            /**
             * 使用的協議和連接埠必須匹配,該參數值固定為5672。
             */
            int port = "5672";
    
            /**
             * 您建立的雲Message QueueTT 版的執行個體ID。
             */
            String instanceId = "post-cn-jaj3h8i****";
    
            /**
             * AccessKey ID,阿里雲身分識別驗證,在阿里雲RAM控制台建立。
             * 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
             * 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
             * 本樣本以將AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * AccessKey Secret,阿里雲身分識別驗證,在阿里雲RAM控制台建立。僅在簽名鑒權模式下需要設定。
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
    
            /**
             * 雲Message QueueTT 版訊息的一級Topic,需要在控制台建立才能使用。
             * 由於雲端SDK訂閱訊息一般用於雲上應用進行訊息匯總和分析等情境,因此,雲端SDK訂閱訊息不支援設定子級Topic。
             * 如果使用了沒有建立或者沒有被授權的Topic會導致鑒權失敗,服務端會斷開用戶端串連。
             */
            String firstTopic = "firstTopic";
    
            ChannelConfig channelConfig = new ChannelConfig();
            channelConfig.setDomain(domain);
            channelConfig.setPort(port);
            channelConfig.setInstanceId(instanceId);
            channelConfig.setAccessKey(accessKey);
            channelConfig.setSecretKey(secretKey);
    
            ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
            serverConsumer.start();
            serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
                @Override
                public void process(String msgId, MessageProperties messageProperties, byte[] payload) {
                    System.out.println("Receive:" + msgId + "," + JSONObject.toJSONString(messageProperties) + "," + new String(payload));
                }
            });
        }
    
    }
    說明

    雲端SDK訊息發送的範例程式碼,請參見MQTTProducerDemo.java