全部产品
Search
文档中心

云消息队列 RocketMQ 版:收发事务消息

更新时间:Sep 23, 2024

本文提供使用TCP协议下的C/C++ SDK收发事务消息的示例代码供您参考。

云消息队列 RocketMQ 版提供类似XA或Open XA的分布式事务功能,通过云消息队列 RocketMQ 版事务消息,能达到分布式事务的最终一致。

交互流程

事务消息交互流程如下图所示。事务消息

更多信息,请参见事务消息

前提条件

您已完成以下操作:

  • 下载C/C++ SDK。更多信息,请参见版本说明

  • 准备环境。更多信息,请参见环境准备(v1.x.x)

  • 创建资源。代码中涉及的资源信息,例如实例、Topic和Group ID等,需要在控制台上提前创建。更多信息,请参见创建资源

  • 获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey

发送事务消息

发送事务消息包含以下两个步骤。

  1. 发送半事务消息(Half Message)及执行本地事务。示例代码如下。

    #include "ONSFactory.h"
    #include "ONSClientException.h"
    using namespace ons;
    
        class MyLocalTransactionExecuter : LocalTransactionExecuter
        {
            MyLocalTransactionExecuter()
            {
            }
    
            ~MyLocalTransactionExecuter()
            {
            }
            virtual TransactionStatus execute(Message &value)
            {
                    // 消息ID(有可能消息体一样,但消息ID不一样,当前消息ID在消息队列RocketMQ版控制台无法查询。)
                    string msgId = value.getMsgID();
                    // 消息体内容进行crc32,也可以使用其它的如MD5。
                    // 消息ID和crc32id主要是用来防止消息重复。
                    // 如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。
                    // 如果要求消息绝对不重复,推荐做法是对消息体body使用crc32或MD5来防止重复消息。
                    TransactionStatus transactionStatus = Unknow;
                    try {
                        boolean isCommit = 本地事务执行结果;
                        if (isCommit) {
                            // 本地事务成功、提交消息。
                            transactionStatus = CommitTransaction;
                        } else {
                            // 本地事务失败、回滚消息。
                            transactionStatus = RollbackTransaction;
                        }
                    } catch (...) {
                        //exception handle
                    }
                    return transactionStatus;
            }
        }
    
        int main(int argc, char* argv[])
        {
            //创建Producer和发送消息所必需的信息。
            ONSFactoryProperty factoryInfo;
            //您在消息队列RocketMQ版控制台创建的Group ID。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
            //设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
            // 您在消息队列RocketMQ版控制台创建的Topic。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
            //消息内容。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
            //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
            //AccessKey ID,阿里云身份验证标识。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    		    //AccessKey Secret,阿里云身份验证密钥。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    
            //创建producer,消息队列RocketMQ版不负责pChecker的释放,需要业务方自行释放资源。
            MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
            g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
    
            //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
            pProducer->start();
    
            Message msg(
                //Message Topic
                factoryInfo.getPublishTopics(),
                //Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。       
                "TagA",
                //Message Body,不能为空,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                factoryInfo.getMessageContent()
            );
    
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
            // 注意:不设置也不会影响消息正常收发。
            msg.setKey("ORDERID_100");
    
            //发送消息,只要不抛出异常,就代表发送成功。   
            try
            {
                //消息队列RocketMQ版不负责pExecuter的释放,需要业务方自行释放资源。
                MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
                SendResultONS sendResult = pProducer->send(msg,pExecuter);
            }
            catch(ONSClientException & e)
            {
                //自定义处理exception的细节。
            }
            // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
            pProducer->shutdown();
    
            return 0;
    
        }        
  2. 提交事务消息状态,示例代码如下。

     class MyLocalTransactionChecker : LocalTransactionChecker
     {
         MyLocalTransactionChecker()
         {
         }
    
         ~MyLocalTransactionChecker()
         {
         }
    
         virtual TransactionStatus check(Message &value)
         {
             // 消息ID(有可能消息体一样,但消息ID不一样,当前消息ID在消息队列RocketMQ版控制台无法查询。)
             string msgId = value.getMsgID();
             // 消息体内容进行crc32,也可以使用其它的如MD5。
             // 消息ID和crc32id主要是用来防止消息重复。
             // 如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。
             // 如果要求消息绝对不重复,推荐做法是对消息体body使用crc32或MD5来防止重复消息。
             TransactionStatus transactionStatus = Unknow;
             try {
                 boolean isCommit = 本地事务执行结果;
                 if (isCommit) {
                     // 本地事务成功、提交消息。
                     transactionStatus = CommitTransaction;
                 } else {
                     // 本地事务失败、回滚消息。
                     transactionStatus = RollbackTransaction;
                 }
             } catch(...) {
                 //exception error
             }
             return transactionStatus;
         }
     }               

事务回查机制说明

  • 发送事务消息为什么必须要实现回查Check机制?

    当半事务消息发送完成,但本地事务返回状态为TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从Broker的角度看,这条半事务消息的状态是未知的。因此Broker会定期向消息发送方即消息生产者集群中的任意一生产者实例发起消息回查,要求发送方回查该Half状态消息,并上报其最终状态。

  • Check被回调时,业务逻辑都需要做些什么?

    事务消息的Check方法里面,应该写一些检查事务一致性的逻辑。云消息队列 RocketMQ 版发送事务消息时需要实现LocalTransactionChecker接口,用来处理Broker主动发起的本地事务状态回查请求,因此在事务消息的Check方法中,需要完成两件事情:

    1. 检查该半事务消息对应的本地事务的状态(committed or rollback)。

    2. 向Broker提交该半事务消息本地事务的状态。

订阅事务消息

事务消息的订阅与普通消息订阅一致,更多信息,请参见订阅消息