All Products
Search
Document Center

ApsaraMQ for RocketMQ:Sample code

Last Updated:Aug 15, 2024

ApsaraMQ for RocketMQ 5.x instances are compatible with clients that use RocketMQ 1.x or 2.x SDK for .NET. This topic provides the sample code that is used to send and receive messages by using RocketMQ 1.x or 2.x SDK for .NET.

Important
  • We recommend that you use the latest RocketMQ 5.x SDKs. These SDKs are fully compatible with ApsaraMQ for RocketMQ 5.x brokers and provide more functions and enhanced features. For more information, see Release notes.

  • Alibaba Cloud only maintains RocketMQ 3.x, 4.x, and TCP client SDKs. We recommend that you use them only for existing business.

Send and receive normal messages

Send normal messages

using System;
using ons;

public class ProducerExampleForEx
{
    public ProducerExampleForEx()
    {
    }

    static void Main(string[] args) { 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        /**
        * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
        * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
        * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the virtual private cloud (VPC) information. 
        * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
        */  
        // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
        // Note: If you use RocketMQ 1.x or 2.x SDK for .NET to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails. 
        
        // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in a format similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
        // The log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create a producer instance. 
        // Note: Producer instances are thread-safe and can be used to send messages from different topics. In most cases, each thread requires only one producer instance. 
        Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);

        // Start the producer instance. 
        producer.start();

        // Create a message. 
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        msg.setKey(Guid.NewGuid().ToString());
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Before you exit your thread, shut down the producer instance. 
        producer.shutdown();

    }
}    

Subscribe to normal messages

using System;
using System.Threading;
using System.Text;
using ons;

// The callback function that is executed when a message is pulled in the ApsaraMQ for RocketMQ broker. 
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        /**
        * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
        * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
        * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The broker automatically obtains the username and password based on the VPC information. 
        * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
        */  
        // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
        // Note: If you use RocketMQ 1.x or 2.x SDK for .NET to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails. 
        
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // Clustering consumption. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // Broadcasting consumption. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // Create a consumer instance. 
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // Subscribe to a topic. 
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // Start the consumer instance. 
        consumer.start();

        // This setting is used only in this demo. In actual production environments, you cannot exit the process. 
        Thread.Sleep(300000);

        // Before you exit the process, shut down the consumer instance. 
        consumer.shutdown();
    }
}

Send and receive ordered messages

Send ordered messages

using System;
using ons;

public class OrderProducerExampleForEx
{
    public OrderProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        /**
        * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
        * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
        * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The broker automatically obtains the username and password based on the VPC information. 
        * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
        */  
        // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
        // Note: If you use RocketMQ 1.x or 2.x SDK for .NET to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails. 
        
        // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in a format similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
        // The log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create a producer instance. 
        // Note: Producer instances are thread-safe and can be used to send messages from different topics. In most cases, each thread requires only one producer instance. 
        OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);

        // Start the producer instance. 
        producer.start();

        // Create a message. 
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        string shardingKey = "App-Test";
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg, shardingKey);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Before you exit your thread, shut down the producer instance. 
        producer.shutdown();

    }
}

Subscribe to ordered messages

using System;
using System.Text;
using System.Threading;
using ons;

namespace demo
{

    public class MyMsgOrderListener : MessageOrderListener
    {
        public MyMsgOrderListener()
        {

        }

        ~MyMsgOrderListener()
        {
        }

        public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
        {
            Byte[] text = Encoding.Default.GetBytes(value.getBody());
            Console.WriteLine(Encoding.UTF8.GetString(text));
            return ons.OrderAction.Success;
        }
    }

    class OrderConsumerExampleForEx
    {
        static void Main(string[] args)
        {
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            /**
            * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
            * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
            * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The broker automatically obtains the username and password based on the VPC information. 
            * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
            */  
            // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
            // Note: If you use RocketMQ 1.x or 2.x SDK for .NET to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails. 
            
            // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
            // The topic that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
            // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in a format similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
            // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
            // The log path. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

            // Create a consumer instance. 
            OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);

            // Subscribe to a topic. 
            consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());

            // Start the consumer instance. 
            consumer.start();

            // Enable the main thread to sleep for a period of time. 
            Thread.Sleep(30000);

            // If the consumer instance is no longer used, shut down the consumer instance. 
            consumer.shutdown();
        }
    }
}

Send and receive scheduled or delayed messages

Send scheduled or delayed messages

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.InteropServices;
using ons;

namespace ons
{
    class onscsharp
    {
        static void Main(string[] args)
        {
            // Set the parameters that are required to create and use a producer. 
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "XXX ");
            // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in a format similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
            // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
            // The topic that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "XXX");
            // The message content. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "XXX");
            /**
            * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
            * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
            * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The broker automatically obtains the username and password based on the VPC information. 
            * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
            */  
            // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
            // Note: If you use RocketMQ 1.x or 2.x SDK for .NET to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails. 

            //Create a producer. 
            Producer pProducer = ONSFactory.getInstance().createProducer(factoryInfo);

            // Before you send the message, call the start() method only once to start the producer. 
            pProducer.start();

            Message msg = new Message(
                // The message topic. 
                factoryInfo.getPublishTopics(),
                // The message tag. 
                "TagA",
                // The message body. 
                factoryInfo.getMessageContent()
            );

            // The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible. 
            // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. 
            // Note: Messages can still be sent and received even if you do not specify the key. 
            msg.setKey("ORDERID_100");

            // The delivery time. Unit: milliseconds. A message is consumed only after the specified time. In this example, the message is consumed after 3 seconds. 
            long deliverTime = System.currentTimeMillis() + 3000;
            msg.setStartDeliverTime(deliverTime);

            // Send the message. If no exception is thrown, the message is sent. 
            try
            {
                SendResultONS sendResult = pProducer.send(msg);
            }
            catch(ONSClientException e)
            {
                // The logic to handle failures. 
            }

            // Before you exit your application, shut down the producer. If you do not shut down the producer, issues such as memory leaks may occur. 
            pProducer.shutdown();

        }
 }
}           

Subscribe to scheduled or delayed messages

The sample code that is used subscribe to scheduled or delayed messages is the same as the sample code that is used to subscribe to normal messages. For more information, see Subscribe to normal messages.

Send and receive transactional messages

Send transactional messages

  1. Send a half message and execute the corresponding local transaction. Sample code:

     using System;
     using System.Collections.Generic;
     using System.Linq;
     using System.Text;
     using System.Runtime.InteropServices;
     using ons;
    
     namespace ons
     {
     public class MyLocalTransactionExecuter : LocalTransactionExecuter
     {
         public MyLocalTransactionExecuter()
         {
         }
    
         ~MyLocalTransactionExecuter()
         {
         }
         public override TransactionStatus execute(Message value)
         {
                 Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                 value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
    
                 // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console. 
                 string msgId = value.getMsgID();
                 // Calculate the message body by using an algorithm such as CRC32 and MD5. 
                 // The message ID and CRC32 ID are used to prevent duplicate messages. 
                 // To prevent duplicate messages, calculate the message body by using the CRC32 or MD5 algorithm. 
    
                 TransactionStatus transactionStatus = TransactionStatus.Unknow;
                 try {
                     boolean isCommit = Execution result of the local transaction;
                     if (isCommit) {
                         // Commit the message if the local transaction is successful. 
                         transactionStatus = TransactionStatus.CommitTransaction;
                     } else {
                         // Roll back the message if the local transaction fails. 
                         transactionStatus = TransactionStatus.RollbackTransaction;
                     }
                 } catch (Exception e) {
                     // The logic to handle exceptions. 
                 }
                 return transactionStatus;
         }
     }
     class onscsharp
     {
    
         static void Main(string[] args)
         {
             ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
             // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in a format similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
             // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
             // The topic that you created in the ApsaraMQ for RocketMQ console. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "");
             // The message content. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "");
             /**
              * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
              * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
              * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The broker automatically obtains the username and password based on the VPC information. 
              * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
              */  
              // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
              factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
              factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
              // Note: If you use RocketMQ 1.x or 2.x SDK for .NET to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails. 
    
             // Create the transaction producer.        
             LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
             TransactionProducer pProducer =ONSFactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker);
    
             // Before you use the producer to send a message, call the start() method once to start the producer. After the producer is started, messages can be concurrently sent in multiple threads. 
             pProducer.start();
    
                 Message msg = new Message(
                 //Message Topic
                 factoryInfo.getPublishTopics(),
                 //Message Tag
                 "TagA",
                 //Message Body
                 factoryInfo.getMessageContent()
             );
    
             // The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible. 
             // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. 
             // Note: Messages can still be sent and received even if you do not specify the key. 
             msg.setKey("ORDERID_100");
    
             // Send the message. If no exception is thrown, the message is sent. 
             try
             {
                 LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
                 SendResultONS sendResult = pProducer.send(msg, ref myExecuter);
             }
             catch(ONSClientException e)
             {
                 Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
             }
    
             // Before you exit your application, shut down the producer. If you do not shut down the producer, issues such as memory leaks may occur. 
             // The producer cannot be restarted after it is shut down. 
             pProducer.shutdown();
         }
     }
     }
  2. Commit the status of a transactional message. Sample code:

    public class MyLocalTransactionChecker : LocalTransactionChecker
    {
        public MyLocalTransactionChecker()
        {
        }
        ~MyLocalTransactionChecker()
        {
        }
        public override TransactionStatus check(Message value)
        {
                Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
                // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console. 
                string msgId = value.getMsgID();
                // Calculate the message body by using an algorithm such as CRC32 and MD5. 
                // The message ID and CRC32 ID are used to prevent duplicate messages. 
                // If your business is idempotent, you do not need to specify the message ID or CRC32 ID. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. 
                // To prevent duplicate messages, calculate the message body by using the CRC32 or MD5 algorithm. 
                TransactionStatus transactionStatus = TransactionStatus.Unknow;
                try {
                    boolean isCommit = Execution result of the local transaction;
                    if (isCommit) {
                        // Commit the message if the local transaction is successful. 
                        transactionStatus = TransactionStatus.CommitTransaction;
                    } else {
                        // Roll back the message if the local transaction fails. 
                        transactionStatus = TransactionStatus.RollbackTransaction;
                    }
                } catch (Exception e) {
                    //exception handle
                }
                return transactionStatus;
        }
        }

Subscribe to transactional messages

The sample code that is used to subscribe to transactional messages is the same as the sample code that is used to subscribe to normal messages. For more information, see Subscribe to normal messages.