使用MQTT协议接入的设备和物联网平台,通过订阅Topic或向Topic发布消息的方式进行通信。Topic分为系统Topic、物模型Topic和自定义Topic,其中自定义Topic需要用户在控制台定义。 本文为您介绍设备使用自定义Topic与物联网平台进行上下行通信,以及物联网平台和业务服务器之间通信的步骤。
背景信息
本示例中,电子温度计通过订阅自定义Topic接收指令,通过向自定义Topic发布消息从而上报温度,物联网平台将收到的温度信息通过AMQP服务端转发到用户服务器,用户服务器调用Pub接口向自定义Topic发布消息从而实现对设备的远程精度设置。
准备开发环境
本示例中,设备端和云端均使用Java语言的SDK,需先准备Java开发环境。您可从Java官方网站下载并安装Java开发环境。
本示例使用环境如下:
操作系统:Windows 10 64位
JDK版本:JDK8
集成开发环境:IntelliJ IDEA社区版
创建产品和设备
登录物联网平台控制台。
在实例概览页签的全部环境下,找到对应的实例,单击实例卡片。
在左侧导航栏,单击 。
单击创建产品,创建温度计产品,获取productKey,例如
a1uzcH0****
。详细操作指导,请参见创建产品。
创建产品成功后,单击该产品对应的查看。
在产品详情页面的Topic类列表页签下,单击自定义Topic,增加自定义Topic类。
详细操作指导,请参见使用自定义Topic通信。
本示例中,定义了以下两个Topic类:
设备发布消息Topic:/a1uzcH0****/${deviceName}/user/devmsg,权限为发布。
设备订阅消息Topic:/a1uzcH0****/${deviceName}/user/cloudmsg,权限为订阅。
在服务端订阅页签下,单击创建订阅,设置AMQP服务端订阅,订阅设备上报消息到默认消费组。
设备上报消息包含自定义Topic消息和物模型消息。详细操作和说明,请参见配置AMQP服务端订阅。
在左侧导航栏,选择 ,然后在刚创建的温度计产品下,添加设备device1,获取设备证书ProductKey、DeviceName和DeviceSecret。
详细操作指导,请参见单个创建设备。
设备发送消息给服务器
流程图:
在整个流程中:
服务器通过AMQP客户端接收消息,需配置AMQP客户端接入物联网平台,监听设备消息。具体操作,请参见Java SDK接入示例。
重要AMQP订阅消息的消费组,必须与设备在相同的物联网平台实例下。
配置设备端SDK接入物联网平台,并发送消息。
配置设备认证信息。
final String productKey = "a1uzcH0****"; final String deviceName = "device1"; final String deviceSecret = "uwMTmVAMnxB****"; final String region = "cn-shanghai"; final String iotInstanceId = "iot-2w****";
实际业务场景中,您需修改以下参数值。
参数
示例
说明
productKey
a1uzcH0****
设备证书信息。您可在物联网平台控制台的设备详情页面查看。具体操作,请参见查看具体设备信息。
deviceName
device1
deviceSecret
uwMTmVAMnxB****
region
cn-shanghai
您物联网平台设备所在地域的Region ID。Region ID表达方法,请参见地域列表。
iotInstanceId
iot-2w****
设备所属实例的ID。
您可在控制台的实例概览页面查看。
若有ID值,必须传入该ID值。
若无实例概览页面或ID值,传入空值,即
iotInstanceId = ""
。
设置初始化连接参数,包括MQTT连接配置、设备信息和初始物模型属性。
LinkKitInitParams params = new LinkKitInitParams(); //LinkKit底层是MQTT协议,设置MQTT的配置。 IoTMqttClientConfig config = new IoTMqttClientConfig(); config.productKey = productKey; config.deviceName = deviceName; config.deviceSecret = deviceSecret; config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883"; //设备的信息。 DeviceInfo deviceInfo = new DeviceInfo(); deviceInfo.productKey = productKey; deviceInfo.deviceName = deviceName; deviceInfo.deviceSecret = deviceSecret; //报备的设备初始状态。 Map<String, ValueWrapper> propertyValues = new HashMap<String, ValueWrapper>(); params.mqttClientConfig = config; params.deviceInfo = deviceInfo; params.propertyValues = propertyValues;
实际业务场景中,您需修改以下参数值。
参数
示例
说明
config.channelHost
config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883";
MQTT设备接入域名。
旧版公共实例:
config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883";
。新版公共实例和企业版实例:
config.channelHost = iotInstanceId + ".mqtt.iothub.aliyuncs.com:1883";
。
初始化连接。
//连接并设置连接成功以后的回调函数。 LinkKit.getInstance().init(params, new ILinkKitConnectListener() { @Override public void onError(AError aError) { System.out.println("Init error:" + aError); } //初始化成功以后的回调。 @Override public void onInitDone(InitResult initResult) { System.out.println("Init done:" + initResult); } });
设备发送消息。
设备端连接物联网平台后,向自定义的Topic发送消息。您需将onInitDone函数内容替换为以下内容:
@Override public void onInitDone(InitResult initResult) { //设置Pub消息的Topic和内容。 MqttPublishRequest request = new MqttPublishRequest(); request.topic = "/" + productKey + "/" + deviceName + "/user/devmsg"; request.qos = 0; request.payloadObj = "{\"temperature\":35.0, \"time\":\"sometime\"}"; //发送消息并设置成功以后的回调。 LinkKit.getInstance().publish(request, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { System.out.println("onResponse:" + aResponse.getData()); } @Override public void onFailure(ARequest aRequest, AError aError) { System.out.println("onFailure:" + aError.getCode() + aError.getMsg()); } }); }
实际业务场景中,您需修改以下参数值。
参数
示例
说明
request.topic
"/" + productKey + "/" + deviceName + "/user/devmsg"
具有发布权限的自定义Topic。
request.payloadObj
"{\"temperature\":35.0, \"time\":\"sometime\"}"
自定义的消息内容。
服务器收到消息如下:
Message {payload={"temperature":35.0, "time":"sometime"}, topic='/a1uzcH0****/device1/user/devmsg', messageId='1131755639450642944', qos=0, generateTime=1558666546105}
服务器发送消息给设备
流程图:
配置设备端SDK订阅Topic。
配置设备认证信息、设置初始化连接参数、初始化连接,请参见设备发送消息给服务器中的相应示例代码。
设备要接收服务器发送的消息,还需订阅消息Topic。
配置设备端订阅Topic示例如下:
//初始化成功以后的回调。 @Override public void onInitDone(InitResult initResult) { //设置订阅的Topic。 MqttSubscribeRequest request = new MqttSubscribeRequest(); request.topic = "/" + productKey + "/" + deviceName + "/user/cloudmsg"; request.isSubscribe = true; //发出订阅请求并设置订阅成功或者失败的回调函数。 LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() { @Override public void onSuccess() { System.out.println(""); } @Override public void onFailure(AError aError) { } }); //设置订阅的下行消息到来时的回调函数。 IConnectNotifyListener notifyListener = new IConnectNotifyListener() { //此处定义收到下行消息以后的回调函数。 @Override public void onNotify(String connectId, String topic, AMessage aMessage) { System.out.println( "received message from " + topic + ":" + new String((byte[])aMessage.getData())); } @Override public boolean shouldHandle(String s, String s1) { return false; } @Override public void onConnectStateChange(String s, ConnectState connectState) { } }; LinkKit.getInstance().registerOnNotifyListener(notifyListener); }
其中,
request.topic
值需修改为具有订阅权限的自定义Topic。配置云端SDK调用物联网平台接口Pub发布消息。参数说明,请参见Pub,使用说明,请参见Java SDK使用说明。
设置身份认证信息。
String regionId = "cn-shanghai"; String accessKey = "LTAI4GFGQvKuqHJhFaj****"; String accessSecret = "iMS8ZhCDdfJbCMeA005sieKe****"; final String productKey = "a1uzcH0****"; final String deviceName = "device1"; final String iotInstanceId = "iot-2w****";
实际业务场景中,您需修改以下参数值。
参数
示例
说明
accessKey
LTAI4GFGQvKuqHJhFaj****
您的阿里云账号的AccessKey ID和AccessKey Secret。
登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。
说明如果使用RAM用户,您需授予该RAM用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见授权RAM用户访问物联网平台。
accessSecret
iMS8ZhCDdfJbCMeA005sieKe****
productKey
a1uzcH0****
设备证书信息。您可在物联网平台控制台的设备详情页面查看。具体操作,请参见查看具体设备信息。
deviceName
device1
region
cn-shanghai
您物联网平台设备所在地域的Region ID。Region ID表达方法,请参见地域列表。
iotInstanceId
iot-2w****
设备所属实例的ID。
您可在控制台的实例概览页面查看。
若有ID值,必须传入该ID值。
若无实例概览页面或ID值,传入空值,即
iotInstanceId = ""
。
设置连接参数。
//设置client的参数。 DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKey, accessSecret); IAcsClient client = new DefaultAcsClient(profile);
设置消息发布参数。
PubRequest request = new PubRequest(); request.setIotInstanceId(iotInstanceId); request.setQos(0); //设置发布消息的Topic。 request.setTopicFullName("/" + productKey + "/" + deviceName + "/user/cloudmsg"); request.setProductKey(productKey); //设置消息的内容,一定要用Base64编码,否则乱码。 request.setMessageContent(Base64.encode("{\"accuracy\":0.001,\"time\":now}"));
实际业务场景中,您需修改调用接口的请求参数。详细说明,请参见Pub。
发送消息。
try { PubResponse response = client.getAcsResponse(request); System.out.println("pub success?:" + response.getSuccess()); } catch (Exception e) { System.out.println(e); }
设备端接收到的消息如下:
msg = [{"accuracy":0.001,"time":now}]
附录:代码示例
实际业务场景中,请按照上文描述,修改相关代码及相关参数的值。
下载Pub/Sub demo,包含本示例的云端SDK和设备端SDK配置代码Demo。
AMQP客户端接入物联网平台示例,请参见: