雲訊息佇列 RocketMQ 版5.x版本執行個體可相容.NET 1.x/2.x SDK用戶端接入,您可以使用1.x/2.x版本的SDK接入5.x執行個體進行訊息收發。本文為您介紹1.x/2.x版本下的.NET SDK訊息收發範例程式碼。
重要
- 推薦您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作為主力研發版本,和雲訊息佇列 RocketMQ 版5.x服務端完全相容,提供了更全面的功能並支援更多增強特性。更多資訊,請參見5.x系列SDK。
- RocketMQ 4.x/3.x系列SDK和ONS系列SDK後續僅做功能維護,建議僅存量業務使用。
普通訊息收發樣本
發送普通訊息
using System;
using ons;
public class ProducerExampleForEx
{
public ProducerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// 您在訊息佇列RocketMQ版控制台建立的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 設定日誌路徑。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 建立生產者執行個體。
// 說明:生產者執行個體是安全執行緒的,可用於發送不同Topic的訊息。基本上,您每一個線程只需要一個生產者執行個體。
Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);
// 啟動用戶端執行個體。
producer.start();
// 建立訊息對象。
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
msg.setKey(Guid.NewGuid().ToString());
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
// 在您的線程即將退出時,關閉生產者執行個體。
producer.shutdown();
}
}
訂閱普通訊息
using System;
using System.Threading;
using System.Text;
using ons;
// 從Broker拉取訊息時要執行的回呼函數。
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 您在訊息佇列RocketMQ版控制台建立的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// 您在訊息佇列RocketMQ版控制台建立的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// 設定日誌路徑。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 叢集消費。
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// 廣播消費。
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// 建立消費者執行個體。
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// 訂閱Topic。
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// 啟動用戶端執行個體。
consumer.start();
// 該設定僅供Demo使用,實際生產環境中請保證進程不退出。
Thread.Sleep(300000);
// 在進程即將退出時,關閉消費者執行個體。
consumer.shutdown();
}
}
順序訊息收發樣本
發送順序訊息
using System;
using ons;
public class OrderProducerExampleForEx
{
public OrderProducerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// 您在訊息佇列RocketMQ版控制台建立的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 設定日誌路徑。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 建立生產者執行個體。
// 說明:生產者執行個體是安全執行緒的,可用於發送不同Topic的訊息。基本上,您每一個線程只需要一個生產者執行個體。
OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);
// 啟動用戶端執行個體。
producer.start();
// 建立訊息對象。
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
string shardingKey = "App-Test";
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg, shardingKey);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
// 在您的線程即將退出時,關閉生產者執行個體。
producer.shutdown();
}
}
訂閱順序訊息
using System;
using System.Text;
using System.Threading;
using ons;
namespace demo
{
public class MyMsgOrderListener : MessageOrderListener
{
public MyMsgOrderListener()
{
}
~MyMsgOrderListener()
{
}
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.OrderAction.Success;
}
}
class OrderConsumerExampleForEx
{
static void Main(string[] args)
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// 您在訊息佇列RocketMQ版控制台建立的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 設定日誌路徑。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 建立消費者執行個體。
OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);
// 訂閱Topic。
consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());
// 啟動消費者執行個體。
consumer.start();
// 讓主線程睡眠一段時間。
Thread.Sleep(30000);
// 不再使用時,關閉消費者執行個體。
consumer.shutdown();
}
}
}
定時/延時訊息收發樣本
發送定時/延時訊息
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.InteropServices;
using ons;
namespace ons
{
class onscsharp
{
static void Main(string[] args)
{
// Producer建立和正常工作的參數,必須輸入。
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "XXX ");
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 您在訊息佇列RocketMQ版控制台建立的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "XXX");
//訊息內容。
factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "XXX");
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 建立Producer。
Producer pProducer = ONSFactory.getInstance().createProducer(factoryInfo);
// 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
pProducer.start();
Message msg = new Message(
// 訊息主題。
factoryInfo.getPublishTopics(),
// 訊息標籤。
"TagA",
// 訊息主體。
factoryInfo.getMessageContent()
);
// 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
// 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發。
msg.setKey("ORDERID_100");
// deliver time,單位:ms。指定一個時刻,在這個時刻之後訊息才能被消費,這個例子表示3s後才能被消費。
long deliverTime = System.currentTimeMillis() + 3000;
msg.setStartDeliverTime(deliverTime);
// 發送訊息,只要不拋出異常,就代表發送成功。
try
{
SendResultONS sendResult = pProducer.send(msg);
}
catch(ONSClientException e)
{
// 發送失敗處理。
}
// 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。
pProducer.shutdown();
}
}
}
訂閱定時/延時訊息
訂閱定時/延時訊息的範例程式碼和訂閱普通訊息一樣,請參見訂閱普通訊息。
事務訊息收發樣本
發送事務訊息
發送半事務訊息(Half Message)及執行本地事務,範例程式碼如下。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.InteropServices; using ons; namespace ons { public class MyLocalTransactionExecuter : LocalTransactionExecuter { public MyLocalTransactionExecuter() { } ~MyLocalTransactionExecuter() { } public override TransactionStatus execute(Message value) { Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}", value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser")); // 訊息ID(有可能訊息體一樣,但訊息ID不一樣。當前訊息ID在訊息佇列RocketMQ版控制台無法查詢)。 string msgId = value.getMsgID(); // 訊息體內容進行crc32, 也可以使用其它的如MD5。 // 訊息ID和crc32id主要是用來防止訊息重複。 // 如果要求訊息絕對不重複,推薦做法是對訊息體body使用crc32或MD5來防止重複訊息。 TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = 本地事務執行結果; if (isCommit) { // 本地事務成功則提交訊息。 transactionStatus = TransactionStatus.CommitTransaction; } else { // 本地事務失敗則復原訊息。 transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { // 處理異常。 } return transactionStatus; } } class onscsharp { static void Main(string[] args) { ONSFactoryProperty factoryInfo = new ONSFactoryProperty(); // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。 factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); // 您在訊息佇列RocketMQ版控制台建立的Topic。 factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, ""); // 訊息內容。 factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, ""); /** * 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。 * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。 * 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。 * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。 */ // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。 factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME"); factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" ); // 注意!!!訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。 // 建立事務生產者。 LocalTransactionChecker myChecker = new MyLocalTransactionChecker(); TransactionProducer pProducer =ONSFactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker); // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可,啟動之後可以多線程並發發送訊息。 pProducer.start(); Message msg = new Message( //Message Topic factoryInfo.getPublishTopics(), //Message Tag "TagA", //Message Body factoryInfo.getMessageContent() ); // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。 // 以方便您在無法正常收到訊息情況下,可通訊息佇列RocketMQ版過控制台查詢訊息並補發。 // 注意:不設定也不會影響訊息正常收發。 msg.setKey("ORDERID_100"); // 發送訊息,只要不拋出異常,就代表發送成功。 try { LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter(); SendResultONS sendResult = pProducer.send(msg, ref myExecuter); } catch(ONSClientException e) { Console.WriteLine("\nexception of sendmsg:{0}",e.what() ); } // 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。 // shutdown之後不能重新start此Producer。 pProducer.shutdown(); } } }
提交事務訊息狀態,範例程式碼如下。
public class MyLocalTransactionChecker : LocalTransactionChecker { public MyLocalTransactionChecker() { } ~MyLocalTransactionChecker() { } public override TransactionStatus check(Message value) { Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}", value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser")); // 訊息ID(有可能訊息體一樣,但訊息ID不一樣。當前訊息ID在訊息佇列RocketMQ版控制台無法查詢)。 string msgId = value.getMsgID(); // 訊息體內容進行crc32,也可以使用其它的如MD5。 // 訊息ID和crc32id主要是用來防止訊息重複。 // 如果業務本身是等冪的,可以忽略,否則需要利用msgId或crc32Id來做等冪。 // 如果要求訊息絕對不重複,推薦做法是對訊息體body使用crc32或MD5來防止重複訊息。 TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = 本地事務執行結果; if (isCommit) { // 本地事務成功、提交訊息。 transactionStatus = TransactionStatus.CommitTransaction; } else { // 本地事務失敗、復原訊息。 transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { //exception handle } return transactionStatus; } }
訂閱事務訊息
訂閱事務訊息的範例程式碼和訂閱普通訊息一樣,請參見訂閱普通訊息。