全部產品
Search
文件中心

IoT Platform:Python 2.7 SDK接入樣本

更新時間:Jun 30, 2024

本文介紹使用Python 2.7 SDK接入阿里雲物聯網平台,接收服務端訂閱訊息的樣本。

前提條件

已擷取消費組ID,並訂閱Topic訊息。

準備開發環境

本樣本所使用的開發環境為Python 2.7版。

下載SDK

Python語言的AMQP SDK,推薦使用Apache Qpid Proton 0.29.0,該庫中已封裝了Python API。請訪問Qpid Proton 0.29.0下載庫和查看使用說明。

安裝Proton。安裝操作指導,請參見Installing Qpid Proton

安裝完成後,通過以下Python命令查看SSL庫是否安裝成功。

import proton;print('%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE')

程式碼範例

# encoding=utf-8
import sys
import logging
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
import hashlib
import hmac
import base64
import os

reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler(sys.stdout)


def current_time_millis():
    return str(int(round(time.time() * 1000)))


def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest())


class AmqpClient(MessagingHandler):
    def __init__(self):
        super(AmqpClient, self).__init__()

    def on_start(self, event):
        # 接入網域名稱,請參見AMQP用戶端接入說明文檔。
        url = "amqps://${YourHost}:5671"
        # 工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性。以下程式碼範例使用環境變數擷取 AccessKey 的方式進行調用,僅供參考
        accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
        accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        consumerGroupId = "${YourConsumerGroupId}"
        clientId = "${YourClientId}"
        # iotInstanceId:執行個體ID。
        iotInstanceId = "${YourIotInstanceId}"
        # 簽名方法:支援hmacmd5,hmacsha1和hmacsha256。
        signMethod = "hmacsha1"
        timestamp = current_time_millis()
        # userName組裝方法,請參見AMQP用戶端接入說明文檔。
        userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                        + ",timestamp=" + timestamp + ",authId=" + accessKey \
                        + ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|"
        signContent = "authId=" + accessKey + "&timestamp=" + timestamp
        # 計算簽名,password組裝方法,請參見AMQP用戶端接入說明文檔。
        passWord = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
        conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60)
        self.receiver = event.container.create_receiver(conn)

    # 當串連成功建立時被調用。
    def on_connection_opened(self, event):
        logger.info("Connection established, remoteUrl: %s", event.connection.hostname)

    # 當串連關閉時被調用。
    def on_connection_closed(self, event):
        logger.info("Connection closed: %s", self)

    # 當遠端因錯誤而關閉串連時被調用。
    def on_connection_error(self, event):
        logger.info("Connection error")

    # 當建立AMQP串連錯誤時被調用,包括身分識別驗證錯誤和通訊端錯誤。
    def on_transport_error(self, event):
        if event.transport.condition:
            if event.transport.condition.info:
                logger.error("%s: %s: %s" % (
                    event.transport.condition.name, event.transport.condition.description,
                    event.transport.condition.info))
            else:
                logger.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
        else:
            logging.error("Unspecified transport error")

    # 當收到訊息時被調用。
    def on_message(self, event):
        message = event.message
        content = message.body.decode('utf-8')
        topic = message.properties.get("topic")
        message_id = message.properties.get("messageId")
        print("receive message: message_id=%s, topic=%s, content=%s" % (message_id, topic, content))
        event.receiver.flow(1)


Container(AmqpClient()).run()

您需按照如下表格中的參數說明,修改代碼中的參數值。更多參數說明,請參見AMQP用戶端接入說明

重要

請確保參數值輸入正確,否則AMQP用戶端接入會失敗。

參數

說明

url

AMQP用戶端接入物聯網平台的串連地址。格式:"amqps://${YourHost}:5671"

${YourHost}對應的AMQP接入網域名稱資訊,請參見查看和配置執行個體終端節點資訊(Endpoint)

accessKey

登入物聯網平台控制台,將滑鼠移至帳號頭像上,然後單擊AccessKey管理,擷取AccessKey ID和AccessKey Secret。

說明

如果使用RAM使用者,您需授予該RAM使用者管理物聯網平台的許可權(AliyunIOTFullAccess),否則將串連失敗。授權方法請參見RAM使用者訪問

accessSecret

consumerGroupId

當前物聯網平台對應執行個體中的消費組ID。

登入物聯網平台控制台,在對應執行個體的訊息轉寄 > 服務端訂閱 > 消費組列表查看您的消費組ID。

iotInstanceId

執行個體ID。您可在物聯網平台控制台執行個體概覽頁面,查看當前執行個體的ID。

  • 若有ID值,必須傳入該ID值。

  • 若無執行個體概覽頁面或ID值,傳入空值,即iotInstanceId = ""

clientId

表示用戶端ID,需您自訂,長度不可超過64個字元。建議使用您的AMQP用戶端所在伺服器UUID、MAC地址、IP等唯一標識。

AMQP用戶端接入並啟動成功後,登入物聯網平台控制台,在對應執行個體的訊息轉寄 > 服務端訂閱 > 消費組列表頁簽,單擊消費組對應的查看消費組詳情頁面將顯示該參數,方便您識別區分不同的用戶端。

運行結果樣本

  • 成功:返回類似如下日誌資訊,表示AMQP用戶端已接入物聯網平台並成功接收訊息。成功

    參數

    樣本

    說明

    message_id

    2**************7

    訊息的ID。

    topic

    /***********/******/thing/event/property/post

    裝置屬性上報的Topic。

    content

    {"deviceType":"CustomCategory","iotId":"qPi***","requestId":"161***","checkFailedData":{},"productKey":"g4***","gmtCreate":1613635594038,"deviceName":"de***","items":{"Temperature":{"value":24,"time":1613635594036},"Humidity":{"value":26,"time":1613635594036}}}

    訊息的內容。

  • 失敗:返回類似如下日誌資訊,表示AMQP用戶端串連物聯網平台失敗。

    您可根據日誌提示,檢查代碼或網路環境,然後修正問題,重新運行代碼。

    失敗

相關文檔

服務端訂閱訊息相關錯誤碼,請參見訊息相關錯誤碼