すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:注文メッセージの送受信

最終更新日:Aug 27, 2024

注文メッセージは、ApsaraMQ for RocketMQによって提供されるメッセージの一種です。 順序付けられたメッセージは、厳密な先入れ先出し (FIFO) 順序で消費されます。 このトピックでは、TCPクライアントSDK forを使用して順序付きメッセージを送受信する方法に関するサンプルコードを提供します。NETを使用します。

順序付けられたメッセージの分類

順序付けられたメッセージは、次のタイプに分類されます。

  • グローバルに順序付けられたメッセージ: 指定されたトピックのすべてのメッセージは、厳密なFIFO順序で発行および使用されます。

  • パーティション順メッセージ: 指定されたトピック内のすべてのメッセージは、シャーディングキーを使用して異なるパーティションに配布されます。 各パーティション内のメッセージは、厳密なFIFO順序で消費されます。 シャーディングキーは、異なるパーティションを識別するために順序付けられたメッセージに使用されるキーフィールドです。 シャーディングキーは、通常のメッセージのキーとは異なります。

詳細については、「注文メッセージ」をご参照ください。

前提条件

  • SDK for。NETがダウンロードされます。 詳細については、「リリースノート」をご参照ください。

  • 環境を整えます。 詳細については、「環境の準備」をご参照ください。

  • コードで指定するリソースは、ApsaraMQ for RocketMQコンソールで作成されます。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。

  • Alibaba CloudアカウントのAccessKeyペアが取得されます。 詳細については、「AccessKey の作成」をご参照ください。

順序付けられたメッセージを送信する

重要

ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。

サンプルコードの詳細については、ApsaraMQ For RocketMQコードリポジトリをご参照ください。

次のサンプルコードは、TPCクライアントSDK forを使用して順序付けられたメッセージを送信する方法の例を示しています。NET:

using System;
using ons;

public class OrderProducerExampleForEx
{
    public OrderProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account. You can obtain the account information in the Alibaba Cloud Management Console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
	      // The AccessKey secret that is used for authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create a producer instance. 
        // Note: Producer instances are thread-safe and can be used to send messages from different topics. In most cases, each thread requires only one producer instance. 
        OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);

        // Start the producer instance. 
        producer.start();

        // Create a message. 
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        string shardingKey = "App-Test";
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg, shardingKey);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Before you exit your thread, terminate the producer instance. 
        producer.shutdown();

    }
}

注文されたメッセージを購読する

次のサンプルコードは、TCPクライアントSDK forを使用して順序付きメッセージをサブスクライブする方法の例を示しています。NET:

using System;
using System.Text;
using System.Threading;
using ons;

namespace demo
{

    public class MyMsgOrderListener : MessageOrderListener
    {
        public MyMsgOrderListener()
        {

        }

        ~MyMsgOrderListener()
        {
        }

        public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
        {
            Byte[] text = Encoding.Default.GetBytes(value.getBody());
            Console.WriteLine(Encoding.UTF8.GetString(text));
            return ons.OrderAction.Success;
        }
    }

    class OrderConsumerExampleForEx
    {
        static void Main(string[] args)
        {
            // Configure your account. You can obtain the account information in the Alibaba Cloud Management Console. 
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // The AccessKey ID that is used for authentication. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
            // The AccessKey secret that is used for authentication. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
            // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
            // The topic that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
            // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
            // The log path. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

            // Create a consumer instance. 
            OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);

            // Subscribe to a topic. 
            consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());

            // Start the consumer instance. 
            consumer.start();

            // Put the main thread to sleep for a period of time. 
            Thread.Sleep(30000);

            // If the consumer instance is no longer used, shut down the consumer instance. 
            consumer.shutdown();
        }
    }
}