All Products
Search
Document Center

ApsaraMQ for RocketMQ:Sample code

Last Updated:Aug 14, 2024

ApsaraMQ for RocketMQ 5.x instances are compatible with clients that use RocketMQ 3.x or 4.x SDKs. You can use the SDKs to connect to ApsaraMQ for RocketMQ 5.x instances to send and receive messages. This topic provides the sample code that is used to send and receive messages by using RocketMQ 3.x or 4.x SDK for C++.

Important
  • We recommend that you use the latest RocketMQ 5.x SDKs. These SDKs are fully compatible with ApsaraMQ for RocketMQ 5.x brokers and provides more functions and enhanced features. For more information, see Release notes.

  • Alibaba Cloud only maintains RocketMQ 4.x, 3.x, and TCP client SDKs. We recommend that you use them only for existing business.

Send and receive normal messages

Send normal messages

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console.       
    DefaultMQProducer producer("GID_XXX");          
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.  
    producer.setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a virtual private cloud (VPC), you do not need to specify the username or password.   
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 

    // Start the producer after you configure the required parameters. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    for (int i = 0; i < count; ++i) {
        // The topic that is used to send normal messages. You must create the topic in the ApsaraMQ for RocketMQ console in advance. 
        MQMessage msg("YOURTOPIC","HiTAG","HelloCPPSDK.");
        try {
            SendResult sendResult = producer.send(msg);
            std::cout <<"SendResult:"<<sendResult.getSendStatus()<< ", Message ID: " << sendResult.getMsgId() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

Subscribe to normal messages

#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;


class ExampleMessageListener : public MessageListenerConcurrently {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
    consumer->setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 
    
    auto start = std::chrono::system_clock::now();

    // Use a custom listener function to process the received messages and return the processing results. 
    ExampleMessageListener *messageListener = new ExampleMessageListener();
    consumer->subscribe("YOURTOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // The preparation is complete. You must invoke the start() function to start the consumer. 
    // ********************************************
    // 1. Make sure that the subscription is configured before you start the consumer. 
    // 2. Make sure that the subscriptions of consumers in the same consumer group are the same. 
    // *********************************************
    consumer->start();

    // Keep the thread running and do not shut down the consumer. 
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

Send and receive ordered messages

Send ordered messages

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

class ExampleSelectMessageQueueByHash : public MessageQueueSelector {
public:
    MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) {
        // To customize partitions, calculate the queue to which a message is routed based on the arg parameter that specifies the partition key. In this example, the arg parameter of the INT type is used. 
        int orderId = *static_cast<int *>(arg);
        int index = orderId % mqs.size();
        return mqs[0];
    }
};

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
    DefaultMQProducer producer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
    producer.setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 

    // Start the producer after you configure the required parameters. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    // You can configure the number of retries for a message to make sure that the message is sent. 
    int retryTimes = 1;
    // Use the configured custom parameter arg to calculate the queue to which a message is sent and send the message to the specified queue. The arg parameter specifies the partition ID. For information about how to select a message queue, see the MessageQueueSelector operation. 
    ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash();
    for (int i = 0; i < count; ++i) {
        // The topic that is used to send ordered messages. You must create the topic in the ApsaraMQ for RocketMQ console in advance. 
        MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message.");
        try {
            SendResult sendResult = producer.send(msg, pSelector, &i, 1, false);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                      << "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

Subscribe to ordered messages


#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;


class ExampleOrderlyMessageListener : public MessageListenerOrderly {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
    consumer->setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 
 
    auto start = std::chrono::system_clock::now();

    // Use a custom listener function to process the received messages and return the processing results. 
    ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener();
    consumer->subscribe("YOUR ORDERLY TOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // The preparation is complete. You must invoke the start() function to start the consumer. 
    // ********************************************
    // 1. Make sure that the subscription is configured before you start the consumer. 
    // 2. Make sure that the subscriptions of consumers in the same consumer group are the same. 
    // *********************************************
    consumer->start();

    // Keep the thread running and do not shut down the consumer. 
    std::this_thread::sleep_for(std::chrono::seconds (60 ));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

Send and receive scheduled or delayed messages

Send scheduled or delayed messages

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
    DefaultMQProducer producer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
    producer.setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
     // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 

    // Start the producer after you configure the required parameters. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    for (int i = 0; i < count; ++i) {
        // The topic that is used to send scheduled or delayed messages. You must create the topic in the ApsaraMQ for RocketMQ console in advance. 
        MQMessage msg("YOUR DELAY TOPIC", "HiTAG", "Hello,CPP SDK, Delay Message.");
        chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
        chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
        // The time when the ApsaraMQ for RocketMQ broker delivers the message to the consumer. Unit: milliseconds. For example, if you set this parameter to 2022-08-07 16:21:00, the broker delivers the message at 16:21:00 on August 7, 2022. The value must be later than the current time. 
        If you set the parameter to a time earlier than the current time, the message is immediately delivered to the consumer. 
        // The scheduled or delayed time. For example, if you set the scheduled or delayed time to 10000, the message is sent 10 seconds after the current time. 
        long exp = mil.count() + 10000;
        msg.setProperty("__STARTDELIVERTIME", to_string(exp));
        std::cout << "Now: " << mil.count() << " Exp:" << exp << std::endl;
        try {
            SendResult sendResult = producer.send(msg);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                      << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

Subscribe to scheduled or delayed messages


#include <iostream>
#include <thread>
#include <chrono>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;
using namespace std;

class ExampleDelayMessageListener : public MessageListenerConcurrently {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
            chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
            std::cout << "Now: " << mil.count() << " Received Message Topic:" << item->getTopic() << ", MsgId:"
                      << item->getMsgId() << " DelayTime:" << item->getProperty("__STARTDELIVERTIME") << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
    consumer->setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN. 
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 
    
    auto start = std::chrono::system_clock::now();

    //register your own listener here to handle the messages received.
    // Use a custom listener function to process the received messages and return the processing results. 
    ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
    consumer->subscribe("YOUR DELAY TOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // The preparation is complete. You must invoke the startup function to start the consumer. 
    // ********************************************
    // 1. Make sure that the subscription is configured before you start the consumer. 
    // 2. Make sure that the subscriptions of consumers in the same consumer group are the same. 
    // *********************************************
    consumer->start();

    // Keep the thread running and do not shut down the consumer. 
    std::this_thread::sleep_for(std::chrono::seconds(600));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

Send and receive transactional messages

Send transactional messages

#include <iostream>
#include <chrono>
#include <thread>
#include "TransactionMQProducer.h"
#include "MQClientException.h"
#include "TransactionListener.h"

using namespace std;
using namespace rocketmq;

class ExampleTransactionListener : public TransactionListener {
public:
    LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg) {
        // Execute the local transaction. If the local transaction is executed, COMMIT_MESSAGE is returned. If the local transaction fails to be executed, ROLLBACK_MESSAGE is returned. If the execution status of the local transaction is unknown, UNKNOWN is returned. 
        // If UNKNOWN is returned, the scheduled task to query the status of the local transaction is triggered. 
        std::cout << "Execute Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:"
                  << msg.getBody() << std::endl;
        return UNKNOWN;
    }

    LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) {
        // Query the execution status of the local transaction. If the local transaction is executed, COMMIT_MESSAGE is returned. If the local transaction fails to be executed, ROLLBACK_MESSAGE is returned. If the execution status of the local transaction is unknown, UNKNOWN is returned. 
        // If UNKNOWN is returned, wait until the next scheduled task to query the status of the local transaction is triggered. 
        std::cout << "Check Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:" << msg.getMsgId()
                  << std::endl;
        return COMMIT_MESSAGE;
    }
};

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // The ID of the group that you created in the ApsaraMQ for RocketMQ console.   
    TransactionMQProducer producer("GID_XXX");
    // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.  // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. producer.setNamesrvAddr("ACCESS POINT");
    // Specify the instance username, instance password, and user channel. The default user channel is ALIYUN.      
    // If you connect the ApsaraMQ for RocketMQ instance over the Internet, you must specify the username and password. If you connect to the ApsaraMQ for RocketMQ instance in a VPC, you do not need to specify the username or password. 
    // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
    // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Note: If you use RocketMQ 3.x or 4.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.0 instance, you do not need to specify the instance ID. Otherwise, the access fails. 

    // The transaction listener. 
    ExampleTransactionListener *exampleTransactionListener = new ExampleTransactionListener();
    producer.setTransactionListener(exampleTransactionListener);
    // Start the producer after you configure the required parameters. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 3;
    for (int i = 0; i < count; ++i) {
        // The topic that is used to send transactional messages. You must create the topic in the ApsaraMQ for RocketMQ console in advance. 
        MQMessage msg("YOUR TRANSACTION TOPIC", "HiTAG", "Hello,CPP SDK, Transaction Message.");
        try {
            SendResult sendResult = producer.sendMessageInTransaction(msg, &i);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                      << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    std::cout << "Wait for local transaction check..... " << std::endl;
    for (int i = 0; i < 6; ++i) {
        this_thread::sleep_for(chrono::seconds(10));
        std::cout << "Running "<< i*10 + 10 << " Seconds......"<< std::endl;
    }
    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

Subscribe to transactional messages

The sample code that is used to subscribe to transactional messages is the same as the sample code that is used to subscribe to normal messages. For more information, see Send and receive normal messages.