物聯網平台將裝置上報的資料流轉至訊息佇列RocketMQ的Topic中後,RocketMQ再將資料流轉到企業伺服器。本文介紹資料流轉的操作步驟。
前提條件
登入阿里雲帳號。
已開通物聯網平台服務。
已開通訊息佇列RocketMQ服務。
如未開通,請登入訊息佇列 RocketMQ產品頁面,開通服務。
已準備開發環境。本樣本使用Java SDK開發的環境如下:
作業系統:Windows 10 64位
JDK版本:JDK8
整合式開發環境:IntelliJ IDEA社區版
背景資訊
資料流轉流程圖:
方案優勢:
裝置使用MQTT協議接入物聯網平台,資料轉送鏈路支援TLS加密,保障資料不被篡改。MQTT協議說明,請參見MQTT協議規範。
通過訊息佇列RocketMQ削峰填穀,緩衝訊息,減輕伺服器同時接收大量裝置訊息的壓力。
操作步驟
登入物聯網平台控制台,建立產品和裝置。
在執行個體概覽頁簽的全部環境下,找到對應的執行個體,單擊執行個體卡片。
本樣本選擇地區華東2(上海)。
在左側導覽列選擇 ,單擊建立產品,配置參數,單擊確認。
本樣本中,產品名稱為MQ_test,節點類型為直連裝置,其他參數使用預設值。
單擊查看產品詳情,在產品詳情頁面,單擊 ,然後單擊自訂Topic類,定義一個用於裝置上報資料的Topic。
本樣本中,定義的Topic類:
/${YourProductKey}/${YourDeviceName}/user/data
。在左側導覽列選擇MQ_test建立裝置。 ,單擊添加裝置,為產品
本樣本中,建立了一個名稱為MQdevice的裝置。
在訊息佇列RocketMQ控制台,建立Topic和消費者。
在左側導覽列選擇執行個體列表,單擊建立執行個體,建立一個4.0系列的標準版執行個體,地區選擇華東2(上海)。
具體操作,請參見建立執行個體。
重要RocketMQ執行個體所在地區必須與物聯網平台執行個體所在地區保持一致。
僅支援將資料流轉到RocketMQ 4.x版本執行個體的Topic中。
在執行個體列表頁面,單擊執行個體名稱。
在執行個體詳情頁面,單擊建立 Group,配置如下圖所示,然後單擊確定。
單擊建立Topic,訊息類型選擇普通訊息。
建立訊息消費者,然後在RocketMQ控制台查看消費者狀態,確保消費者處於線上狀態,訂閱關係一致。
本文以調用TCP協議的SDK為例,進行收發訊息。SDK擷取和使用的詳細內容,請參見調用TCP協議的SDK收發普通訊息。
Java語言的SDK程式碼範例如下:
說明查看AccessKey和SecretKey方法,請參見建立AccessKey。
RocketMQ詳細操作指導,請參見訊息佇列RocketMQ文檔。
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); // 您在控制台建立的 Group ID properties.put(PropertyKeyConst.GROUP_ID, "XXX"); // AccessKey 阿里雲身分識別驗證,在阿里雲伺服器管理主控台建立 properties.put(PropertyKeyConst.AccessKey, "${AccessKey}"); // SecretKey 阿里雲身分識別驗證,在阿里雲伺服器管理主控台建立 properties.put(PropertyKeyConst.SecretKey, "${SecretKey}"); // 設定 TCP 接入網域名稱,到控制台的執行個體基本資料中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); // 叢集訂閱者式 (預設) // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); // 廣播訂閱者式 // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("iot_to_mq", "*", new MessageListener() { //訂閱多個 Tag public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }
返回物聯網平台控制台,在對應執行個體下,設定資料流轉規則,將裝置上報的資料轉寄至訊息佇列(RocketMQ)。
在左側導覽列選擇 。
在雲產品流轉頁面,單擊建立規則。
重要若當前頁面顯示新版功能,先單擊右上方返回舊版,進入舊版功能頁面,再單擊建立規則。
輸入規則名稱MQ流轉,資料格式選擇為JSON,單擊確認。
單擊編寫SQL,設定資料處理SQL,如下圖所示,然後單擊確認。
單擊添加操作設定資料轉寄目的地,如下圖所示,然後單擊確認。
所有設定完成後,返回至雲產品流轉頁面,單擊MQ流轉規則對應的啟動。
規則啟動後,物聯網平台會將規則SQL中定義的裝置上報訊息轉寄至訊息佇列(RocketMQ)的Topic中。
使用Java SDK類比裝置接入物聯網平台,並上報訊息。
下載Java SDK Demo,然後解壓。
在IntelliJ IDEA中,匯入Demo包中的樣本工程JavaLinkKitDemo。
在檔案
device_id.json
中輸入MQdevice的裝置認證資訊:productKey、deviceName和deviceSecret。在檔案
src\main\java\com.aliyun.alink.devicesdk.demo\MqttSample.java
中修改MQTT Topic為裝置上報資料的Topic。本樣本中,使用的Topic是
/{YourProductKey}/${YourDeviceName}/user/data
。/** * 發布介面樣本 */ public void publish() { MqttPublishRequest request = new MqttPublishRequest(); // topic 使用者根據實際情境填寫 request.topic = "/" + productKey + "/" + deviceName + "/user/data"; ...... ...... }
在檔案
src\main\java\com.aliyun.alink.devicesdk.demo\HelloWorld.java
中修改MQTT接入網域名稱為您裝置的MQTT接入網域名稱。本樣本如下,查看接入網域名稱方法,請參見查看執行個體終端節點。
public void init(final DeviceInfoData deviceInfoData) { ...... ...... /** * 設定 Mqtt 初始化參數 */ IoTMqttClientConfig config = new IoTMqttClientConfig(); config.productKey = deviceInfoData.productKey; config.deviceName = deviceInfoData.deviceName; config.deviceSecret = deviceInfoData.deviceSecret; config.channelHost = "iot-06****.mqtt.iothub.aliyuncs.com:1883"; ...... ...... }
運行
src\main\java\com.aliyun.alink.devicesdk.demo\HelloWorld.java
檔案,啟動裝置。
返回物聯網平台控制台,在對應的執行個體下,選擇 ,查看該裝置的日誌資訊,發現裝置資料成功轉寄至RocketMQ。
在RocketMQ控制台查看訊息。
在本地運行訂閱訊息佇列RocketMQ資源的代碼。
返回訊息佇列RocketMQ控制台的執行個體詳情頁面,單擊訊息查詢,基於Topic或者Message ID查詢訊息。
您可單擊詳情查看並下載流轉至訊息佇列RocketMQ中的訊息詳情。
訊息內容如下:
{"deviceName":"MQdevice"}