全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發順序訊息

更新時間:Aug 01, 2024

順序訊息(FIFO 訊息)是雲訊息佇列 RocketMQ 版提供的一種嚴格按照順序來發布和消費的訊息類型。本文提供使用TCP協議下的.NET SDK收發順序訊息的範例程式碼。

順序訊息分類

順序訊息分為兩類:

  • 全域順序:對於指定的一個Topic,所有訊息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和消費。

  • 分區順序:對於指定的一個Topic,所有訊息根據Sharding Key進行區塊分區。同一個分區內的訊息按照嚴格的FIFO順序進行發布和消費。Sharding Key是順序訊息中用來區分不同分區的關鍵字段,和普通訊息的Key是完全不同的概念。

更多資訊,請參見順序訊息

前提條件

  • 下載.NET SDK。更多資訊,請參見版本說明

  • 環境準備。更多資訊,請參見環境準備

  • 建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源

  • 擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey

發送順序訊息

重要

雲訊息佇列 RocketMQ 版服務端判定訊息產生的順序性是參照單一生產者、單一線程並發下訊息發送的時序。如果發送方有多個生產者或者有多個線程並發發送訊息,則此時只能以到達雲訊息佇列 RocketMQ 版服務端的時序作為訊息順序的依據,和業務側的發送順序未必一致。

具體的範例程式碼,請以雲訊息佇列 RocketMQ 版程式碼程式庫為準。

發送順序訊息的範例程式碼如下。

using System;
using ons;

public class OrderProducerExampleForEx
{
    public OrderProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // 配置您的帳號,以下設定均可從控制台擷取。
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
        //AccessKey ID,阿里雲身分識別驗證標識。
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
	      //AccessKey Secret,阿里雲身分識別驗證密鑰。
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // 您在訊息佇列RocketMQ版控制台建立的Group ID。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // 您在訊息佇列RocketMQ版控制台建立的Topic。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // 設定日誌路徑。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // 建立生產者執行個體。
        // 說明:生產者執行個體是安全執行緒的,可用於發送不同Topic的訊息。基本上,您每一個線程只需要一個生產者執行個體。
        OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);

        // 啟動用戶端執行個體。
        producer.start();

        // 建立訊息對象。
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        string shardingKey = "App-Test";
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg, shardingKey);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // 在您的線程即將退出時,關閉生產者執行個體。
        producer.shutdown();

    }
}

訂閱順序訊息

訂閱順序訊息的範例程式碼如下。

using System;
using System.Text;
using System.Threading;
using ons;

namespace demo
{

    public class MyMsgOrderListener : MessageOrderListener
    {
        public MyMsgOrderListener()
        {

        }

        ~MyMsgOrderListener()
        {
        }

        public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
        {
            Byte[] text = Encoding.Default.GetBytes(value.getBody());
            Console.WriteLine(Encoding.UTF8.GetString(text));
            return ons.OrderAction.Success;
        }
    }

    class OrderConsumerExampleForEx
    {
        static void Main(string[] args)
        {
            // 配置您的帳號,以下設定均可從控制台擷取。
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // AccessKey ID,阿里雲身分識別驗證標識。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
            // AccessKey Secret,阿里雲身分識別驗證密鑰。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
            // 您在訊息佇列RocketMQ版控制台建立的Group ID。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
            // 您在訊息佇列RocketMQ版控制台建立的Topic。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
            // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
            // 設定日誌路徑。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

            // 建立消費者執行個體。
            OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);

            // 訂閱Topic。
            consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());

            // 啟動消費者執行個體。
            consumer.start();

            // 讓主線程睡眠一段時間。
            Thread.Sleep(30000);

            // 不再使用時,關閉消費者執行個體。
            consumer.shutdown();
        }
    }
}