このトピックでは、TCPクライアントSDK forを使用してメッセージをサブスクライブする方法について説明します。ApsaraMQ for RocketMQが提供するNET。
説明
同じグループIDで識別されるすべてのコンシューマインスタンスに対して、一貫したサブスクリプションを維持する必要があります。 詳細については、「サブスクリプションの一貫性」をご参照ください。
ApsaraMQ for RocketMQは、次のサブスクリプションモードをサポートしています。
サブスクリプションのクラスター化
同じグループIDによって識別されるすべてのコンシューマは、等しい数のメッセージを消費する。 たとえば、トピックには9つのメッセージが含まれ、消費者グループには3つの消費者が含まれます。 クラスタリング消費モードでは、各コンシューマは3つのメッセージを消費します。
// Configure clustering subscription, which is the default mode. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
放送サブスクリプション
同じグループIDによって識別される消費者の各々は、すべてのメッセージを1回消費する。 たとえば、トピックには9つのメッセージが含まれ、消費者グループには3つの消費者が含まれます。 ブロードキャスト消費モードでは、各消費者は9つのメッセージすべてを消費する。
// Configure broadcasting subscription. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
サンプルコード:
using System;
using System.Threading;
using System.Text;
using ons;
// The callback function that is executed when a message is pulled from the ApsaraMQ for RocketMQ broker.
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// The AccessKey secret that is used for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// The topic that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// The log path.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Clustering consumption.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// Broadcasting consumption.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// Create the consumer instance.
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// Subscribe to the topic.
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// Start the consumer instance.
consumer.start();
// This setting is used only in this demo. In actual production environments, you cannot exit the process.
Thread.Sleep(300000);
// Before you exit the process, terminate the consumer instance.
consumer.shutdown();
}
}