全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發事務訊息

更新時間:Jul 01, 2024

本文提供使用TCP協議下的Java SDK收發事務訊息的範例程式碼。

雲訊息佇列 RocketMQ 版提供類似XA或Open XA的分散式交易功能,通過雲訊息佇列 RocketMQ 版事務訊息,能達到分散式交易的最終一致。

說明

對於新手使用者,建議在正式收發訊息前,閱讀Demo工程來瞭解搭建雲訊息佇列 RocketMQ 版工程的具體步驟。

互動流程

事務訊息互動流程如下圖所示。

process

更多資訊,請參見事務訊息

前提條件

您已完成以下操作:

  • 下載Java SDK。Java SDK版本說明,請參見版本說明

  • 準備環境。更多資訊,請參見準備環境

  • (可選)日誌配置。更多資訊,請參見日誌配置

發送事務訊息

具體的範例程式碼,請以雲訊息佇列 RocketMQ 版程式碼程式庫為準。

package com.aliyun.openservices.tcp.example.producer;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;

import java.util.Date;
import java.util.Properties;

public class SimpleTransactionProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 您在訊息佇列RocketMQ版控制台建立的Group ID。注意:事務訊息的Group ID不能與其他類型訊息的Group ID共用。
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
        // AccessKey ID,阿里雲身分識別驗證標識。
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // AccessKey Secret,阿里雲身分識別驗證密鑰。
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點頁簽查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");

        // 初始化事務訊息Producer時,需要註冊一個本地事務狀態的Checker。
        LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
        TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
        transactionProducer.start();

        Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());

        for (int i = 0; i < 3; i++) {
            try{
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        System.out.println("執行本地事務,並根據本地事務的狀態提交TransactionStatus。");
                        return TransactionStatus.CommitTransaction;
                    }
                }, null);
                assert sendResult != null;
            }catch (ONSClientException e){
                // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        System.out.println("Send transaction message success.");
    }
}
// 本地事務檢查器。
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
   
    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("收到事務訊息的回查請求,MsgId: " + msg.getMsgID());
        return TransactionStatus.CommitTransaction;
    }
}

事務回查機制說明

  • 發送事務訊息為什麼必須要實現回查Check機制?

    當半事務訊息發送完成,但本地事務返回狀態為TransactionStatus.Unknow,或者應用退出導致本地事務未提交任何狀態時,從Broker的角度看,這條半事務訊息的狀態是未知的。因此Broker會定期向訊息發送方即訊息生產者叢集中的任意一生產者執行個體發起訊息回查,要求發送方回查該Half狀態訊息,並上報其最終狀態。

  • Check被回調時,商務邏輯都需要做些什嗎?

    事務訊息的Check方法裡面,應該寫一些檢查事務一致性的邏輯。雲訊息佇列 RocketMQ 版發送事務訊息時需要實現LocalTransactionChecker介面,用來處理Broker主動發起的本地事務狀態回查請求,因此在事務訊息的Check方法中,需要完成兩件事情:

    1. 檢查該半事務訊息對應的本地事務的狀態(committed or rollback)。

    2. 向Broker提交該半事務訊息本地事務的狀態。

訂閱事務訊息

事務訊息的訂閱與普通訊息訂閱一致,更多資訊,請參見訂閱訊息