本文介紹如何通過雲訊息佇列 RocketMQ 版的C/C++ SDK訂閱訊息。
訂閱者式
雲訊息佇列 RocketMQ 版支援以下兩種訂閱者式:
叢集訂閱
同一個Group ID所標識的所有Consumer平均分攤消費訊息。例如某個Topic有9條訊息,一個Group ID有3個Consumer執行個體,那麼在叢集消費模式下每個執行個體平均分攤,只消費其中的3條訊息。設定方式如下所示。
// 叢集訂閱者式設定(不設定的情況下,預設為叢集訂閱者式) factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
廣播訂閱
同一個Group ID所標識的所有Consumer都會各自消費某條訊息一次。例如某個Topic有9條訊息,一個Group ID有3個Consumer執行個體,那麼在廣播消費模式下每個執行個體都會各自消費9條訊息。設定方式如下所示。
// 廣播訂閱者式設定 factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
範例程式碼
#include "ONSFactory.h"
#include <iostream>
#include <thread>
#include <mutex>
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(Message& message, ConsumeContext& context) {
//此處為具體的訊息處理過程,確認訊息被處理成功請返回CommitMessage。
//如果有消費異常,或者期望重新消費,可以返回ReconsumeLater,訊息將會在一段時間後重新投遞。
std::lock_guard<std::mutex> lk(console_mtx);
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
<< message.getMsgID() << std::endl;
return CommitMessage;
}
};
int main(int argc, char* argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factoryInfo;
//請填寫在阿里雲訊息佇列RocketMQ版控制台上申請的Group ID,從執行個體化的版本開始,ProducerId和CounsumerId已經統一,此處設定是為了介面保持向前相容。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
//請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
//AccessKey ID,阿里雲身分識別驗證標識。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
//AccessKey Secret,阿里雲身分識別驗證密鑰。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
//請填寫阿里雲訊息佇列RocketMQ版控制台上對應執行個體的存取點。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
//請填寫阿里雲訊息佇列RocketMQ版控制台上申請的Topic。
const char* topic_1 = "topic-1";
// 訂閱topic-1中Tag訊息屬性為tag-1的所有訊息。
const char* tag_1 = "tag-1";
const char* topic_2 = "topic-2";
// 訂閱topic-2的所有訊息。
const char* tag_2 = "*";
//請註冊自訂偵聽函數用來處理接收到的訊息,並返迴響應的處理結果。
ExampleMessageListener * message_listener = new ExampleMessageListener();
consumer->subscribe(topic_1, tag_1, message_listener);
consumer->subscribe(topic_2, tag_2, message_listener);
//準備工作完成,必須調用啟動函數,才可以正常工作。
consumer->start();
//請保持線程常駐,不要執行shutdown操作。
std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
consumer->shutdown();
delete message_listener;
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
更多資訊
雲訊息佇列 RocketMQ 版消費端流控的最佳實務,請參見用戶端流控設計。