全部产品
Search
文档中心

轻量消息队列(原 MNS):超大消息传输

更新时间:Sep 02, 2024

当消息内容大于64 KB以至于无法直接存入SMQ队列时,不做消息切片,使用轻量消息队列(原 MNS)对象存储OSS来传递大于64 KB的消息。

背景信息

轻量消息队列(原 MNS)的队列的消息大小最大限制为64 KB,正常情况下基本能够满足控制流信息交换的需求。在某些特殊场景下,消息数据超过64 KB时就只能采用消息切片的方式。如果您不想使用消息切片,轻量消息队列(原 MNS)支持通过OSS实现超大消息的传递。

下面为您介绍如何通过OSS来传递大于64 KB的消息。

解决方案

  1. 生产者在向轻量消息队列(原 MNS)发送消息前,如果发现消息体大于64 KB,则先将消息体数据上传到OSS。

  2. 生产者把数据对应的Object信息发送到轻量消息队列(原 MNS)

  3. 消费者从轻量消息队列(原 MNS)队列里读取消息,判断消息内容是否为OSS的Object信息。

  4. 判断消息内容是OSS的Object信息,则从OSS下载对应的Object内容,并作为消息体返回给上层程序。

具体过程如下图所示。

image

注意事项

  • 大消息主要消费网络带宽,用该方案发送大消息时,生产者和消费者的网络带宽需要满足需求。

  • 大消息网络传输时间较长,受网络波动影响的概率更大,建议在上层做必要的重试。

前提条件

示例代码

示例代码下载,请参见LargeMessageDemo

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.scenarios.largeMessage.service.MNSExtendedClient;
import com.aliyun.mns.sample.scenarios.largeMessage.service.bean.MNSExtendedConfiguration;
import com.aliyun.mns.sample.scenarios.largeMessage.service.impl.MNSExtendedClientImpl;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import org.junit.Assert;

public class LargeMessageDemo {

    private final static String OSS_ENDPOINT = "oss-cn-XXX.aliyuncs.com";
    private final static String OSS_BUCKET_NAME = "mns-test-XXXXX-bucket";
    private final static String MNS_QUEUE_NAME = "test-largeMessage-queue";
    private final static String MNS_TOPIC_NAME = "test-largeMessage-topic";
    /**
     * 本文以4 KB临界值为例,大于4 KB即用OSS存储。
     */
    private final static Long payloadSizeThreshold = 4L;

    public static void main(String[] args) throws ClientException {
        // 从环境变量中获取访问凭证。
        EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();

        // 创建OSSClient实例。
        OSS ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, credentialsProvider);

        // 创建MNS实例。
        // 遵循阿里云规范,env设置ak、sk。
        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient();
        CloudQueue queue = client.getQueueRef(MNS_QUEUE_NAME);
        CloudTopic cloudTopic = client.getTopicRef(MNS_TOPIC_NAME);

        //reCreate
        ReCreateUtil.reCreateQueue(client,MNS_QUEUE_NAME);
        ReCreateUtil.reCreateTopic(client,MNS_TOPIC_NAME);

        // 配置超大队列属性。
        MNSExtendedConfiguration configuration = new MNSExtendedConfiguration()
            .setOssClient(ossClient).setOssBucketName(OSS_BUCKET_NAME)
            .setMNSQueue(queue)
            .setMNSTopic(cloudTopic)
            .setPayloadSizeThreshold(payloadSizeThreshold);

        MNSExtendedClient mnsExtendedClient = new MNSExtendedClientImpl(configuration);

        // 执行常规发送。
        Message normalMessage = new Message();
        normalMessage.setMessageBodyAsRawString("1");
        mnsExtendedClient.sendMessage(normalMessage);
        Message message = mnsExtendedClient.receiveMessage(10);
        System.out.println("[normal]ReceiveMsg:"+message.getMessageBodyAsRawString());
        mnsExtendedClient.deleteMessage(message.getReceiptHandle());

        // 大文件发送Queue模型。
        String largeMsgBody = "largeMessage";
        Assert.assertTrue(largeMsgBody.getBytes().length > payloadSizeThreshold);

        Message largeMessage = new Message();
        largeMessage.setMessageBodyAsRawString(largeMsgBody);

        mnsExtendedClient.sendMessage(largeMessage);
        Message receiveMessage = mnsExtendedClient.receiveMessage(10);
        System.out.println("[large]ReceiveMsg:"+receiveMessage.getMessageBodyAsRawString());
        mnsExtendedClient.deleteMessage(receiveMessage.getReceiptHandle());


        client.close();
        ossClient.shutdown();
    }
}