本文介紹使用AMQP協議的Go用戶端接入阿里雲物聯網平台,接收服務端訂閱訊息的樣本。
前提條件
已擷取消費組ID,並訂閱Topic訊息。
管理AMQP消費組:您可使用物聯網平台預設消費組(DEFAULT_GROUP)或建立消費組。
配置AMQP服務端訂閱:您可通過消費組訂閱需要的Topic訊息。
準備開發環境
本樣本的測試環境為Go 1.12.7。
下載SDK
可使用以下命令匯入Go語言AMQP SDK。
import "pack.ag/amqp"
SDK使用說明,請參見package amqp。
程式碼範例
package main
import (
"os"
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"pack.ag/amqp"
"time"
)
//參數說明,請參見AMQP用戶端接入說明文檔。
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
//iotInstanceId:執行個體ID。
const iotInstanceId = "${YourIotInstanceId}"
//接入網域名稱,請參見AMQP用戶端接入說明文檔。
const host = "${YourHost}"
func main() {
//工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性。以下程式碼範例使用環境變數擷取 AccessKey 的方式進行調用,僅供參考
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
accessSecret := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
address := "amqps://" + host + ":5671"
timestamp := time.Now().Nanosecond() / 1000000
//userName組裝方法,請參見AMQP用戶端接入說明文檔。
userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|",
clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
stringToSign := fmt.Sprintf("authId=%s×tamp=%d", accessKey, timestamp)
hmacKey := hmac.New(sha1.New, []byte(accessSecret))
hmacKey.Write([]byte(stringToSign))
//計算簽名,password組裝方法,請參見AMQP用戶端接入說明文檔。
password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))
amqpManager := &AmqpManager{
address:address,
userName:userName,
password:password,
}
//如果需要做接受訊息通訊或者取消操作,從Background衍生context。
ctx := context.Background()
amqpManager.startReceiveMessage(ctx)
}
//業務函數。使用者自訂實現,該函數被非同步執行,請考慮系統資源消耗情況。
func (am *AmqpManager) processMessage(message *amqp.Message) {
fmt.Println("data received:", string(message.GetData()), " properties:", message.ApplicationProperties)
}
type AmqpManager struct {
address string
userName string
password string
client *amqp.Client
session *amqp.Session
receiver *amqp.Receiver
}
func (am *AmqpManager) startReceiveMessage(ctx context.Context) {
childCtx, _ := context.WithCancel(ctx)
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
defer func() {
am.receiver.Close(childCtx)
am.session.Close(childCtx)
am.client.Close()
}()
for {
//阻塞接受訊息,如果ctx是background則不會被打斷。
message, err := am.receiver.Receive(ctx)
if nil == err {
go am.processMessage(message)
message.Accept()
} else {
fmt.Println("amqp receive data error:", err)
//如果是主動取消,則退出程式。
select {
case <- childCtx.Done(): return
default:
}
//非主動取消,則重建立立串連。
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
}
}
}
func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
//退避重連,從10ms依次x2,直到20s。
duration := 10 * time.Millisecond
maxDuration := 20000 * time.Millisecond
times := 1
//異常情況,退避重連。
for {
select {
case <- ctx.Done(): return amqp.ErrConnClosed
default:
}
err := am.generateReceiver()
if nil != err {
time.Sleep(duration)
if duration < maxDuration {
duration *= 2
}
fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
times ++
} else {
fmt.Println("amqp connect init success")
return nil
}
}
}
//由於包不可見,無法判斷Connection和Session狀態,重啟串連擷取。
func (am *AmqpManager) generateReceiver() error {
if am.session != nil {
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
//如果斷網等行為發生,Connection會關閉導致Session建立失敗,未關閉串連則建立成功。
if err == nil {
am.receiver = receiver
return nil
}
}
//清理上一個串連。
if am.client != nil {
am.client.Close()
}
client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
if err != nil {
return err
}
am.client = client
session, err := client.NewSession()
if err != nil {
return err
}
am.session = session
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
if err != nil {
return err
}
am.receiver = receiver
return nil
}
您需按照如下表格中的參數說明,修改代碼中的參數值。更多參數說明,請參見AMQP用戶端接入說明。
請確保參數值輸入正確,否則AMQP用戶端接入會失敗。
參數 | 說明 |
accessKey | 登入物聯網平台控制台,將滑鼠移至帳號頭像上,然後單擊AccessKey管理,擷取AccessKey ID和AccessKey Secret。 說明 如果使用RAM使用者,您需授予該RAM使用者管理物聯網平台的許可權(AliyunIOTFullAccess),否則將串連失敗。授權方法請參見RAM使用者訪問。 |
accessSecret | |
consumerGroupId | 當前物聯網平台對應執行個體中的消費組ID。 登入物聯網平台控制台,在對應執行個體的 查看您的消費組ID。 |
iotInstanceId | 執行個體ID。您可在物聯網平台控制台的執行個體概覽頁面,查看當前執行個體的ID。
|
clientId | 表示用戶端ID,需您自訂,長度不可超過64個字元。建議使用您的AMQP用戶端所在伺服器UUID、MAC地址、IP等唯一標識。 AMQP用戶端接入並啟動成功後,登入物聯網平台控制台,在對應執行個體的 頁簽,單擊消費組對應的查看,消費組詳情頁面將顯示該參數,方便您識別區分不同的用戶端。 |
host | AMQP接入網域名稱。
|
運行結果樣本
成功:返回類似如下日誌資訊,表示AMQP用戶端已接入物聯網平台並成功接收訊息。
失敗:返回類似如下日誌資訊,表示AMQP用戶端串連物聯網平台失敗。
您可根據日誌提示,檢查代碼或網路環境,然後修正問題,重新運行代碼。
相關文檔
服務端訂閱訊息相關錯誤碼,請參見訊息相關錯誤碼。