All Products
Search
Document Center

Simple Message Queue (formerly MNS):Perform a concurrency test

Last Updated:Sep 02, 2024

This topic describes how to use Simple Message Queue (formerly MNS) SDK for Java to concurrently send and receive multiple messages.

Background information

A concurrency test is a performance test method. It is used to verify the performance and stability of a messaging system when the messaging system processes multiple messages or requests at the same time. In the concurrency test, you can specify the concurrency and test duration. The queries per second (QPS) is calculated based on the following formula: QPS = Total number of requests/Test duration.

Prerequisites

Configure the properties file

mns.accountendpoint=http://12xxxxxxxx.mns.cn-xxx.aliyuncs.com
mns.perf.queueName=Queue_Test # The name of the queue.
mns.perf.threadNum=2 # The number of concurrent threads.
mns.perf.durationTime=6 # The duration of the test. Unit: seconds.

Sample code

For more information about how to download the sample code, see JavaSDKPerfTest.java.

package com.aliyun.mns.sample.scenarios.perf;

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.common.utils.ThreadUtil;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;

/**
 Use sample code to perform a concurrency test.
 * Preparations
 * 1. Configure the AccessKey ID and AccessKey secret in the environment based on Alibaba Cloud specifications. 
 * 2. Configure the ${"user.home"}/.aliyun-mns.properties file based on the following content:
 *          mns.endpoint=http://xxxxxxx
 *          mns.perf.queueName=JavaSDKPerfTestQueue # The name of the queue.
 *          mns.perf.threadNum=200 # The number of concurrent threads.
 *          mns.perf.durationTime=180 # The test duration. Unit: seconds.
 */
public class JavaSDKPerfTest {
    private static MNSClient client = null;

    private static String endpoint = null;

    private static String queueName;
    private static int threadNum;

    /**
     * The test duration. Unit: seconds.
     */
    private static long durationTime;


    public static void main(String[] args) throws InterruptedException {
        if (!parseConf()) {
            return;
        }

        // 1. init client
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMaxConnections(threadNum);
        clientConfiguration.setMaxConnectionsPerRoute(threadNum);
        CloudAccount cloudAccount = new CloudAccount(endpoint, clientConfiguration);
        client = cloudAccount.getMNSClient();

        // 2. reCreateQueue
        ReCreateUtil.reCreateQueue(client,queueName);
        // 3. SendMessage
        Function<CloudQueue,Message> sendFunction = new Function<CloudQueue, Message>() {
            @Override
            public Message apply(CloudQueue queue) {
                Message message = new Message();
                message.setMessageBody("BodyTest");
                return queue.putMessage(message);
            }
        };
        actionProcess("SendMessage", sendFunction , durationTime);
        // 4. Now is the ReceiveMessage
        Function<CloudQueue,Message> receiveFunction = new Function<CloudQueue, Message>() {
            @Override
            public Message apply(CloudQueue queue) {
                Message message = queue.popMessage();
                String handle = message == null?null:message.getReceiptHandle();
                if (StringUtils.isNotBlank(handle)) {
                    queue.deleteMessage(handle);
                }
                return message;
            }
        };
        actionProcess("ReceiveAndDelMessage", receiveFunction, durationTime);



        client.close();
        System.out.println("=======end=======");
    }

    private static void actionProcess(String actionName, final Function<CloudQueue, Message> function, final long durationSeconds) throws InterruptedException {
        System.out.println(actionName +" start!");

        final AtomicLong totalCount = new AtomicLong(0);

        ThreadPoolExecutor executor = ThreadUtil.initThreadPoolExecutorAbort();
        ThreadUtil.asyncWithReturn(executor, threadNum, new ThreadUtil.AsyncRunInterface() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();

                    CloudQueue queue = client.getQueueRef(queueName);
                    Message message = new Message();
                    message.setMessageBody("BodyTest");
                    long count = 0;

                    Date startDate = new Date();
                    long startTime = startDate.getTime();

                    System.out.printf("[Thread%s]startTime:%s %n", threadName, getBjTime(startDate));
                    long endTime = startTime + durationSeconds * 1000L;
                    while (true) {
                        for (int i = 0; i < 50; ++i) {
                            function.apply(queue);
                        }
                        count += 50;

                        if (System.currentTimeMillis() >= endTime) {
                            break;
                        }
                    }

                    System.out.printf("[Thread%s]endTime:%s,count:%d %n", threadName, getBjTime(new Date()),count);

                    totalCount.addAndGet(count);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });


        executor.shutdown();
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }

        System.out.println(actionName +" QPS: "+(totalCount.get() / durationSeconds));
    }


    protected static boolean parseConf() {

        // init the member parameters
        endpoint = ServiceSettings.getMNSAccountEndpoint();
        System.out.println("Endpoint: " + endpoint);

        queueName = ServiceSettings.getMNSPropertyValue("perf.queueName","JavaSDKPerfTestQueue");
        System.out.println("QueueName: " + queueName);
        threadNum = Integer.parseInt(ServiceSettings.getMNSPropertyValue("perf.threadNum","2"));
        System.out.println("ThreadNum: " + threadNum);
        durationTime = Long.parseLong(ServiceSettings.getMNSPropertyValue("perf.totalSeconds","6"));
        System.out.println("DurationTime: " + durationTime);

        return true;
    }

    /**
     * Query the time in UTC+8.
     */
    private static String getBjTime(Date date){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        return sdf.format(date);
    }


    public interface Function<T, R> {

        /**
         * Applies this function to the given argument.
         *
         * @param t the function argument
         * @return the function result
         */
        R apply(T t);

    }
}