轻量消息队列(原 MNS)支持一对多拉取消息消费模型,以满足一对多订阅、主动拉取的场景。本文介绍如何高效利用该模型实现多消费者并行拉取与处理消息。
本文以Java SDK为例介绍广播拉取消息流程,其他语言SDK请参见新版SDK参考(推荐)。
前提条件
您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
背景信息
轻量消息队列(原 MNS)提供队列(Queue)和主题(Topic)两种模型,基本能满足大多数应用场景。
队列提供的是一对一的共享消息消费模型,采用客户端主动拉取(Pull)模式。
主题模型提供一对多的广播消息消费模型,采用服务端主动推送(Push)模式。
推送模式的好处是即时性能较好,但需要暴露客户端地址来接收服务端的消息推送。有些情况下有的信息,例如企业内网,无法暴露推送地址,希望改用拉取(Pull)的方式。虽然轻量消息队列(原 MNS)不直接提供这种消费模型,但可以结合主题和队列来实现一对多的拉取消息消费模型。
解决方案
通过创建订阅,让主题将消息先推送到队列,然后由消费者从队列拉取消息。这样既可以做到一对多的广播消息,又可避免暴露消费者的地址。
安装Java依赖库
在IDEA中创建一个Java工程。
在pom.xml文件中添加以下依赖引入Java依赖库。
<dependency> <groupId>com.aliyun.mns</groupId> <artifactId>aliyun-sdk-mns</artifactId> <version>1.1.9.2</version> </dependency>
接口说明
最新的Java SDK(1.1.8)中的CloudPullTopic默认支持上述解决方案。其中MNSClient提供以下接口来快速创建CloudPullTopic:
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
参数说明如下:
TopicMeta:创建主题的参数设置。
QueueMeta:创建队列的参数设置。
queueNameList:指定主题消息推送的队列名列表。
needCreateQueue:queueNameList是否需要创建。
queueMetaTemplate:创建队列需要的QueueMeta参数示例。
示例代码
广播拉取消息的示例代码如下:
package doc;
// 引入阿里云轻量消息队列(原 MNS)的相关类
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ClientException;
import java.util.Vector;
public class DemoTopicMessageBroadcast {
public static void main(String[] args) {
// 获取阿里云账户的AccessKey ID,AccessKey Secret和SMQ服务的endpoint。
CloudAccount account = new CloudAccount(
ServiceSettings.getMNSAccessKeyId(),
ServiceSettings.getMNSAccessKeySecret(),
ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();
// 创建消费者列表。
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);
try{
// 创建主题。
String topicName = "demo-topic-for-pull";
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName(topicName);
CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);
// 发布消息。
String messageBody = "broadcast message to all the consumers:hello the world.";
// 如果发送原始消息,使用getMessageBodyAsRawString解析消息体。
TopicMessage tMessage = new RawTopicMessage();
tMessage.setBaseMessageBody(messageBody);
pullTopic.publishMessage(tMessage);
// 接收消息。
CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);
Message consumer1Msg = queueForConsumer1.popMessage(30);
if(consumer1Msg != null)
{
System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
} else{
System.out.println("the queue is empty");
}
Message consumer2Msg = queueForConsumer2.popMessage(30);
if(consumer2Msg != null)
{
System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
Message consumer3Msg = queueForConsumer3.popMessage(30);
if(consumer3Msg != null)
{
System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
// 删除主题。
pullTopic.delete();
} catch(ClientException ce) {
System.out.println("Something wrong with the network connection between client and SMQ service."
+ "Please check your network and DNS availability.");
ce.printStackTrace();
} catch(ServiceException se) {
se.printStackTrace();
}
client.close();
}
}