当消息内容大于64 KB以至于无法直接存入SMQ队列时,不做消息切片,使用轻量消息队列(原 MNS)和对象存储OSS来传递大于64 KB的消息。
背景信息
轻量消息队列(原 MNS)的队列的消息大小最大限制为64 KB,正常情况下基本能够满足控制流信息交换的需求。在某些特殊场景下,消息数据超过64 KB时就只能采用消息切片的方式。如果您不想使用消息切片,轻量消息队列(原 MNS)支持通过OSS实现超大消息的传递。
下面为您介绍如何通过OSS来传递大于64 KB的消息。
解决方案
生产者在向轻量消息队列(原 MNS)发送消息前,如果发现消息体大于64 KB,则先将消息体数据上传到OSS。
生产者把数据对应的Object信息发送到轻量消息队列(原 MNS)。
消费者从轻量消息队列(原 MNS)队列里读取消息,判断消息内容是否为OSS的Object信息。
判断消息内容是OSS的Object信息,则从OSS下载对应的Object内容,并作为消息体返回给上层程序。
具体过程如下图所示。
注意事项
大消息主要消费网络带宽,用该方案发送大消息时,生产者和消费者的网络带宽需要满足需求。
大消息网络传输时间较长,受网络波动影响的概率更大,建议在上层做必要的重试。
前提条件
示例代码
示例代码下载,请参见LargeMessageDemo。
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.scenarios.largeMessage.service.MNSExtendedClient;
import com.aliyun.mns.sample.scenarios.largeMessage.service.bean.MNSExtendedConfiguration;
import com.aliyun.mns.sample.scenarios.largeMessage.service.impl.MNSExtendedClientImpl;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import org.junit.Assert;
public class LargeMessageDemo {
private final static String OSS_ENDPOINT = "oss-cn-XXX.aliyuncs.com";
private final static String OSS_BUCKET_NAME = "mns-test-XXXXX-bucket";
private final static String MNS_QUEUE_NAME = "test-largeMessage-queue";
private final static String MNS_TOPIC_NAME = "test-largeMessage-topic";
/**
* 本文以4 KB临界值为例,大于4 KB即用OSS存储。
*/
private final static Long payloadSizeThreshold = 4L;
public static void main(String[] args) throws ClientException {
// 从环境变量中获取访问凭证。
EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
// 创建OSSClient实例。
OSS ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, credentialsProvider);
// 创建MNS实例。
// 遵循阿里云规范,env设置ak、sk。
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(MNS_QUEUE_NAME);
CloudTopic cloudTopic = client.getTopicRef(MNS_TOPIC_NAME);
//reCreate
ReCreateUtil.reCreateQueue(client,MNS_QUEUE_NAME);
ReCreateUtil.reCreateTopic(client,MNS_TOPIC_NAME);
// 配置超大队列属性。
MNSExtendedConfiguration configuration = new MNSExtendedConfiguration()
.setOssClient(ossClient).setOssBucketName(OSS_BUCKET_NAME)
.setMNSQueue(queue)
.setMNSTopic(cloudTopic)
.setPayloadSizeThreshold(payloadSizeThreshold);
MNSExtendedClient mnsExtendedClient = new MNSExtendedClientImpl(configuration);
// 执行常规发送。
Message normalMessage = new Message();
normalMessage.setMessageBodyAsRawString("1");
mnsExtendedClient.sendMessage(normalMessage);
Message message = mnsExtendedClient.receiveMessage(10);
System.out.println("[normal]ReceiveMsg:"+message.getMessageBodyAsRawString());
mnsExtendedClient.deleteMessage(message.getReceiptHandle());
// 大文件发送Queue模型。
String largeMsgBody = "largeMessage";
Assert.assertTrue(largeMsgBody.getBytes().length > payloadSizeThreshold);
Message largeMessage = new Message();
largeMessage.setMessageBodyAsRawString(largeMsgBody);
mnsExtendedClient.sendMessage(largeMessage);
Message receiveMessage = mnsExtendedClient.receiveMessage(10);
System.out.println("[large]ReceiveMsg:"+receiveMessage.getMessageBodyAsRawString());
mnsExtendedClient.deleteMessage(receiveMessage.getReceiptHandle());
client.close();
ossClient.shutdown();
}
}