當訊息內容大於64 KB以至於無法直接存入SMQ隊列時,不做訊息切片,使用輕量訊息佇列(原 MNS)和Object Storage Service來傳遞大於64 KB的訊息。
背景資訊
輕量訊息佇列(原 MNS)的隊列的訊息大小最大限制為64 KB,正常情況下基本能夠滿足控制流程資訊交換的需求。在某些特殊情境下,訊息資料超過64 KB時就只能採用訊息切片的方式。如果您不想使用訊息切片,輕量訊息佇列(原 MNS)支援通過OSS實現超大訊息的傳遞。
下面為您介紹如何通過OSS來傳遞大於64 KB的訊息。
解決方案
生產者在向輕量訊息佇列(原 MNS)發送訊息前,如果發現訊息體大於64 KB,則先將訊息體資料上傳到OSS。
生產者把資料對應的Object資訊發送到輕量訊息佇列(原 MNS)。
消費者從輕量訊息佇列(原 MNS)隊列裡讀取訊息,判斷訊息內容是否為OSS的Object資訊。
判斷訊息內容是OSS的Object資訊,則從OSS下載對應的Object內容,並作為訊息體返回給上層程式。
具體過程如下圖所示。
注意事項
大訊息主要消費網路頻寬,用該方案發送大訊息時,生產者和消費者的網路頻寬需要滿足需求。
大訊息網路傳輸時間較長,受網路波動影響的機率更大,建議在上層做必要的重試。
前提條件
範例程式碼
範例程式碼下載,請參見LargeMessageDemo。
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.scenarios.largeMessage.service.MNSExtendedClient;
import com.aliyun.mns.sample.scenarios.largeMessage.service.bean.MNSExtendedConfiguration;
import com.aliyun.mns.sample.scenarios.largeMessage.service.impl.MNSExtendedClientImpl;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import org.junit.Assert;
public class LargeMessageDemo {
private final static String OSS_ENDPOINT = "oss-cn-XXX.aliyuncs.com";
private final static String OSS_BUCKET_NAME = "mns-test-XXXXX-bucket";
private final static String MNS_QUEUE_NAME = "test-largeMessage-queue";
private final static String MNS_TOPIC_NAME = "test-largeMessage-topic";
/**
* 本文以4 KB臨界值為例,大於4 KB即用OSS儲存。
*/
private final static Long payloadSizeThreshold = 4L;
public static void main(String[] args) throws ClientException {
// 從環境變數中擷取訪問憑證。
EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
// 建立OSSClient執行個體。
OSS ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, credentialsProvider);
// 建立MNS執行個體。
// 遵循阿里雲規範,env設定ak、sk。
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(MNS_QUEUE_NAME);
CloudTopic cloudTopic = client.getTopicRef(MNS_TOPIC_NAME);
//reCreate
ReCreateUtil.reCreateQueue(client,MNS_QUEUE_NAME);
ReCreateUtil.reCreateTopic(client,MNS_TOPIC_NAME);
// 配置超大隊列屬性。
MNSExtendedConfiguration configuration = new MNSExtendedConfiguration()
.setOssClient(ossClient).setOssBucketName(OSS_BUCKET_NAME)
.setMNSQueue(queue)
.setMNSTopic(cloudTopic)
.setPayloadSizeThreshold(payloadSizeThreshold);
MNSExtendedClient mnsExtendedClient = new MNSExtendedClientImpl(configuration);
// 執行常規發送。
Message normalMessage = new Message();
normalMessage.setMessageBodyAsRawString("1");
mnsExtendedClient.sendMessage(normalMessage);
Message message = mnsExtendedClient.receiveMessage(10);
System.out.println("[normal]ReceiveMsg:"+message.getMessageBodyAsRawString());
mnsExtendedClient.deleteMessage(message.getReceiptHandle());
// 大檔案發送Queue模型。
String largeMsgBody = "largeMessage";
Assert.assertTrue(largeMsgBody.getBytes().length > payloadSizeThreshold);
Message largeMessage = new Message();
largeMessage.setMessageBodyAsRawString(largeMsgBody);
mnsExtendedClient.sendMessage(largeMessage);
Message receiveMessage = mnsExtendedClient.receiveMessage(10);
System.out.println("[large]ReceiveMsg:"+receiveMessage.getMessageBodyAsRawString());
mnsExtendedClient.deleteMessage(receiveMessage.getReceiptHandle());
client.close();
ossClient.shutdown();
}
}