全部产品
Search
文档中心

云消息队列 RocketMQ 版:订阅消息

更新时间:Jul 25, 2023

本文介绍如何通过云消息队列 RocketMQ 版的C/C++ SDK订阅消息。

订阅方式

云消息队列 RocketMQ 版支持以下两种订阅方式:

  • 集群订阅

    同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。设置方式如下所示。

    // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • 广播订阅

    同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。设置方式如下所示。

    // 广播订阅方式设置
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
说明
  • 请确保同一个Group ID下所有Consumer实例的订阅关系保持一致,更多信息,请参见订阅关系一致
  • 两种不同的订阅方式有着不同的功能限制,例如,广播模式不支持顺序消息、不维护消费进度、不支持重置消费位点等,更多信息,请参见集群消费和广播消费

示例代码

#include "ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        //此处为具体的消息处理过程,确认消息被处理成功请返回CommitMessage。
        //如果有消费异常,或者期望重新消费,可以返回ReconsumeLater,消息将会在一段时间后重新投递。
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    ONSFactoryProperty factoryInfo;
    //请填写在阿里云消息队列RocketMQ版控制台上申请的Group ID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
    //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
    //AccessKey ID,阿里云身份验证标识。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		//AccessKey Secret,阿里云身份验证密钥。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    //请填写阿里云消息队列RocketMQ版控制台上对应实例的接入点。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    //请填写阿里云消息队列RocketMQ版控制台上申请的Topic。
    const char* topic_1 = "topic-1";
    // 订阅topic-1中Tag消息属性为tag-1的所有消息。
    const char* tag_1 = "tag-1";

    const char* topic_2 = "topic-2";
    // 订阅topic-2的所有消息。
    const char* tag_2 = "*";


    //请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    //准备工作完成,必须调用启动函数,才可以正常工作。
    consumer->start();

    //请保持线程常驻,不要执行shutdown操作。
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

更多信息

云消息队列 RocketMQ 版消费端流控的最佳实践,请参见客户端流控设计