本文介紹基於Java SDK提供的隊列訊息發送以及消費的並發測試案例。
並發測試說明
並發測試是一種效能測試方法,用於驗證訊息傳遞系統在同時處理多個訊息或多個使用者請求時的效能和穩定性,在並發測試中,您可以指定並發度、已耗用時間;通過發送總請求數除以已耗用時間計算得到QPS。
前提條件
配置properties檔案
mns.accountendpoint=http://12xxxxxxxx.mns.cn-xxx.aliyuncs.com
mns.perf.queueName=Queue_Test # queue名稱
mns.perf.threadNum=2 # 並發線程數
mns.perf.durationTime=6 # 測試期間(秒)
範例程式碼
範例程式碼下載,請參見JavaSDKPerfTest。
package com.aliyun.mns.sample.scenarios.perf;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.common.utils.ThreadUtil;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
/**
* 並發測試範例程式碼
* 前置要求
* 1. 遵循阿里雲規範,env設定ak、sk。
* 2. ${"user.home"}/.aliyun-mns.properties 檔案配置如下:
* mns.endpoint=http://xxxxxxx
* mns.perf.queueName=JavaSDKPerfTestQueue # queue名稱
* mns.perf.threadNum=200 # 並發線程數
* mns.perf.durationTime=180 # 測試期間(秒)
*/
public class JavaSDKPerfTest {
private static MNSClient client = null;
private static String endpoint = null;
private static String queueName;
private static int threadNum;
/**
* 測試期間(秒)
*/
private static long durationTime;
public static void main(String[] args) throws InterruptedException {
if (!parseConf()) {
return;
}
// 1. init client
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setMaxConnections(threadNum);
clientConfiguration.setMaxConnectionsPerRoute(threadNum);
CloudAccount cloudAccount = new CloudAccount(endpoint, clientConfiguration);
client = cloudAccount.getMNSClient();
// 2. reCreateQueue
ReCreateUtil.reCreateQueue(client,queueName);
// 3. SendMessage
Function<CloudQueue,Message> sendFunction = new Function<CloudQueue, Message>() {
@Override
public Message apply(CloudQueue queue) {
Message message = new Message();
message.setMessageBody("BodyTest");
return queue.putMessage(message);
}
};
actionProcess("SendMessage", sendFunction , durationTime);
// 4. Now is the ReceiveMessage
Function<CloudQueue,Message> receiveFunction = new Function<CloudQueue, Message>() {
@Override
public Message apply(CloudQueue queue) {
Message message = queue.popMessage();
String handle = message == null?null:message.getReceiptHandle();
if (StringUtils.isNotBlank(handle)) {
queue.deleteMessage(handle);
}
return message;
}
};
actionProcess("ReceiveAndDelMessage", receiveFunction, durationTime);
client.close();
System.out.println("=======end=======");
}
private static void actionProcess(String actionName, final Function<CloudQueue, Message> function, final long durationSeconds) throws InterruptedException {
System.out.println(actionName +" start!");
final AtomicLong totalCount = new AtomicLong(0);
ThreadPoolExecutor executor = ThreadUtil.initThreadPoolExecutorAbort();
ThreadUtil.asyncWithReturn(executor, threadNum, new ThreadUtil.AsyncRunInterface() {
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
CloudQueue queue = client.getQueueRef(queueName);
Message message = new Message();
message.setMessageBody("BodyTest");
long count = 0;
Date startDate = new Date();
long startTime = startDate.getTime();
System.out.printf("[Thread%s]startTime:%s %n", threadName, getBjTime(startDate));
long endTime = startTime + durationSeconds * 1000L;
while (true) {
for (int i = 0; i < 50; ++i) {
function.apply(queue);
}
count += 50;
if (System.currentTimeMillis() >= endTime) {
break;
}
}
System.out.printf("[Thread%s]endTime:%s,count:%d %n", threadName, getBjTime(new Date()),count);
totalCount.addAndGet(count);
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
System.out.println(actionName +" QPS: "+(totalCount.get() / durationSeconds));
}
protected static boolean parseConf() {
// init the member parameters
endpoint = ServiceSettings.getMNSAccountEndpoint();
System.out.println("Endpoint: " + endpoint);
queueName = ServiceSettings.getMNSPropertyValue("perf.queueName","JavaSDKPerfTestQueue");
System.out.println("QueueName: " + queueName);
threadNum = Integer.parseInt(ServiceSettings.getMNSPropertyValue("perf.threadNum","2"));
System.out.println("ThreadNum: " + threadNum);
durationTime = Long.parseLong(ServiceSettings.getMNSPropertyValue("perf.totalSeconds","6"));
System.out.println("DurationTime: " + durationTime);
return true;
}
/**
* 擷取北京時間
*/
private static String getBjTime(Date date){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return sdf.format(date);
}
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
}
}