全部產品
Search
文件中心

IoT Platform:Python3 SDK接入樣本

更新時間:Jun 30, 2024

本文介紹使用Python3 SDK接入阿里雲物聯網平台,接收服務端訂閱訊息的樣本。

前提條件

已擷取消費組ID,並訂閱Topic訊息。

準備開發環境

可使用Python 3.0及更高版本。本樣本使用了Python 3.8版本。

下載SDK

本樣本使用stomp.py和schedule,您可訪問stomp.pyschedule查看使用說明。

安裝stomp.py和schedule的操作指導,請參見Installing Packages

程式碼範例

本文提供基於stomp.py的7.0.0版本範例程式碼。

# encoding=utf-8

import time
import sys
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading
import os

def connect_and_subscribe(conn):
    # 工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性。以下程式碼範例使用環境變數擷取 AccessKey 的方式進行調用,僅供參考
    accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
    accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    consumerGroupId = "${YourConsumerGroupId}"
    # iotInstanceId:執行個體ID。
    iotInstanceId = "${YourIotInstanceId}"
    clientId = "${YourClientId}"
    # 簽名方法:支援hmacmd5,hmacsha1和hmacsha256。
    signMethod = "hmacsha1"
    timestamp = current_time_millis()
    # userName組裝方法,請參見AMQP用戶端接入說明文檔。
    # 若使用二進位傳輸,則userName需要添加encode=base64參數,服務端會將訊息體base64編碼後再推送。具體添加方法請參見下一章節“二進位訊息體說明”。
    username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId + "|"
    signContent = "authId=" + accessKey + "&timestamp=" + timestamp
    # 計算簽名,password組裝方法,請參見AMQP用戶端接入說明文檔。
    password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
    
    conn.set_listener('', MyListener(conn))
    conn.connect(username, password, wait=True)
    # 清除歷史串連檢查任務,建立串連檢查任務
    schedule.clear('conn-check')
    schedule.every(1).seconds.do(do_check,conn).tag('conn-check')

class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn

    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

    def on_heartbeat_timeout(self):
        print('on_heartbeat_timeout')

    def on_connected(self, headers):
        print("successfully connected")
        conn.subscribe(destination='/topic/#', id=1, ack='auto')
        print("successfully subscribe")

    def on_disconnected(self):
        print('disconnected')
        connect_and_subscribe(self.conn)

def current_time_millis():
    return str(int(round(time.time() * 1000)))

def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest()).decode("utf-8")

# 檢查串連,如果未串連則重建立連
def do_check(conn):
    print('check connection, is_connected: %s', conn.is_connected())
    if (not conn.is_connected()):
        try:
            connect_and_subscribe(conn)
        except Exception as e:
            print('disconnected, ', e)

# 定時任務方法,檢查串連狀態
def connection_check_timer():
    while 1:
        schedule.run_pending()
        time.sleep(10)

#  接入網域名稱,請參見AMQP用戶端接入說明文檔。這裡直接填入網域名稱,不需要帶amqps://首碼
conn = stomp.Connection([('${YourHost}', 61614)], heartbeats=(0,300))
conn.set_ssl(for_hosts=[('${YourHost}', 61614)], ssl_version=ssl.PROTOCOL_TLS)

try:
    connect_and_subscribe(conn)
except Exception as e:
    print('connecting failed')
    raise e
    
# 非同步線程運行定時任務,檢查串連狀態
thread = threading.Thread(target=connection_check_timer)
thread.start()

您需按照如下表格中的參數說明,修改代碼中的參數值。更多參數說明,請參見AMQP用戶端接入說明

重要

請確保參數值輸入正確,否則AMQP用戶端接入會失敗。

參數

說明

accessKey

登入物聯網平台控制台,將滑鼠移至帳號頭像上,然後單擊AccessKey管理,擷取AccessKey ID和AccessKey Secret。

說明如果使用RAM使用者,您需授予該RAM使用者管理物聯網平台的許可權(AliyunIOTFullAccess),否則將串連失敗。授權方法請參見RAM使用者訪問

accessSecret

consumerGroupId

當前物聯網平台對應執行個體中的消費組ID。

登入物聯網平台控制台,在對應執行個體的訊息轉寄 > 服務端訂閱 > 消費組列表查看您的消費組ID。

iotInstanceId

執行個體ID。您可在物聯網平台控制台執行個體概覽頁面,查看當前執行個體的ID。

  • 若有ID值,必須傳入該ID值。

  • 若無執行個體概覽頁面或ID值,傳入空值,即iotInstanceId = ""

clientId

表示用戶端ID,需您自訂,長度不可超過64個字元。建議使用您的AMQP用戶端所在伺服器UUID、MAC地址、IP等唯一標識。

AMQP用戶端接入並啟動成功後,登入物聯網平台控制台,在對應執行個體的訊息轉寄 > 服務端訂閱 > 消費組列表頁簽,單擊消費組對應的查看消費組詳情頁面將顯示該參數,方便您識別區分不同的用戶端。

conn

建立AMQP用戶端與物聯網平台的TLS串連。

${YourHost}對應的AMQP接入網域名稱資訊,請參見查看和配置執行個體終端節點資訊(Endpoint)

conn.set_ssl

運行結果樣本

  • 成功:返回類似如下日誌資訊,表示AMQP用戶端已接入物聯網平台並成功接收訊息。成功

  • 失敗:返回類似如下日誌資訊,表示AMQP用戶端串連物聯網平台失敗。

    您可根據日誌提示,檢查代碼或網路環境,然後修正問題,重新運行代碼。

    失敗

二進位訊息體說明

當您需要傳輸位元據時,由於STOMP協議為文本協議,需要使用base64編碼參數,否則訊息體可能會被截斷。

本樣本中,userName需要按以下方法添加encode=base64參數,使服務端將訊息體base64編碼後再推送。

username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId \ 
                    + ",encode=base64"+"|"

相關文檔

服務端訂閱訊息相關錯誤碼,請參見訊息相關錯誤碼