注文メッセージは、ApsaraMQ for RocketMQによって提供されるメッセージの一種です。 順序付けられたメッセージは、厳密な先入れ先出し (FIFO) 順序で消費されます。 このトピックでは、TCPクライアントSDK forを使用して順序付きメッセージを送受信する方法に関するサンプルコードを提供します。NETを使用します。
順序付けられたメッセージの分類
順序付けられたメッセージは、次のタイプに分類されます。
グローバルに順序付けられたメッセージ: 指定されたトピックのすべてのメッセージは、厳密なFIFO順序で発行および使用されます。
パーティション順メッセージ: 指定されたトピック内のすべてのメッセージは、シャーディングキーを使用して異なるパーティションに配布されます。 各パーティション内のメッセージは、厳密なFIFO順序で消費されます。 シャーディングキーは、異なるパーティションを識別するために順序付けられたメッセージに使用されるキーフィールドです。 シャーディングキーは、通常のメッセージのキーとは異なります。
詳細については、「注文メッセージ」をご参照ください。
前提条件
SDK for。NETがダウンロードされます。 詳細については、「リリースノート」をご参照ください。
環境を整えます。 詳細については、「環境の準備」をご参照ください。
コードで指定するリソースは、ApsaraMQ for RocketMQコンソールで作成されます。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。
Alibaba CloudアカウントのAccessKeyペアが取得されます。 詳細については、「AccessKey の作成」をご参照ください。
順序付けられたメッセージを送信する
ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。
サンプルコードの詳細については、ApsaraMQ For RocketMQコードリポジトリをご参照ください。
次のサンプルコードは、TPCクライアントSDK forを使用して順序付けられたメッセージを送信する方法の例を示しています。NET:
using System;
using ons;
public class OrderProducerExampleForEx
{
public OrderProducerExampleForEx()
{
}
static void Main(string[] args) {
// Configure your account. You can obtain the account information in the Alibaba Cloud Management Console.
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// The AccessKey secret that is used for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// The topic that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// The log path.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Create a producer instance.
// Note: Producer instances are thread-safe and can be used to send messages from different topics. In most cases, each thread requires only one producer instance.
OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);
// Start the producer instance.
producer.start();
// Create a message.
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
string shardingKey = "App-Test";
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg, shardingKey);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
// Before you exit your thread, terminate the producer instance.
producer.shutdown();
}
}
注文されたメッセージを購読する
次のサンプルコードは、TCPクライアントSDK forを使用して順序付きメッセージをサブスクライブする方法の例を示しています。NET:
using System;
using System.Text;
using System.Threading;
using ons;
namespace demo
{
public class MyMsgOrderListener : MessageOrderListener
{
public MyMsgOrderListener()
{
}
~MyMsgOrderListener()
{
}
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.OrderAction.Success;
}
}
class OrderConsumerExampleForEx
{
static void Main(string[] args)
{
// Configure your account. You can obtain the account information in the Alibaba Cloud Management Console.
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// The AccessKey ID that is used for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
// The AccessKey secret that is used for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// The topic that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// The log path.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Create a consumer instance.
OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);
// Subscribe to a topic.
consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());
// Start the consumer instance.
consumer.start();
// Put the main thread to sleep for a period of time.
Thread.Sleep(30000);
// If the consumer instance is no longer used, shut down the consumer instance.
consumer.shutdown();
}
}
}