輕量訊息佇列(原 MNS)支援一對多拉取訊息消費模型,以滿足一對多訂閱、主動拉取的情境。本文介紹如何高效利用該模型實現多消費者並行拉取與處理訊息。
本文以Java SDK為例介紹廣播拉取訊息流程程,其他語言SDK請參見新版SDK參考(推薦)。
前提條件
您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
背景資訊
輕量訊息佇列(原 MNS)提供隊列(Queue)和主題(Topic)兩種模型,基本能滿足大多數應用情境。
隊列提供的是一對一的共用訊息消費模型,採用用戶端主動拉取(Pull)模式。
主題模型提供一對多的廣播訊息消費模型,採用服務端主動推送(Push)模式。
推送模式的好處是即時效能較好,但需要暴露用戶端地址來接收服務端的訊息推送。有些情況下有的資訊,例如企業內網,無法暴露推送地址,希望改用拉取(Pull)的方式。雖然輕量訊息佇列(原 MNS)不直接提供這種消費模型,但可以結合主題和隊列來實現一對多的拉取訊息消費模型。
解決方案
通過建立訂閱,讓主題將訊息先推送到隊列,然後由消費者從隊列拉取訊息。這樣既可以做到一對多的廣播訊息,又可避免暴露消費者的地址。
安裝Java依賴庫
在IDEA中建立一個Java工程。
在pom.xml檔案中添加以下依賴引入Java依賴庫。
<dependency> <groupId>com.aliyun.mns</groupId> <artifactId>aliyun-sdk-mns</artifactId> <version>1.1.9.2</version> </dependency>
介面說明
最新的Java SDK(1.1.8)中的CloudPullTopic預設支援上述解決方案。其中MNSClient提供以下介面來快速建立CloudPullTopic:
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
參數說明如下:
TopicMeta:建立主題的參數設定。
QueueMeta:建立隊列的參數設定。
queueNameList:指定主題訊息推送的隊列名列表。
needCreateQueue:queueNameList是否需要建立。
queueMetaTemplate:建立隊列需要的QueueMeta參數樣本。
範例程式碼
廣播拉取訊息的範例程式碼如下:
package doc;
// 引入阿里雲輕量訊息佇列(原 MNS)的相關類
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ClientException;
import java.util.Vector;
public class DemoTopicMessageBroadcast {
public static void main(String[] args) {
// 擷取阿里雲賬戶的AccessKey ID,AccessKey Secret和SMQ服務的endpoint。
CloudAccount account = new CloudAccount(
ServiceSettings.getMNSAccessKeyId(),
ServiceSettings.getMNSAccessKeySecret(),
ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();
// 建立消費者列表。
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);
try{
// 建立主題。
String topicName = "demo-topic-for-pull";
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName(topicName);
CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);
// 發布訊息。
String messageBody = "broadcast message to all the consumers:hello the world.";
// 如果發送原始訊息,使用getMessageBodyAsRawString解析訊息體。
TopicMessage tMessage = new RawTopicMessage();
tMessage.setBaseMessageBody(messageBody);
pullTopic.publishMessage(tMessage);
// 接收訊息。
CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);
Message consumer1Msg = queueForConsumer1.popMessage(30);
if(consumer1Msg != null)
{
System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
} else{
System.out.println("the queue is empty");
}
Message consumer2Msg = queueForConsumer2.popMessage(30);
if(consumer2Msg != null)
{
System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
Message consumer3Msg = queueForConsumer3.popMessage(30);
if(consumer3Msg != null)
{
System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
// 刪除主題。
pullTopic.delete();
} catch(ClientException ce) {
System.out.println("Something wrong with the network connection between client and SMQ service."
+ "Please check your network and DNS availability.");
ce.printStackTrace();
} catch(ServiceException se) {
se.printStackTrace();
}
client.close();
}
}