All Products
Search
Document Center

Simple Message Queue (formerly MNS):Transmit oversized messages

Last Updated:Sep 02, 2024

If the size of a message exceeds 64 KB and cannot be directly stored in a Simple Message Queue (formerly MNS) queue, you can use SMQ and Object Storage Service to transmit the message without the need to slice the message.

Background information

You can send a message whose size is up to 64 KB to a SMQ queue. In most cases, this can meet the basic requirements of message transmission. In some special scenarios, however, you must slice a message whose size is larger than 64 KB to send the message. If you do not want to slice the message, you can use SMQ together with OSS to transmit the message.

The following example shows how to use OSS to transmit a message whose size is larger than 64 KB without the need to slice the messages.

Solution

  1. Before the message producer sends a message to a SMQ queue, the message producer uploads the message body as an object to OSS if the size of the message body is larger than 64 KB.

  2. The message producer sends the object to the SMQ queue.

  3. The message consumer reads the message from the SMQ queue and checks whether the message content is the OSS object.

  4. If the message content is the OSS object, the message consumer downloads the OSS object and returns the message content to the upper-layer application.

The following figure shows the message transmission process.

image

Usage notes

  • Oversized messages mainly consume the network bandwidth. When you use this solution to send oversized messages, the network bandwidth of both the message producer and message consumer must meet the requirements.

  • Transmitting oversized messages is time-consuming and may be affected by network jitter. We recommend that you perform retries at the upper layer if errors occur.

Prerequisites

Sample code

For more information about how to download the sample code, see LargeMessageDemo.java.

package com.aliyun.mns.sample.scenarios.largeMessage;

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.scenarios.largeMessage.service.MNSExtendedClient;
import com.aliyun.mns.sample.scenarios.largeMessage.service.bean.MNSExtendedConfiguration;
import com.aliyun.mns.sample.scenarios.largeMessage.service.impl.MNSExtendedClientImpl;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import org.junit.Assert;

public class LargeMessageDemo {

    private final static String OSS_ENDPOINT = "oss-cn-XXX.aliyuncs.com";
    private final static String OSS_BUCKET_NAME = "mns-test-XXXXX-bucket";
    private final static String MNS_QUEUE_NAME = "test-largeMessage-queue";
    private final static String MNS_TOPIC_NAME = "test-largeMessage-topic";
    /**
     * In this example, messages whose size is larger than 4 KB are sent to OSS. 
     */
    private final static Long payloadSizeThreshold = 4L;

    public static void main(String[] args) throws ClientException {
        // Obtain access credentials from environment variables. 
        EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();

        // Create an OSSClient instance. 
        OSS ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, credentialsProvider);

        // Create an MNS instance. 
        // Configure the AccessKey ID and AccessKey secret in the environment based on Alibaba Cloud specifications. 
        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient();
        CloudQueue queue = client.getQueueRef(MNS_QUEUE_NAME);
        CloudTopic cloudTopic = client.getTopicRef(MNS_TOPIC_NAME);

        //reCreate
        ReCreateUtil.reCreateQueue(client,MNS_QUEUE_NAME);
        ReCreateUtil.reCreateTopic(client,MNS_TOPIC_NAME);

        // Configure the attributes of a queue for oversized messages. 
        MNSExtendedConfiguration configuration = new MNSExtendedConfiguration()
            .setOssClient(ossClient).setOssBucketName(OSS_BUCKET_NAME)
            .setMNSQueue(queue)
            .setMNSTopic(cloudTopic)
            .setPayloadSizeThreshold(payloadSizeThreshold);

        MNSExtendedClient mnsExtendedClient = new MNSExtendedClientImpl(configuration);

        // Send messages of the regular size to the queue. 
        Message normalMessage = new Message();
        normalMessage.setMessageBodyAsRawString("1");
        mnsExtendedClient.sendMessage(normalMessage);
        Message message = mnsExtendedClient.receiveMessage(10);
        System.out.println("[normal]ReceiveMsg:"+message.getMessageBodyAsRawString());
        mnsExtendedClient.deleteMessage(message.getReceiptHandle());

        // Send oversized messages to the queue. 
        String largeMsgBody = "largeMessage";
        Assert.assertTrue(largeMsgBody.getBytes().length > payloadSizeThreshold);

        Message largeMessage = new Message();
        largeMessage.setMessageBodyAsRawString(largeMsgBody);

        mnsExtendedClient.sendMessage(largeMessage);
        Message receiveMessage = mnsExtendedClient.receiveMessage(10);
        System.out.println("[large]ReceiveMsg:"+receiveMessage.getMessageBodyAsRawString());
        mnsExtendedClient.deleteMessage(receiveMessage.getReceiptHandle());


        client.close();
        ossClient.shutdown();
    }
}