全部產品
Search
文件中心

ApsaraMQ for RocketMQ:訂閱訊息

更新時間:Jul 01, 2024

本文介紹如何通過雲訊息佇列 RocketMQ 版的C/C++ SDK訂閱訊息。

訂閱者式

雲訊息佇列 RocketMQ 版支援以下兩種訂閱者式:

  • 叢集訂閱

    同一個Group ID所標識的所有Consumer平均分攤消費訊息。例如某個Topic有9條訊息,一個Group ID有3個Consumer執行個體,那麼在叢集消費模式下每個執行個體平均分攤,只消費其中的3條訊息。設定方式如下所示。

    // 叢集訂閱者式設定(不設定的情況下,預設為叢集訂閱者式)
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • 廣播訂閱

    同一個Group ID所標識的所有Consumer都會各自消費某條訊息一次。例如某個Topic有9條訊息,一個Group ID有3個Consumer執行個體,那麼在廣播消費模式下每個執行個體都會各自消費9條訊息。設定方式如下所示。

    // 廣播訂閱者式設定
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
說明
  • 請確保同一個Group ID下所有Consumer執行個體的訂閱關係保持一致,更多資訊,請參見訂閱關係一致

  • 兩種不同的訂閱者式有著不同的功能限制,例如,廣播模式不支援順序訊息、不維護消費進度、不支援重設消費位點等,更多資訊,請參見叢集消費和廣播消費

範例程式碼

#include "ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        //此處為具體的訊息處理過程,確認訊息被處理成功請返回CommitMessage。
        //如果有消費異常,或者期望重新消費,可以返回ReconsumeLater,訊息將會在一段時間後重新投遞。
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    ONSFactoryProperty factoryInfo;
    //請填寫在阿里雲訊息佇列RocketMQ版控制台上申請的Group ID,從執行個體化的版本開始,ProducerId和CounsumerId已經統一,此處設定是為了介面保持向前相容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
    //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
    //AccessKey ID,阿里雲身分識別驗證標識。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		//AccessKey Secret,阿里雲身分識別驗證密鑰。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    //請填寫阿里雲訊息佇列RocketMQ版控制台上對應執行個體的存取點。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    //請填寫阿里雲訊息佇列RocketMQ版控制台上申請的Topic。
    const char* topic_1 = "topic-1";
    // 訂閱topic-1中Tag訊息屬性為tag-1的所有訊息。
    const char* tag_1 = "tag-1";

    const char* topic_2 = "topic-2";
    // 訂閱topic-2的所有訊息。
    const char* tag_2 = "*";


    //請註冊自訂偵聽函數用來處理接收到的訊息,並返迴響應的處理結果。
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    //準備工作完成,必須調用啟動函數,才可以正常工作。
    consumer->start();

    //請保持線程常駐,不要執行shutdown操作。
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

更多資訊

雲訊息佇列 RocketMQ 版消費端流控的最佳實務,請參見用戶端流控設計