すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:トランザクションメッセージの送受信

最終更新日:Sep 23, 2024

このトピックでは、CまたはC ++ 用のTCPクライアントSDKを使用してトランザクションメッセージを送受信する方法に関するサンプルコードを提供します。

ApsaraMQ for RocketMQは、拡張アーキテクチャ (X/Open XA) と同様の分散トランザクション処理機能を提供し、ApsaraMQ for RocketMQのトランザクションの一貫性を確保します。

相互作用プロセス

次の図は、トランザクションメッセージの対話プロセスを示しています。 Transactional messages

詳細については、「トランザクションメッセージ」をご参照ください。

前提条件

開始する前に、次の操作が実行されていることを確認してください。

  • CまたはC ++ 用のSDKがダウンロードされます。 詳細については、「リリースノート」をご参照ください。

  • 環境を整えます。 詳細については、「環境準備 (V1.x.x)」をご参照ください。

  • コードで指定するリソースは、ApsaraMQ for RocketMQコンソールで作成されます。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。

  • Alibaba CloudアカウントのAccessKeyペアが取得されます。 詳細については、「AccessKey の作成」をご参照ください。

トランザクションメッセージの送信

トランザクションメッセージを送信するには、次の手順を実行します。

  1. ハーフメッセージを送信し、ローカルトランザクションを実行します。 サンプルコード:

    #include "ONSFactory.h"
    #include "ONSClientException.h"
    using namespace ons;
    
        class MyLocalTransactionExecuter : LocalTransactionExecuter
        {
            MyLocalTransactionExecuter()
            {
            }
    
            ~MyLocalTransactionExecuter()
            {
            }
            virtual TransactionStatus execute(Message &value)
            {
                    // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console. )
                    string msgId = value.getMsgID();
                    // Calculate the message body by using an algorithm such as CRC32 and MD5. 
                    // The message ID and CRC32 ID are used to prevent duplicate messages. 
                    // You do not need to specify the message ID or CRC32 ID if your business is idempotent. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. 
                    // To prevent duplicate messages, we recommend that you calculate the message body by using the CRC32 or MD5 algorithm. 
                    TransactionStatus transactionStatus = Unknow;
                    try {
                        boolean isCommit = The execution result of the local transaction;
                        if (isCommit) {
                            // Commit the message if the local transaction is executed. 
                            transactionStatus = CommitTransaction;
                        } else {
                            // Roll back the message if the local transaction failed to be executed. 
                            transactionStatus = RollbackTransaction;
                        }
                    } catch (...) {
                        //exception handle
                    }
                    return transactionStatus;
            }
        }
    
        int main(int argc, char* argv[])
        {
            // The parameters that are required to create the producer and send messages. 
            ONSFactoryProperty factoryInfo;
            // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
            // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
            // The topic that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
            // The message content. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
            // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
            // The AccessKey ID that is used for authentication. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    		    // The AccessKey secret that is used for authentication. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    
            // Create the producer. ApsaraMQ for RocketMQ does not release pChecker. You must release pChecker by yourself. 
            MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
            g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
    
            // Before you send the message, call the start() method only once to start the producer. 
            pProducer->start();
    
            Message msg(
                //Message Topic
                factoryInfo.getPublishTopics(),
                // The message tag. A message tag is similar to a Gmail tag and is used by consumers to sort and filter messages in the ApsaraMQ for RocketMQ broker.        
                "TagA",
                // The message body. You cannot leave this parameter empty. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies. 
                factoryInfo.getMessageContent()
            );
    
            // The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible. 
            // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. 
            // Note: You can send and receive a message even if you do not specify the key. 
            msg.setKey("ORDERID_100");
    
            // Send the message. If no exception is thrown, the message is sent.    
            try
            {
                // ApsaraMQ for RocketMQ does not release pExecuter. You must release pExecuter by yourself. 
                MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
                SendResultONS sendResult = pProducer->send(msg,pExecuter);
            }
            catch(ONSClientException & e)
            {
                // Specify the logic to handle exceptions. 
            }
            // Before you exit the application, destroy the producer. Otherwise, issues such as memory leaks occur. 
            pProducer->shutdown();
    
            return 0;
    
        }        
  2. 次のサンプルコードは、トランザクションメッセージのステータスをコミットする方法の例を示しています。

     class MyLocalTransactionChecker : LocalTransactionChecker
     {
         MyLocalTransactionChecker()
         {
         }
    
         ~MyLocalTransactionChecker()
         {
         }
    
         virtual TransactionStatus check(Message &value)
         {
             // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console.
             string msgId = value.getMsgID();
             // Calculate the message body by using an algorithm such as CRC32 and MD5. 
             // The message ID and CRC32 ID are used to prevent duplicate messages. 
             // You do not need to specify the message ID or CRC32 ID if your business is idempotent. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. 
             // To prevent duplicate messages, we recommend that you calculate the message body by using the CRC32 or MD5 algorithm. 
             TransactionStatus transactionStatus = Unknow;
             try {
                 boolean isCommit = The execution result of the local transaction;
                 if (isCommit) {
                     // Commit the message if the local transaction is executed. 
                     transactionStatus = CommitTransaction;
                 } else {
                     // Roll back the message if the local transaction failed to be executed. 
                     transactionStatus = RollbackTransaction;
                 }
             } catch(...) {
                 //exception error
             }
             return transactionStatus;
         }
     }               

トランザクション状態チェックの仕組み

  • トランザクションメッセージの送信時にトランザクションステータスチェックのメカニズムを実装する必要があるのはなぜですか。

    ハーフメッセージが送信されたが、TransactionStatus.Unknowが返された場合、またはアプリケーションの終了によりローカルトランザクションのステータスがコミットされていない場合、ハーフメッセージのステータスはApsaraMQ for RocketMQブローカーには不明です。 したがって、ブローカは、半メッセージのステータスをチェックするために、プロデューサクラスタ内のプロデューサに要求を定期的に送信する。 ステータスチェック要求が受信された後、プロデューサは、ハーフメッセージに対応するローカルトランザクションの最終ステータスをチェックしてコミットする。

  • チェックメソッドがコールバックされると、ビジネスロジックは何をしますか?

    トランザクションメッセージのチェックメソッドには、トランザクションの整合性をチェックするために使用されるロジックが含まれている必要があります。 トランザクションメッセージの送信後、ApsaraMQ for RocketMQLocalTransactionCheckerg APIを呼び出して、ブローカーからのローカルトランザクションステータスのリクエストに応答する必要があります。 したがって、トランザクションメッセージのチェックに使用されるメソッドは、次の目的を達成する必要があります。

    1. ハーフメッセージに対応するローカルトランザクションのステータス (コミット済みまたはロールバック) を確認します。

    2. ハーフメッセージに対応するローカルトランザクションのステータスをブローカーにコミットします。

トランザクションメッセージの購読

トランザクションメッセージをサブスクライブするためのサンプルコードは、通常のメッセージをサブスクライブするためのサンプルコードと同じです。 詳細については、『メッセージのサブスクライブ』をご参照ください。