物联网平台将设备上报的数据流转至消息队列RocketMQ的Topic中后,RocketMQ再将数据流转到企业服务器。本文介绍数据流转的操作步骤。
前提条件
已注册阿里云账号。
已开通物联网平台服务。
已开通消息队列RocketMQ服务。
如未开通,请登录消息队列 RocketMQ产品页面,开通服务。
已准备开发环境。本示例使用Java SDK开发的环境如下:
- 操作系统:Windows 10 64位
- JDK版本:JDK8
- 集成开发环境:IntelliJ IDEA社区版
背景信息
数据流转流程图:
方案优势:
设备使用MQTT协议接入物联网平台,数据传输链路支持TLS加密,保障数据不被篡改。MQTT协议说明,请参见MQTT协议规范。
通过消息队列RocketMQ削峰填谷,缓冲消息,减轻服务器同时接收大量设备消息的压力。
操作步骤
登录物联网平台控制台,创建产品和设备。
在实例概览页签的全部环境下,找到对应的实例,单击实例ID或备注名称。
本示例选择地域华东2(上海)。
在左侧导航栏选择 ,单击创建产品,配置参数,单击确认。
本示例中,产品名称为MQ_test,节点类型为直连设备,其他参数使用默认值。
单击查看产品详情,在产品详情页面,单击 ,然后单击自定义Topic类,定义一个用于设备上报数据的Topic。
本示例中,定义的Topic类:
/${YourProductKey}/${YourDeviceName}/user/data
。在左侧导航栏选择MQ_test创建设备。 ,单击添加设备,为产品
本示例中,创建了一个名称为MQdevice的设备。
在消息队列RocketMQ控制台,创建Topic和消费者。
在左侧导航栏选择实例列表,单击创建实例,创建一个4.0系列的标准版实例,地域选择华东2(上海)。
具体操作,请参见创建实例。
重要RocketMQ实例所在地域必须与物联网平台实例所在地域保持一致。
仅支持将数据流转到RocketMQ 4.x版本实例的Topic中。
在实例列表页面,单击实例名称。
在实例详情页面,单击创建 Group,配置如下图所示,然后单击确定。
单击创建Topic,消息类型选择普通消息。
创建消息消费者,然后在RocketMQ控制台查看消费者状态,确保消费者处于在线状态,订阅关系一致。
本文以调用TCP协议的SDK为例,进行收发消息。SDK获取和使用的详细内容,请参见调用TCP协议的SDK收发普通消息。
Java语言的SDK代码示例如下:
说明查看AccessKey和SecretKey方法,请参见创建AccessKey。
RocketMQ详细操作指导,请参见消息队列RocketMQ文档。
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); // 您在控制台创建的 Group ID properties.put(PropertyKeyConst.GROUP_ID, "XXX"); // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.AccessKey, "${AccessKey}"); // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.SecretKey, "${SecretKey}"); // 设置 TCP 接入域名,到控制台的实例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); // 集群订阅方式 (默认) // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); // 广播订阅方式 // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("iot_to_mq", "*", new MessageListener() { //订阅多个 Tag public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }
返回物联网平台控制台,在对应实例下,设置数据流转规则,将设备上报的数据转发至消息队列(RocketMQ)。
在左侧导航栏选择 。
在云产品流转页面,单击创建规则。
重要若当前页面显示新版功能,先单击右上角返回旧版,进入旧版功能页面,再单击创建规则。
输入规则名称MQ流转,数据格式选择为JSON,单击确认。
单击编写SQL,设置数据处理SQL,如下图所示,然后单击确认。
单击添加操作设置数据转发目的地,如下图所示,然后单击确认。
所有设置完成后,返回至云产品流转页面,单击MQ流转规则对应的启动。
规则启动后,物联网平台会将规则SQL中定义的设备上报消息转发至消息队列(RocketMQ)的Topic中。
使用Java SDK模拟设备接入物联网平台,并上报消息。
下载Java SDK Demo,然后解压。
在IntelliJ IDEA中,导入Demo包中的示例工程JavaLinkKitDemo。
在文件
device_id.json
中输入MQdevice的设备证书信息:productKey、deviceName和deviceSecret。在文件
src\main\java\com.aliyun.alink.devicesdk.demo\MqttSample.java
中修改MQTT Topic为设备上报数据的Topic。本示例中,使用的Topic是
/{YourProductKey}/${YourDeviceName}/user/data
。/** * 发布接口示例 */ public void publish() { MqttPublishRequest request = new MqttPublishRequest(); // topic 用户根据实际场景填写 request.topic = "/" + productKey + "/" + deviceName + "/user/data"; ...... ...... }
在文件
src\main\java\com.aliyun.alink.devicesdk.demo\HelloWorld.java
中修改MQTT接入域名为您设备的MQTT接入域名。本示例如下,查看接入域名方法,请参见查看实例终端节点。
public void init(final DeviceInfoData deviceInfoData) { ...... ...... /** * 设置 Mqtt 初始化参数 */ IoTMqttClientConfig config = new IoTMqttClientConfig(); config.productKey = deviceInfoData.productKey; config.deviceName = deviceInfoData.deviceName; config.deviceSecret = deviceInfoData.deviceSecret; config.channelHost = "iot-06****.mqtt.iothub.aliyuncs.com:1883"; ...... ...... }
运行
src\main\java\com.aliyun.alink.devicesdk.demo\HelloWorld.java
文件,启动设备。
返回物联网平台控制台,在对应的实例下,选择 ,查看该设备的日志信息,发现设备数据成功转发至RocketMQ。
在RocketMQ控制台查看消息。
在本地运行订阅消息队列RocketMQ资源的代码。
返回消息队列RocketMQ控制台的实例详情页面,单击消息查询,基于Topic或者Message ID查询消息。
您可单击详情查看并下载流转至消息队列RocketMQ中的消息详情。
消息内容如下:
{"deviceName":"MQdevice"}