This topic describes how to use Simple Message Queue (formerly MNS) SDK for Java to concurrently send and receive multiple messages.
Background information
A concurrency test is a performance test method. It is used to verify the performance and stability of a messaging system when the messaging system processes multiple messages or requests at the same time. In the concurrency test, you can specify the concurrency and test duration. The queries per second (QPS) is calculated based on the following formula: QPS = Total number of requests/Test duration.
Prerequisites
SMQ SDK for Java is installed. For more information, see Install SDK for Java.
An endpoint and an access credential are configured. For more information, see Configure endpoints and access credentials.
Configure the properties file
mns.accountendpoint=http://12xxxxxxxx.mns.cn-xxx.aliyuncs.com
mns.perf.queueName=Queue_Test # The name of the queue.
mns.perf.threadNum=2 # The number of concurrent threads.
mns.perf.durationTime=6 # The duration of the test. Unit: seconds.
Sample code
For more information about how to download the sample code, see JavaSDKPerfTest.java.
package com.aliyun.mns.sample.scenarios.perf;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.common.utils.ThreadUtil;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
/**
Use sample code to perform a concurrency test.
* Preparations
* 1. Configure the AccessKey ID and AccessKey secret in the environment based on Alibaba Cloud specifications.
* 2. Configure the ${"user.home"}/.aliyun-mns.properties file based on the following content:
* mns.endpoint=http://xxxxxxx
* mns.perf.queueName=JavaSDKPerfTestQueue # The name of the queue.
* mns.perf.threadNum=200 # The number of concurrent threads.
* mns.perf.durationTime=180 # The test duration. Unit: seconds.
*/
public class JavaSDKPerfTest {
private static MNSClient client = null;
private static String endpoint = null;
private static String queueName;
private static int threadNum;
/**
* The test duration. Unit: seconds.
*/
private static long durationTime;
public static void main(String[] args) throws InterruptedException {
if (!parseConf()) {
return;
}
// 1. init client
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setMaxConnections(threadNum);
clientConfiguration.setMaxConnectionsPerRoute(threadNum);
CloudAccount cloudAccount = new CloudAccount(endpoint, clientConfiguration);
client = cloudAccount.getMNSClient();
// 2. reCreateQueue
ReCreateUtil.reCreateQueue(client,queueName);
// 3. SendMessage
Function<CloudQueue,Message> sendFunction = new Function<CloudQueue, Message>() {
@Override
public Message apply(CloudQueue queue) {
Message message = new Message();
message.setMessageBody("BodyTest");
return queue.putMessage(message);
}
};
actionProcess("SendMessage", sendFunction , durationTime);
// 4. Now is the ReceiveMessage
Function<CloudQueue,Message> receiveFunction = new Function<CloudQueue, Message>() {
@Override
public Message apply(CloudQueue queue) {
Message message = queue.popMessage();
String handle = message == null?null:message.getReceiptHandle();
if (StringUtils.isNotBlank(handle)) {
queue.deleteMessage(handle);
}
return message;
}
};
actionProcess("ReceiveAndDelMessage", receiveFunction, durationTime);
client.close();
System.out.println("=======end=======");
}
private static void actionProcess(String actionName, final Function<CloudQueue, Message> function, final long durationSeconds) throws InterruptedException {
System.out.println(actionName +" start!");
final AtomicLong totalCount = new AtomicLong(0);
ThreadPoolExecutor executor = ThreadUtil.initThreadPoolExecutorAbort();
ThreadUtil.asyncWithReturn(executor, threadNum, new ThreadUtil.AsyncRunInterface() {
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
CloudQueue queue = client.getQueueRef(queueName);
Message message = new Message();
message.setMessageBody("BodyTest");
long count = 0;
Date startDate = new Date();
long startTime = startDate.getTime();
System.out.printf("[Thread%s]startTime:%s %n", threadName, getBjTime(startDate));
long endTime = startTime + durationSeconds * 1000L;
while (true) {
for (int i = 0; i < 50; ++i) {
function.apply(queue);
}
count += 50;
if (System.currentTimeMillis() >= endTime) {
break;
}
}
System.out.printf("[Thread%s]endTime:%s,count:%d %n", threadName, getBjTime(new Date()),count);
totalCount.addAndGet(count);
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
System.out.println(actionName +" QPS: "+(totalCount.get() / durationSeconds));
}
protected static boolean parseConf() {
// init the member parameters
endpoint = ServiceSettings.getMNSAccountEndpoint();
System.out.println("Endpoint: " + endpoint);
queueName = ServiceSettings.getMNSPropertyValue("perf.queueName","JavaSDKPerfTestQueue");
System.out.println("QueueName: " + queueName);
threadNum = Integer.parseInt(ServiceSettings.getMNSPropertyValue("perf.threadNum","2"));
System.out.println("ThreadNum: " + threadNum);
durationTime = Long.parseLong(ServiceSettings.getMNSPropertyValue("perf.totalSeconds","6"));
System.out.println("DurationTime: " + durationTime);
return true;
}
/**
* Query the time in UTC+8.
*/
private static String getBjTime(Date date){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return sdf.format(date);
}
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
}
}