全部產品
Search
文件中心

ApsaraMQ for RocketMQ:訂閱訊息

更新時間:Jul 01, 2024

本文介紹如何通過雲訊息佇列 RocketMQ 版的SDK使用.NET語言進行訊息訂閱。

說明

請確保同一個Group ID下所有Consumer執行個體的訂閱關係保持一致。更多資訊,請參見訂閱關係一致

雲訊息佇列 RocketMQ 版支援以下兩種訂閱者式:

  • 叢集訂閱

    同一個Group ID所標識的所有Consumer平均分攤消費訊息。例如某個Topic有9條訊息,一個Group ID有3個Consumer執行個體,那麼在叢集消費模式下每個執行個體平均分攤,只消費其中的3條訊息。

        // 叢集訂閱者式設定(不設定的情況下,預設為叢集訂閱者式)
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);              
  • 廣播訂閱

    同一個Group ID所標識的所有Consumer都會各自消費某條訊息一次。例如某個Topic有9條訊息,一個Group ID有3個Consumer執行個體,那麼在廣播消費模式下每個執行個體都會各自消費9條訊息。

        // 廣播訂閱者式設定
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);                

範例程式碼

using System;
using System.Threading;
using System.Text;
using ons;

// 從Broker拉取訊息時要執行的回呼函數。
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
        //AccessKey ID,阿里雲身分識別驗證標識。
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
	      //AccessKey Secret,阿里雲身分識別驗證密鑰。
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // 您在訊息佇列RocketMQ版控制台建立的Group ID。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // 您在訊息佇列RocketMQ版控制台建立的Topic。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // 設定日誌路徑。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // 叢集消費。
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // 廣播消費。
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // 建立消費者執行個體。
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // 訂閱Topic。
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // 啟動用戶端執行個體。
        consumer.start();

        // 該設定僅供Demo使用,實際生產環境中請保證進程不退出。
        Thread.Sleep(300000);

        // 在進程即將退出時,關閉消費者執行個體。
        consumer.shutdown();
    }
}