すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:トランザクションメッセージ

最終更新日:Aug 16, 2024

トランザクションメッセージは、ApsaraMQ for RocketMQによって提供される機能付きメッセージの一種です。 このトピックでは、トランザクションメッセージのシナリオ、作業メカニズム、制限、使用方法、および使用方法について説明します。

シナリオ

分散トランザクション

コアビジネスシステムのロジックが分散システムで実行されると、複数のダウンストリームシステムが呼び出されてロジックが同時に処理されます。 したがって、分散トランザクションで解決する必要がある主な問題は、コアビジネスシステムと下流ビジネスシステムとの間で実行結果の一貫性を確保することです。

事务消息诉求

電子商取引のシナリオでは、ユーザが注文を出すと、ダウンストリームシステムの変更もトリガされる。 例えば、物流システムにおいて出荷が開始され、ポイントシステムにおいてユーザのクレジットポイントが更新され、ショッピングカートシステムにおいてアイテムが清算される。 この場合、次のトランザクションブランチが関係します。

  • 注文システム: 注文ステータスが未払いから未払いに変更されます。

  • 物流システム: 出荷されるレコードが追加され、出荷レコードが作成されます。

  • ポイントシステム: ユーザーのクレジットポイントが更新されます。

  • ショッピングカートシステム: アイテムがクリアされ、ユーザーレコードが更新されます。

従来のXAベースのトランザクションソリューション: パフォーマンスの低下

先行するトランザクションブランチ間の結果の一貫性を保証するために使用される典型的な解決策は、拡張アーキテクチャ (XA) プロトコルに基づく分散トランザクションシステムを使用することである。 システムは、4つの独立したトランザクションブランチからなる大きなトランザクションに変更をカプセル化します。 XAベースのトランザクションソリューションは、結果の一貫性を保証できます。 ただし、処理中に多数のリソースをロックする必要があり、システムの同時実行性が低くなり、パフォーマンスが低下します。 ロックされたリソースの数は、トランザクションブランチの数とともに増加します。

通常のメッセージベースのソリューション: 結果の一貫性が悪い

XAベースのトランザクションソリューションに基づくより単純なソリューションは、注文システムの変更をローカルトランザクションと見なし、下流システムの変更を下流タスクと見なします。 トランザクションブランチは、通常のメッセージと注文テーブルのトランザクションと見なされます。 このソリューションは、メッセージを非同期に処理して処理時間を短縮し、システムの同時実行性を向上します。

普通消息方案

ただし、このソリューションは、コアトランザクションとトランザクションブランチの間で一貫性のない結果をもたらす可能性があります。 例:

  • メッセージは送信されますが、注文は完了していません。 この場合、トランザクション全体をロールバックする必要があります。

  • 注文は完了しましたが、メッセージは送信されません。 この場合、メッセージは再び消費するために送信する必要があります。

  • タイムアウトエラーを確実に検出することができず、注文をロールバックする必要があるのか、注文変更をコミットする必要があるのかを判断することが困難になります。

ApsaraMQ for RocketMQトランザクションメッセージソリューション: 結果の整合性

通常のメッセージには、スタンドアロンのデータベーストランザクションのコミット、ロールバック、および統合された調整機能がないため、通常のメッセージベースのソリューションでは結果の一貫性を保証できません。

通常のメッセージソリューションに基づいて開発されたApsaraMQ for RocketMQが提供するトランザクションメッセージソリューションは、2フェーズコミットをサポートします。 このソリューションでは、2フェーズのコミットとローカルトランザクションを組み合わせて、コミット結果のグローバルな一貫性を確保します。

事务消息

ApsaraMQ for RocketMQが提供するトランザクションメッセージソリューションは、高いパフォーマンス、高いスケーラビリティ、および簡単なビジネス開発を特徴としています。 トランザクションメッセージの作業メカニズムと処理ワークフローの詳細については、「作業メカニズム」をご参照ください。

働くメカニズム

トランザクションメッセージとは何ですか?

トランザクションメッセージは、ApsaraMQ for RocketMQが提供するメッセージの一種であり、メッセージの生成とローカルトランザクションの整合性を確保します。

処理ワークフロー

次の図は、トランザクションメッセージを使用するプロセスを示しています。事务消息

  1. プロデューサーは、ApsaraMQ for RocketMQブローカーにメッセージを送信します。

  2. ApsaraMQ for RocketMQブローカーはメッセージを保持し、メッセージの確認応答 (ACK) をプロデューサーに返します。 この場合、メッセージは「配信の準備ができていません」とマークされます。 この状態のメッセージをハーフメッセージと呼ぶ。

  3. プロデューサは、ローカルトランザクションを実行する。

  4. プロデューサは、ローカルトランザクションの実行結果をブローカにコミットする。 実行結果はコミットまたはロールバックです。 次の項目は、ブローカーが実行結果を取得した後の処理ロジックを示しています。

    • 実行結果がコミットである場合、ブローカは、ハーフメッセージを「配信準備完了」としてマークし、メッセージをコンシューマに配信する。

    • 実行結果がロールバックの場合、ブローカーはトランザクションをロールバックし、ハーフメッセージをコンシューマーに配信しません。

  5. ブローカーが実行結果を受信しない場合、またはネットワーク切断またはプロデューサーの再起動によりハーフメッセージのステータスが不明である場合、ブローカーは一定期間待機し、プロデューサークラスタ内のプロデューサーにハーフメッセージのステータスを照会する要求を送信します。

    説明

    2つのステータスクエリの間隔と最大クエリ数については、「パラメーターの制限」をご参照ください。

  6. プロデューサは、ハーフメッセージのステータスを照会する要求を受信した後、ハーフメッセージに対応するローカルトランザクションの実行結果をチェックする。

  7. プロデューサは、ローカルトランザクションの照会されたステータスに基づいて、実行結果をブローカにコミットする。 次いで、ブローカは、ステップ4の動作を実行して、ハーフメッセージを処理する。

トランザクションメッセージのライフサイクル

事务消息

  • 初期化

    半メッセージは、プロデューサによって生成され、初期化され、ブローカに送信される準備ができている。

  • コミットするトランザクション

    ハーフメッセージはブローカーに送信されます。 通常のメッセージとは異なり、ハーフメッセージはブローカーによって保持されません。 代わりに、ハーフメッセージは、トランザクション記憶システムに記憶され、ローカルトランザクションの実行結果が返されるまでコミットされない。 このフェーズでは、メッセージは下流のコンシューマには見えません。

  • メッセージロールバック

    ローカルトランザクションの実行結果がロールバックの場合、ブローカーはハーフメッセージをロールバックしてワークフローを終了します。

  • 消費のためにコミット

    ローカルトランザクションの実行結果がCommitである場合、ブローカは、ハーフメッセージをストレージシステムに格納する。 メッセージが表示され、下流のコンシューマーがすぐに使用できるようになります。

  • 消費されている

    メッセージは、消費者によって取得され、消費者によって定義されたローカル消費ロジックに基づいて処理される。

    このプロセスでは、ブローカーは、消費者が消費結果を返すのを待つ。 特定の期間内にコンシューマーから応答が受信されない場合、ApsaraMQ for RocketMQはメッセージに対して再試行を実行します。 詳細は、「消費の再試行」をご参照ください。

  • 消費結果コミット

    消費者は消費を完了し、消費結果をブローカーにコミットする。 ブローカーは、メッセージが消費されたかどうかをマークします。

    デフォルトでは、ApsaraMQ for RocketMQはすべてのメッセージを保持します。 消費結果がコミットされると、メッセージはすぐに削除されるのではなく、消費済みとしてマークされます。 メッセージは、保存期間が終了した場合、またはシステムのストレージ容量が不足している場合にのみ削除されます。 メッセージが削除される前に、コンシューマはメッセージを再消費することができる。

  • メッセージ削除

    メッセージの保存期間が満了した場合、またはストレージ容量が不足している場合、ApsaraMQ for RocketMQは、物理ファイルから最も早く保存されたメッセージをローリング方式で削除します。 詳細については、「メッセージの保存とクリーンアップ」をご参照ください。

制限事項

メッセージタイプの整合性

トランザクションメッセージは、MessageTypeTransactionに設定されているトピックでのみ使用できます。

トランザクション中心の消費

ApsaraMQ for RocketMQによって提供されるトランザクションメッセージは、ローカルコアトランザクションとダウンストリームトランザクションブランチ間の結果の整合性のみを保証します。 下流のビジネスシステムは、メッセージが正しく処理されるようにする必要があります。 コンシューマーは、メッセージの正常な処理を保証するために、消費の再試行を適切に実行することを推奨します。 詳細は、「消費の再試行」をご参照ください。

中間ステータスの可視性

ApsaraMQ for RocketMQによって提供されるトランザクションメッセージは、結果の一貫性のみを保証します。 メッセージがコンシューマに配信される前に、下流トランザクションブランチと上流トランザクションとの間にステータスの不一致が存在する。 したがって、トランザクションメッセージは、非同期実行を使用できるトランザクションシナリオにのみ適しています。

トランザクションのタイムアウトメカニズム

タイムアウトメカニズムは、ApsaraMQ for RocketMQが提供するトランザクションメッセージのライフサイクルで使用されます。 ブローカーがハーフメッセージを受信した後、ブローカーがトランザクションメッセージの実行結果を判断できない場合、メッセージはデフォルトでロールバックされます。 詳細については、「パラメーターの制限」をご参照ください。

複数のSendReceiptsがサポートされていません

トランザクション内のトランザクションメッセージに対して許可されるSendReceiptは1つだけです。

サンプルコード

トランザクションメッセージの送信は、次の点で通常のメッセージの送信とは異なります。

  • トランザクションメッセージを送信する前に、トランザクションチェッカーを有効にして、ローカルトランザクション実行に関連付ける必要があります。

  • トランザクションの一貫性を確保するには、トランザクションチェッカーを構成し、プロデューサーをビルドするときにメッセージを送信するトピックをバインドする必要があります。 バインドされたトピックで例外が発生した場合、組み込みのトランザクションチェッカーを使用してステータスを復元できます。

次のサンプルコードは、Javaでトランザクションメッセージを使用する方法の例を示しています。

メッセージングの完全なサンプルコードについては、「Apache RocketMQ 5.x SDK (推奨) 」をご参照ください。

サンプルコード

import java.time.Duration;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.shaded.com.google.common.base.Strings;

public class ProducerTransactionMessageExample {
    /**
     // The demo is used to simulate the order table query service to check whether the order transaction is submitted. 
     */
    private static boolean checkOrderById(String orderId) {
        return true;
    }

    /**
     // The demo is used to simulate the execution result of a local transaction. 
     */
    private static boolean doLocalTransaction() {
        return true;
    }

    public static void main(String[] args) throws ClientException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "xxx-hangzhou.rmq.aliyuncs.com:8080";
        // The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned. 
        String topic = "topic1";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless instance, you must specify the username and password of the instance, regardless of whether you access the instance over the Internet or in a VPC. 
         */
        builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        builder.setRequestTimeout(Duration.ofMillis(5000));
        ClientConfiguration configuration = builder.build();
        
        MessageBuilder messageBuilder = new MessageBuilderImpl();

        // Build the transaction producer. The producer must build a transaction checker to check the intermediate status of an exceptional half message. 
        Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                /**
                 * The transaction checker checks whether the local transaction is correctly committed or rolled back based on the business ID. In this example, an order ID is used. 
                 * If this order is found in the order table, the order insertion action is correctly committed by the local transaction. If no order is found in the order table, the local transaction is rolled back. 
                 */
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    // An error occurs in the message. Rollback is returned. 
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
            }).setTopics(topic)
            .setClientConfiguration(configuration)
            .build();
        // Create a transaction branch. 
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            // If the transaction branch fails to be created, the transaction is terminated. 
            return;
        }
        Message message = messageBuilder.setTopic(topic)
            // The message key. You can use a keyword to accurately find the message. 
            .setKeys("messageKey1")
            // The message tag. The consumer can use the tag to filter messages. 
            .setTag("messageTag")
            // The unique ID that is associated with the local transaction. The ID is used to verify the query of the local transaction status. 
            .addProperty("OrderId", "xxx")
            // The message body. 
            .setBody("messageBody".getBytes())
            .build();
        // Send the half message.
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            // If the half message fails to be sent, the transaction is terminated and the message is rolled back. 
            return;
        }
        /**
         * Execute the local transaction and determine the execution result. 
         * 1. If the result is Commit, deliver the message. 
         * 2. If the result is Rollback, roll back the message. 
         * 3. If an unknown exception occurs, no action is performed until a response is obtained from the query of the local transaction status. 
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // You can determine whether to retry the message based on your business requirements. If you do not want to retry the message, you can query the local transaction status to commit the transaction status. 
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // We recommend that you record the exception information. This way, if an exception occurs during the rollback of a message, you can query the local transaction status to commit the transaction status without retrying the message. 
                e.printStackTrace();
            }
        }
    }
}

使用上の注意

結果が不明なトランザクションによるタイムアウトの防止

ApsaraMQ for RocketMQでは、トランザクションの一貫性を確保するために、トランザクションコミットフェーズで例外が発生した場合にローカルトランザクションステータスを照会するリクエストを開始できます。 ただし、生産者は、ローカルトランザクションが未知の結果を返すのを防ぐ必要があります。 多数のトランザクションチェックは、システム性能を低下させ、トランザクション処理の遅延を引き起こす可能性がある。

進行中のトランザクションを適切に処理する

ローカルトランザクションステータスのクエリ中に、実行中のトランザクションのロールバックまたはコミットを返さないでください。 代わりに、トランザクションの [不明] ステータスを保持します。

ほとんどの場合、トランザクションの実行が遅く、ローカルトランザクションステータスのクエリが早すぎるため、トランザクションは進行中です。 以下のソリューションが提供されます。

  • ローカルトランザクションステータスの最初のクエリを実行する時間に、より大きな値を指定します。 これは、クエリ結果に依存するトランザクションに大きな遅延を引き起こす可能性があります。

  • プログラムに進行中のトランザクションを正しく識別させます。