This topic provides a Python script template and a sample Python script that you can use to parse Thing Specification Language (TSL) messages.
Script template
You can write a Python script based on the following script template to parse TSL messages.
# Convert custom data of a device to Alink JSON data when the device submits data to IoT Platform.
# rawData: the input parameter. The value is a list of integers that cannot be empty.
# jsonObj: the output parameter. The value is a JSON object that indicates a dictionary and cannot be empty.
def raw_data_to_protocol(rawData):
jsonObj = {}
return jsonObj
# Convert Alink JSON data to data that can be recognized by a device when IoT Platform sends data to the device.
# jsonData: the input parameter. The value is a JSON object that indicates a dictionary and cannot be empty.
# rawData: the output parameter. The value is a list of integers within a value range of [0,255] and cannot be empty.
def protocol_to_raw_data(jsonData):
rawData = []
return rawData
Notes for script writing
- Do not use global variables. Otherwise, results may be inconsistent.
- The script is used to process data by using the two's complement operation. The complement range for values in the range of [-128,127] is [0,255]. For example, the complement of -1 is 255 in decimal.
- The raw_data_to_protocol function parses the data that is submitted by devices. The input parameter of the function is an integer array. Use
0xFF
for the bitwise AND operation to obtain the complement. - The protocol_to_raw_data function parses the data that is sent by IoT Platform and returns the result in an array. Each element in the array must be an integer within a value range of [0,255].
Sample script
The following sample script is written based on the properties and the protocol that are defined in the Submit a script to parse TSL data topic:
For more information about the data types supported by TSL models, see Supported data types. After a device submits TSL property data or event data, IoT Platform generates a response whose data is in the Alink JSON format. Before IoT Platform sends the response to the device, IoT Platform uses the script to convert the format of the data to a format that can be recognized by the device. For more information about the Alink JSON format, see Device properties, events, and services.
# coding=UTF-8
import struct
import common_util
COMMAND_REPORT = 0x00 # Submit properties.
COMMAND_SET = 0x01 # Configure properties.
COMMAND_REPORT_REPLY = 0x02 # Return results of data submission.
COMMAND_SET_REPLY = 0x03 # Return results of property setting.
COMMAD_UNKOWN = 0xff # Unknown commands.
ALINK_PROP_REPORT_METHOD = 'thing.event.property.post' # The topic for devices to submit property data to IoT Platform.
ALINK_PROP_SET_METHOD = 'thing.service.property.set' # The topic for IoT Platform to send a property setting command to devices.
SELF_DEFINE_TOPIC_UPDATE_FLAG = '/user/update' # Define the following custom topic: /user/update.
SELF_DEFINE_TOPIC_ERROR_FLAG = '/user/update/error' # Define the following custom topic: /user/update/error.
# Sample data:
# Submit property data
# Input:
# 0x000000000100320100000000
# Output:
# {"method":"thing.event.property.post","id":"1","params":{"prop_float":0,"prop_int16":50,"prop_bool":1},"version":"1.0"}
# Returned result of property setting
# Input:
# 0x0300223344c8
# Output:
# {"code":"200","data":{},"id":"2241348","version":"1.0"}
def raw_data_to_protocol(bytes):
uint8Array = []
for byteValue in bytes:
uint8Array.append(byteValue & 0xff)
fHead = uint8Array[0]
jsonMap = {}
if fHead == COMMAND_REPORT:
jsonMap['method'] = ALINK_PROP_REPORT_METHOD
jsonMap['version'] = '1.0'
jsonMap['id'] = str(bytes_to_int(uint8Array[1:5]))
params = {}
params['prop_int16'] = bytes_to_int(uint8Array[5:7])
params['prop_bool'] = bytes_to_int(uint8Array[7: 8])
params['prop_float'] = bytes_to_int(uint8Array[8:])
jsonMap['params'] = params
elif fHead == COMMAND_SET_REPLY:
jsonMap['version'] = '1.0'
jsonMap['id'] = str(bytes_to_int(uint8Array[1:5]))
jsonMap['code'] = str(bytes_to_int(uint8Array[5:]))
jsonMap['data'] = {}
return jsonMap
# Sample data:
# Sends downstream commands to configure device properties
# Input:
# {"method":"thing.service.property.set","id":"12345","version":"1.0",
# "params":{"prop_float":123.452, "prop_int16":333, "prop_bool":1}}
# Output:
# 0x0100003039014d0142f6e76d
# Returned result after the device submits data
# Input:
# {"method":"thing.event.property.post","id":"12345","version":"1.0","code":200,"data":{}}
# Output:
# 0x0200003039c8
def protocol_to_raw_data(json):
method = json.get('method', None)
id = json.get('id', None)
version = json.get('version', None)
payload_array = []
if method == ALINK_PROP_SET_METHOD:
params = json.get('params')
prop_float = params.get('prop_float', None)
prop_int16 = params.get('prop_int16', None)
prop_bool = params.get('prop_bool', None)
payload_array = payload_array + int_8_to_byte(COMMAND_SET)
payload_array = payload_array + int_32_to_byte(int(id))
payload_array = payload_array + int_16_to_byte(prop_int16)
payload_array = payload_array + int_8_to_byte(prop_bool)
payload_array = payload_array + float_to_byte(prop_float)
elif method == ALINK_PROP_REPORT_METHOD:
code = json.get('code', None)
payload_array = payload_array + int_8_to_byte(COMMAND_REPORT_REPLY)
payload_array = payload_array + int_32_to_byte(int(id))
payload_array = payload_array + int_8_to_byte(code)
else:
code = json.get('code')
payload_array = payload_array + int_8_to_byte(COMMAD_UNKOWN)
payload_array = payload_array + int_32_to_byte(int(id))
payload_array = payload_array + int_8_to_byte(code)
return payload_array
# Sample data:
# Use the following custom topic to submit data:
# /user/update.
# Input:
# topic:/{productKey}/{deviceName}/user/update
# bytes: 0x000000000100320100000000
# Output:
# {
# "prop_float": 0,
# "prop_int16": 50,
# "prop_bool": 1,
# "topic": "/{productKey}/{deviceName}/user/update"
# }
def transform_payload(topic, bytes):
uint8Array = []
for byteValue in bytes:
uint8Array.append(byteValue & 0xff)
jsonMap = {}
if SELF_DEFINE_TOPIC_ERROR_FLAG in topic:
jsonMap['topic'] = topic
jsonMap['errorCode'] = bytes_to_int(uint8Array[0:1])
elif SELF_DEFINE_TOPIC_UPDATE_FLAG in topic:
jsonMap['topic'] = topic
jsonMap['prop_int16'] = bytes_to_int(uint8Array[5:7])
jsonMap['prop_bool'] = bytes_to_int(uint8Array[7: 8])
jsonMap['prop_float'] = bytes_to_int(uint8Array[8:])
return jsonMap
# Convert a byte array to an integer.
def bytes_to_int(bytes):
data = ['%02X' % i for i in bytes]
return int(''.join(data), 16)
# Convert an 8-bit integer to a byte array.
def int_8_to_byte(value):
t_value = '%02X' % value
if len(t_value) % 2 != 0:
t_value += '0'
return hex_string_to_byte_array(t_value)
# Convert a 32-bit integer to a byte array.
def int_32_to_byte(value):
t_value = '%08X' % value
if len(t_value) % 2 != 0:
t_value += '0'
return hex_string_to_byte_array(t_value)
# Convert a 16-bit integer to a byte array.
def int_16_to_byte(value):
t_value = '%04X' % value
if len(t_value) % 2 != 0:
t_value += '0'
return hex_string_to_byte_array(t_value)
# Convert a float to an integer array.
def float_to_byte(param):
return hex_string_to_byte_array(struct.pack(">f", param).encode('hex'))
# Convert a hexadecimal string to a byte array.
def hex_string_to_byte_array(str_value):
if len(str_value) % 2 != 0:
return None
cycle = len(str_value) / 2
pos = 0
result = []
for i in range(0, cycle, 1):
temp_str_value = str_value[pos:pos + 2]
temp_int_value = int(temp_str_value, base=16)
result.append(temp_int_value)
pos += 2
return result