Classification of ordered messages
Ordered messages are classified into the following types:
Globally ordered message: All messages in the specified topic are published and consumed in strict FIFO order.
Partitionally ordered messages: All messages in the specified topic are distributed to different partitions by using sharding keys. The messages in each partition are consumed in strict FIFO order. A sharding key is a key field that is used for ordered messages to identify different partitions. Sharding keys are different from the keys of normal messages.
For more information, see Ordered messages.
Prerequisites
The SDK for .NET is downloaded. For more information, see Release notes.
The environment is prepared. For more information, see Prepare the environment.
The resources that you want to specify in the code are created in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.
The AccessKey pair of your Alibaba Cloud account is obtained. For more information, see Create an AccessKey pair.
Send ordered messages
Important
An ApsaraMQ for RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer or thread to send messages. If the sender uses multiple producers or threads to concurrently send messages, the message order is determined by the order in which the messages are received by the ApsaraMQ for RocketMQ broker. This order may be different from the sending order on the business side.
For information about the detailed sample code, see the ApsaraMQ for RocketMQ code repository.
The following sample code provides an example on how to send ordered messages by using the TPC client SDK for .NET:
using System;
using ons;
public class OrderProducerExampleForEx
{
public OrderProducerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);
producer.start();
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
string shardingKey = "App-Test";
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg, shardingKey);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
producer.shutdown();
}
}
Subscribe to ordered messages
The following sample code provides an example on how to subscribe to ordered messages by using the TCP client SDK for .NET:
using System;
using System.Text;
using System.Threading;
using ons;
namespace demo
{
public class MyMsgOrderListener : MessageOrderListener
{
public MyMsgOrderListener()
{
}
~MyMsgOrderListener()
{
}
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.OrderAction.Success;
}
}
class OrderConsumerExampleForEx
{
static void Main(string[] args)
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);
consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());
consumer.start();
Thread.Sleep(30000);
consumer.shutdown();
}
}
}