全部产品
Search
文档中心

云消息队列 RocketMQ 版:收发事务消息

更新时间:Jul 25, 2023

本文提供使用TCP协议下的.NET SDK收发事务消息的示例代码。目前支持的地域包括公网、华东1(杭州)、华北2(北京)、华东2(上海)、华南1(深圳)。

云消息队列 RocketMQ 版提供类似XA或Open XA的分布式事务功能,通过云消息队列 RocketMQ 版事务消息,能达到分布式事务的最终一致。

交互流程

事务消息交互流程如下图所示。事务消息交互流程

更多信息,请参见事务消息

前提条件

  • 下载.NET SDK。更多信息,请参见版本说明

  • 环境准备。更多信息,请参见环境准备

  • 创建资源。代码中涉及的资源信息,例如实例、Topic和Group ID等,需要在控制台上提前创建。更多信息,请参见创建资源

  • 获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey

发送事务消息

发送事务消息包含以下两个步骤:

  1. 发送半事务消息(Half Message)及执行本地事务。示例代码如下。

     using System;
     using System.Collections.Generic;
     using System.Linq;
     using System.Text;
     using System.Runtime.InteropServices;
     using ons;
    
     namespace ons
     {
     public class MyLocalTransactionExecuter : LocalTransactionExecuter
     {
         public MyLocalTransactionExecuter()
         {
         }
    
         ~MyLocalTransactionExecuter()
         {
         }
         public override TransactionStatus execute(Message value)
         {
                 Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                 value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser"));
    
                 // 消息ID(有可能消息体一样,但消息ID不一样。当前消息ID在消息队列RocketMQ版控制台无法查询)。
                 string msgId = value.getMsgID();
                 // 消息体内容进行crc32, 也可以使用其它的如MD5。
                 // 消息ID和crc32id主要是用来防止消息重复。
                 // 如果要求消息绝对不重复,推荐做法是对消息体body使用crc32或MD5来防止重复消息。
    
                 TransactionStatus transactionStatus = TransactionStatus.Unknow;
                 try {
                     boolean isCommit = 本地事务执行结果;
                     if (isCommit) {
                         // 本地事务成功则提交消息。
                         transactionStatus = TransactionStatus.CommitTransaction;
                     } else {
                         // 本地事务失败则回滚消息。
                         transactionStatus = TransactionStatus.RollbackTransaction;
                     }
                 } catch (Exception e) {
                     // 处理异常。
                 }
                 return transactionStatus;
         }
     }
     class onscsharp
     {
    
         static void Main(string[] args)
         {
             ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
             // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
             factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "XXX");
             // 您在消息队列RocketMQ版控制台创建的Topic。
             factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "");
             // 消息内容。
             factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "");
             //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
             //AccessKey ID,阿里云身份验证标识。
             factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    	       //AccessKey Secret,阿里云身份验证密钥。
             factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            
             //create transaction producer                
             LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
             TransactionProducer pProducer =ONSFactory.getInstance().createTransactionProducer(factoryInfo, myChecker);
    
             // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可,启动之后可以多线程并发发送消息。
             pProducer.start();
    
                 Message msg = new Message(
                 //Message Topic
                 factoryInfo.getPublishTopics(),
                 //Message Tag
                 "TagA",
                 //Message Body
                 factoryInfo.getMessageContent()
             );
    
             // 设置代表消息的业务关键属性,请尽可能全局唯一。
             // 以方便您在无法正常收到消息情况下,可通消息队列RocketMQ版过控制台查询消息并补发。
             // 注意:不设置也不会影响消息正常收发。
             msg.setKey("ORDERID_100");
    
             // 发送消息,只要不抛出异常,就代表发送成功。
             try
             {
                 LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
                 SendResultONS sendResult = pProducer.send(msg, myExecuter);
             }
             catch(ONSClientException e)
             {
                 Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
             }
    
             // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
             // shutdown之后不能重新start此producer。
             pProducer.shutdown();
         }
     }
     }
  2. 提交事务消息状态。

    当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。 通知方式有以下两种:

    • 执行本地事务完成后提交。

    • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。

    事务状态有以下三种:

    • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。

    • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。

    • TransactionStatus.Unknow 无法判断状态,期待Broker向发送方再次询问该消息对应的本地事务的状态。

    public class MyLocalTransactionChecker : LocalTransactionChecker
    {
        public MyLocalTransactionChecker()
        {
        }
        ~MyLocalTransactionChecker()
        {
        }
        public override TransactionStatus check(Message value)
        {
                Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser"));
                // 消息ID(有可能消息体一样,但消息ID不一样。当前消息ID在消息队列RocketMQ版控制台无法查询)。
                string msgId = value.getMsgID();
                // 消息体内容进行crc32,也可以使用其它的如MD5。
                // 消息ID和crc32id主要是用来防止消息重复。
                // 如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。
                // 如果要求消息绝对不重复,推荐做法是对消息体body使用crc32或MD5来防止重复消息。
                TransactionStatus transactionStatus = TransactionStatus.Unknow;
                try {
                    boolean isCommit = 本地事务执行结果;
                    if (isCommit) {
                        // 本地事务成功、提交消息。
                        transactionStatus = TransactionStatus.CommitTransaction;
                    } else {
                        // 本地事务失败、回滚消息。
                        transactionStatus = TransactionStatus.RollbackTransaction;
                    }
                } catch (Exception e) {
                    //exception handle
                }
                return transactionStatus;
        }
        }

    事务回查机制说明

    • 发送事务消息为什么必须要实现Check机制?

      当步骤1半事务消息发送完成,但本地事务返回状态为TransactionStatus.Unknow时,亦或是应用退出导致本地事务未提交任何状态时,从Broker的角度看,这条半状态的消息的状态是未知的,因此Broker会定期向消息发送方即消息生产者集群中的任意一生产者实例发起消息回查,要求发送方回查该Half状态消息,并上报其最终状态。

    • Check被回调时,业务逻辑都需要做些什么?

      事务消息的Check方法里面,应该写一些检查事务一致性的逻辑。云消息队列 RocketMQ 版发送事务消息时需要实现LocalTransactionChecker接口,用来处理Broker主动发起的本地事务状态回查请求;因此在事务消息的Check方法中,需要完成两件事情:

      1. 检查该半事务消息对应的本地事务的状态(committed or rollback)。

      2. 向Broker提交该半事务消息本地事务的状态。

    • 本地事务的不同状态对半事务消息的影响:

      • TransactionStatus.CommitTransaction提交事务,允许订阅方消费该消息。

      • TransactionStatus.RollbackTransaction回滚事务,消息将被丢弃不允许消费。

      • TransactionStatus.Unknow无法判断状态,期待Broker向发送方再次询问该消息对应的本地事务的状态。

      具体代码的更多信息,请参见MyLocalTransactionChecker的实现。

订阅事务消息

事务消息的订阅与普通消息订阅一致,更多信息,请参见订阅消息