全部產品
Search
文件中心

Simple Message Queue (formerly MNS):超大訊息傳輸

更新時間:Sep 03, 2024

當訊息內容大於64 KB以至於無法直接存入SMQ隊列時,不做訊息切片,使用輕量訊息佇列(原 MNS)Object Storage Service來傳遞大於64 KB的訊息。

背景資訊

輕量訊息佇列(原 MNS)的隊列的訊息大小最大限制為64 KB,正常情況下基本能夠滿足控制流程資訊交換的需求。在某些特殊情境下,訊息資料超過64 KB時就只能採用訊息切片的方式。如果您不想使用訊息切片,輕量訊息佇列(原 MNS)支援通過OSS實現超大訊息的傳遞。

下面為您介紹如何通過OSS來傳遞大於64 KB的訊息。

解決方案

  1. 生產者在向輕量訊息佇列(原 MNS)發送訊息前,如果發現訊息體大於64 KB,則先將訊息體資料上傳到OSS。

  2. 生產者把資料對應的Object資訊發送到輕量訊息佇列(原 MNS)

  3. 消費者從輕量訊息佇列(原 MNS)隊列裡讀取訊息,判斷訊息內容是否為OSS的Object資訊。

  4. 判斷訊息內容是OSS的Object資訊,則從OSS下載對應的Object內容,並作為訊息體返回給上層程式。

具體過程如下圖所示。

注意事項

  • 大訊息主要消費網路頻寬,用該方案發送大訊息時,生產者和消費者的網路頻寬需要滿足需求。

  • 大訊息網路傳輸時間較長,受網路波動影響的機率更大,建議在上層做必要的重試。

前提條件

範例程式碼

範例程式碼下載,請參見LargeMessageDemo

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.scenarios.largeMessage.service.MNSExtendedClient;
import com.aliyun.mns.sample.scenarios.largeMessage.service.bean.MNSExtendedConfiguration;
import com.aliyun.mns.sample.scenarios.largeMessage.service.impl.MNSExtendedClientImpl;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import org.junit.Assert;

public class LargeMessageDemo {

    private final static String OSS_ENDPOINT = "oss-cn-XXX.aliyuncs.com";
    private final static String OSS_BUCKET_NAME = "mns-test-XXXXX-bucket";
    private final static String MNS_QUEUE_NAME = "test-largeMessage-queue";
    private final static String MNS_TOPIC_NAME = "test-largeMessage-topic";
    /**
     * 本文以4 KB臨界值為例,大於4 KB即用OSS儲存。
     */
    private final static Long payloadSizeThreshold = 4L;

    public static void main(String[] args) throws ClientException {
        // 從環境變數中擷取訪問憑證。
        EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();

        // 建立OSSClient執行個體。
        OSS ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, credentialsProvider);

        // 建立MNS執行個體。
        // 遵循阿里雲規範,env設定ak、sk。
        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient();
        CloudQueue queue = client.getQueueRef(MNS_QUEUE_NAME);
        CloudTopic cloudTopic = client.getTopicRef(MNS_TOPIC_NAME);

        //reCreate
        ReCreateUtil.reCreateQueue(client,MNS_QUEUE_NAME);
        ReCreateUtil.reCreateTopic(client,MNS_TOPIC_NAME);

        // 配置超大隊列屬性。
        MNSExtendedConfiguration configuration = new MNSExtendedConfiguration()
            .setOssClient(ossClient).setOssBucketName(OSS_BUCKET_NAME)
            .setMNSQueue(queue)
            .setMNSTopic(cloudTopic)
            .setPayloadSizeThreshold(payloadSizeThreshold);

        MNSExtendedClient mnsExtendedClient = new MNSExtendedClientImpl(configuration);

        // 執行常規發送。
        Message normalMessage = new Message();
        normalMessage.setMessageBodyAsRawString("1");
        mnsExtendedClient.sendMessage(normalMessage);
        Message message = mnsExtendedClient.receiveMessage(10);
        System.out.println("[normal]ReceiveMsg:"+message.getMessageBodyAsRawString());
        mnsExtendedClient.deleteMessage(message.getReceiptHandle());

        // 大檔案發送Queue模型。
        String largeMsgBody = "largeMessage";
        Assert.assertTrue(largeMsgBody.getBytes().length > payloadSizeThreshold);

        Message largeMessage = new Message();
        largeMessage.setMessageBodyAsRawString(largeMsgBody);

        mnsExtendedClient.sendMessage(largeMessage);
        Message receiveMessage = mnsExtendedClient.receiveMessage(10);
        System.out.println("[large]ReceiveMsg:"+receiveMessage.getMessageBodyAsRawString());
        mnsExtendedClient.deleteMessage(receiveMessage.getReceiptHandle());


        client.close();
        ossClient.shutdown();
    }
}