全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發事務訊息

更新時間:Jul 01, 2024

本文提供使用TCP協議下的.NET SDK收發事務訊息的範例程式碼。目前支援的地區包括公網、華東1(杭州)、華北2(北京)、華東2(上海)、華南1(深圳)。

雲訊息佇列 RocketMQ 版提供類似XA或Open XA的分散式交易功能,通過雲訊息佇列 RocketMQ 版事務訊息,能達到分散式交易的最終一致。

互動流程

事務訊息互動流程如下圖所示。事務訊息互動流程

更多資訊,請參見事務訊息

前提條件

  • 下載.NET SDK。更多資訊,請參見版本說明

  • 環境準備。更多資訊,請參見環境準備

  • 建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源

  • 擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey

發送事務訊息

發送事務訊息包含以下兩個步驟:

  1. 發送半事務訊息(Half Message)及執行本地事務。範例程式碼如下。

     using System;
     using System.Collections.Generic;
     using System.Linq;
     using System.Text;
     using System.Runtime.InteropServices;
     using ons;
    
     namespace ons
     {
     public class MyLocalTransactionExecuter : LocalTransactionExecuter
     {
         public MyLocalTransactionExecuter()
         {
         }
    
         ~MyLocalTransactionExecuter()
         {
         }
         public override TransactionStatus execute(Message value)
         {
                 Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                 value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser"));
    
                 // 訊息ID(有可能訊息體一樣,但訊息ID不一樣。當前訊息ID在訊息佇列RocketMQ版控制台無法查詢)。
                 string msgId = value.getMsgID();
                 // 訊息體內容進行crc32, 也可以使用其它的如MD5。
                 // 訊息ID和crc32id主要是用來防止訊息重複。
                 // 如果要求訊息絕對不重複,推薦做法是對訊息體body使用crc32或MD5來防止重複訊息。
    
                 TransactionStatus transactionStatus = TransactionStatus.Unknow;
                 try {
                     boolean isCommit = 本地事務執行結果;
                     if (isCommit) {
                         // 本地事務成功則提交訊息。
                         transactionStatus = TransactionStatus.CommitTransaction;
                     } else {
                         // 本地事務失敗則復原訊息。
                         transactionStatus = TransactionStatus.RollbackTransaction;
                     }
                 } catch (Exception e) {
                     // 處理異常。
                 }
                 return transactionStatus;
         }
     }
     class onscsharp
     {
    
         static void Main(string[] args)
         {
             ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
             // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
             factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "XXX");
             // 您在訊息佇列RocketMQ版控制台建立的Topic。
             factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "");
             // 訊息內容。
             factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "");
             //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
             //AccessKey ID,阿里雲身分識別驗證標識。
             factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    	       //AccessKey Secret,阿里雲身分識別驗證密鑰。
             factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            
             //create transaction producer                
             LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
             TransactionProducer pProducer =ONSFactory.getInstance().createTransactionProducer(factoryInfo, myChecker);
    
             // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可,啟動之後可以多線程並發發送訊息。
             pProducer.start();
    
                 Message msg = new Message(
                 //Message Topic
                 factoryInfo.getPublishTopics(),
                 //Message Tag
                 "TagA",
                 //Message Body
                 factoryInfo.getMessageContent()
             );
    
             // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
             // 以方便您在無法正常收到訊息情況下,可通訊息佇列RocketMQ版過控制台查詢訊息並補發。
             // 注意:不設定也不會影響訊息正常收發。
             msg.setKey("ORDERID_100");
    
             // 發送訊息,只要不拋出異常,就代表發送成功。
             try
             {
                 LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
                 SendResultONS sendResult = pProducer.send(msg, myExecuter);
             }
             catch(ONSClientException e)
             {
                 Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
             }
    
             // 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。
             // shutdown之後不能重新start此producer。
             pProducer.shutdown();
         }
     }
     }
  2. 提交事務訊息狀態。

    當本地事務執行完成(執行成功或執行失敗),需要通知伺服器當前訊息的事務狀態。 通知方式有以下兩種:

    • 執行本地事務完成後提交。

    • 執行本地事務一直沒提交狀態,等待伺服器回查訊息的事務狀態。

    事務狀態有以下三種:

    • TransactionStatus.CommitTransaction 提交事務,允許訂閱者消費該訊息。

    • TransactionStatus.RollbackTransaction 復原事務,訊息將被丟棄不允許消費。

    • TransactionStatus.Unknow 無法判斷狀態,期待Broker向發送方再次詢問該訊息對應的本地事務的狀態。

    public class MyLocalTransactionChecker : LocalTransactionChecker
    {
        public MyLocalTransactionChecker()
        {
        }
        ~MyLocalTransactionChecker()
        {
        }
        public override TransactionStatus check(Message value)
        {
                Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser"));
                // 訊息ID(有可能訊息體一樣,但訊息ID不一樣。當前訊息ID在訊息佇列RocketMQ版控制台無法查詢)。
                string msgId = value.getMsgID();
                // 訊息體內容進行crc32,也可以使用其它的如MD5。
                // 訊息ID和crc32id主要是用來防止訊息重複。
                // 如果業務本身是等冪的,可以忽略,否則需要利用msgId或crc32Id來做等冪。
                // 如果要求訊息絕對不重複,推薦做法是對訊息體body使用crc32或MD5來防止重複訊息。
                TransactionStatus transactionStatus = TransactionStatus.Unknow;
                try {
                    boolean isCommit = 本地事務執行結果;
                    if (isCommit) {
                        // 本地事務成功、提交訊息。
                        transactionStatus = TransactionStatus.CommitTransaction;
                    } else {
                        // 本地事務失敗、復原訊息。
                        transactionStatus = TransactionStatus.RollbackTransaction;
                    }
                } catch (Exception e) {
                    //exception handle
                }
                return transactionStatus;
        }
        }

    事務回查機制說明

    • 發送事務訊息為什麼必須要實現Check機制?

      當步驟1半事務訊息發送完成,但本地事務返回狀態為TransactionStatus.Unknow時,亦或是應用退出導致本地事務未提交任何狀態時,從Broker的角度看,這條半狀態的訊息的狀態是未知的,因此Broker會定期向訊息發送方即訊息生產者叢集中的任意一生產者執行個體發起訊息回查,要求發送方回查該Half狀態訊息,並上報其最終狀態。

    • Check被回調時,商務邏輯都需要做些什嗎?

      事務訊息的Check方法裡面,應該寫一些檢查事務一致性的邏輯。雲訊息佇列 RocketMQ 版發送事務訊息時需要實現LocalTransactionChecker介面,用來處理Broker主動發起的本地事務狀態回查請求;因此在事務訊息的Check方法中,需要完成兩件事情:

      1. 檢查該半事務訊息對應的本地事務的狀態(committed or rollback)。

      2. 向Broker提交該半事務訊息本地事務的狀態。

    • 本地事務的不同狀態對半事務訊息的影響:

      • TransactionStatus.CommitTransaction提交事務,允許訂閱者消費該訊息。

      • TransactionStatus.RollbackTransaction復原事務,訊息將被丟棄不允許消費。

      • TransactionStatus.Unknow無法判斷狀態,期待Broker向發送方再次詢問該訊息對應的本地事務的狀態。

      具體代碼的更多資訊,請參見MyLocalTransactionChecker的實現。

訂閱事務訊息

事務訊息的訂閱與普通訊息訂閱一致,更多資訊,請參見訂閱訊息