本文介紹如何通過雲訊息佇列 RocketMQ 版的SDK使用.NET語言進行訊息訂閱。
說明
請確保同一個Group ID下所有Consumer執行個體的訂閱關係保持一致。更多資訊,請參見訂閱關係一致。
雲訊息佇列 RocketMQ 版支援以下兩種訂閱者式:
叢集訂閱
同一個Group ID所標識的所有Consumer平均分攤消費訊息。例如某個Topic有9條訊息,一個Group ID有3個Consumer執行個體,那麼在叢集消費模式下每個執行個體平均分攤,只消費其中的3條訊息。
// 叢集訂閱者式設定(不設定的情況下,預設為叢集訂閱者式) factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
廣播訂閱
同一個Group ID所標識的所有Consumer都會各自消費某條訊息一次。例如某個Topic有9條訊息,一個Group ID有3個Consumer執行個體,那麼在廣播消費模式下每個執行個體都會各自消費9條訊息。
// 廣播訂閱者式設定 factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
範例程式碼
using System;
using System.Threading;
using System.Text;
using ons;
// 從Broker拉取訊息時要執行的回呼函數。
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
//請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
//AccessKey ID,阿里雲身分識別驗證標識。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
//AccessKey Secret,阿里雲身分識別驗證密鑰。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// 您在訊息佇列RocketMQ版控制台建立的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// 您在訊息佇列RocketMQ版控制台建立的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// 設定日誌路徑。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 叢集消費。
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// 廣播消費。
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// 建立消費者執行個體。
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// 訂閱Topic。
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// 啟動用戶端執行個體。
consumer.start();
// 該設定僅供Demo使用,實際生產環境中請保證進程不退出。
Thread.Sleep(300000);
// 在進程即將退出時,關閉消費者執行個體。
consumer.shutdown();
}
}