すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:サンプルコード

最終更新日:Aug 15, 2024

ApsaraMQ for RocketMQ 5.xインスタンスは、RocketMQ 3.xまたは4.x SDKを使用するクライアントと互換性があります。 SDKを使用してApsaraMQ for RocketMQ 5.xインスタンスに接続し、メッセージを送受信できます。 このトピックでは、RocketMQ 3.xまたは4.x SDK for C ++ を使用してメッセージを送受信するために使用されるサンプルコードについて説明します。

重要
  • 最新のRocketMQ 5.x SDKを使用することを推奨します。 これらのSDKは、ApsaraMQ for RocketMQ 5.xブローカーと完全に互換性があり、より多くの機能と拡張機能を提供します。 詳細については、「リリースノート」をご参照ください。

  • Alibaba Cloudは、RocketMQ 4.x、3.x、およびTCPクライアントSDKのみを保持します。 既存のビジネスにのみ使用することを推奨します。

通常のメッセージの送受信

通常のメッセージを送信する

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console.       
    DefaultMQProducer producer("GID_XXX");          
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.  
    producer.setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a virtual private cloud (VPC), you do not need to specify the username or password.   
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 

    // Start the producer after you configure the required parameters. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    for (int i = 0; i < count; ++i) {
        // The topic that is used to send normal messages. You must create the topic in the ApsaraMQ for RocketMQ console in advance. 
        MQMessage msg("YOURTOPIC","HiTAG","HelloCPPSDK.");
        try {
            SendResult sendResult = producer.send(msg);
            std::cout <<"SendResult:"<<sendResult.getSendStatus()<< ", Message ID: " << sendResult.getMsgId() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

通常のメッセージを購読する

#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;


class ExampleMessageListener : public MessageListenerConcurrently {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
    consumer->setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 
    
    auto start = std::chrono::system_clock::now();

    // Use a custom listener function to process the received messages and return the processing results. 
    ExampleMessageListener *messageListener = new ExampleMessageListener();
    consumer->subscribe("YOURTOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // The preparation is complete. You must invoke the start() function to start the consumer. 
    // ********************************************
    // 1. Make sure that the subscription is configured before you start the consumer. 
    // 2. Make sure that the subscriptions of consumers in the same consumer group are the same. 
    // *********************************************
    consumer->start();

    // Keep the thread running and do not shut down the consumer. 
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

注文メッセージの送受信

順序付けられたメッセージを送信する

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

class ExampleSelectMessageQueueByHash : public MessageQueueSelector {
public:
    MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) {
        // To customize partitions, calculate the queue to which a message is routed based on the arg parameter that specifies the partition key. In this example, the arg parameter of the INT type is used. 
        int orderId = *static_cast<int *>(arg);
        int index = orderId % mqs.size();
        return mqs[0];
    }
};

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
    DefaultMQProducer producer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
    producer.setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 

    // Start the producer after you configure the required parameters. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    // You can configure the number of retries for a message to make sure that the message is sent. 
    int retryTimes = 1;
    // Use the configured custom parameter arg to calculate the queue to which a message is sent and send the message to the specified queue. The arg parameter specifies the partition ID. For information about how to select a message queue, see the MessageQueueSelector operation. 
    ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash();
    for (int i = 0; i < count; ++i) {
        // The topic that is used to send ordered messages. You must create the topic in the ApsaraMQ for RocketMQ console in advance. 
        MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message.");
        try {
            SendResult sendResult = producer.send(msg, pSelector, &i, 1, false);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                      << "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

注文されたメッセージを購読する


#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;


class ExampleOrderlyMessageListener : public MessageListenerOrderly {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
    consumer->setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 
 
    auto start = std::chrono::system_clock::now();

    // Use a custom listener function to process the received messages and return the processing results. 
    ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener();
    consumer->subscribe("YOUR ORDERLY TOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // The preparation is complete. You must invoke the start() function to start the consumer. 
    // ********************************************
    // 1. Make sure that the subscription is configured before you start the consumer. 
    // 2. Make sure that the subscriptions of consumers in the same consumer group are the same. 
    // *********************************************
    consumer->start();

    // Keep the thread running and do not shut down the consumer. 
    std::this_thread::sleep_for(std::chrono::seconds (60 ));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

スケジュールまたは遅延メッセージの送受信

スケジュールまたは遅延メッセージの送信

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
    DefaultMQProducer producer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
    producer.setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
     // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 

    // Start the producer after you configure the required parameters. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    for (int i = 0; i < count; ++i) {
        // The topic that is used to send scheduled or delayed messages. You must create the topic in the ApsaraMQ for RocketMQ console in advance. 
        MQMessage msg("YOUR DELAY TOPIC", "HiTAG", "Hello,CPP SDK, Delay Message.");
        chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
        chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
        // The time when the ApsaraMQ for RocketMQ broker delivers the message to the consumer. Unit: milliseconds. For example, if you set this parameter to 2022-08-07 16:21:00, the broker delivers the message at 16:21:00 on August 7, 2022. The value must be later than the current time. 
        If you set the parameter to a time earlier than the current time, the message is immediately delivered to the consumer. 
        // The scheduled or delayed time. For example, if you set the scheduled or delayed time to 10000, the message is sent 10 seconds after the current time. 
        long exp = mil.count() + 10000;
        msg.setProperty("__STARTDELIVERTIME", to_string(exp));
        std::cout << "Now: " << mil.count() << " Exp:" << exp << std::endl;
        try {
            SendResult sendResult = producer.send(msg);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                      << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

スケジュールまたは遅延メッセージの購読


#include <iostream>
#include <thread>
#include <chrono>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;
using namespace std;

class ExampleDelayMessageListener : public MessageListenerConcurrently {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
            chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
            std::cout << "Now: " << mil.count() << " Received Message Topic:" << item->getTopic() << ", MsgId:"
                      << item->getMsgId() << " DelayTime:" << item->getProperty("__STARTDELIVERTIME") << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
    consumer->setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 
    
    auto start = std::chrono::system_clock::now();

    //register your own listener here to handle the messages received.
    // Use a custom listener function to process the received messages and return the processing results. 
    ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
    consumer->subscribe("YOUR DELAY TOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // The preparation is complete. You must invoke the startup function to start the consumer. 
    // ********************************************
    // 1. Make sure that the subscription is configured before you start the consumer. 
    // 2. Make sure that the subscriptions of consumers in the same consumer group are the same. 
    // *********************************************
    consumer->start();

    // Keep the thread running and do not shut down the consumer. 
    std::this_thread::sleep_for(std::chrono::seconds(600));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

トランザクションメッセージの送受信

トランザクションメッセージの送信

#include <iostream>
#include <chrono>
#include <thread>
#include "TransactionMQProducer.h"
#include "MQClientException.h"
#include "TransactionListener.h"

using namespace std;
using namespace rocketmq;

class ExampleTransactionListener : public TransactionListener {
public:
    LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg) {
        // Execute the local transaction. If the local transaction is executed, COMMIT_MESSAGE is returned. If the local transaction fails to be executed, ROLLBACK_MESSAGE is returned. If the execution status of the local transaction is unknown, UNKNOWN is returned. 
        // If UNKNOWN is returned, the scheduled task to query the status of the local transaction is triggered. 
        std::cout << "Execute Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:"
                  << msg.getBody() << std::endl;
        return UNKNOWN;
    }

    LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) {
        // Query the execution status of the local transaction. If the local transaction is executed, COMMIT_MESSAGE is returned. If the local transaction fails to be executed, ROLLBACK_MESSAGE is returned. If the execution status of the local transaction is unknown, UNKNOWN is returned. 
        // If UNKNOWN is returned, wait until the next scheduled task to query the status of the local transaction is triggered. 
        std::cout << "Check Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:" << msg.getMsgId()
                  << std::endl;
        return COMMIT_MESSAGE;
    }
};

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console.   
    TransactionMQProducer producer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.  // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. producer.setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN.      
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 

    // The transaction listener. 
    ExampleTransactionListener *exampleTransactionListener = new ExampleTransactionListener();
    producer.setTransactionListener(exampleTransactionListener);
    // Start the producer after you configure the required parameters. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 3;
    for (int i = 0; i < count; ++i) {
        // The topic that is used to send transactional messages. You must create the topic in the ApsaraMQ for RocketMQ console in advance. 
        MQMessage msg("YOUR TRANSACTION TOPIC", "HiTAG", "Hello,CPP SDK, Transaction Message.");
        try {
            SendResult sendResult = producer.sendMessageInTransaction(msg, &i);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                      << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    std::cout << "Wait for local transaction check..... " << std::endl;
    for (int i = 0; i < 6; ++i) {
        this_thread::sleep_for(chrono::seconds(10));
        std::cout << "Running "<< i*10 + 10 << " Seconds......"<< std::endl;
    }
    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

トランザクションメッセージの購読

トランザクションメッセージのサブスクライブに使用されるサンプルコードは、通常のメッセージのサブスクライブに使用されるサンプルコードと同じです。 詳細については、「通常のメッセージの送受信」をご参照ください。