全部產品
Search
文件中心

ApsaraMQ for RocketMQ:範例程式碼

更新時間:Jul 23, 2024

雲訊息佇列 RocketMQ 版5.x版本執行個體可相容C++ 1.x/2.x SDK用戶端接入,您可以使用1.x/2.x版本的SDK接入5.x執行個體進行訊息收發。本文為您介紹1.x/2.x版本下的C++ SDK訊息收發範例程式碼。

重要
  • 推薦您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作為主力研發版本,和雲訊息佇列 RocketMQ 版5.x服務端完全相容,提供了更全面的功能並支援更多增強特性。更多資訊,請參見5.x系列SDK
  • RocketMQ 4.x/3.x系列SDK和ONS系列SDK後續僅做功能維護,建議僅存量業務使用。

普通訊息收發樣本

發送普通訊息

#include "ONSFactory.h"
#include "ONSClientException.h"

using namespace ons;

int main()
{

    // 建立Producer,並配置發送訊息所必需的資訊。
    ONSFactoryProperty factoryInfo;  
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
    // 您在訊息佇列RocketMQ版控制台建立的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // 訊息內容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    /**
    * 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
    * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
    * 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
    * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    */  
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。

    // 建立Producer;
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
    pProducer->start();

    Message msg(
            // Message Topic。
            factoryInfo.getPublishTopics(),
            // Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。       
            "TagA",
            // Message Body,不可為空,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
            factoryInfo.getMessageContent()
    );

    // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
    // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
    // 注意:不設定也不會影響訊息正常收發。
    msg.setKey("ORDERID_100");

    // 發送訊息,只要不拋出異常,就代表發送成功。     
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // 自訂處理exception的細節。
    }
    // 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。
    pProducer->shutdown();

    return 0;
}

訂閱普通訊息

#include "ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        // 此處為具體的訊息處理過程,確認訊息被處理成功請返回CommitMessage。
        // 如果有消費異常,或者期望重新消費,可以返回ReconsumeLater,訊息將會在一段時間後重新投遞。
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    ONSFactoryProperty factoryInfo;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。從執行個體化的版本開始,ProducerId和CounsumerId已經統一,此處設定是為了介面保持向前相容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
    /**
    * 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
    * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
    * 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
    * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    */
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
    // 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // 設定為您在訊息佇列RocketMQ版控制台上建立的Topic。
    const char* topic_1 = "topic-1";
    // 訂閱topic-1中Tag訊息屬性為tag-1的所有訊息。
    const char* tag_1 = "tag-1";

    const char* topic_2 = "topic-2";
    // 訂閱topic-2的所有訊息。
    const char* tag_2 = "*";


    // 請註冊自訂偵聽函數用來處理接收到的訊息,並返迴響應的處理結果。
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    // 準備工作完成,必須調用啟動函數,才可以正常工作。
    consumer->start();

    // 請保持線程常駐,不要執行shutdown操作。
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

順序訊息收發樣本

發送順序訊息

#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;

int main()
{
    // Producer建立和正常工作的參數,必須輸入。
    ONSFactoryProperty factoryInfo;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
    // 您在訊息佇列RocketMQ版控制台建立的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // 訊息內容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    /**
    * 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
    * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
    * 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
    * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    */  
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
    
    // 建立Producer。
    OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);


    // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
    pProducer->start();

    Message msg(
                // Message Topic。
                factoryInfo.getPublishTopics(),
                // Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
                "TagA",
                // Message Body,任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
                factoryInfo.getMessageContent()
    );

    // 設定代表訊息的業務屬性,請儘可能全域唯一。
    // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
    // 注意:不設定也不會影響訊息正常收發。
    msg.setKey("ORDERID_100");
    // 分區順序訊息中區分不同分區的關鍵字段。
    // 如果是全域順序訊息,該欄位可以設定為任意非Null 字元串。
    std::string shardingKey = "abc";  
    // 帶有同一Sharding Key的訊息會按照順序發送。
    try
    {
        // 發送訊息,只要不拋異常就是成功。
        SendResultONS sendResult = pProducer->send(msg, shardingKey);
    std::cout << "send success" << std::endl;
    }
    catch(ONSClientException & e)
    {
        // 添加對exception的處理。
    }
    // 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。
    pProducer->shutdown();

    return 0;
}           

訂閱順序訊息

#include "ONSFactory.h"
using namespace std;
using namespace ons;

// 建立消費訊息的執行個體。
// pushConsumer拉取到訊息後,會主動調用該執行個體的consumeMessage函數。
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
    MyMsgListener()
    {
    }
    virtual ~MyMsgListener()
    {
    }

    virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
    {
        // 根據業務需求,消費訊息。
        return Success; //CONSUME_SUCCESS;
    }
};


int main(int argc, char* argv[])
{
    // OrderConsumer建立和工作需要的參數,必須輸入。
    ONSFactoryProperty factoryInfo;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
    // 設定為您在訊息佇列RocketMQ版控制台上建立的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    /**
    * 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
    * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
    * 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
    * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    */  
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
    // 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。

    // 建立orderConsumer。
    OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
    MyMsgListener  msglistener;
    // 指定orderConsumer訂閱的訊息Topic和訊息Tag。
    orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    // 註冊訊息監聽的處理執行個體,orderConsumer拉取到訊息後,會調用該類的consumeMessage函數。

    // 啟動orderConsumer。
    orderConsumer->start();

    for(volatile int i = 0; i < 10; ++i) {
        // wait
    }

    // 銷毀orderConsumer, 在應用退出前,必須銷毀Consumer對象,否則會導致記憶體泄露等問題。
    orderConsumer->shutdown();

   return 0;
}        

定時/延時訊息收發樣本

發送定時/延時訊息

#include "ONSFactory.h"
#include "ONSClientException.h"

#include <windows.h>
using namespace ons;
int main()
{

    // 建立Producer,並配置發送訊息所必需的資訊。
    ONSFactoryProperty factoryInfo;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
    // 您在訊息佇列RocketMQ版控制台建立的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // 訊息內容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    /**
    * 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
    * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
    * 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
    * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    */  
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。

    // 建立Producer。
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
    pProducer->start();

    Message msg(
            // Message Topic。
            factoryInfo.getPublishTopics(),
            // Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾       
            "TagA",
            // Message Body,不可為空,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
            factoryInfo.getMessageContent()
    );

    // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
    // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
    // 注意:不設定也不會影響訊息正常收發。
    msg.setKey("ORDERID_100");

    // deliver time單位ms,指定一個時刻,在這個時刻之後訊息才能被消費,這個例子表示3s後才能被消費。
    long deliverTime = GetTickCount64() + 3000;
    msg.setStartDeliverTime(deliverTime);

    // 發送訊息,只要不拋出異常,就代表發送成功。     
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // 自訂處理exception的細節。
    }

    // 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。
    pProducer->shutdown();

    return 0;
}
            

訂閱定時/延時訊息

訂閱定時/延時訊息的範例程式碼和訂閱普通訊息一樣,請參見訂閱普通訊息

事務訊息收發樣本

發送事務訊息

  1. 發送半事務訊息(Half Message)及執行本地事務,範例程式碼如下。

    #include "ONSFactory.h"
    #include "ONSClientException.h"
    using namespace ons;
    
        class MyLocalTransactionExecuter : LocalTransactionExecuter
        {
            MyLocalTransactionExecuter()
            {
            }
    
            ~MyLocalTransactionExecuter()
            {
            }
            virtual TransactionStatus execute(Message &value)
            {
                    // 訊息ID(有可能訊息體一樣,但訊息ID不一樣,當前訊息ID在訊息佇列RocketMQ版控制台無法查詢。)
                    string msgId = value.getMsgID();
                    // 訊息體內容進行crc32,也可以使用其它的如MD5。
                    // 訊息ID和crc32id主要是用來防止訊息重複。
                    // 如果業務本身是等冪的,可以忽略,否則需要利用msgId或crc32Id來做等冪。
                    // 如果要求訊息絕對不重複,推薦做法是對訊息體body使用crc32或MD5來防止重複訊息。
                    TransactionStatus transactionStatus = Unknow;
                    try {
                        boolean isCommit = 本地事務執行結果;
                        if (isCommit) {
                            // 本地事務成功、提交訊息。
                            transactionStatus = CommitTransaction;
                        } else {
                            // 本地事務失敗、復原訊息。
                            transactionStatus = RollbackTransaction;
                        }
                    } catch (...) {
                        //exception handle
                    }
                    return transactionStatus;
            }
        }
    
        int main(int argc, char* argv[])
        {
            //建立Producer和發送訊息所必需的資訊。
            ONSFactoryProperty factoryInfo;
            // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
            // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
            // 您在訊息佇列RocketMQ版控制台建立的Topic。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
            // 訊息內容。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
            /**
            * 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
            * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
            * 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
            * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
            */  
            // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
            // 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
    
            // 建立producer,訊息佇列RocketMQ版不負責pChecker的釋放,需要業務方自行釋放資源。
            MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
            g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
    
            // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
            pProducer->start();
    
            Message msg(
                // Message Topic。
                factoryInfo.getPublishTopics(),
                // Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。       
                "TagA",
                // Message Body,不可為空,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
                factoryInfo.getMessageContent()
            );
    
            // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
            // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
            // 注意:不設定也不會影響訊息正常收發。
            msg.setKey("ORDERID_100");
    
            // 發送訊息,只要不拋出異常,就代表發送成功。   
            try
            {
                //訊息佇列RocketMQ版不負責pExecuter的釋放,需要業務方自行釋放資源。
                MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
                SendResultONS sendResult = pProducer->send(msg,pExecuter);
            }
            catch(ONSClientException & e)
            {
                // 自訂處理exception的細節。
            }
            // 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。
            pProducer->shutdown();
    
            return 0;
    
        }        
  2. 提交事務訊息狀態,範例程式碼如下。

     class MyLocalTransactionChecker : LocalTransactionChecker
     {
         MyLocalTransactionChecker()
         {
         }
    
         ~MyLocalTransactionChecker()
         {
         }
    
         virtual TransactionStatus check(Message &value)
         {
             // 訊息ID(有可能訊息體一樣,但訊息ID不一樣,當前訊息ID在訊息佇列RocketMQ版控制台無法查詢。)
             string msgId = value.getMsgID();
             // 訊息體內容進行crc32,也可以使用其它的如MD5。
             // 訊息ID和crc32id主要是用來防止訊息重複。
             // 如果業務本身是等冪的,可以忽略,否則需要利用msgId或crc32Id來做等冪。
             // 如果要求訊息絕對不重複,推薦做法是對訊息體body使用crc32或MD5來防止重複訊息。
             TransactionStatus transactionStatus = Unknow;
             try {
                 boolean isCommit = 本地事務執行結果;
                 if (isCommit) {
                     // 本地事務成功、提交訊息。
                     transactionStatus = CommitTransaction;
                 } else {
                     // 本地事務失敗、復原訊息。
                     transactionStatus = RollbackTransaction;
                 }
             } catch(...) {
                 //exception error
             }
             return transactionStatus;
         }
     }               

訂閱事務訊息

訂閱事務訊息的範例程式碼和訂閱普通訊息一樣,請參見訂閱普通訊息