本文介绍使用Python3 SDK接入阿里云物联网平台,接收服务端订阅消息的示例。
前提条件
已获取消费组ID,并订阅Topic消息。
管理消费组:您可使用物联网平台默认消费组(DEFAULT_GROUP)或创建消费组。
配置AMQP服务端订阅:您可通过消费组订阅需要的Topic消息。
准备开发环境
可使用Python 3.0及更高版本。本示例使用了Python 3.8版本。
下载SDK
本示例使用stomp.py和schedule,您可访问stomp.py和schedule查看使用说明。
安装stomp.py和schedule的操作指导,请参见Installing Packages。
代码示例
本文提供基于stomp.py的7.0.0版本示例代码。
# encoding=utf-8
import time
import sys
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading
import os
def connect_and_subscribe(conn):
# 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
consumerGroupId = "${YourConsumerGroupId}"
# iotInstanceId:实例ID。
iotInstanceId = "${YourIotInstanceId}"
clientId = "${YourClientId}"
# 签名方法:支持hmacmd5,hmacsha1和hmacsha256。
signMethod = "hmacsha1"
timestamp = current_time_millis()
# userName组装方法,请参见AMQP客户端接入说明文档。
# 若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。
username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId \
+ ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
conn.set_listener('', MyListener(conn))
conn.connect(username, password, wait=True)
# 清除历史连接检查任务,新建连接检查任务
schedule.clear('conn-check')
schedule.every(1).seconds.do(do_check,conn).tag('conn-check')
class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print('received a message "%s"' % frame.body)
def on_heartbeat_timeout(self):
print('on_heartbeat_timeout')
def on_connected(self, headers):
print("successfully connected")
conn.subscribe(destination='/topic/#', id=1, ack='auto')
print("successfully subscribe")
def on_disconnected(self):
print('disconnected')
connect_and_subscribe(self.conn)
def current_time_millis():
return str(int(round(time.time() * 1000)))
def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest()).decode("utf-8")
# 检查连接,如果未连接则重新建连
def do_check(conn):
print('check connection, is_connected: %s', conn.is_connected())
if (not conn.is_connected()):
try:
connect_and_subscribe(conn)
except Exception as e:
print('disconnected, ', e)
# 定时任务方法,检查连接状态
def connection_check_timer():
while 1:
schedule.run_pending()
time.sleep(10)
# 接入域名,请参见AMQP客户端接入说明文档。这里直接填入域名,不需要带amqps://前缀
conn = stomp.Connection([('${YourHost}', 61614)], heartbeats=(0,300))
conn.set_ssl(for_hosts=[('${YourHost}', 61614)], ssl_version=ssl.PROTOCOL_TLS)
try:
connect_and_subscribe(conn)
except Exception as e:
print('connecting failed')
raise e
# 异步线程运行定时任务,检查连接状态
thread = threading.Thread(target=connection_check_timer)
thread.start()
您需按照如下表格中的参数说明,修改代码中的参数值。更多参数说明,请参见AMQP客户端接入说明。
请确保参数值输入正确,否则AMQP客户端接入会失败。
参数 | 说明 |
accessKey | 登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。 说明如果使用RAM用户,您需授予该RAM用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见RAM用户访问。 |
accessSecret | |
consumerGroupId | 当前物联网平台对应实例中的消费组ID。 登录物联网平台控制台,在对应实例的 查看您的消费组ID。 |
iotInstanceId | 实例ID。您可在物联网平台控制台的实例概览页面,查看当前实例的ID。
|
clientId | 表示客户端ID,需您自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。 AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的 页签,单击消费组对应的查看,消费组详情页面将显示该参数,方便您识别区分不同的客户端。 |
conn | 创建AMQP客户端与物联网平台的TLS连接。
|
conn.set_ssl |
运行结果示例
成功:返回类似如下日志信息,表示AMQP客户端已接入物联网平台并成功接收消息。
失败:返回类似如下日志信息,表示AMQP客户端连接物联网平台失败。
您可根据日志提示,检查代码或网络环境,然后修正问题,重新运行代码。
二进制消息体说明
当您需要传输二进制数据时,由于STOMP协议为文本协议,需要使用base64编码参数,否则消息体可能会被截断。
本示例中,userName需要按以下方法添加encode=base64参数,使服务端将消息体base64编码后再推送。
username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId \
+ ",consumerGroupId=" + consumerGroupId \
+ ",encode=base64"+"|"
相关文档
服务端订阅消息相关错误码,请参见消息相关错误码。