本文介紹使用Python3 SDK接入阿里雲物聯網平台,接收服務端訂閱訊息的樣本。
前提條件
已擷取消費組ID,並訂閱Topic訊息。
管理AMQP消費組:您可使用物聯網平台預設消費組(DEFAULT_GROUP)或建立消費組。
配置AMQP服務端訂閱:您可通過消費組訂閱需要的Topic訊息。
準備開發環境
可使用Python 3.0及更高版本。本樣本使用了Python 3.8版本。
下載SDK
本樣本使用stomp.py和schedule,您可訪問stomp.py和schedule查看使用說明。
安裝stomp.py和schedule的操作指導,請參見Installing Packages。
程式碼範例
本文提供基於stomp.py的7.0.0版本範例程式碼。
# encoding=utf-8
import time
import sys
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading
import os
def connect_and_subscribe(conn):
# 工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性。以下程式碼範例使用環境變數擷取 AccessKey 的方式進行調用,僅供參考
accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
consumerGroupId = "${YourConsumerGroupId}"
# iotInstanceId:執行個體ID。
iotInstanceId = "${YourIotInstanceId}"
clientId = "${YourClientId}"
# 簽名方法:支援hmacmd5,hmacsha1和hmacsha256。
signMethod = "hmacsha1"
timestamp = current_time_millis()
# userName組裝方法,請參見AMQP用戶端接入說明文檔。
# 若使用二進位傳輸,則userName需要添加encode=base64參數,服務端會將訊息體base64編碼後再推送。具體添加方法請參見下一章節“二進位訊息體說明”。
username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId \
+ ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# 計算簽名,password組裝方法,請參見AMQP用戶端接入說明文檔。
password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
conn.set_listener('', MyListener(conn))
conn.connect(username, password, wait=True)
# 清除歷史串連檢查任務,建立串連檢查任務
schedule.clear('conn-check')
schedule.every(1).seconds.do(do_check,conn).tag('conn-check')
class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print('received a message "%s"' % frame.body)
def on_heartbeat_timeout(self):
print('on_heartbeat_timeout')
def on_connected(self, headers):
print("successfully connected")
conn.subscribe(destination='/topic/#', id=1, ack='auto')
print("successfully subscribe")
def on_disconnected(self):
print('disconnected')
connect_and_subscribe(self.conn)
def current_time_millis():
return str(int(round(time.time() * 1000)))
def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest()).decode("utf-8")
# 檢查串連,如果未串連則重建立連
def do_check(conn):
print('check connection, is_connected: %s', conn.is_connected())
if (not conn.is_connected()):
try:
connect_and_subscribe(conn)
except Exception as e:
print('disconnected, ', e)
# 定時任務方法,檢查串連狀態
def connection_check_timer():
while 1:
schedule.run_pending()
time.sleep(10)
# 接入網域名稱,請參見AMQP用戶端接入說明文檔。這裡直接填入網域名稱,不需要帶amqps://首碼
conn = stomp.Connection([('${YourHost}', 61614)], heartbeats=(0,300))
conn.set_ssl(for_hosts=[('${YourHost}', 61614)], ssl_version=ssl.PROTOCOL_TLS)
try:
connect_and_subscribe(conn)
except Exception as e:
print('connecting failed')
raise e
# 非同步線程運行定時任務,檢查串連狀態
thread = threading.Thread(target=connection_check_timer)
thread.start()
您需按照如下表格中的參數說明,修改代碼中的參數值。更多參數說明,請參見AMQP用戶端接入說明。
請確保參數值輸入正確,否則AMQP用戶端接入會失敗。
參數 | 說明 |
accessKey | 登入物聯網平台控制台,將滑鼠移至帳號頭像上,然後單擊AccessKey管理,擷取AccessKey ID和AccessKey Secret。 說明如果使用RAM使用者,您需授予該RAM使用者管理物聯網平台的許可權(AliyunIOTFullAccess),否則將串連失敗。授權方法請參見RAM使用者訪問。 |
accessSecret | |
consumerGroupId | 當前物聯網平台對應執行個體中的消費組ID。 登入物聯網平台控制台,在對應執行個體的 查看您的消費組ID。 |
iotInstanceId | 執行個體ID。您可在物聯網平台控制台的執行個體概覽頁面,查看當前執行個體的ID。
|
clientId | 表示用戶端ID,需您自訂,長度不可超過64個字元。建議使用您的AMQP用戶端所在伺服器UUID、MAC地址、IP等唯一標識。 AMQP用戶端接入並啟動成功後,登入物聯網平台控制台,在對應執行個體的 頁簽,單擊消費組對應的查看,消費組詳情頁面將顯示該參數,方便您識別區分不同的用戶端。 |
conn | 建立AMQP用戶端與物聯網平台的TLS串連。
|
conn.set_ssl |
運行結果樣本
成功:返回類似如下日誌資訊,表示AMQP用戶端已接入物聯網平台並成功接收訊息。
失敗:返回類似如下日誌資訊,表示AMQP用戶端串連物聯網平台失敗。
您可根據日誌提示,檢查代碼或網路環境,然後修正問題,重新運行代碼。
二進位訊息體說明
當您需要傳輸位元據時,由於STOMP協議為文本協議,需要使用base64編碼參數,否則訊息體可能會被截斷。
本樣本中,userName需要按以下方法添加encode=base64參數,使服務端將訊息體base64編碼後再推送。
username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId \
+ ",consumerGroupId=" + consumerGroupId \
+ ",encode=base64"+"|"
相關文檔
服務端訂閱訊息相關錯誤碼,請參見訊息相關錯誤碼。