全部產品
Search
文件中心

IoT Platform:裝置訊息通過RocketMQ流轉到伺服器

更新時間:Jun 30, 2024

物聯網平台將裝置上報的資料流轉至訊息佇列RocketMQ的Topic中後,RocketMQ再將資料流轉到企業伺服器。本文介紹資料流轉的操作步驟。

前提條件

背景資訊

資料流轉流程圖:

訊息流程轉MQ

方案優勢:

  • 裝置使用MQTT協議接入物聯網平台,資料轉送鏈路支援TLS加密,保障資料不被篡改。MQTT協議說明,請參見MQTT協議規範

  • 通過訊息佇列RocketMQ削峰填穀,緩衝訊息,減輕伺服器同時接收大量裝置訊息的壓力。

操作步驟

  1. 登入物聯網平台控制台,建立產品和裝置。

    1. 執行個體概覽頁簽的全部環境下,找到對應的執行個體,單擊執行個體卡片。

      本樣本選擇地區華東2(上海)

    2. 在左側導覽列選擇裝置管理 > 產品,單擊建立產品,配置參數,單擊確認

      本樣本中,產品名稱為MQ_test,節點類型為直連裝置,其他參數使用預設值。

    3. 單擊查看產品詳情,在產品詳情頁面,單擊Topic類列表 > 自訂Topic,然後單擊自訂Topic類,定義一個用於裝置上報資料的Topic。

      本樣本中,定義的Topic類:/${YourProductKey}/${YourDeviceName}/user/data

    4. 在左側導覽列選擇裝置管理 > 裝置,單擊添加裝置,為產品MQ_test建立裝置。

      本樣本中,建立了一個名稱為MQdevice的裝置。

  2. 在訊息佇列RocketMQ控制台,建立Topic和消費者。

    1. 登入訊息佇列RocketMQ版控制台

    2. 在左側導覽列選擇執行個體列表,單擊建立執行個體,建立一個4.0系列的標準版執行個體,地區選擇華東2(上海)

      具體操作,請參見建立執行個體

      重要
      • RocketMQ執行個體所在地區必須與物聯網平台執行個體所在地區保持一致。

      • 僅支援將資料流轉到RocketMQ 4.x版本執行個體的Topic中。

    3. 執行個體列表頁面,單擊執行個體名稱。

    4. 執行個體詳情頁面,單擊建立 Group,配置如下圖所示,然後單擊確定

      物聯網裝置訊息

    5. 單擊建立Topic,訊息類型選擇普通訊息

      物聯網裝置訊息

    6. 建立訊息消費者,然後在RocketMQ控制台查看消費者狀態,確保消費者處於線上狀態,訂閱關係一致。

      本文以調用TCP協議的SDK為例,進行收發訊息。SDK擷取和使用的詳細內容,請參見調用TCP協議的SDK收發普通訊息

      Java語言的SDK程式碼範例如下:

      說明
      import com.aliyun.openservices.ons.api.Action;
      import com.aliyun.openservices.ons.api.ConsumeContext;
      import com.aliyun.openservices.ons.api.Consumer;
      import com.aliyun.openservices.ons.api.Message;
      import com.aliyun.openservices.ons.api.MessageListener;
      import com.aliyun.openservices.ons.api.ONSFactory;
      import com.aliyun.openservices.ons.api.PropertyKeyConst;
      import java.util.Properties;
      public class ConsumerTest {
          public static void main(String[] args) {
              Properties properties = new Properties();
              // 您在控制台建立的 Group ID
              properties.put(PropertyKeyConst.GROUP_ID, "XXX");
              // AccessKey 阿里雲身分識別驗證,在阿里雲伺服器管理主控台建立
              properties.put(PropertyKeyConst.AccessKey, "${AccessKey}");
              // SecretKey 阿里雲身分識別驗證,在阿里雲伺服器管理主控台建立
              properties.put(PropertyKeyConst.SecretKey, "${SecretKey}");
              // 設定 TCP 接入網域名稱,到控制台的執行個體基本資料中查看
              properties.put(PropertyKeyConst.NAMESRV_ADDR,
                  "XXX");
              // 叢集訂閱者式 (預設)
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // 廣播訂閱者式
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
              Consumer consumer = ONSFactory.createConsumer(properties);
              consumer.subscribe("iot_to_mq", "*", new MessageListener() { //訂閱多個 Tag
                  public Action consume(Message message, ConsumeContext context) {
                      System.out.println("Receive: " + message);
                      return Action.CommitMessage;
                  }
              });
              consumer.start();
              System.out.println("Consumer Started");
          }
      }
  3. 返回物聯網平台控制台在對應執行個體下,設定資料流轉規則,將裝置上報的資料轉寄至訊息佇列(RocketMQ)。

    1. 在左側導覽列選擇訊息轉寄 > 雲產品流轉

    2. 在雲產品流轉頁面,單擊建立規則

      重要

      若當前頁面顯示新版功能,先單擊右上方返回舊版,進入舊版功能頁面,再單擊建立規則

    3. 輸入規則名稱MQ流轉資料格式選擇為JSON,單擊確認

    4. 單擊編寫SQL,設定資料處理SQL,如下圖所示,然後單擊確認

      物聯網裝置訊息

    5. 單擊添加操作設定資料轉寄目的地,如下圖所示,然後單擊確認

      物聯網裝置訊息

    6. 所有設定完成後,返回至雲產品流轉頁面,單擊MQ流轉規則對應的啟動

      規則啟動後,物聯網平台會將規則SQL中定義的裝置上報訊息轉寄至訊息佇列(RocketMQ)的Topic中。

  4. 使用Java SDK類比裝置接入物聯網平台,並上報訊息。

    1. 下載Java SDK Demo,然後解壓。

    2. 在IntelliJ IDEA中,匯入Demo包中的樣本工程JavaLinkKitDemo。

    3. 在檔案device_id.json中輸入MQdevice的裝置認證資訊:productKeydeviceNamedeviceSecret

    4. 在檔案src\main\java\com.aliyun.alink.devicesdk.demo\MqttSample.java中修改MQTT Topic為裝置上報資料的Topic。

      本樣本中,使用的Topic是/{YourProductKey}/${YourDeviceName}/user/data

      /**
       * 發布介面樣本
       */
      public void publish() {
              MqttPublishRequest request = new MqttPublishRequest();
              // topic 使用者根據實際情境填寫
              request.topic = "/" + productKey + "/" + deviceName + "/user/data";
              ......
              ......
      }
    5. 在檔案src\main\java\com.aliyun.alink.devicesdk.demo\HelloWorld.java中修改MQTT接入網域名稱為您裝置的MQTT接入網域名稱。

      本樣本如下,查看接入網域名稱方法,請參見查看執行個體終端節點

      public void init(final DeviceInfoData deviceInfoData) {
              ......
              ......
              /**
               * 設定 Mqtt 初始化參數
               */
              IoTMqttClientConfig config = new IoTMqttClientConfig();
              config.productKey = deviceInfoData.productKey;
              config.deviceName = deviceInfoData.deviceName;
              config.deviceSecret = deviceInfoData.deviceSecret;
              config.channelHost = "iot-06****.mqtt.iothub.aliyuncs.com:1883";
              ......
              ......
      }
    6. 運行src\main\java\com.aliyun.alink.devicesdk.demo\HelloWorld.java檔案,啟動裝置。

    返回物聯網平台控制台在對應的執行個體下,選擇監控營運 > Log Service,查看該裝置的日誌資訊,發現裝置資料成功轉寄至RocketMQ。物聯網裝置訊息

  5. 在RocketMQ控制台查看訊息。

    1. 在本地運行訂閱訊息佇列RocketMQ資源的代碼。

      物聯網裝置訊息

    2. 返回訊息佇列RocketMQ控制台的執行個體詳情頁面,單擊訊息查詢,基於Topic或者Message ID查詢訊息。

      您可單擊詳情查看並下載流轉至訊息佇列RocketMQ中的訊息詳情。

      訊息內容如下:

      {"deviceName":"MQdevice"}