順序訊息(FIFO 訊息)是雲訊息佇列 RocketMQ 版提供的一種嚴格按照順序來發布和消費的訊息類型。本文提供使用TCP協議下的.NET SDK收發順序訊息的範例程式碼。
順序訊息分類
順序訊息分為兩類:
全域順序:對於指定的一個Topic,所有訊息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和消費。
分區順序:對於指定的一個Topic,所有訊息根據Sharding Key進行區塊分區。同一個分區內的訊息按照嚴格的FIFO順序進行發布和消費。Sharding Key是順序訊息中用來區分不同分區的關鍵字段,和普通訊息的Key是完全不同的概念。
更多資訊,請參見順序訊息。
前提條件
下載.NET SDK。更多資訊,請參見版本說明。
環境準備。更多資訊,請參見環境準備。
建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源。
擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey。
發送順序訊息
重要
雲訊息佇列 RocketMQ 版服務端判定訊息產生的順序性是參照單一生產者、單一線程並發下訊息發送的時序。如果發送方有多個生產者或者有多個線程並發發送訊息,則此時只能以到達雲訊息佇列 RocketMQ 版服務端的時序作為訊息順序的依據,和業務側的發送順序未必一致。
具體的範例程式碼,請以雲訊息佇列 RocketMQ 版程式碼程式庫為準。
發送順序訊息的範例程式碼如下。
using System;
using ons;
public class OrderProducerExampleForEx
{
public OrderProducerExampleForEx()
{
}
static void Main(string[] args) {
// 配置您的帳號,以下設定均可從控制台擷取。
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
//請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
//AccessKey ID,阿里雲身分識別驗證標識。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
//AccessKey Secret,阿里雲身分識別驗證密鑰。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// 您在訊息佇列RocketMQ版控制台建立的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// 您在訊息佇列RocketMQ版控制台建立的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// 設定日誌路徑。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 建立生產者執行個體。
// 說明:生產者執行個體是安全執行緒的,可用於發送不同Topic的訊息。基本上,您每一個線程只需要一個生產者執行個體。
OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);
// 啟動用戶端執行個體。
producer.start();
// 建立訊息對象。
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
string shardingKey = "App-Test";
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg, shardingKey);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
// 在您的線程即將退出時,關閉生產者執行個體。
producer.shutdown();
}
}
訂閱順序訊息
訂閱順序訊息的範例程式碼如下。
using System;
using System.Text;
using System.Threading;
using ons;
namespace demo
{
public class MyMsgOrderListener : MessageOrderListener
{
public MyMsgOrderListener()
{
}
~MyMsgOrderListener()
{
}
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.OrderAction.Success;
}
}
class OrderConsumerExampleForEx
{
static void Main(string[] args)
{
// 配置您的帳號,以下設定均可從控制台擷取。
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// AccessKey ID,阿里雲身分識別驗證標識。
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
// AccessKey Secret,阿里雲身分識別驗證密鑰。
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
// 您在訊息佇列RocketMQ版控制台建立的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// 您在訊息佇列RocketMQ版控制台建立的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// 設定日誌路徑。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 建立消費者執行個體。
OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);
// 訂閱Topic。
consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());
// 啟動消費者執行個體。
consumer.start();
// 讓主線程睡眠一段時間。
Thread.Sleep(30000);
// 不再使用時,關閉消費者執行個體。
consumer.shutdown();
}
}
}