本文介紹使用.NET語言的AMQP SDK接入阿里雲物聯網平台,接收服務端訂閱訊息的樣本。
前提條件
已擷取消費組ID,並訂閱Topic訊息。
管理AMQP消費組:您可使用物聯網平台預設消費組(DEFAULT_GROUP)或建立消費組。
配置AMQP服務端訂閱:您可通過消費組訂閱需要的Topic訊息。
開發環境
本樣本使用的開發環境要求如下表。
Framework | 支援版本 |
.NET Framework | 3.5、4.0、4.5及以上版本 |
.NET Micro Framework | 4.2及以上版本 |
.NET nanoFramework | 1.0及以上版本 |
.NET Compact Framework | 3.9及以上版本 |
.Net Core on Windows 10 and Ubuntu 14.04 | 1.0及以上版本 |
Mono | 4.2.1及以上版本 |
下載SDK
.NET版本AMQP SDK,推薦使用AMQP.Net Lite庫。請訪問AMQP.Net Lite下載庫和查看使用說明。
添加依賴
在packages.config中添加以下依賴。
<packages>
<package id="AMQPNetLite" version="2.2.0" targetFramework="net47" />
</packages>
程式碼範例
using System;
using System.Text;
using Amqp;
using Amqp.Sasl;
using Amqp.Framing;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Security.Cryptography;
namespace amqp
{
class MainClass
{
//接入網域名稱,請參見AMQP用戶端接入說明文檔。
static string Host = "${YourHost}";
static int Port = 5671;
// 工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性。以下程式碼範例使用環境變數擷取 AccessKey 的方式進行調用,僅供參考
static string AccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
static string AccessSecret = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
static string consumerGroupId = "${YourConsumerGroupId}";
static string clientId = "${YourClientId}";
//iotInstanceId:執行個體ID。
static string iotInstanceId = "${YourIotInstanceId}";
static int Count = 0;
static int IntervalTime = 10000;
static Address address;
public static void Main(string[] args)
{
long timestamp = GetCurrentMilliseconds();
string param = "authId=" + AccessKey + "×tamp=" + timestamp;
//userName組裝方法,請參見AMQP用戶端接入說明文檔。
string userName = clientId + "|authMode=aksign,signMethod=hmacmd5,consumerGroupId=" + consumerGroupId
+ ",iotInstanceId=" + iotInstanceId + ",authId=" + AccessKey + ",timestamp=" + timestamp + "|";
//計算簽名,password組裝方法,請參見AMQP用戶端接入說明文檔。
string password = doSign(param, AccessSecret, "HmacMD5");
DoConnectAmqp(userName, password);
ManualResetEvent resetEvent = new ManualResetEvent(false);
resetEvent.WaitOne();
}
static void DoConnectAmqp(string userName, string password)
{
address = new Address(Host, Port, userName, password);
//建立Connection。
ConnectionFactory cf = new ConnectionFactory();
//如果需要,使用本地TLS。
//cf.SSL.ClientCertificates.Add(GetCert());
//cf.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
cf.SASL.Profile = SaslProfile.External;
cf.AMQP.IdleTimeout = 120000;
//cf.AMQP.ContainerId、cf.AMQP.HostName請自訂。
cf.AMQP.ContainerId = "client.1.2";
cf.AMQP.HostName = "contoso.com";
cf.AMQP.MaxFrameSize = 8 * 1024;
var connection = cf.CreateAsync(address).Result;
//Connection Exception已關閉。
connection.AddClosedCallback(ConnClosed);
//接收訊息。
DoReceive(connection);
}
static void DoReceive(Connection connection)
{
//建立Session。
var session = new Session(connection);
//建立ReceiverLink並接收訊息。
var receiver = new ReceiverLink(session, "queueName", null);
receiver.Start(20, (link, message) =>
{
object messageId = message.ApplicationProperties["messageId"];
object topic = message.ApplicationProperties["topic"];
string body = Encoding.UTF8.GetString((Byte[])message.Body);
//注意:此處不要有耗時的邏輯,如果這裡要進行業務處理,請另開線程,否則會堵塞消費。如果消費一直延時,會增加訊息重發的機率。
Console.WriteLine("receive message, topic=" + topic + ", messageId=" + messageId + ", body=" + body);
//ACK訊息。
link.Accept(message);
});
}
//串連發生異常後,進入重連模式。
//這裡只是一個簡單重試的樣本,您可以採用指數退避方式,來完善異常情境,重連策略。
static void ConnClosed(IAmqpObject _, Error e)
{
Console.WriteLine("ocurr error: " + e);
if(Count < 3)
{
Count += 1;
Thread.Sleep(IntervalTime * Count);
}
else
{
Thread.Sleep(120000);
}
//重連。
DoConnectAmqp(address.User, address.Password);
}
static X509Certificate GetCert()
{
string certPath = Environment.CurrentDirectory + "/root.crt";
X509Certificate crt = new X509Certificate(certPath);
return crt;
}
static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
return true;
}
static long GetCurrentMilliseconds()
{
DateTime dt1970 = new DateTime(1970, 1, 1);
DateTime current = DateTime.Now;
return (long)(current - dt1970).TotalMilliseconds;
}
//簽名方法:支援hmacmd5,hmacsha1和hmacsha256。
static string doSign(string param, string accessSecret, string signMethod)
{
//signMethod = HmacMD5
byte[] key = Encoding.UTF8.GetBytes(accessSecret);
byte[] signContent = Encoding.UTF8.GetBytes(param);
var hmac = new HMACMD5(key);
byte[] hashBytes = hmac.ComputeHash(signContent);
return Convert.ToBase64String(hashBytes);
}
}
}
您需按照如下表格中的參數說明,修改代碼中的參數值。更多參數說明,請參見AMQP用戶端接入說明。
請確保參數值輸入正確,否則AMQP用戶端接入會失敗。
參數 | 說明 |
Host | AMQP接入網域名稱。
|
AccessKey | 登入物聯網平台控制台,將滑鼠移至帳號頭像上,然後單擊AccessKey管理,擷取AccessKey ID和AccessKey Secret。 說明 如果使用RAM使用者,您需授予該RAM使用者管理物聯網平台的許可權(AliyunIOTFullAccess),否則將串連失敗。授權方法請參見RAM使用者訪問。 |
AccessSecret | |
consumerGroupId | 當前物聯網平台對應執行個體中的消費組ID。 登入物聯網平台控制台,在對應執行個體的 查看您的消費組ID。 |
iotInstanceId | 執行個體ID。您可在物聯網平台控制台的執行個體概覽頁面,查看當前執行個體的ID。
|
clientId | 表示用戶端ID,需您自訂,長度不可超過64個字元。建議使用您的AMQP用戶端所在伺服器UUID、MAC地址、IP等唯一標識。 AMQP用戶端接入並啟動成功後,登入物聯網平台控制台,在對應執行個體的 頁簽,單擊消費組對應的查看,消費組詳情頁面將顯示該參數,方便您識別區分不同的用戶端。 |
運行結果樣本
成功:返回類似如下日誌資訊,表示AMQP用戶端已接入物聯網平台並成功接收訊息。
參數
樣本
說明
topic
/***********/******/thing/event/property/post
裝置屬性上報的Topic。
messageId
2**************7
訊息的ID。
body
{"deviceType":"CustomCategory","iotId":"4EwuVV***","requestId":"161268***","checkFailedData":{},"productKey":"g4***S","gmtCreate":1612682173249,"deviceName":"Esensor","items":{"temperature":{"value":-1,"time":1612682173247},"humidity":{"value":74,"time":1612682173247}}}
訊息的內容。
失敗:返回類似如下日誌資訊,表示AMQP用戶端串連物聯網平台失敗。
相關文檔
服務端訂閱訊息相關錯誤碼,請參見訊息相關錯誤碼。