このトピックでは、TCPクライアントSDK forを使用してトランザクションメッセージを送受信する方法に関するサンプルコードを提供します。NETを使用します。 スケジュールされたメッセージは、インターネット、中国 (杭州) 、中国 (北京) 、中国 (上海) 、中国 (深セン) の各リージョンでサポートされます。
ApsaraMQ for RocketMQは、eXtended Architecture (X/Open XA) と同様の分散トランザクション処理機能を提供します。 ApsaraMQ for RocketMQはトランザクションメッセージを使用してトランザクションの一貫性を確保します。
相互作用プロセス
次の図は、トランザクションメッセージを使用するプロセスを示しています。
詳細については、「トランザクションメッセージ」をご参照ください。
前提条件
SDK for。NETがダウンロードされます。 詳細については、「リリースノート」をご参照ください。
環境を整えます。 詳細については、「環境の準備」をご参照ください。
コードで指定するリソースは、ApsaraMQ for RocketMQコンソールで作成されます。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。
Alibaba CloudアカウントのAccessKeyペアが取得されます。 詳細については、「AccessKey の作成」をご参照ください。
トランザクションメッセージの送信
トランザクションメッセージを送信するには、次の手順を実行します。
ハーフメッセージを送信し、ローカルトランザクションを実行します。 サンプルコード:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.InteropServices; using ons; namespace ons { public class MyLocalTransactionExecuter : LocalTransactionExecuter { public MyLocalTransactionExecuter() { } ~MyLocalTransactionExecuter() { } public override TransactionStatus execute(Message value) { Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}", value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser")); // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console. string msgId = value.getMsgID(); // Calculate the message body by using an algorithm such as CRC32 and MD5. // The message ID and CRC32 ID are used to prevent duplicate messages. // To prevent duplicate messages, we recommend that you calculate the message body by using the CRC32 or MD5 algorithm. TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = The execution result of the local transaction; if (isCommit) { // Commit the message if the local transaction is executed. transactionStatus = TransactionStatus.CommitTransaction; } else { // Roll back the message if the local transaction failed to be executed. transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { // The logic to handle exceptions. } return transactionStatus; } } class onscsharp { static void Main(string[] args) { ONSFactoryProperty factoryInfo = new ONSFactoryProperty(); // The TCP endpoint. You can obtain the endpoint in the TCP endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "XXX"); // The topic that you created in the ApsaraMQ for RocketMQ console. factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, ""); // The message content. factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, ""); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // The AccessKey ID that is used for authentication. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // The AccessKey secret that is used for authentication. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); //create transaction producer LocalTransactionChecker myChecker = new MyLocalTransactionChecker(); TransactionProducer pProducer =ONSFactory.getInstance().createTransactionProducer(factoryInfo, myChecker); // Before you send the message, call the start() method only once to start the producer. After the producer is started, messages can be concurrently sent in multiple threads. pProducer.start(); Message msg = new Message( //Message Topic factoryInfo.getPublishTopics(), //Message Tag "TagA", //Message Body factoryInfo.getMessageContent() ); // The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible. // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. // Note: You can send and receive a message even if you do not specify the key. msg.setKey("ORDERID_100"); // Send the message. If no exception is thrown, the message is sent. try { LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter(); SendResultONS sendResult = pProducer.send(msg, myExecuter); } catch(ONSClientException e) { Console.WriteLine("\nexception of sendmsg:{0}",e.what() ); } // Before you exit the application, destroy the producer. Otherwise, issues such as memory leaks occur. // The producer cannot be started again after it is destroyed. pProducer.shutdown(); } } }
トランザクションメッセージのステータスをコミットします。
ローカルトランザクションの実行後、実行が成功したかどうかにかかわらず、ApsaraMQ for RocketMQブローカーに現在のメッセージのトランザクションステータスを通知する必要があります。 ApsaraMQ for RocketMQブローカーには、次のいずれかの方法で通知できます。
ローカルトランザクションの実行后にステータスをコミットします。
ApsaraMQ for RocketMQブローカーがメッセージのトランザクションステータスを確認するリクエストを送信するまで待ちます。
トランザクションは、次のいずれかの状態になります。
: トランザクションがコミットされ、コンシューマはメッセージを消費できます。
: トランザクションはロールバックされ、メッセージは破棄され、使用できません。
TransactionStatus.Unknow
: トランザクションのステータスが不明であり、システムはブローカーがメッセージに対応するローカルトランザクションのステータスを照会するのを待っています。
public class MyLocalTransactionChecker : LocalTransactionChecker { public MyLocalTransactionChecker() { } ~MyLocalTransactionChecker() { } public override TransactionStatus check(Message value) { Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}", value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser")); // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console. string msgId = value.getMsgID(); // Calculate the message body by using an algorithm such as CRC32 and MD5. // The message ID and CRC32 ID are used to prevent duplicate messages. // You do not need to specify the message ID or CRC32 ID if your business is idempotent. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. // To prevent duplicate messages, we recommend that you calculate the message body by using the CRC32 or MD5 algorithm. TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = The execution result of the local transaction; if (isCommit) { // Commit the message if the local transaction is executed. transactionStatus = TransactionStatus.CommitTransaction; } else { // Roll back the message if the local transaction failed to be executed. transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { //exception handle } return transactionStatus; } }
トランザクション状態チェックの仕組み
トランザクションメッセージの送信時にトランザクションステータスチェックのメカニズムを実装する必要があるのはなぜですか。
手順1でハーフメッセージが送信されたが、
TransactionStatus.Unknow
が返された場合、またはアプリケーションが終了したためにステータスのローカルトランザクションがコミットされなかった場合、ハーフメッセージのステータスはApsaraMQ for RocketMQブローカーには不明です。 したがって、ブローカは、半メッセージのステータスをチェックするために、プロデューサクラスタ内のプロデューサに要求を定期的に送信する。 ステータスチェック要求が受信された後、プロデューサは、ハーフメッセージに対応するローカルトランザクションの最終ステータスをチェックしてコミットする。チェックメソッドがコールバックされると、ビジネスロジックは何をしますか?
トランザクションメッセージのチェックメソッドには、トランザクションの整合性をチェックするために使用されるロジックが含まれている必要があります。 トランザクションメッセージの送信後、ApsaraMQ for RocketMQは
LocalTransactionChecker
APIを呼び出して、ローカルトランザクションのブローカーからのステータスチェック要求に応答する必要があります。 したがって、トランザクションメッセージのチェックに使用されるメソッドは、次の目的を達成する必要があります。ハーフメッセージに対応するローカルトランザクションのステータス (コミット済みまたはロールバック) を確認します。
ハーフメッセージに対応するローカルトランザクションのステータスをブローカーにコミットします。
ローカルトランザクションのステータスは、ハーフメッセージにどのように影響しますか?
: トランザクションはコミットされています。 メッセージは、消費者によって消費され得る。
: トランザクションはロールバックされます。 メッセージは破棄され、使用できません。
TransactionStatus.Unknow
: トランザクションのステータスが不明であり、システムはブローカーがメッセージに対応するローカルトランザクションのステータスを照会するのを待っています。
詳細については、「
MyLocalTransactionChecker
」の実装をご参照ください。
トランザクションメッセージの購読
トランザクションメッセージをサブスクライブするためのサンプルコードは、通常のメッセージをサブスクライブするためのサンプルコードと同じです。 詳細については、『メッセージのサブスクライブ』をご参照ください。