すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:メッセージの購読

最終更新日:Jul 09, 2024

このトピックでは、ApsaraMQ for RocketMQが提供するTCPクライアントSDK for CまたはC ++ を使用してメッセージをサブスクライブする方法について説明します。

サブスクリプションモード

ApsaraMQ for RocketMQは、次のサブスクリプションモードをサポートしています。

  • サブスクリプションのクラスター化

    同じグループIDによって識別されるすべてのコンシューマは、等しい数のメッセージを消費する。 たとえば、トピックには9つのメッセージが含まれ、消費者グループには3つの消費者が含まれます。 クラスタリング消費モードでは、各コンシューマは3つのメッセージを消費します。 次のコードは、クラスター化サブスクリプションモードを設定する方法を示しています。

    // Configure the clustering subscription mode. This is the default mode.
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • 放送サブスクリプション

    同じグループIDによって識別される消費者の各々は、すべてのメッセージを1回消費する。 たとえば、トピックには9つのメッセージが含まれ、消費者グループには3つの消費者が含まれます。 ブロードキャスト消費モードでは、各消費者は9つのメッセージを消費する。 次のコードは、ブロードキャストサブスクリプションモードを設定する方法を示しています。

    // Configure the broadcasting subscription mode.
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
説明
  • 同じグループIDで識別されるすべてのコンシューマインスタンスに対して、一貫したサブスクリプションを維持する必要があります。 詳細については、「サブスクリプションの一貫性」をご参照ください。

  • 前述のサブスクリプションモードにはさまざまな制限が課されます。 たとえば、ブロードキャストサブスクリプションモードでは、順序付きメッセージの送受信、消費の進行状況の維持、消費者オフセットのリセットはできません。 詳細については、「クラスタリング消費とブロードキャスト消費」をご参照ください。

サンプルコード:

#include "ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        // The consumer receives the message and attempts to consume it. After the message is consumed, CommitMessage is returned. 
        // If the consumer fails to consume the message or wants to consume the message again, ReconsumeLater is returned. Then, the message is delivered to the consumer again after a predefined period of time. 
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    ONSFactoryProperty factoryInfo;
    // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. ApsaraMQ for RocketMQ instances use group IDs instead of producer IDs and consumer IDs. This parameter is configured to ensure the compatibility with earlier versions. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
    // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    // The AccessKey ID that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		// The AccessKey secret that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    // The endpoint that is used to access the ApsaraMQ for RocketMQ instance. You can obtain the endpoint in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // The topic that you created in the ApsaraMQ for RocketMQ console. 
    const char* topic_1 = "topic-1";
    // Subscribe to the messages that are attached with the tag-1 tag in topic-1. 
    const char* tag_1 = "tag-1";

    const char* topic_2 = "topic-2";
    // Subscribe to all messages in topic-2. 
    const char* tag_2 = "*";


    // Use a custom listener function to process the received messages and return the results. 
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    // The preparation is complete. You must invoke the startup function to start the consumer. 
    consumer->start();

    // Keep the thread running and do not terminate the consumer. 
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

追加情報

ApsaraMQ For RocketMQのコンシューマースロットリングのベストプラクティスについては、「RocketMQクライアントトラフィック制御デザイン」をご参照ください。