すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:注文メッセージの送受信

最終更新日:Sep 23, 2024

注文メッセージは、ApsaraMQ for RocketMQによって提供されるメッセージの一種です。 注文されたメッセージは、厳密な先入れ先出しの順序で発行および使用されます。 このトピックでは、CまたはC ++ 用のTCPクライアントSDKを使用して順序付きメッセージを送受信する方法に関するサンプルコードを提供します。

背景情報

順序付けられたメッセージは、次のタイプに分類されます。

  • グローバルに順序付けられたメッセージ: 指定されたトピックのすべてのメッセージは、先入れ先出し (FIFO) の順序で発行および使用されます。

  • パーティション順メッセージ: 指定されたトピックのすべてのメッセージは、シャーディングキーを使用して異なるパーティションに配布されます。 各パーティション内のメッセージは、FIFO順に発行され、消費される。 シャーディングキーは、異なるパーティションを識別するために順序付けられたメッセージに使用されるキーフィールドです。 シャーディングキーは、通常のメッセージのキーとは異なります。

詳細については、「注文メッセージ」をご参照ください。

前提条件

次の操作が実行されていることを確認します。

  • CまたはC ++ 用のSDKがダウンロードされます。 詳細については、「リリースノート」をご参照ください。

  • 環境を整えます。 詳細については、「環境準備 (V1.x.x)」をご参照ください。

  • コードで指定するリソースは、ApsaraMQ for RocketMQコンソールで作成されます。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。

  • Alibaba CloudアカウントのAccessKeyペアが取得されます。 詳細については、「AccessKey の作成」をご参照ください。

順序付けられたメッセージを送信する

重要

ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。

次のサンプルコードは、CまたはC ++ 用のTCPクライアントSDKを使用して順序付きメッセージを送信する方法の例を示しています。

#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;

int main()
{
    // The parameters that are required to create and use a producer. 
    ONSFactoryProperty factoryInfo;
    // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); 
    // The topic that you created in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // The message content. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    // The AccessKey ID that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		// The AccessKey secret that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));

    // Create the producer. 
    OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);


    // Before you send the message, call the start() method only once to start the producer. 
    pProducer->start();

    Message msg(
                //Message Topic
                factoryInfo.getPublishTopics(),
                // The message tag. A message tag is similar to a Gmail tag and is used by consumers to filter messages in the ApsaraMQ for RocketMQ broker. 
                "TagA",
                // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and the consumer must agree on the serialization and deserialization methods. 
                factoryInfo.getMessageContent()
    );

    // The message key. A key is the business-specific attribute of a message and must be globally unique. 
    // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. 
    // Note: You can send and receive a message even if you do not specify the key. 
    msg.setKey("ORDERID_100");
    // The key field that is used to identify partitions for partitionally ordered messages. 
    // This field can be set to a non-empty string for globally ordered messages. 
    std::string shardingKey = "abc";  
    // Messages that have the same sharding key are sent in order. 
    try
    {
        // Send the message. If no exception is thrown, the message is sent. 
        SendResultONS sendResult = pProducer->send(msg, shardingKey);
    std::cout << "send success" << std::endl;
    }
    catch(ONSClientException & e)
    {
        // Specify the logic to handle errors. 
    }
    // Before you exit your application, destroy the producer. If you do not destroy the producer, issues such as memory leaks may occur. 
    pProducer->shutdown();

    return 0;
}           

注文されたメッセージを購読する

次のサンプルコードは、CまたはC ++ 用のTCPクライアントSDKを使用して順序付きメッセージをサブスクライブする方法の例を示しています。

#include "ONSFactory.h"
using namespace std;
using namespace ons;

// Create the consumer instance. 
//After the push consumer pulls the message, the push consumer calls the consumeMessage function of the instance. 
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
    MyMsgListener()
    {
    }
    virtual ~MyMsgListener()
    {
    }

    virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
    {
        // Consume messages based on your business requirements. 
        return Success; //CONSUME_SUCCESS;
    }
};


int main(int argc, char* argv[])
{
    // The parameters that are required to create and use the consumer. 
    ONSFactoryProperty factoryInfo;
    // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
    // The topic that you created in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    // The AccessKey ID that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		// The AccessKey secret that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");


    // Create the consumer. 
    OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
    MyMsgListener  msglistener;
    // The message topic and tag to which the consumer subscribes. 
    orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    // Register the instance to listen to messages. After the consumer pulls the messages, the consumer calls the consumeMessage function of the message listening class. 

    //Start the consumer. 
    orderConsumer->start();

    for(volatile int i = 0; i < 1000000000; ++i) {
        //wait
    }

    // Destroy the consumer. Before you exit the application, destroy the consumer. Otherwise, issues such as memory leaks may occur. 
    orderConsumer->shutdown();

   return 0;
}