全部產品
Search
文件中心

ApsaraMQ for RocketMQ:事務訊息

更新時間:Jul 01, 2024

雲訊息佇列 RocketMQ 版提供的分散式交易訊息適用於所有對資料最終一致性有強需求的情境。本文介紹雲訊息佇列 RocketMQ 版事務訊息的概念、優勢、典型情境、互動流程、使用規則以及範例程式碼。

概念介紹

  • 事務訊息:雲訊息佇列 RocketMQ 版提供類似XA或Open XA的分散式交易功能,通過雲訊息佇列 RocketMQ 版事務訊息能達到分散式交易的最終一致。
  • 半事務訊息:暫不能投遞的訊息,生產者已經成功地將訊息發送到了雲訊息佇列 RocketMQ 版服務端,但是雲訊息佇列 RocketMQ 版服務端未收到生產者對該訊息的二次確認,此時該訊息被標記成“暫不能投遞”狀態,處於該種狀態下的訊息即半事務訊息。
  • 訊息回查:由於網路閃斷、生產者應用重啟等原因,導致某條事務訊息的二次確認丟失,雲訊息佇列 RocketMQ 版服務端通過掃描發現某條訊息長期處於“半事務訊息”時,需要主動向訊息生產者詢問該訊息的最終狀態(Commit或是Rollback),該詢問過程即訊息回查。

分散式交易訊息的優勢

雲訊息佇列 RocketMQ 版分散式交易訊息不僅可以實現應用之間的解耦,又能保證資料的最終一致性。同時,傳統的大事務可以被拆分為小事務,不僅能提升效率,還不會因為某一個關聯應用的不可用導致整體復原,從而最大限度保證核心系統的可用性。在極端情況下,如果關聯的某一個應用始終無法處理成功,也只需對當前應用進行補償或資料訂正處理,而無需對整體業務進行復原。

典型情境

在淘寶購物車下單時,涉及到購物車系統和交易系統,這兩個系統之間的資料最終一致性可以通過分散式交易訊息的非同步處理實現。在這種情境下,交易系統是最為核心的系統,需要最大限度地保證下單成功。而購物車系統只需要訂閱雲訊息佇列 RocketMQ 版的交易訂單訊息,做相應的業務處理,即可保證最終的資料一致性。

互動流程

事務訊息互動流程如下圖所示。事務訊息

事務訊息發送步驟如下:

  1. 生產者將半事務訊息發送至雲訊息佇列 RocketMQ 版服務端。
  2. 雲訊息佇列 RocketMQ 版服務端將訊息持久化成功之後,向生產者返回Ack確認訊息已經發送成功,此時訊息為半事務訊息。
  3. 生產者開始執行本地事務邏輯。
  4. 生產者根據本地事務執行結果向服務端提交二次確認結果(Commit或是Rollback),服務端收到確認結果後處理邏輯如下:
    • 二次確認結果為Commit:服務端將半事務訊息標記為可投遞,並投遞給消費者。
    • 二次確認結果為Rollback:服務端將復原事務,不會將半事務訊息投遞給消費者。
  5. 在斷網或者是生產者應用重啟的特殊情況下,若服務端未收到寄件者提交的二次確認結果,或服務端收到的二次確認結果為Unknown未知狀態,經過固定時間後,服務端將對訊息生產者即生產者叢集中任一生產者執行個體發起訊息回查。

事務訊息回查步驟如下:

  1. 生產者收到訊息回查後,需要檢查對應訊息的本地事務執行的最終結果。
  2. 生產者根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務訊息進行處理。

使用規則

生產郵件規則

  • 事務訊息發送完成本地事務後,可在execute方法中返回以下三種狀態:
    • TransactionStatus.CommitTransaction:提交事務,允許消費者消費該訊息。
    • TransactionStatus.RollbackTransaction:復原事務,訊息將被丟棄不允許消費。
    • TransactionStatus.Unknow:暫時無法判斷狀態,等待固定時間以後雲訊息佇列 RocketMQ 版服務端根據回查規則向生產者進行訊息回查。
  • 通過ONSFactory.createTransactionProducer建立事務訊息的Producer時必須指定LocalTransactionChecker的實作類別,處理異常情況下事務訊息的回查。
  • 回查規則:本地事務執行完成後,若雲訊息佇列 RocketMQ 版服務端收到的本地事務返回狀態為TransactionStatus.Unknow,或生產者應用退出導致本地事務未提交任何狀態。則雲訊息佇列 RocketMQ 版服務端會向訊息生產者發起事務回查,第一次回查後仍未擷取到事務狀態,則之後每隔一段時間會再次回查。
    • 回查間隔時間:系統預設每隔30秒發起一次定時任務,對未提交的半事務訊息進行回查,共持續12小時。
    • 第一次訊息回查最快時間:該參數支援自訂設定。若指定訊息未達到設定的最快回查時間前,系統預設每隔30秒一次的回查任務不會檢查該訊息。
      以Java為例,以下設定表示:第一次回查的最快時間為60秒。
      Message message = new Message();
      message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"60");
      說明

      因為系統預設的回查間隔,第一次訊息回查的實際時間會向後有0秒~30秒的浮動。

      例如:指定訊息的第一次訊息最快回查時間設定為60秒,系統在第58秒時達到定時的回查時間,但設定的60秒未到,所以該訊息不在本次回查範圍內。等待間隔30秒後,下一次的系統回查時間在第88秒,該訊息才符合條件進行第一次回查,距設定的最快回查時間延後了28秒。

消費郵件規則

  • 事務訊息的Group ID不能與其他類型訊息的Group ID共用。與其他類型的訊息不同,事務訊息有回查機制,回查時雲訊息佇列 RocketMQ 版服務端會根據Group ID去查詢生產者用戶端。

範例程式碼

收發事務訊息的範例程式碼如下: