雲訊息佇列 RocketMQ 版提供類似XA或Open XA的分散式交易功能,通過雲訊息佇列 RocketMQ 版事務訊息,能達到分散式交易的最終一致。本文提供使用HTTP協議下的C++ SDK收發事務訊息的範例程式碼。
背景資訊
事務訊息的互動流程如下圖所示。
更多資訊,請參見事務訊息。
前提條件
您已完成以下操作:
安裝C++ SDK。更多資訊,請參見準備環境。
建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源。
擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey。
發送事務訊息
發送事務訊息的範例程式碼如下。
//#include <iostream>
#include <fstream>
#ifdef _WIN32
#include <windows.h>
#include <process.h>
#else
#include "pthread.h"
#endif
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
const int32_t pubMsgCount = 4;
const int32_t halfCheckCount = 3;
void processCommitRollError(AckMessageResponse& bdmResp, const std::string& messageId) {
if (bdmResp.isSuccess()) {
cout << "Commit/Roll Transaction Suc: " << messageId << endl;
return;
}
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "Commit/Roll Transaction ERROR: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
}
#ifdef WIN32
unsigned __stdcall consumeHalfMessageThread(void *arg)
#else
void* consumeHalfMessageThread(void *arg)
#endif
{
MQTransProducerPtr transProducer = *(MQTransProducerPtr*)(arg);
int count = 0;
do {
std::vector<Message> halfMsgs;
try {
// 長輪詢消費訊息。
// 長輪詢表示如果Topic沒有訊息,則請求會在服務端掛起3s,3s內如果有訊息可以消費則立即返回用戶端。
transProducer->consumeHalfMessage(
1, // 一次最多消費1條(最多可設定為16條)。
3, // 長輪詢時間3秒(最多可設定為30秒)。
halfMsgs
);
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No half message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
}
if (halfMsgs.size() == 0) {
continue;
}
cout << "Consume Half: " << halfMsgs.size() << " Messages!" << endl;
// 處理事務半訊息。
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = halfMsgs.begin();
iter != halfMsgs.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
int32_t consumedTimes = iter->getConsumedTimes();
const std::string propA = iter->getProperty("a");
const std::string handle = iter->getReceiptHandle();
AckMessageResponse bdmResp;
if (propA == "1") {
cout << "Commit msg.." << endl;
transProducer->commit(handle, bdmResp);
count++;
} else if(propA == "2") {
if (consumedTimes > 1) {
cout << "Commit msg.." << endl;
transProducer->commit(handle, bdmResp);
count++;
} else {
cout << "Commit Later!!!" << endl;
}
} else if(propA == "3") {
cout << "Rollback msg.." << endl;
transProducer->rollback(handle, bdmResp);
count++;
} else {
transProducer->commit(handle, bdmResp);
cout << "Unkown msg.." << endl;
}
// 如果Commit或Rollback時超過了NextConsumeTime的時間,則Commit或Rollback會失敗。
processCommitRollError(bdmResp, iter->getMessageId());
}
} while(count < halfCheckCount);
#ifdef WIN32
return 0;
#else
return NULL;
#endif
}
int main() {
MQClient mqClient(
// 設定HTTP協議用戶端存取點,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
"${HTTP_ENDPOINT}",
// 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
// AccessKey ID,阿里雲身分識別驗證標識。
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// AccessKey Secret,阿里雲身分識別驗證密鑰。
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// 訊息所屬的Topic,在訊息佇列RocketMQ版控制台建立。
string topic = "${TOPIC}";
// Topic所屬的執行個體ID,在訊息佇列RocketMQ版控制台建立。
// 若執行個體有命名空間,則執行個體ID必須傳入;若執行個體無命名空間,則執行個體ID傳入null空值或字串空值。執行個體的命名空間可以在訊息佇列RocketMQ版控制台的執行個體詳情頁面查看。
string instanceId = "${INSTANCE_ID}";
// 您在訊息佇列RocketMQ版控制台建立的Group ID。
string groupId = "${GROUP_ID}";
MQTransProducerPtr transProducer;
if (instanceId == "") {
transProducer = mqClient.getTransProducerRef(topic, groupId);
} else {
transProducer = mqClient.getTransProducerRef(instanceId, topic, groupId);
}
// 用戶端需要有一個線程或者進程來消費沒有確認的事務訊息。
// 啟動一個線程來檢查沒有確認的事務訊息。
#ifdef WIN32
HANDLE thread;
unsigned int threadId;
thread = (HANDLE)_beginthreadex(NULL, 0, consumeHalfMessageThread, &transProducer, 0, &threadId);
#else
pthread_t thread;
pthread_create(&thread, NULL, consumeHalfMessageThread, static_cast<void *>(&transProducer));
#endif
try {
for (int i = 0; i < pubMsgCount; i++)
{
PublishMessageResponse pmResp;
TopicMessage pubMsg("Hello, mq, trans_msg!");
pubMsg.putProperty("a",std::to_string(i));
pubMsg.setMessageKey("ImKey");
pubMsg.setTransCheckImmunityTime(10);
transProducer->publishMessage(pubMsg, pmResp);
cout << "Publish mq message success. Topic:" << topic
<< ", msgId:" << pmResp.getMessageId()
<< ", bodyMD5:" << pmResp.getMessageBodyMD5()
<< ", Handle:" << pmResp.getReceiptHandle() << endl;
if (i == 0) {
// 發送完事務訊息後能擷取到半訊息控制代碼,可以直接Commit或Rollback事務訊息。
// 如果Commit或Rollback時超過了TransCheckImmunityTime的時間,則Commit或Rollback會失敗。
AckMessageResponse bdmResp;
transProducer->commit(pmResp.getReceiptHandle(), bdmResp);
processCommitRollError(bdmResp, pmResp.getMessageId());
}
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
}
#ifdef WIN32
WaitForSingleObject(thread, INFINITE);
CloseHandle(thread);
#else
pthread_join(thread, NULL);
#endif
return 0;
}
訂閱事務訊息
訂閱事務訊息的範例程式碼如下。
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// 設定HTTP協議用戶端存取點,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
"${HTTP_ENDPOINT}",
// 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
// AccessKey ID,阿里雲身分識別驗證標識。
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// AccessKey Secret,阿里雲身分識別驗證密鑰。
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// 訊息所屬的Topic,在訊息佇列RocketMQ版控制台建立。
string topic = "${TOPIC}";
// 您在訊息佇列RocketMQ版控制台建立的Group ID。
string groupId = "${GROUP_ID}";
// Topic所屬的執行個體ID,在訊息佇列RocketMQ版控制台建立。
// 若執行個體有命名空間,則執行個體ID必須傳入;若執行個體無命名空間,則執行個體ID傳入null空值或字串空值。執行個體的命名空間可以在訊息佇列RocketMQ版控制台的執行個體詳情頁面查看。
string instanceId = "${INSTANCE_ID}";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// 長輪詢消費訊息。
// 長輪詢表示如果Topic沒有訊息,則請求會在服務端掛起3s,3s內如果有訊息可以消費則立即返回用戶端。
consumer->consumeMessage(
3, // 一次最多消費3條(最多可設定為16條)。
3, // 長輪詢時間3秒(最多可設定為30秒)。
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;
// 處理商務邏輯。
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// 確認訊息消費成功。
// Message.NextConsumeTime前若不確認訊息消費成功,則訊息會被重複消費。
// 訊息控制代碼有時間戳記,同一條訊息每次消費拿到的都不一樣。
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// 某些訊息的控制代碼可能逾時,會導致訊息消費狀態確認不成功。
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}