顺序消息FIFO( First In First Out)是云消息队列 RocketMQ 版提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用TCP协议下的C/C++ SDK收发顺序消息的示例代码。
背景信息
顺序消息分为两类:
全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。
分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
更多信息,请参见顺序消息。
前提条件
您已完成以下操作:
下载C/C++ SDK。更多信息,请参见版本说明。
准备环境。更多信息,请参见环境准备(v1.x.x)。
创建资源。代码中涉及的资源信息,例如实例、Topic和Group ID等,需要在控制台上提前创建。更多信息,请参见创建资源。
获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey。
发送顺序消息
重要
云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果发送方有多个生产者或者有多个线程并发发送消息,则此时只能以到达云消息队列 RocketMQ 版服务端的时序作为消息顺序的依据,和业务侧的发送顺序未必一致。
发送顺序消息的示例代码如下。
#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;
int main()
{
//Producer创建和正常工作的参数,必须输入。
ONSFactoryProperty factoryInfo;
//您在消息队列RocketMQ版控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
//设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
//您在消息队列RocketMQ版控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
//消息内容。
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
//请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
//AccessKey ID,阿里云身份验证标识。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
//AccessKey Secret,阿里云身份验证密钥。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
//创建Producer。
OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
//在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
pProducer->start();
Message msg(
//Message Topic
factoryInfo.getPublishTopics(),
//Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
"TagA",
//Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
factoryInfo.getMessageContent()
);
// 设置代表消息的业务属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey("ORDERID_100");
// 分区顺序消息中区分不同分区的关键字段。
// 如果是全局顺序消息,该字段可以设置为任意非空字符串。
std::string shardingKey = "abc";
//带有同一Sharding Key的消息会按照顺序发送。
try
{
//发送消息,只要不抛异常就是成功。
SendResultONS sendResult = pProducer->send(msg, shardingKey);
std::cout << "send success" << std::endl;
}
catch(ONSClientException & e)
{
//添加对exception的处理。
}
// 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
pProducer->shutdown();
return 0;
}
订阅顺序消息
订阅顺序消息的示例代码如下。
#include "ONSFactory.h"
using namespace std;
using namespace ons;
//创建消费消息的实例。
//pushConsumer拉取到消息后,会主动调用该实例的consumeMessage函数。
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
MyMsgListener()
{
}
virtual ~MyMsgListener()
{
}
virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
{
//根据业务需求,消费消息。
return Success; //CONSUME_SUCCESS;
}
};
int main(int argc, char* argv[])
{
//OrderConsumer创建和工作需要的参数,必须输入。
ONSFactoryProperty factoryInfo;
//您在消息队列RocketMQ版控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
//您在消息队列RocketMQ版控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
//请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
//AccessKey ID,阿里云身份验证标识。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
//AccessKey Secret,阿里云身份验证密钥。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
//设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
//创建orderConsumer。
OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
MyMsgListener msglistener;
//指定orderConsumer订阅的消息Topic和消息Tag。
orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
//注册消息监听的处理实例,orderConsumer拉取到消息后,会调用该类的consumeMessage函数。
//启动orderConsumer。
orderConsumer->start();
for(volatile int i = 0; i < 1000000000; ++i) {
//wait
}
//销毁orderConsumer, 在应用退出前,必须销毁Consumer对象,否则会导致内存泄露等问题。
orderConsumer->shutdown();
return 0;
}