本文介绍如何通过云消息队列 RocketMQ 版的SDK使用.NET语言进行消息订阅。
说明
请确保同一个Group ID下所有Consumer实例的订阅关系保持一致。更多信息,请参见订阅关系一致。
云消息队列 RocketMQ 版支持以下两种订阅方式:
集群订阅
同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。
// 集群订阅方式设置(不设置的情况下,默认为集群订阅方式) factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
广播订阅
同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。
// 广播订阅方式设置 factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
示例代码
using System;
using System.Threading;
using System.Text;
using ons;
// 从Broker拉取消息时要执行的回调函数。
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
//请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
//AccessKey ID,阿里云身份验证标识。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
//AccessKey Secret,阿里云身份验证密钥。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// 您在消息队列RocketMQ版控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// 您在消息队列RocketMQ版控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// 设置日志路径。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 集群消费。
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// 广播消费。
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// 创建消费者实例。
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// 订阅Topic。
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// 启动客户端实例。
consumer.start();
// 该设置仅供Demo使用,实际生产环境中请保证进程不退出。
Thread.Sleep(300000);
// 在进程即将退出时,关闭消费者实例。
consumer.shutdown();
}
}