IoT platform realizes the best practice of RRPC synchronous call
Actual Combat of Synchronous Call Based on Pub/Sub Mode
1. Synchronous call scene
1.1 Background
The MQTT protocol is an asynchronous communication mode based on PUB/SUB, which cannot realize the scenario where the server sends instructions to the device and requires the device to return a response result.
Based on the MQTT protocol, the IoT platform has developed a set of request and response synchronization mechanisms, which can realize synchronous communication without changing the MQTT protocol. The application server initiates an Rrpc call through the POP API, and the IoT device only needs to reply to the Pub message in a fixed format within the Timeout, and the server can obtain the response result of the IoT device synchronously.
The specific process is as follows:
1.2 Topic format convention
Request: /sys/${productKey}/${deviceName}/rrpc/request/${messageId}**
** Response: **/sys/${productKey}/${deviceName}/rrpc/ **response**/**${messageId}
$ represents a variable, which is different for each device
messageId is the message ID generated by the IoT platform,
The messageId in the responseTopic replied by the device must be consistent with the requestTopic
Example:
The device side needs to subscribe:
/sys/${productKey}/${deviceName}/rrpc/request/+
The running device receives the Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
After receiving the message, reply to the topic within the timeout time:
/sys/PK100101/DN213452/rrpc/response/443859344534
2. Example of calling RRPC synchronously
2.1 Device-side code
const mqtt = require('aliyun-iot-mqtt');
//Device properties
const options = require("./iot-device-config.json");
//establish connection
const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {
if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
handleRrpc(topic, message)
}
})
function handleRrpc(topic, message){
topic = topic.replace('/request/','/response/');
console.log("topic=" + topic)
// Ordinary Rrpc, response payload customization
const payloadJson = {code:200,msg:"handle ok"};
client.publish(topic, JSON.stringify(payloadJson));
}
2.2 The server POP calls Rrpc
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1. Initialize client
const client = new RPCClient({
accessKeyId: options.accessKey,
secretAccessKey: options. accessKeySecret,
endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
apiVersion: '2018-01-20'
});
const payload = {
"msg": "hello Rrpc"
};
//2. Build request
const params = {
ProductKey: "a1gMu82K4m2",
DeviceName: "h5@nuwr5r9hf6l@1532088166923",
RequestBase64Byte: new Buffer(JSON.stringify(payload)).toString("base64"),
Timeout: 3000
};
co(function*() {
//3. Initiate an API call
const response = yield client.request('Rrpc', params);
console.log(JSON.stringify(response));
});
rrpc response:
{
"MessageId": "1037292594536681472",
"RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",
"PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",
"Success": true,
"RrpcCode": "SUCCESS"
}
// PayloadBase64Byte decoding: {"code":200,"msg":"handle ok"}
3. Thing model - service synchronous call InvokeThingService example
Note: Thing model service call interface InvokeThingService, not Rrpc
Device subscribes to subTopic
Note: The service synchronous call API is InvokeThingService
/sys/${productKey}/${deviceName}/rrpc/request/+
The payload format of IoT cloud downlink
{
"id": 3536123,
"version": "1.0",
"params": {
"Input parameter key1": "Input parameter value1",
"Input parameter key2": "Input parameter value2"
},
"method": "thing. service. {tsl. service. identifier}"
}
The device responds to replyTopic
Message Id of /sys/${productKey}/${deviceName}/rrpc/response/request
Device response payload format
{
"id": 3536123,
"code": 200,
"data": {
"Out parameter key1": "Out parameter value1",
"Out parameter key2": "Out parameter value2"
}
}
3.1 Thing Model - Synchronization Service Definition
3.2 Implementation on the device side
const mqtt = require('aliyun-iot-mqtt');
//Device properties
const options = require("./iot-device-config.json");
//establish connection
const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {
if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
handleRrpc(topic, message)
}
})
/*
* If there are multiple synchronous call services, they need to be distinguished by the method in the payload
*/
function handleRrpc(topic, message){
topic = topic.replace('/request/','/response/');
console.log("topic=" + topic)
//Object model Synchronous service call, response payload structure:
const payloadJson = {
id: Date.now(),
code:200,
data: {
currentMode: Math. floor((Math. random() * 20) + 10)
}
}
client.publish(topic, JSON.stringify(payloadJson));
}
Note: The payload responded by the device must meet the output parameter structure defined by the object model
3.3 Server POP interface InvokeThingService
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1. Initialize client
const client = new RPCClient({
accessKeyId: options.accessKey,
secretAccessKey: options. accessKeySecret,
endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
apiVersion: '2018-01-20'
});
const params = {
ProductKey: "a1gMu82K4m2",
DeviceName: "h5@nuwr5r9hf6l@1532088166923",
Args: JSON. stringify({ "mode": "1" }),
Identifier: "thing. service. setMode"
};
co(function*() {
try {
//3. Initiate an API call
const response = yield client.request('InvokeThingService', params);
console.log(JSON.stringify(response));
} catch (err) {
console. log(err);
}
});
Call result:
{
"Data": {
"MessageId": "1536145625658"
},
"RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",
"Success": true
}
1. Synchronous call scene
1.1 Background
The MQTT protocol is an asynchronous communication mode based on PUB/SUB, which cannot realize the scenario where the server sends instructions to the device and requires the device to return a response result.
Based on the MQTT protocol, the IoT platform has developed a set of request and response synchronization mechanisms, which can realize synchronous communication without changing the MQTT protocol. The application server initiates an Rrpc call through the POP API, and the IoT device only needs to reply to the Pub message in a fixed format within the Timeout, and the server can obtain the response result of the IoT device synchronously.
The specific process is as follows:
1.2 Topic format convention
Request: /sys/${productKey}/${deviceName}/rrpc/request/${messageId}**
** Response: **/sys/${productKey}/${deviceName}/rrpc/ **response**/**${messageId}
$ represents a variable, which is different for each device
messageId is the message ID generated by the IoT platform,
The messageId in the responseTopic replied by the device must be consistent with the requestTopic
Example:
The device side needs to subscribe:
/sys/${productKey}/${deviceName}/rrpc/request/+
The running device receives the Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
After receiving the message, reply to the topic within the timeout time:
/sys/PK100101/DN213452/rrpc/response/443859344534
2. Example of calling RRPC synchronously
2.1 Device-side code
const mqtt = require('aliyun-iot-mqtt');
//Device properties
const options = require("./iot-device-config.json");
//establish connection
const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {
if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
handleRrpc(topic, message)
}
})
function handleRrpc(topic, message){
topic = topic.replace('/request/','/response/');
console.log("topic=" + topic)
// Ordinary Rrpc, response payload customization
const payloadJson = {code:200,msg:"handle ok"};
client.publish(topic, JSON.stringify(payloadJson));
}
2.2 The server POP calls Rrpc
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1. Initialize client
const client = new RPCClient({
accessKeyId: options.accessKey,
secretAccessKey: options. accessKeySecret,
endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
apiVersion: '2018-01-20'
});
const payload = {
"msg": "hello Rrpc"
};
//2. Build request
const params = {
ProductKey: "a1gMu82K4m2",
DeviceName: "h5@nuwr5r9hf6l@1532088166923",
RequestBase64Byte: new Buffer(JSON.stringify(payload)).toString("base64"),
Timeout: 3000
};
co(function*() {
//3. Initiate an API call
const response = yield client.request('Rrpc', params);
console.log(JSON.stringify(response));
});
rrpc response:
{
"MessageId": "1037292594536681472",
"RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",
"PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",
"Success": true,
"RrpcCode": "SUCCESS"
}
// PayloadBase64Byte decoding: {"code":200,"msg":"handle ok"}
3. Thing model - service synchronous call InvokeThingService example
Note: Thing model service call interface InvokeThingService, not Rrpc
Device subscribes to subTopic
Note: The service synchronous call API is InvokeThingService
/sys/${productKey}/${deviceName}/rrpc/request/+
The payload format of IoT cloud downlink
{
"id": 3536123,
"version": "1.0",
"params": {
"Input parameter key1": "Input parameter value1",
"Input parameter key2": "Input parameter value2"
},
"method": "thing. service. {tsl. service. identifier}"
}
The device responds to replyTopic
Message Id of /sys/${productKey}/${deviceName}/rrpc/response/request
Device response payload format
{
"id": 3536123,
"code": 200,
"data": {
"Out parameter key1": "Out parameter value1",
"Out parameter key2": "Out parameter value2"
}
}
3.1 Thing Model - Synchronization Service Definition
3.2 Implementation on the device side
const mqtt = require('aliyun-iot-mqtt');
//Device properties
const options = require("./iot-device-config.json");
//establish connection
const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {
if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
handleRrpc(topic, message)
}
})
/*
* If there are multiple synchronous call services, they need to be distinguished by the method in the payload
*/
function handleRrpc(topic, message){
topic = topic.replace('/request/','/response/');
console.log("topic=" + topic)
//Object model Synchronous service call, response payload structure:
const payloadJson = {
id: Date.now(),
code:200,
data: {
currentMode: Math. floor((Math. random() * 20) + 10)
}
}
client.publish(topic, JSON.stringify(payloadJson));
}
Note: The payload responded by the device must meet the output parameter structure defined by the object model
3.3 Server POP interface InvokeThingService
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1. Initialize client
const client = new RPCClient({
accessKeyId: options.accessKey,
secretAccessKey: options. accessKeySecret,
endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
apiVersion: '2018-01-20'
});
const params = {
ProductKey: "a1gMu82K4m2",
DeviceName: "h5@nuwr5r9hf6l@1532088166923",
Args: JSON. stringify({ "mode": "1" }),
Identifier: "thing. service. setMode"
};
co(function*() {
try {
//3. Initiate an API call
const response = yield client.request('InvokeThingService', params);
console.log(JSON.stringify(response));
} catch (err) {
console. log(err);
}
});
Call result:
{
"Data": {
"MessageId": "1536145625658"
},
"RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",
"Success": true
}
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00