順序訊息FIFO( First In First Out)是雲訊息佇列 RocketMQ 版提供的一種嚴格按照順序來發布和消費的訊息類型。本文提供使用TCP協議下的C/C++ SDK收發順序訊息的範例程式碼。
背景資訊
順序訊息分為兩類:
全域順序:對於指定的一個Topic,所有訊息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和消費。
分區順序:對於指定的一個Topic,所有訊息根據Sharding Key進行區塊分區。同一個分區內的訊息按照嚴格的FIFO順序進行發布和消費。Sharding Key是順序訊息中用來區分不同分區的關鍵字段,和普通訊息的Key是完全不同的概念。
更多資訊,請參見順序訊息。
前提條件
您已完成以下操作:
下載C/C++ SDK。更多資訊,請參見版本說明。
準備環境。更多資訊,請參見環境準備(v1.x.x)。
建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源。
擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey。
發送順序訊息
重要
雲訊息佇列 RocketMQ 版服務端判定訊息產生的順序性是參照單一生產者、單一線程並發下訊息發送的時序。如果發送方有多個生產者或者有多個線程並發發送訊息,則此時只能以到達雲訊息佇列 RocketMQ 版服務端的時序作為訊息順序的依據,和業務側的發送順序未必一致。
發送順序訊息的範例程式碼如下。
#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;
int main()
{
//Producer建立和正常工作的參數,必須輸入。
ONSFactoryProperty factoryInfo;
//您在訊息佇列RocketMQ版控制台建立的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
//設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
//您在訊息佇列RocketMQ版控制台建立的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
//訊息內容。
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
//請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
//AccessKey ID,阿里雲身分識別驗證標識。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
//AccessKey Secret,阿里雲身分識別驗證密鑰。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
//建立Producer。
OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
//在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
pProducer->start();
Message msg(
//Message Topic
factoryInfo.getPublishTopics(),
//Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
"TagA",
//Message Body,任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
factoryInfo.getMessageContent()
);
// 設定代表訊息的業務屬性,請儘可能全域唯一。
// 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發。
msg.setKey("ORDERID_100");
// 分區順序訊息中區分不同分區的關鍵字段。
// 如果是全域順序訊息,該欄位可以設定為任意非Null 字元串。
std::string shardingKey = "abc";
//帶有同一Sharding Key的訊息會按照順序發送。
try
{
//發送訊息,只要不拋異常就是成功。
SendResultONS sendResult = pProducer->send(msg, shardingKey);
std::cout << "send success" << std::endl;
}
catch(ONSClientException & e)
{
//添加對exception的處理。
}
// 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。
pProducer->shutdown();
return 0;
}
訂閱順序訊息
訂閱順序訊息的範例程式碼如下。
#include "ONSFactory.h"
using namespace std;
using namespace ons;
//建立消費訊息的執行個體。
//pushConsumer拉取到訊息後,會主動調用該執行個體的consumeMessage函數。
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
MyMsgListener()
{
}
virtual ~MyMsgListener()
{
}
virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
{
//根據業務需求,消費訊息。
return Success; //CONSUME_SUCCESS;
}
};
int main(int argc, char* argv[])
{
//OrderConsumer建立和工作需要的參數,必須輸入。
ONSFactoryProperty factoryInfo;
//您在訊息佇列RocketMQ版控制台建立的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
//您在訊息佇列RocketMQ版控制台建立的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
//請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
//AccessKey ID,阿里雲身分識別驗證標識。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
//AccessKey Secret,阿里雲身分識別驗證密鑰。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
//設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
//建立orderConsumer。
OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
MyMsgListener msglistener;
//指定orderConsumer訂閱的訊息Topic和訊息Tag。
orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
//註冊訊息監聽的處理執行個體,orderConsumer拉取到訊息後,會調用該類的consumeMessage函數。
//啟動orderConsumer。
orderConsumer->start();
for(volatile int i = 0; i < 1000000000; ++i) {
//wait
}
//銷毀orderConsumer, 在應用退出前,必須銷毀Consumer對象,否則會導致記憶體泄露等問題。
orderConsumer->shutdown();
return 0;
}