ApsaraMQ for RocketMQ 5.x instances are compatible with clients that use RocketMQ 1.x SDK for C++. You can use RocketMQ 1.x SDK for C++ to connect to an ApsaraMQ for RocketMQ 5.x instance to send and receive messages. This topic provides the sample code that is used to send and receive messages by using RocketMQ 1.x SDK for C++.
We recommend that you use the latest RocketMQ 5.x SDKs. These SDKs are fully compatible with ApsaraMQ for RocketMQ 5.x brokers and provide more functions and enhanced features. For more information, see Release notes.
Alibaba Cloud only maintains RocketMQ 3.x, 4.x, and TCP client SDKs. We recommend that you use them only for existing business.
Send and receive normal messages
Send normal messages
#include "ONSFactory.h"
#include "ONSClientException.h"
using namespace ons;
int main()
{
// The parameter that is required to create a producer and send messages.
ONSFactoryProperty factoryInfo;
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are provided in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// The topic that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// The message content.
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the virtual private cloud (VPC) information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// Note: If you use the RocketMQ 1.x or 2.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// Create a producer.
Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);
// Before you send the message, call the start() method only once to start the producer.
pProducer->start();
Message msg(
// The topic to which the message belongs.
factoryInfo.getPublishTopics(),
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies. You cannot leave this parameter empty.
factoryInfo.getMessageContent()
);
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can still be sent and received even if you do not specify the key.
msg.setKey("ORDERID_100");
// Send the message. If no exception is thrown, the message is sent.
try
{
SendResultONS sendResult = pProducer->send(msg);
}
catch(ONSClientException & e)
{
// The method that is used to handle exceptions.
}
// Before you exit the application, shut down the consumer. Otherwise, issues such as memory leaks may occur.
pProducer->shutdown();
return 0;
}
Subscribe to normal messages
#include "ONSFactory.h"
#include <iostream>
#include <thread>
#include <mutex>
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(Message& message, ConsumeContext& context) {
// The consumer receives the message and attempts to consume it. After the message is consumed, CommitMessage is returned.
// If the consumer fails to consume the message or wants to consume the message again, ReconsumeLater is returned. Then, the message is redelivered to the consumer after a predefined period of time.
std::lock_guard<std::mutex> lk(console_mtx);
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
<< message.getMsgID() << std::endl;
return CommitMessage;
}
};
int main(int argc, char* argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factoryInfo;
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. ApsaraMQ for RocketMQ instances use the group ID instead of the producer ID and consumer ID. Configuring this parameter helps ensure compatibility with earlier versions.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are provided in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// Note: If you use the RocketMQ 1.x or 2.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
// The topic that you created in the ApsaraMQ for RocketMQ console.
const char* topic_1 = "topic-1";
// Subscribe to messages attached with tag-1 in topic-1.
const char* tag_1 = "tag-1";
const char* topic_2 = "topic-2";
// Subscribe to all messages in topic-2.
const char* tag_2 = "*";
// Use a custom listener function to process the received messages and return the processing results.
ExampleMessageListener * message_listener = new ExampleMessageListener();
consumer->subscribe(topic_1, tag_1, message_listener);
consumer->subscribe(topic_2, tag_2, message_listener);
// The preparation is complete. You must invoke the start() function to start the consumer.
consumer->start();
// Keep the thread running and do not shut down the consumer.
std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
consumer->shutdown();
delete message_listener;
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
Send and receive ordered messages
Send ordered messages
#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;
int main()
{
// The parameter that is required to create and use a producer.
ONSFactoryProperty factoryInfo;
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are provided in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// The topic that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// The message content.
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// Note: If you use the RocketMQ 1.x or 2.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// Create a producer.
OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
// Before you send the message, call the start() method only once to start the producer.
pProducer->start();
Message msg(
// The topic to which the message belongs.
factoryInfo.getPublishTopics(),
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
factoryInfo.getMessageContent()
);
// The message key. The key is the business-specific attribute of a message and must be globally unique.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can still be sent and received even if you do not specify the key.
msg.setKey("ORDERID_100");
// The key field that is used to identify partitions for partitionally ordered messages.
// This field can be set to a non-empty string if the messages are globally ordered messages.
std::string shardingKey = "abc";
// Messages that have the same sharding key are sent in order.
try
{
// Send the message. If no exception is thrown, the message is sent.
SendResultONS sendResult = pProducer->send(msg, shardingKey);
std::cout << "send success" << std::endl;
}
catch(ONSClientException & e)
{
// The method that is used to handle exceptions.
}
// Before you exit the application, shut down the consumer. Otherwise, issues such as memory leaks may occur.
pProducer->shutdown();
return 0;
}
Subscribe to ordered messages
#include "ONSFactory.h"
using namespace std;
using namespace ons;
// Create a consumer instance.
// After a push consumer pulls the message, the consumeMessage function of the instance is invoked.
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
MyMsgListener()
{
}
virtual ~MyMsgListener()
{
}
virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
{
// Consume the message based on your business requirements.
return Success; //CONSUME_SUCCESS;
}
};
int main(int argc, char* argv[])
{
// The parameter that is required to create and use an order consumer.
ONSFactoryProperty factoryInfo;
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
// The topic that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are provided in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// Note: If you use the RocketMQ 1.x or 2.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// Create an order consumer.
OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
MyMsgListener msglistener;
// The topic and tag of the messages to which the order consumer subscribes.
orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
// Register the instance that is used to listen to messages. After the order consumer pulls the messages, the consumeMessage function of the message listening class is invoked.
//Start the order consumer.
orderConsumer->start();
for(volatile int i = 0; i < 10; ++i) {
// wait
}
// Before you exit the application, shut down the consumer. Otherwise, issues such as memory leaks may occur.
orderConsumer->shutdown();
return 0;
}
Send and receive scheduled or delayed messages
Send and receive scheduled or delayed messages
#include "ONSFactory.h"
#include "ONSClientException.h"
#include <windows.h>
using namespace ons;
int main()
{
// The parameter that is required to create a producer and send messages.
ONSFactoryProperty factoryInfo;
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are provided in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// The topic that you created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// The message content.
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// Note: If you use the RocketMQ 1.x or 2.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// Create a producer.
Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);
// Before you send the message, call the start() method only once to start the producer.
pProducer->start();
Message msg(
// The topic to which the message belongs.
factoryInfo.getPublishTopics(),
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies. You cannot leave this parameter empty.
factoryInfo.getMessageContent()
);
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can still be sent and received even if you do not specify the key.
msg.setKey("ORDERID_100");
// The time when the ApsaraMQ for RocketMQ broker delivers the message to the consumer. Unit: milliseconds. The message can be consumed only after the specified time. In this example, the message can be consumed 3 seconds later.
long deliverTime = GetTickCount64() + 3000;
msg.setStartDeliverTime(deliverTime);
// Send the message. If no exception is thrown, the message is sent.
try
{
SendResultONS sendResult = pProducer->send(msg);
}
catch(ONSClientException & e)
{
// The method that is used to handle exceptions.
}
// Before you exit the application, shut down the consumer. Otherwise, issues such as memory leaks may occur.
pProducer->shutdown();
return 0;
}
Subscribe to scheduled or delayed messages
The sample code that is used to subscribe to scheduled or delayed messages is the same as the sample code that is used to subscribe to normal messages. For more information, see Send and receive normal messages.
Send and receive transactional messages
Send transactional messages
Send a half message and execute the corresponding local transaction. Sample code:
#include "ONSFactory.h" #include "ONSClientException.h" using namespace ons; class MyLocalTransactionExecuter : LocalTransactionExecuter { MyLocalTransactionExecuter() { } ~MyLocalTransactionExecuter() { } virtual TransactionStatus execute(Message &value) { // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console. ) string msgId = value.getMsgID(); // Calculate the message body by using an algorithm such as CRC32 and MD5. // The message ID and CRC32 ID are used to prevent duplicate messages. // You do not need to specify the message ID or CRC32 ID if your business is idempotent. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. // To prevent duplicate messages, we recommend that you calculate the message body by using the CRC32 or MD5 algorithm. TransactionStatus transactionStatus = Unknow; try { boolean isCommit = The execution result of the local transaction; if (isCommit) { // Commit the message if the local transaction is executed. transactionStatus = CommitTransaction; } else { // Roll back the message if the local transaction fails to be executed. transactionStatus = RollbackTransaction; } } catch (...) { //exception handle } return transactionStatus; } } int main(int argc, char* argv[]) { // The parameter that is required to create a producer and send messages. ONSFactoryProperty factoryInfo; // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX"); // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Note: Enter the domain name and port number that are provided in the ApsaraMQ for RocketMQ console. Do not add the http:// or https:// prefix or use a resolved IP address. factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); // The topic that you created in the ApsaraMQ for RocketMQ console. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" ); // The message content. factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX"); /** * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. */ // You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME"); factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" ); // Note: If you use the RocketMQ 1.x or 2.x SDK for C++ to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails. // Create a producer. ApsaraMQ for RocketMQ does not release pChecker. You must release pChecker by yourself. MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker(); g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker); // Before you send the message, call the start() method only once to start the producer. pProducer->start(); Message msg( // The topic to which the message belongs. factoryInfo.getPublishTopics(), // The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker. "TagA", // The message body. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies. You cannot leave this parameter empty. factoryInfo.getMessageContent() ); // The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible. // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. // Note: Messages can still be sent and received even if you do not specify the key. msg.setKey("ORDERID_100"); // Send the message. If no exception is thrown, the message is sent. try { // ApsaraMQ for RocketMQ does not release pExecuter. You must release pExecuter by yourself. MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter(); SendResultONS sendResult = pProducer->send(msg,pExecuter); } catch(ONSClientException & e) { // The method that is used to handle exceptions. } // Before you exit the application, shut down the consumer. Otherwise, issues such as memory leaks may occur. pProducer->shutdown(); return 0; }
Commit the status of a transactional message. Sample code:
class MyLocalTransactionChecker : LocalTransactionChecker { MyLocalTransactionChecker() { } ~MyLocalTransactionChecker() { } virtual TransactionStatus check(Message &value) { // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console. string msgId = value.getMsgID(); // Calculate the message body by using an algorithm such as CRC32 and MD5. // The message ID and CRC32 ID are used to prevent duplicate messages. // You do not need to specify the message ID or CRC32 ID if your business is idempotent. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. // To prevent duplicate messages, we recommend that you calculate the message body by using the CRC32 or MD5 algorithm. TransactionStatus transactionStatus = Unknow; try { boolean isCommit = The execution result of the local transaction; if (isCommit) { // Commit the message if the local transaction is executed. transactionStatus = CommitTransaction; } else { // Roll back the message if the local transaction fails to be executed. transactionStatus = RollbackTransaction; } } catch(...) { //exception error } return transactionStatus; } }
Subscribe to transactional messages
The sample code that is used to subscribe to transactional messages is the same as the sample code that is used to subscribe to normal messages. For more information, see Send and receive normal messages.