全部產品
Search
文件中心

ApsaraMQ for RocketMQ:範例程式碼

更新時間:Jul 23, 2024

雲訊息佇列 RocketMQ 版5.x版本執行個體可相容4.x/3.x SDK用戶端接入,您可以使用4.x/3.x版本的SDK接入5.x執行個體進行訊息收發。本文為您介紹4.x/3.x版本下的C++ SDK訊息收發範例程式碼。

重要
  • 推薦您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作為主力研發版本,和雲訊息佇列 RocketMQ 版5.x服務端完全相容,提供了更全面的功能並支援更多增強特性。更多資訊,請參見5.x系列SDK
  • RocketMQ 4.x/3.x系列SDK和ONS系列SDK後續僅做功能維護,建議僅存量業務使用。

普通訊息收發樣本

發送普通訊息

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。      
    DefaultMQProducer producer("GID_XXX");          
    // 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。 
    producer.setNamesrvAddr("ACCESS POINT");
    // 設定執行個體使用者名稱、執行個體密碼和使用者渠道。使用者渠道預設為“ALIYUN”。
    // 若您使用公網接入訊息佇列RocketMQ版執行個體,則必須配置執行個體的使用者名稱和密碼;若使用VPC接入,則無需配置。  
    // 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // 注意!!!訪問RocketMQ 5.0執行個體時,InstanceID屬性不需要設定,否則會導致失敗。

    // 請確保參數設定完成之後啟動Producer。
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    for (int i = 0; i < count; ++i) {
        // 設定為您在訊息佇列RocketMQ版控制台上建立的普通訊息類型的Topic。
        MQMessage msg("YOURTOPIC","HiTAG","HelloCPPSDK.");
        try {
            SendResult sendResult = producer.send(msg);
            std::cout <<"SendResult:"<<sendResult.getSendStatus()<< ", Message ID: " << sendResult.getMsgId() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

訂閱普通訊息

#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;


class ExampleMessageListener : public MessageListenerConcurrently {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
    consumer->setNamesrvAddr("ACCESS POINT");
    // 設定執行個體使用者名稱、執行個體密碼和使用者渠道。使用者渠道預設為“ALIYUN”。
    // 若您使用公網接入訊息佇列RocketMQ版執行個體,則必須配置執行個體的使用者名稱和密碼;若使用VPC接入,則無需配置。
    // 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // 注意!!!訪問RocketMQ 5.0執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
    
    auto start = std::chrono::system_clock::now();

    // 請註冊自訂監聽函數用來處理接收到的訊息,並返迴響應的處理結果。
    ExampleMessageListener *messageListener = new ExampleMessageListener();
    consumer->subscribe("YOURTOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // 準備工作完成,必須調用啟動函數,才可以正常工作。
    // ********************************************
    // 1.確保訂閱關係的設定在啟動之前完成。
    // 2.確保相同消費者分組內的消費者的訂閱關係一致。
    // *********************************************
    consumer->start();

    // 請保持線程常駐,不要執行shutdown操作。
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

順序訊息收發樣本

發送順序訊息

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

class ExampleSelectMessageQueueByHash : public MessageQueueSelector {
public:
    MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) {
        // 實現自訂分區邏輯,根據業務傳入arg參數即分區鍵,計算路由到哪個隊列,這裡以arg為int型參數為例。
        int orderId = *static_cast<int *>(arg);
        int index = orderId % mqs.size();
        return mqs[0];
    }
};

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
    DefaultMQProducer producer("GID_XXX");
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
    producer.setNamesrvAddr("ACCESS POINT");
    // 設定執行個體使用者名稱、執行個體密碼和使用者渠道。使用者渠道預設為“ALIYUN”。
    // 若您使用公網接入訊息佇列RocketMQ版執行個體,則必須配置執行個體的使用者名稱和密碼;若使用VPC接入,則無需配置。
    // 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // 注意!!!訪問RocketMQ 5.0執行個體時,InstanceID屬性不需要設定,否則會導致失敗。

    // 請確保參數設定完成之後啟動Producer。
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    // 可以設定發送重試的次數,確保發送成功。
    int retryTimes = 1;
    // 參考介面MessageQueueSelector,通過設定的自訂參數arg,計算髮送到指定的路由隊列中,此處的arg便是分區ID。
    ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash();
    for (int i = 0; i < count; ++i) {
        // 設定為您在訊息佇列RocketMQ版控制台上建立的順序訊息類型的Topic。
        MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message.");
        try {
            SendResult sendResult = producer.send(msg, pSelector, &i, 1, false);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                      << "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

訂閱順序訊息


#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;


class ExampleOrderlyMessageListener : public MessageListenerOrderly {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
    consumer->setNamesrvAddr("ACCESS POINT");
    // 設定執行個體使用者名稱、執行個體密碼和使用者渠道。使用者渠道預設為“ALIYUN”。
    // 若您使用公網接入訊息佇列RocketMQ版執行個體,則必須配置執行個體的使用者名稱和密碼;若使用VPC接入,則無需配置。
    // 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // 注意!!!訪問RocketMQ 5.0執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
 
    auto start = std::chrono::system_clock::now();

    // 請註冊自訂偵聽函數用來處理接收到的訊息,並返迴響應的處理結果。
    ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener();
    consumer->subscribe("YOUR ORDERLY TOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // 準備工作完成,必須調用啟動函數,才可以正常工作。
    // ********************************************
    // 1.確保訂閱關係的設定在啟動之前完成。
    // 2.確保相同消費者分組內的消費者的訂閱關係一致。
    // *********************************************
    consumer->start();

    // 請保持線程常駐,不要執行shutdown操作。
    std::this_thread::sleep_for(std::chrono::seconds (60 ));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

定時/延時訊息收發樣本

發送定時/延時訊息

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
    DefaultMQProducer producer("GID_XXX");
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
    producer.setNamesrvAddr("ACCESS POINT");
    // 設定執行個體使用者名稱、執行個體密碼和使用者渠道。使用者渠道預設為“ALIYUN”。
    // 若您使用公網接入訊息佇列RocketMQ版執行個體,則必須配置執行個體的使用者名稱和密碼;若使用VPC接入,則無需配置。
     // 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // 注意!!!訪問RocketMQ 5.0執行個體時,InstanceID屬性不需要設定,否則會導致失敗。

    //請確保參數設定完成之後啟動Producer。
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    for (int i = 0; i < count; ++i) {
        // 設定為您在訊息佇列RocketMQ版控制台上建立的定時/延時訊息類型的Topic。
        MQMessage msg("YOUR DELAY TOPIC", "HiTAG", "Hello,CPP SDK, Delay Message.");
        chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
        chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
        // 定時延時訊息,單位毫秒(ms),在指定時間戳記(目前時間之後)進行投遞,例如2022-08-07 16:21:00投遞。
        // 如果設定成目前時間戳之前的某個時刻,訊息將被立刻投遞給消費者。
        // 設定需要延時或者定時的時間,例如目前時間延遲10秒後投遞。
        long exp = mil.count() + 10000;
        msg.setProperty("__STARTDELIVERTIME", to_string(exp));
        std::cout << "Now: " << mil.count() << " Exp:" << exp << std::endl;
        try {
            SendResult sendResult = producer.send(msg);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                      << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

訂閱定時/延時訊息


#include <iostream>
#include <thread>
#include <chrono>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;
using namespace std;

class ExampleDelayMessageListener : public MessageListenerConcurrently {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
            chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
            std::cout << "Now: " << mil.count() << " Received Message Topic:" << item->getTopic() << ", MsgId:"
                      << item->getMsgId() << " DelayTime:" << item->getProperty("__STARTDELIVERTIME") << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
    consumer->setNamesrvAddr("ACCESS POINT");
    // 設定執行個體使用者名稱、執行個體密碼和使用者渠道。使用者渠道預設為“ALIYUN”。
    // 若您使用公網接入訊息佇列RocketMQ版執行個體,則必須配置執行個體的使用者名稱和密碼;若使用VPC接入,則無需配置。
    // 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // 注意!!!訪問RocketMQ 5.0執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
    
    auto start = std::chrono::system_clock::now();

    //register your own listener here to handle the messages received.
    // 請註冊自訂偵聽函數用來處理接收到的訊息,並返迴響應的處理結果。
    ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
    consumer->subscribe("YOUR DELAY TOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // 準備工作完成,必須調用啟動函數,才可以正常工作。
    // ********************************************
    // 1.確保訂閱關係的設定在啟動之前完成。
    // 2.確保相同消費者分組內的消費者的訂閱關係一致。
    // *********************************************
    consumer->start();

    // 請保持線程常駐,不要執行shutdown操作。
    std::this_thread::sleep_for(std::chrono::seconds(600));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

事務訊息

發送事務訊息

#include <iostream>
#include <chrono>
#include <thread>
#include "TransactionMQProducer.h"
#include "MQClientException.h"
#include "TransactionListener.h"

using namespace std;
using namespace rocketmq;

class ExampleTransactionListener : public TransactionListener {
public:
    LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg) {
        // 執行本地事務,成功返回COMMIT_MESSAGE,失敗返回ROLLBACK_MESSAGE,不確定返回UNKNOWN。
        // 如果返回UNKNOWN,則會觸發定時任務回查狀態。
        std::cout << "Execute Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:"
                  << msg.getBody() << std::endl;
        return UNKNOWN;
    }

    LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) {
        // 回查本地事務執行情況,成功返回COMMIT_MESSAGE,失敗返回ROLLBACK_MESSAGE,不確定返回UNKNOWN。
        // 如果返回UNKNOWN,則等待下次定時任務回查。
        std::cout << "Check Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:" << msg.getMsgId()
                  << std::endl;
        return COMMIT_MESSAGE;
    }
};

int main() {
    std::cout << "=======Before sending messages=======" << std::endl;
    // 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。  
    TransactionMQProducer producer("GID_XXX");
    // 設定為您從訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。producer.setNamesrvAddr("ACCESS POINT");
    // 設定執行個體使用者名稱、執行個體密碼和使用者渠道。使用者渠道預設為“ALIYUN”。     
    // 若您使用公網接入訊息佇列RocketMQ版執行個體,則必須配置執行個體的使用者名稱和密碼;若使用VPC接入,則無需配置。
    // 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
    // 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // 注意!!!訪問RocketMQ 5.0執行個體時,InstanceID屬性不需要設定,否則會導致失敗。

    // 本地事務執行和回查函數。
    ExampleTransactionListener *exampleTransactionListener = new ExampleTransactionListener();
    producer.setTransactionListener(exampleTransactionListener);
    // 請確保參數設定完成之後啟動Producer。
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 3;
    for (int i = 0; i < count; ++i) {
        // 設定為您在訊息佇列RocketMQ版控制台上建立的事務訊息類型的Topic。
        MQMessage msg("YOUR TRANSACTION TOPIC", "HiTAG", "Hello,CPP SDK, Transaction Message.");
        try {
            SendResult sendResult = producer.sendMessageInTransaction(msg, &i);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                      << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Send " << count << " messages OK, costs "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    std::cout << "Wait for local transaction check..... " << std::endl;
    for (int i = 0; i < 6; ++i) {
        this_thread::sleep_for(chrono::seconds(10));
        std::cout << "Running "<< i*10 + 10 << " Seconds......"<< std::endl;
    }
    producer.shutdown();
    std::cout << "=======After sending messages=======" << std::endl;
    return 0;
}

訂閱事務訊息

訂閱事務訊息的範例程式碼和訂閱普通訊息一樣,請參見訂閱普通訊息