注文メッセージは、ApsaraMQ for RocketMQによって提供されるメッセージの一種です。 注文されたメッセージは、厳密な先入れ先出しの順序で発行および使用されます。 このトピックでは、CまたはC ++ 用のTCPクライアントSDKを使用して順序付きメッセージを送受信する方法に関するサンプルコードを提供します。
背景情報
順序付けられたメッセージは、次のタイプに分類されます。
グローバルに順序付けられたメッセージ: 指定されたトピックのすべてのメッセージは、先入れ先出し (FIFO) の順序で発行および使用されます。
パーティション順メッセージ: 指定されたトピックのすべてのメッセージは、シャーディングキーを使用して異なるパーティションに配布されます。 各パーティション内のメッセージは、FIFO順に発行され、消費される。 シャーディングキーは、異なるパーティションを識別するために順序付けられたメッセージに使用されるキーフィールドです。 シャーディングキーは、通常のメッセージのキーとは異なります。
詳細については、「注文メッセージ」をご参照ください。
前提条件
次の操作が実行されていることを確認します。
CまたはC ++ 用のSDKがダウンロードされます。 詳細については、「リリースノート」をご参照ください。
環境を整えます。 詳細については、「環境準備 (V1.x.x)」をご参照ください。
コードで指定するリソースは、ApsaraMQ for RocketMQコンソールで作成されます。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。
Alibaba CloudアカウントのAccessKeyペアが取得されます。 詳細については、「AccessKey の作成」をご参照ください。
順序付けられたメッセージを送信する
ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。
次のサンプルコードは、CまたはC ++ 用のTCPクライアントSDKを使用して順序付きメッセージを送信する方法の例を示しています。
#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;
int main()
{
// The parameters that are required to create and use a producer.
ONSFactoryProperty factoryInfo;
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
// The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
// The topic that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// The message content.
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// The AccessKey secret that is used for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// Create the producer.
OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
// Before you send the message, call the start() method only once to start the producer.
pProducer->start();
Message msg(
//Message Topic
factoryInfo.getPublishTopics(),
// The message tag. A message tag is similar to a Gmail tag and is used by consumers to filter messages in the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and the consumer must agree on the serialization and deserialization methods.
factoryInfo.getMessageContent()
);
// The message key. A key is the business-specific attribute of a message and must be globally unique.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: You can send and receive a message even if you do not specify the key.
msg.setKey("ORDERID_100");
// The key field that is used to identify partitions for partitionally ordered messages.
// This field can be set to a non-empty string for globally ordered messages.
std::string shardingKey = "abc";
// Messages that have the same sharding key are sent in order.
try
{
// Send the message. If no exception is thrown, the message is sent.
SendResultONS sendResult = pProducer->send(msg, shardingKey);
std::cout << "send success" << std::endl;
}
catch(ONSClientException & e)
{
// Specify the logic to handle errors.
}
// Before you exit your application, destroy the producer. If you do not destroy the producer, issues such as memory leaks may occur.
pProducer->shutdown();
return 0;
}
注文されたメッセージを購読する
次のサンプルコードは、CまたはC ++ 用のTCPクライアントSDKを使用して順序付きメッセージをサブスクライブする方法の例を示しています。
#include "ONSFactory.h"
using namespace std;
using namespace ons;
// Create the consumer instance.
//After the push consumer pulls the message, the push consumer calls the consumeMessage function of the instance.
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
MyMsgListener()
{
}
virtual ~MyMsgListener()
{
}
virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
{
// Consume messages based on your business requirements.
return Success; //CONSUME_SUCCESS;
}
};
int main(int argc, char* argv[])
{
// The parameters that are required to create and use the consumer.
ONSFactoryProperty factoryInfo;
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
// The topic that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// The AccessKey secret that is used for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
// Create the consumer.
OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
MyMsgListener msglistener;
// The message topic and tag to which the consumer subscribes.
orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
// Register the instance to listen to messages. After the consumer pulls the messages, the consumer calls the consumeMessage function of the message listening class.
//Start the consumer.
orderConsumer->start();
for(volatile int i = 0; i < 1000000000; ++i) {
//wait
}
// Destroy the consumer. Before you exit the application, destroy the consumer. Otherwise, issues such as memory leaks may occur.
orderConsumer->shutdown();
return 0;
}