すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:トランザクションメッセージの送受信

最終更新日:Jul 09, 2024

このトピックでは、TCPクライアントSDK for Javaを使用してトランザクションメッセージを送受信する方法に関するサンプルコードを提供します。

ApsaraMQ for RocketMQは、拡張アーキテクチャ (X/Open XA) と同様の分散トランザクション処理機能を提供し、ApsaraMQ for RocketMQのトランザクションの一貫性を確保します。

説明

ApsaraMQ for RocketMQの新規ユーザーの場合、ApsaraMQ for RocketMQを使用してメッセージを送受信する前に、デモプロジェクトを参照してプロジェクトを構築することを推奨します。

相互作用プロセス

次の図は、トランザクションメッセージの対話プロセスを示しています。

process

詳細については、「トランザクションメッセージ」をご参照ください。

前提条件

開始する前に、次の操作が実行されていることを確認してください。

  • Java用SDKがダウンロードされます。 SDK For Javaのリリースノートについては、「リリースノート」をご参照ください。

  • 環境がセットアップされる。 詳細については、「環境の準備」をご参照ください。

  • (オプション) ロギング設定が設定されています。 詳細については、「ロギング設定」をご参照ください。

トランザクションメッセージの送信

サンプルコードの詳細については、「ApsaraMQ For RocketMQコードライブラリ」をご参照ください。

package com.aliyun.openservices.tcp.example.producer;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;

import java.util.Date;
import java.util.Properties;

public class SimpleTransactionProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. Note: The ID of the consumer group that is used for transactional messages cannot be the same as the ID of the consumer group that is used for other types of messages. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // The AccessKey secret that is used for authentication. 
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");

        // Before you initialize the producer, you must register a checker to check the status of the local transaction. 
        LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
        TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
        transactionProducer.start();

        Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());

        for (int i = 0; i < 3; i++) {
            try{
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        System.out.println("Execute the local transaction and commit the transaction status.");
                        return TransactionStatus.CommitTransaction;
                    }
                }, null);
                assert sendResult != null;
            }catch (ONSClientException e){
                // Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        System.out.println("Send transaction message success.");
    }
}
// The local transaction checker. 
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
   
    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("The request to check the transaction status of the message. MsgId: " + msg.getMsgID());
        return TransactionStatus.CommitTransaction;
    }
}

トランザクション状態チェックの仕組み

  • トランザクションメッセージの送信時にトランザクションステータスチェックのメカニズムを実装する必要があるのはなぜですか。

    ハーフメッセージが送信されたが、TransactionStatus.Unknowが返された場合、またはアプリケーションの終了によりローカルトランザクションのステータスがコミットされていない場合、ハーフメッセージのステータスはApsaraMQ for RocketMQブローカーには不明です。 したがって、ブローカは、半メッセージのステータスをチェックするために、プロデューサクラスタ内のプロデューサに要求を定期的に送信する。 ステータスチェック要求が受信された後、プロデューサは、ハーフメッセージに対応するローカルトランザクションの最終ステータスをチェックしてコミットする。

  • チェックメソッドがコールバックされると、ビジネスロジックは何をしますか?

    トランザクションメッセージのチェックメソッドには、トランザクションの整合性をチェックするために使用されるロジックが含まれている必要があります。 トランザクションメッセージの送信後、ApsaraMQ for RocketMQLocalTransactionCheckerg APIを呼び出して、ブローカーからのローカルトランザクションステータスのリクエストに応答する必要があります。 したがって、トランザクションメッセージのチェックに使用されるメソッドは、次の目的を達成する必要があります。

    1. ハーフメッセージに対応するローカルトランザクションのステータス (コミット済みまたはロールバック) を確認します。

    2. ハーフメッセージに対応するローカルトランザクションのステータスをブローカーにコミットします。

トランザクションメッセージの購読

トランザクションメッセージをサブスクライブするためのサンプルコードは、通常のメッセージをサブスクライブするためのサンプルコードと同じです。 詳細については、『メッセージのサブスクライブ』をご参照ください。