本文介绍如何通过云消息队列 RocketMQ 版的C/C++ SDK订阅消息。
订阅方式
云消息队列 RocketMQ 版支持以下两种订阅方式:
集群订阅
同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。设置方式如下所示。
// 集群订阅方式设置(不设置的情况下,默认为集群订阅方式) factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
广播订阅
同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。设置方式如下所示。
// 广播订阅方式设置 factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
示例代码
#include "ONSFactory.h"
#include <iostream>
#include <thread>
#include <mutex>
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(Message& message, ConsumeContext& context) {
//此处为具体的消息处理过程,确认消息被处理成功请返回CommitMessage。
//如果有消费异常,或者期望重新消费,可以返回ReconsumeLater,消息将会在一段时间后重新投递。
std::lock_guard<std::mutex> lk(console_mtx);
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
<< message.getMsgID() << std::endl;
return CommitMessage;
}
};
int main(int argc, char* argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factoryInfo;
//请填写在阿里云消息队列RocketMQ版控制台上申请的Group ID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
//请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
//AccessKey ID,阿里云身份验证标识。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
//AccessKey Secret,阿里云身份验证密钥。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
//请填写阿里云消息队列RocketMQ版控制台上对应实例的接入点。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
//请填写阿里云消息队列RocketMQ版控制台上申请的Topic。
const char* topic_1 = "topic-1";
// 订阅topic-1中Tag消息属性为tag-1的所有消息。
const char* tag_1 = "tag-1";
const char* topic_2 = "topic-2";
// 订阅topic-2的所有消息。
const char* tag_2 = "*";
//请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。
ExampleMessageListener * message_listener = new ExampleMessageListener();
consumer->subscribe(topic_1, tag_1, message_listener);
consumer->subscribe(topic_2, tag_2, message_listener);
//准备工作完成,必须调用启动函数,才可以正常工作。
consumer->start();
//请保持线程常驻,不要执行shutdown操作。
std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
consumer->shutdown();
delete message_listener;
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
更多信息
云消息队列 RocketMQ 版消费端流控的最佳实践,请参见客户端流控设计。