全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發事務訊息

更新時間:Jul 01, 2024

本文提供使用TCP協議下的C/C++ SDK收發事務訊息的範例程式碼供您參考。

雲訊息佇列 RocketMQ 版提供類似XA或Open XA的分散式交易功能,通過雲訊息佇列 RocketMQ 版事務訊息,能達到分散式交易的最終一致。

互動流程

事務訊息互動流程如下圖所示。事務訊息

更多資訊,請參見事務訊息

前提條件

您已完成以下操作:

  • 下載C/C++ SDK。更多資訊,請參見版本說明

  • 準備環境。請根據您使用的SDK版本做好環境準備工作:

  • 建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源

  • 擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey

發送事務訊息

發送事務訊息包含以下兩個步驟。

  1. 發送半事務訊息(Half Message)及執行本地事務。範例程式碼如下。

    #include "ONSFactory.h"
    #include "ONSClientException.h"
    using namespace ons;
    
        class MyLocalTransactionExecuter : LocalTransactionExecuter
        {
            MyLocalTransactionExecuter()
            {
            }
    
            ~MyLocalTransactionExecuter()
            {
            }
            virtual TransactionStatus execute(Message &value)
            {
                    // 訊息ID(有可能訊息體一樣,但訊息ID不一樣,當前訊息ID在訊息佇列RocketMQ版控制台無法查詢。)
                    string msgId = value.getMsgID();
                    // 訊息體內容進行crc32,也可以使用其它的如MD5。
                    // 訊息ID和crc32id主要是用來防止訊息重複。
                    // 如果業務本身是等冪的,可以忽略,否則需要利用msgId或crc32Id來做等冪。
                    // 如果要求訊息絕對不重複,推薦做法是對訊息體body使用crc32或MD5來防止重複訊息。
                    TransactionStatus transactionStatus = Unknow;
                    try {
                        boolean isCommit = 本地事務執行結果;
                        if (isCommit) {
                            // 本地事務成功、提交訊息。
                            transactionStatus = CommitTransaction;
                        } else {
                            // 本地事務失敗、復原訊息。
                            transactionStatus = RollbackTransaction;
                        }
                    } catch (...) {
                        //exception handle
                    }
                    return transactionStatus;
            }
        }
    
        int main(int argc, char* argv[])
        {
            //建立Producer和發送訊息所必需的資訊。
            ONSFactoryProperty factoryInfo;
            //您在訊息佇列RocketMQ版控制台建立的Group ID。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
            //設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
            // 您在訊息佇列RocketMQ版控制台建立的Topic。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
            //訊息內容。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
            //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
            //AccessKey ID,阿里雲身分識別驗證標識。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    		    //AccessKey Secret,阿里雲身分識別驗證密鑰。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    
            //建立producer,訊息佇列RocketMQ版不負責pChecker的釋放,需要業務方自行釋放資源。
            MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
            g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
    
            //在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
            pProducer->start();
    
            Message msg(
                //Message Topic
                factoryInfo.getPublishTopics(),
                //Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。       
                "TagA",
                //Message Body,不可為空,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
                factoryInfo.getMessageContent()
            );
    
            // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
            // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
            // 注意:不設定也不會影響訊息正常收發。
            msg.setKey("ORDERID_100");
    
            //發送訊息,只要不拋出異常,就代表發送成功。   
            try
            {
                //訊息佇列RocketMQ版不負責pExecuter的釋放,需要業務方自行釋放資源。
                MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
                SendResultONS sendResult = pProducer->send(msg,pExecuter);
            }
            catch(ONSClientException & e)
            {
                //自訂處理exception的細節。
            }
            // 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。
            pProducer->shutdown();
    
            return 0;
    
        }        
  2. 提交事務訊息狀態,範例程式碼如下。

     class MyLocalTransactionChecker : LocalTransactionChecker
     {
         MyLocalTransactionChecker()
         {
         }
    
         ~MyLocalTransactionChecker()
         {
         }
    
         virtual TransactionStatus check(Message &value)
         {
             // 訊息ID(有可能訊息體一樣,但訊息ID不一樣,當前訊息ID在訊息佇列RocketMQ版控制台無法查詢。)
             string msgId = value.getMsgID();
             // 訊息體內容進行crc32,也可以使用其它的如MD5。
             // 訊息ID和crc32id主要是用來防止訊息重複。
             // 如果業務本身是等冪的,可以忽略,否則需要利用msgId或crc32Id來做等冪。
             // 如果要求訊息絕對不重複,推薦做法是對訊息體body使用crc32或MD5來防止重複訊息。
             TransactionStatus transactionStatus = Unknow;
             try {
                 boolean isCommit = 本地事務執行結果;
                 if (isCommit) {
                     // 本地事務成功、提交訊息。
                     transactionStatus = CommitTransaction;
                 } else {
                     // 本地事務失敗、復原訊息。
                     transactionStatus = RollbackTransaction;
                 }
             } catch(...) {
                 //exception error
             }
             return transactionStatus;
         }
     }               

事務回查機制說明

  • 發送事務訊息為什麼必須要實現回查Check機制?

    當半事務訊息發送完成,但本地事務返回狀態為TransactionStatus.Unknow,或者應用退出導致本地事務未提交任何狀態時,從Broker的角度看,這條半事務訊息的狀態是未知的。因此Broker會定期向訊息發送方即訊息生產者叢集中的任意一生產者執行個體發起訊息回查,要求發送方回查該Half狀態訊息,並上報其最終狀態。

  • Check被回調時,商務邏輯都需要做些什嗎?

    事務訊息的Check方法裡面,應該寫一些檢查事務一致性的邏輯。雲訊息佇列 RocketMQ 版發送事務訊息時需要實現LocalTransactionChecker介面,用來處理Broker主動發起的本地事務狀態回查請求,因此在事務訊息的Check方法中,需要完成兩件事情:

    1. 檢查該半事務訊息對應的本地事務的狀態(committed or rollback)。

    2. 向Broker提交該半事務訊息本地事務的狀態。

訂閱事務訊息

事務訊息的訂閱與普通訊息訂閱一致,更多資訊,請參見訂閱訊息