全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發順序訊息

更新時間:Sep 23, 2024

順序訊息FIFO( First In First Out)是雲訊息佇列 RocketMQ 版提供的一種嚴格按照順序來發布和消費的訊息類型。本文提供使用TCP協議下的C/C++ SDK收發順序訊息的範例程式碼。

背景資訊

順序訊息分為兩類:

  • 全域順序:對於指定的一個Topic,所有訊息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和消費。

  • 分區順序:對於指定的一個Topic,所有訊息根據Sharding Key進行區塊分區。同一個分區內的訊息按照嚴格的FIFO順序進行發布和消費。Sharding Key是順序訊息中用來區分不同分區的關鍵字段,和普通訊息的Key是完全不同的概念。

更多資訊,請參見順序訊息

前提條件

您已完成以下操作:

  • 下載C/C++ SDK。更多資訊,請參見版本說明

  • 準備環境。更多資訊,請參見環境準備(v1.x.x)

  • 建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源

  • 擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey

發送順序訊息

重要

雲訊息佇列 RocketMQ 版服務端判定訊息產生的順序性是參照單一生產者、單一線程並發下訊息發送的時序。如果發送方有多個生產者或者有多個線程並發發送訊息,則此時只能以到達雲訊息佇列 RocketMQ 版服務端的時序作為訊息順序的依據,和業務側的發送順序未必一致。

發送順序訊息的範例程式碼如下。

#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;

int main()
{
    //Producer建立和正常工作的參數,必須輸入。
    ONSFactoryProperty factoryInfo;
    //您在訊息佇列RocketMQ版控制台建立的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    //設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); 
    //您在訊息佇列RocketMQ版控制台建立的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    //訊息內容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
    //AccessKey ID,阿里雲身分識別驗證標識。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		//AccessKey Secret,阿里雲身分識別驗證密鑰。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));

    //建立Producer。
    OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);


    //在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
    pProducer->start();

    Message msg(
                //Message Topic
                factoryInfo.getPublishTopics(),
                //Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
                "TagA",
                //Message Body,任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
                factoryInfo.getMessageContent()
    );

    // 設定代表訊息的業務屬性,請儘可能全域唯一。
    // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
    // 注意:不設定也不會影響訊息正常收發。
    msg.setKey("ORDERID_100");
    // 分區順序訊息中區分不同分區的關鍵字段。
    // 如果是全域順序訊息,該欄位可以設定為任意非Null 字元串。
    std::string shardingKey = "abc";  
    //帶有同一Sharding Key的訊息會按照順序發送。
    try
    {
        //發送訊息,只要不拋異常就是成功。
        SendResultONS sendResult = pProducer->send(msg, shardingKey);
    std::cout << "send success" << std::endl;
    }
    catch(ONSClientException & e)
    {
        //添加對exception的處理。
    }
    // 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。
    pProducer->shutdown();

    return 0;
}           

訂閱順序訊息

訂閱順序訊息的範例程式碼如下。

#include "ONSFactory.h"
using namespace std;
using namespace ons;

//建立消費訊息的執行個體。
//pushConsumer拉取到訊息後,會主動調用該執行個體的consumeMessage函數。
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
    MyMsgListener()
    {
    }
    virtual ~MyMsgListener()
    {
    }

    virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
    {
        //根據業務需求,消費訊息。
        return Success; //CONSUME_SUCCESS;
    }
};


int main(int argc, char* argv[])
{
    //OrderConsumer建立和工作需要的參數,必須輸入。
    ONSFactoryProperty factoryInfo;
    //您在訊息佇列RocketMQ版控制台建立的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
    //您在訊息佇列RocketMQ版控制台建立的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
    //AccessKey ID,阿里雲身分識別驗證標識。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		//AccessKey Secret,阿里雲身分識別驗證密鑰。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    //設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");


    //建立orderConsumer。
    OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
    MyMsgListener  msglistener;
    //指定orderConsumer訂閱的訊息Topic和訊息Tag。
    orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    //註冊訊息監聽的處理執行個體,orderConsumer拉取到訊息後,會調用該類的consumeMessage函數。

    //啟動orderConsumer。
    orderConsumer->start();

    for(volatile int i = 0; i < 1000000000; ++i) {
        //wait
    }

    //銷毀orderConsumer, 在應用退出前,必須銷毀Consumer對象,否則會導致記憶體泄露等問題。
    orderConsumer->shutdown();

   return 0;
}