全部產品
Search
文件中心

ApsaraMQ for RocketMQ:事務訊息

更新時間:Jul 23, 2024

事務訊息為雲訊息佇列 RocketMQ 版中的進階特性訊息,本文為您介紹事務訊息的應用情境、功能原理、使用限制、使用方法和使用建議。

應用情境

分散式交易的訴求

分布式系統調用的特點為一個核心商務邏輯的執行,同時需要調用多個下遊業務進行處理。因此,如何保證核心業務和多個下遊業務的執行結果完全一致,是分散式交易需要解決的主要問題。

事務訊息訴求

以電商交易情境為例,使用者支付訂單這一核心操作的同時會涉及到下遊物流發貨、積分變更、購物車狀態清空等多個子系統的變更。當前業務的處理分支包括:

  • 主分支訂單系統狀態更新:由未支付變更為支付成功。

  • 物流系統狀態新增:新增待發貨物流記錄,建立訂單物流記錄。

  • 積分系統狀態變更:變更使用者積分,更新使用者積分表。

  • 購物車系統狀態變更:清空購物車,更新使用者購物車記錄。

傳統XA事務方案:效能不足

為了保證上述四個分支的執行結果一致性,典型方案是基於XA協議的分散式交易系統來實現。將四個調用分支封裝成包含四個獨立事務分支的大事務。基於XA分散式交易的方案可以滿足業務處理結果的正確性,但最大的缺點是多分支環境下資源鎖定範圍大,並發度低,隨著下遊分支的增加,系統效能會越來越差。

基於普通訊息方案:一致性保障困難

將上述基於XA事務的方案進行簡化,將訂單系統變更作為本地事務,剩下的系統變更作為普通訊息的下遊來執行,事務分支簡化成普通訊息+訂單表事務,充分利用訊息非同步化的能力縮短鏈路,提高並發度。

普通訊息方案

該方案中訊息下遊分支和訂單系統變更的主分支很容易出現不一致的現象,例如:

  • 訊息發送成功,訂單沒有執行成功,需要復原整個事務。

  • 訂單執行成功,訊息沒有發送成功,需要額外補償才能發現不一致。

  • 訊息發送逾時未知,此時無法判斷需要復原訂單還是提交訂單變更。

基於雲訊息佇列 RocketMQ 版分散式交易訊息:支援最終一致性

上述普通訊息方案中,普通訊息和訂單事務無法保證一致的原因,本質上是由於普通訊息無法像單機資料庫事務一樣,具備提交、復原和統一協調的能力。

而基於雲訊息佇列 RocketMQ 版實現的分散式交易訊息功能,在普通訊息基礎上,支援二階段的提交能力。將二階段提交和本地事務綁定,實現全域提交結果的一致性。

事務訊息

雲訊息佇列 RocketMQ 版事務訊息的方案,具備高效能、可擴充、業務開發簡單的優勢。具體事務訊息的原理和流程,請參見下文的功能原理

功能原理

什麼是事務訊息

事務訊息是雲訊息佇列 RocketMQ 版提供的一種進階訊息類型,支援在分布式情境下保障訊息生產和本地事務的最終一致性。

事務訊息處理流程

事務訊息互動流程如下圖所示。事務訊息

  1. 生產者將訊息發送至雲訊息佇列 RocketMQ 版服務端。

  2. 雲訊息佇列 RocketMQ 版服務端將訊息持久化成功之後,向生產者返回Ack確認訊息已經發送成功,此時訊息被標記為“暫不能投遞”,這種狀態下的訊息即為半事務訊息。

  3. 生產者開始執行本地事務邏輯。

  4. 生產者根據本地事務執行結果向服務端提交二次確認結果(Commit或是Rollback),服務端收到確認結果後處理邏輯如下:

    • 二次確認結果為Commit:服務端將半事務訊息標記為可投遞,並投遞給消費者。

    • 二次確認結果為Rollback:服務端將復原事務,不會將半事務訊息投遞給消費者。

  5. 在斷網或者是生產者應用重啟的特殊情況下,若服務端未收到寄件者提交的二次確認結果,或服務端收到的二次確認結果為Unknown未知狀態,經過固定時間後,服務端將對訊息生產者即生產者叢集中任一生產者執行個體發起訊息回查。

    說明

    服務端回查的間隔時間和最大回查次數,請參見參數限制

  6. 生產者收到訊息回查後,需要檢查對應訊息的本地事務執行的最終結果。

  7. 生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務訊息進行處理。

事務訊息生命週期

事務訊息

  • 初始化

    半事務訊息被生產者構建並完成初始化,待發送到服務端的狀態。

  • 事務待提交

    半事務訊息被發送到服務端,和普通訊息不同,並不會直接被服務端持久化,而是會被單獨儲存到事務儲存系統中,等待第二階段本地事務返回執行結果後再提交。此時訊息對下遊消費者不可見。

  • 訊息復原

    第二階段如果事務執行結果明確為復原,服務端會將半事務訊息復原,該事務訊息流程程終止。

  • 提交待消費

    第二階段如果事務執行結果明確為提交,服務端會將半事務訊息重新儲存到普通儲存系統中,此時訊息對下遊消費者可見,等待被消費者擷取並消費。

  • 消費中

    訊息被消費者擷取,並按照消費者本地的商務邏輯進行處理的過程。

    此時服務端會等待消費者完成消費並提交消費結果,如果一定時間後沒有收到消費者的響應,雲訊息佇列 RocketMQ 版會對訊息進行重試處理。具體資訊,請參見消費重試

  • 消費提交

    消費者完成消費處理,並向服務端提交消費結果,服務端標記當前訊息已經被處理(包括消費成功和失敗)。

    雲訊息佇列 RocketMQ 版預設支援保留所有訊息,此時訊息資料並不會立即被刪除,只是邏輯標記已消費。訊息在儲存時間到期或儲存空間不足被刪除前,消費者仍然可以回溯訊息重新消費。

  • 訊息刪除

    雲訊息佇列 RocketMQ 版按照訊息儲存機制滾動清理最早的訊息資料,將訊息從物理檔案中刪除。更多資訊,請參見訊息儲存和清理機制

使用限制

訊息類型一致性

事務訊息僅支援在MessageTypeTransaction的主題內使用,即事務訊息只能發送至類型為事務訊息的主題中,發送的訊息的類型必須和主題的類型一致。

消費事務性

雲訊息佇列 RocketMQ 版事務訊息保證本地主分支事務和下遊訊息發送事務的一致性,但不保證訊息消費結果和上遊事務的一致性。因此需要下遊業務分支自行保證訊息正確處理,建議消費端做好消費重試,如果有短暫失敗可以利用重試機制保證最終處理成功。

中間狀態可見度

雲訊息佇列 RocketMQ 版事務訊息為最終一致性,即在訊息提交到下遊消費端處理完成之前,下遊分支和上遊事務之間的狀態會不一致。因此,事務訊息僅適合接受非同步執行的事務情境。

事務逾時機制

雲訊息佇列 RocketMQ 版事務訊息的生命週期存在逾時機制,即半事務訊息被生產者發送服務端後,如果在指定時間內服務端無法確認提交或者復原狀態,則訊息預設會被復原。事務逾時時間,請參見參數限制

不支援多個sendReceipt

事務訊息在一個事務中僅允許一個sendReceipt,不支援多個sendReceipt。

使用樣本

事務訊息相比普通訊息發送時需要修改以下幾點:

  • 發送事務訊息前,需要開啟事務並關聯本地的事務執行。

  • 為保證事務一致性,在構建生產者時,必須設定事務檢查器和預綁定事務訊息發送的主題列表,用戶端內建的事務檢查器會對綁定的事務主題做異常狀態恢複。

以Java語言為例,使用事務訊息樣本參考如下:

完整的訊息收發範例程式碼請參見RocketMQ 5.x系列SDK(推薦)

樣本代碼

import java.time.Duration;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.shaded.com.google.common.base.Strings;

public class ProducerTransactionMessageExample {
    /**
     * 示範demo,類比訂單表查詢服務,用來確認訂單事務是否提交成功。
     */
    private static boolean checkOrderById(String orderId) {
        return true;
    }

    /**
     * 示範demo,類比本地事務的執行結果。
     */
    private static boolean doLocalTransaction() {
        return true;
    }

    public static void main(String[] args) throws ClientException {
        /**
         * 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
         * 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
         * 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
         */
        String endpoints = "xxx-hangzhou.rmq.aliyuncs.com:8080";
        //訊息發送的目標Topic名稱,需要提前在控制台建立,如果不建立直接使用會返回報錯。
        String topic = "topic1";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
         * 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
         * 如果執行個體類型為Serverlesss執行個體,則不管是公網訪問還是內網訪問都必須設定執行個體的使用者名稱密碼。
         */
        builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        builder.setRequestTimeout(Duration.ofMillis(5000));
        ClientConfiguration configuration = builder.build();
        
        //構造事務生產者:事務訊息需要生產者構建一個事務檢查器,用於檢查確認異常半事務的中間狀態。
        Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                /**
                 * 事務檢查器一般是根據業務的ID去檢查本地事務是否正確提交還是復原,此處以訂單ID屬性為例。
                 * 在訂單表找到了這個訂單,說明本地事務插入訂單的操作已經正確提交;如果訂單表沒有訂單,說明本地事務已經復原。
                 */
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    // 錯誤的訊息,直接返回Rollback。
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
            }).setTopics(topic)
            .setClientConfiguration(configuration)
            .build();
        //開啟事務分支。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            //事務分支開啟失敗,直接退出。
            return;
        }
        Message message = provider.newMessageBuilder()
            .setTopic(topic)
            //設定訊息索引鍵,可根據關鍵字精確尋找某條訊息。
            .setKeys("messageKey1")
            //設定訊息Tag,用於消費端根據指定Tag過濾訊息。
            .setTag("messageTag")
            //一般事務訊息都會設定一個本地事務關聯的唯一ID,用來做本地事務回查的校正。
            .addProperty("OrderId", "xxx")
            //訊息體。
            .setBody("messageBody".getBytes())
            .build();
        //發送半事務訊息
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            //半事務訊息發送失敗,事務可以直接退出並復原。
            return;
        }
        /**
         * 執行本地事務,並確定本地事務結果。
         * 1. 如果本地事務提交成功,則提交訊息事務。
         * 2. 如果本地事務提交失敗,則復原訊息事務。
         * 3. 如果本地事務未知異常,則不處理,等待事務訊息回查。
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // 業務可以自身對即時性的要求選擇是否重試,如果放棄重試,可以依賴事務訊息回查機制進行事務狀態的提交。
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // 建議記錄異常資訊,復原異常時可以無需重試,依賴事務訊息回查機制進行事務狀態的提交。
                e.printStackTrace();
            }
        }
    }
}

使用建議

避免大量未決交易導致逾時

雲訊息佇列 RocketMQ 版支援在事務提交階段異常的情況下發起事務回查,保證事務一致性。但生產者應該盡量避免本地事務返回未知結果。大量的事務檢查會導致系統效能受損,容易導致交易處理延遲。

正確處理“進行中”的事務

訊息回查時,對於進行中中的事務不要返回Rollback或Commit結果,應繼續保持Unknown的狀態。

一般出現訊息回查時事務正在處理的原因為:事務執行較慢,訊息回查太快。解決方案如下:

  • 將第一次事務回查時間設定較大一些,但可能導致依賴回查的事務提交延遲較大。

  • 程式能正確識別進行中中的事務。