All Products
Search
Document Center

Simple Message Queue (formerly MNS):Send messages from one producer client to multiple consumer clients

Last Updated:Sep 02, 2024

Simple Message Queue (formerly MNS) provides a message model that allows multiple consumer clients to pull messages from one producer client. This topic describes how to use this model to send messages from one producer client to multiple consumer clients.

Note

In this example, an SDK for Java is used. For more information about SDKs for other programming languages, see Reference for New Version SDKs (Recommended).

Prerequisites

  • IntelliJ IDEA is installed. For more information, see IntelliJ IDEA.

    You can use IntelliJ IDEA or Eclipse. In this example, IntelliJ IDEA is used.

  • JDK 1.8 or later is installed. For more information, see Java Downloads.

  • Maven 2.5 or later is installed. For more information, see Downloading Apache Maven.

Background information

SMQ provides queue-based and topic-based message models that can meet your requirements in most business scenarios.

  • Queue-based message model: One client sends messages to an SMQ queue. Multiple clients consume messages by pulling messages from the queue.

  • Topic-based message model: One client sends messages to an SMQ topic. Messages are automatically pushed from the SMQ server to multiple clients.

SMQ allows clients to consume messages in real time by pushing messages from an SMQ server to clients. However, the endpoints of consumer clients are exposed to receive the message in this scenario. You cannot expose the endpoints of consumer clients in specific scenarios, such as in the private networks of an enterprise. In this case, the consumer clients can consume messages by pulling messages from the SMQ server. You can combine SMQ queues and topics to send messages from one client to multiple consumer clients without exposing the endpoints of the consumer clients.

Solution

Create a subscription to a topic by using a specific queue endpoint. Then, messages can be pushed from the topic to the queue, and consumer clients can pull the messages from the queue. In this case, you can send messages from one client to multiple clients without exposing the endpoints of the consumer clients. The following figure shows the process.消息流

Install the Java dependency library

  1. Create a Java project in IntelliJ IDEA.

  2. Add the following dependency to the pom.xml file to import the Java dependency library:

    <dependency>
        <groupId>com.aliyun.mns</groupId>
        <artifactId>aliyun-sdk-mns</artifactId>
        <version>1.1.9.2</version>
    </dependency>

API operations

SDK for Java 1.1.8 provides the CloudPullTopic class to support the preceding solution. You can call the following API operations provided by MNSClient to create a CloudPullTopic object:

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)

The following table describes the parameters that you can configure in the preceding statement.

  • TopicMeta: specifies the metadata of a topic.

  • QueueMeta: specifies the metadata of a queue.

  • queueNameList: specifies a list of queues to which the messages in a specific topic are pushed.

  • needCreateQueue: specifies whether to create the queues that are specified by the queueNameList parameter.

  • queueMetaTemplate: specifies the metadata template of a queue.

Sample code

package doc;

// Specify the SMQ classes.
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ClientException;
import java.util.Vector;

public class DemoTopicMessageBroadcast {
    public static void main(String[] args) {
      
// Obtain the AccessKey ID and AccessKey secret of the Alibaba Cloud account and the endpoint of MNS. 
    CloudAccount account = new CloudAccount(
    ServiceSettings.getMNSAccessKeyId(),
    ServiceSettings.getMNSAccessKeySecret(),
    ServiceSettings.getMNSAccountEndpoint());
    MNSClient client = account.getMNSClient();

// Create consumers. 
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);

try{

    // Create a topic. 
    String topicName = "demo-topic-for-pull";
    TopicMeta topicMeta = new TopicMeta();
    topicMeta.setTopicName(topicName);
    CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);

    // Publish a message to the topic. 
    String messageBody = "broadcast message to all the consumers:hello the world.";
    // If an original message is sent, use getMessageBodyAsRawString to parse the message body. 
    TopicMessage tMessage = new RawTopicMessage();
    tMessage.setBaseMessageBody(messageBody);
    pullTopic.publishMessage(tMessage);


    // Receive the message. 
    CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
    CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
    CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);

    Message consumer1Msg = queueForConsumer1.popMessage(30);
    if(consumer1Msg != null)
    {
        System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
    } else{
        System.out.println("the queue is empty");
    }

    Message consumer2Msg = queueForConsumer2.popMessage(30);
    if(consumer2Msg != null)
    {
        System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
    }else{
        System.out.println("the queue is empty");
    }

    Message consumer3Msg = queueForConsumer3.popMessage(30);
    if(consumer3Msg != null)
    {
        System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
    }else{
        System.out.println("the queue is empty");
    }

    // Delete the topic. 
    pullTopic.delete();
} catch(ClientException ce) {
    System.out.println("Something wrong with the network connection between client and SMQ service."
            + "Please check your network and DNS availability.");
    ce.printStackTrace();
} catch(ServiceException se) {

    se.printStackTrace();
}

    client.close();
    }
}