云消息队列 RocketMQ 版提供类似XA或Open XA的分布式事务功能,通过云消息队列 RocketMQ 版事务消息,能达到分布式事务的最终一致。本文提供使用HTTP协议下的C# SDK收发事务消息的示例代码。
背景信息
事务消息的交互流程如下图所示。
更多信息,请参见事务消息。
前提条件
您已完成以下操作:
安装C# SDK。更多信息,请参见准备环境。
创建资源。代码中涉及的资源信息,例如实例、Topic和Group ID等,需要在控制台上提前创建。更多信息,请参见创建资源。
获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey。
发送事务消息
发送事务消息的示例代码如下。
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;
namespace Aliyun.MQ.Sample
{
public class TransProducerSample
{
// 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
private const string _endpoint = "${HTTP_ENDPOINT}";
// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
// AccessKey ID,阿里云身份验证标识。
private const string _accessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
// AccessKey Secret,阿里云身份验证密钥。
private const string _secretAccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 消息所属的Topic,在消息队列RocketMQ版控制台创建。
private const string _topicName = "${TOPIC}";
// Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
// 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
private const string _instanceId = "${INSTANCE_ID}";
// 您在消息队列RocketMQ版控制台创建的Group ID。
private const string _groupId = "${GROUP_ID}";
private static readonly MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
private static readonly MQTransProducer transProducer = _client.GetTransProdcuer(_instanceId, _topicName, _groupId);
static void ProcessAckError(Exception exception)
{
// 如果Commit或Rollback时超过了TransCheckImmunityTime(针对发送事务消息的句柄)或者超过10s(针对consumeHalfMessage的句柄),则Commit或Rollback会失败。
if (exception is AckMessageException)
{
AckMessageException ackExp = (AckMessageException)exception;
Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
static void ConsumeHalfMessage()
{
int count = 0;
while (true)
{
if (count == 3)
break;
try
{
// 检查事务半消息,类似消费普通消息。
List<Message> messages = null;
try
{
messages = transProducer.ConsumeHalfMessage(3, 3);
} catch (Exception exp1) {
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name + " No half message, " + ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
continue;
// 处理业务逻辑。
foreach (Message message in messages)
{
Console.WriteLine(message);
int a = int.Parse(message.GetProperty("a"));
uint consumeTimes = message.ConsumedTimes;
try {
if (a == 1) {
// 确认提交事务消息。
transProducer.Commit(message.ReceiptHandle);
count++;
Console.WriteLine("Id:" + message.Id + ", commit");
} else if (a == 2 && consumeTimes > 1) {
// 确认提交事务消息。
transProducer.Commit(message.ReceiptHandle);
count++;
Console.WriteLine("Id:" + message.Id + ", commit");
} else if (a == 3) {
// 确认回滚事务消息。
transProducer.Rollback(message.ReceiptHandle);
count++;
Console.WriteLine("Id:" + message.Id + ", rollback");
} else {
// 什么都不做,下次再检查。
Console.WriteLine("Id:" + message.Id + ", unkonwn");
}
} catch (Exception ackError) {
ProcessAckError(ackError);
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
static void Main(string[] args)
{
// 客户端需要有一个线程或者进程来消费没有确认的事务消息。
// 启动一个线程来检查没有确认的事务消息。
Thread consumeHalfThread = new Thread(ConsumeHalfMessage);
consumeHalfThread.Start();
try
{
// 循环发送4条事务消息,第一条直接在发送完提交事务, 其它三条根据条件处理。
for (int i = 0; i < 4; i++)
{
TopicMessage sendMsg = new TopicMessage("trans_msg");
sendMsg.MessageTag = "a";
sendMsg.MessageKey = "MessageKey";
sendMsg.PutProperty("a", i.ToString());
// 设置事务第一次回查的时间,为相对时间。单位:秒,范围:10~300。
// 第一次事务回查后如果消息没有Commit或者Rollback,则之后每隔10s左右会回查一次,共回查24小时。
sendMsg.TransCheckImmunityTime = 10;
TopicMessage result = transProducer.PublishMessage(sendMsg);
Console.WriteLine("publis message success:" + result);
try {
if (!string.IsNullOrEmpty(result.ReceiptHandle) && i == 0)
{
// 发送完事务消息后能获取到半消息句柄,可以直接Commit或Rollback事务消息。
transProducer.Commit(result.ReceiptHandle);
Console.WriteLine("Id:" + result.Id + ", commit");
}
} catch (Exception ackError) {
ProcessAckError(ackError);
}
}
} catch (Exception ex) {
Console.Write(ex);
}
consumeHalfThread.Join();
}
}
}
订阅事务消息
订阅事务消息的示例代码如下。
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;
namespace Aliyun.MQ.Sample
{
public class ConsumerSample
{
// 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
private const string _endpoint = "${HTTP_ENDPOINT}";
// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
// AccessKey ID,阿里云身份验证标识。
private const string _accessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
// AccessKey Secret,阿里云身份验证密钥。
private const string _secretAccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 消息所属的Topic,在消息队列RocketMQ版控制台创建。
private const string _topicName = "${TOPIC}";
// Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
// 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
private const string _instanceId = "${INSTANCE_ID}";
// 您在消息队列RocketMQ版控制台创建的Group ID。
private const string _groupId = "${GROUP_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// 在当前线程循环消费消息,建议多开个几个线程并发消费消息。
while (true)
{
try
{
// 长轮询消费消息。
// 长轮询表示如果Topic没有消息,则请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回客户端。
List<Message> messages = null;
try
{
messages = consumer.ConsumeMessage(
3, // 一次最多消费3条(最多可设置为16条)。
3 // 长轮询时间3秒(最多可设置为30秒)。
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
// 处理业务逻辑。
foreach (Message message in messages)
{
Console.WriteLine(message);
Console.WriteLine("Property a is:" + message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// Message.nextConsumeTime前若不确认消息消费成功,则消息会被重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack message success:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
if (exp2 is AckMessageException)
{
AckMessageException ackExp = (AckMessageException)exp2;
Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
}