本文介紹使用Python 2.7 SDK接入阿里雲物聯網平台,接收服務端訂閱訊息的樣本。
前提條件
已擷取消費組ID,並訂閱Topic訊息。
管理AMQP消費組:您可使用物聯網平台預設消費組(DEFAULT_GROUP)或建立消費組。
配置AMQP服務端訂閱:您可通過消費組訂閱需要的Topic訊息。
準備開發環境
本樣本所使用的開發環境為Python 2.7版。
下載SDK
Python語言的AMQP SDK,推薦使用Apache Qpid Proton 0.29.0,該庫中已封裝了Python API。請訪問Qpid Proton 0.29.0下載庫和查看使用說明。
安裝Proton。安裝操作指導,請參見Installing Qpid Proton。
安裝完成後,通過以下Python命令查看SSL庫是否安裝成功。
import proton;print('%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE')
程式碼範例
# encoding=utf-8
import sys
import logging
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
import hashlib
import hmac
import base64
import os
reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler(sys.stdout)
def current_time_millis():
return str(int(round(time.time() * 1000)))
def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest())
class AmqpClient(MessagingHandler):
def __init__(self):
super(AmqpClient, self).__init__()
def on_start(self, event):
# 接入網域名稱,請參見AMQP用戶端接入說明文檔。
url = "amqps://${YourHost}:5671"
# 工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性。以下程式碼範例使用環境變數擷取 AccessKey 的方式進行調用,僅供參考
accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
consumerGroupId = "${YourConsumerGroupId}"
clientId = "${YourClientId}"
# iotInstanceId:執行個體ID。
iotInstanceId = "${YourIotInstanceId}"
# 簽名方法:支援hmacmd5,hmacsha1和hmacsha256。
signMethod = "hmacsha1"
timestamp = current_time_millis()
# userName組裝方法,請參見AMQP用戶端接入說明文檔。
userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# 計算簽名,password組裝方法,請參見AMQP用戶端接入說明文檔。
passWord = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60)
self.receiver = event.container.create_receiver(conn)
# 當串連成功建立時被調用。
def on_connection_opened(self, event):
logger.info("Connection established, remoteUrl: %s", event.connection.hostname)
# 當串連關閉時被調用。
def on_connection_closed(self, event):
logger.info("Connection closed: %s", self)
# 當遠端因錯誤而關閉串連時被調用。
def on_connection_error(self, event):
logger.info("Connection error")
# 當建立AMQP串連錯誤時被調用,包括身分識別驗證錯誤和通訊端錯誤。
def on_transport_error(self, event):
if event.transport.condition:
if event.transport.condition.info:
logger.error("%s: %s: %s" % (
event.transport.condition.name, event.transport.condition.description,
event.transport.condition.info))
else:
logger.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
else:
logging.error("Unspecified transport error")
# 當收到訊息時被調用。
def on_message(self, event):
message = event.message
content = message.body.decode('utf-8')
topic = message.properties.get("topic")
message_id = message.properties.get("messageId")
print("receive message: message_id=%s, topic=%s, content=%s" % (message_id, topic, content))
event.receiver.flow(1)
Container(AmqpClient()).run()
您需按照如下表格中的參數說明,修改代碼中的參數值。更多參數說明,請參見AMQP用戶端接入說明。
請確保參數值輸入正確,否則AMQP用戶端接入會失敗。
參數 | 說明 |
url | AMQP用戶端接入物聯網平台的串連地址。格式:
|
accessKey | 登入物聯網平台控制台,將滑鼠移至帳號頭像上,然後單擊AccessKey管理,擷取AccessKey ID和AccessKey Secret。 說明 如果使用RAM使用者,您需授予該RAM使用者管理物聯網平台的許可權(AliyunIOTFullAccess),否則將串連失敗。授權方法請參見RAM使用者訪問。 |
accessSecret | |
consumerGroupId | 當前物聯網平台對應執行個體中的消費組ID。 登入物聯網平台控制台,在對應執行個體的 查看您的消費組ID。 |
iotInstanceId | 執行個體ID。您可在物聯網平台控制台的執行個體概覽頁面,查看當前執行個體的ID。
|
clientId | 表示用戶端ID,需您自訂,長度不可超過64個字元。建議使用您的AMQP用戶端所在伺服器UUID、MAC地址、IP等唯一標識。 AMQP用戶端接入並啟動成功後,登入物聯網平台控制台,在對應執行個體的 頁簽,單擊消費組對應的查看,消費組詳情頁面將顯示該參數,方便您識別區分不同的用戶端。 |
運行結果樣本
成功:返回類似如下日誌資訊,表示AMQP用戶端已接入物聯網平台並成功接收訊息。
參數
樣本
說明
message_id
2**************7
訊息的ID。
topic
/***********/******/thing/event/property/post
裝置屬性上報的Topic。
content
{"deviceType":"CustomCategory","iotId":"qPi***","requestId":"161***","checkFailedData":{},"productKey":"g4***","gmtCreate":1613635594038,"deviceName":"de***","items":{"Temperature":{"value":24,"time":1613635594036},"Humidity":{"value":26,"time":1613635594036}}}
訊息的內容。
失敗:返回類似如下日誌資訊,表示AMQP用戶端串連物聯網平台失敗。
您可根據日誌提示,檢查代碼或網路環境,然後修正問題,重新運行代碼。
相關文檔
服務端訂閱訊息相關錯誤碼,請參見訊息相關錯誤碼。