このページは機械翻訳によるものです。内容の正確さは保証しておりません。 人力翻訳を依頼する

同時実行テストを実行する

更新日時2025-01-12 21:45

このトピックでは、Simple Message Queue(旧称 MNS) SDK for Java を使用して、複数のメッセージを同時に送受信する方法について説明します。

背景情報

同時実行テストは、パフォーマンステスト手法の 1 つです。メッセージングシステムが複数のメッセージまたはリクエストを同時に処理する場合の、メッセージングシステムのパフォーマンスと安定性を検証するために使用されます。同時実行テストでは、同時実行数とテスト期間を指定できます。1 秒あたりのクエリ数(QPS)は、次の式に基づいて計算されます。QPS = リクエストの総数 / テスト期間。

前提条件

プロパティファイルを構成するファイル

mns.accountendpoint=http://12xxxxxxxx.mns.cn-xxx.aliyuncs.com
mns.perf.queueName=Queue_Test // キューの名前。
mns.perf.threadNum=2 // 同時スレッドの数。
mns.perf.durationTime=6 // テストの期間。単位:秒。

サンプルコード

サンプルコードのダウンロード方法の詳細については、「JavaSDKPerfTest.java」をご参照ください。

package com.aliyun.mns.sample.scenarios.perf;

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.common.utils.ThreadUtil;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;

/**
 サンプルコードを使用して、同時実行テストを実行します。
 * 準備
 * 1. Alibaba Cloud の仕様に基づいて、環境に AccessKey ID と AccessKey Secret を構成します。
 * 2. 次の内容に基づいて、${"user.home"}/.aliyun-mns.properties ファイルを構成します。
 *          mns.endpoint=http://xxxxxxx
 *          mns.perf.queueName=JavaSDKPerfTestQueue // キューの名前。
 *          mns.perf.threadNum=200 // 同時スレッドの数。
 *          mns.perf.durationTime=180 // テスト期間。単位:秒。
 */
public class JavaSDKPerfTest {
    private static MNSClient client = null;

    private static String endpoint = null;

    private static String queueName;
    private static int threadNum;

    /**
     * テスト期間。単位:秒。
     */
    private static long durationTime;


    public static void main(String[] args) throws InterruptedException {
        if (!parseConf()) {
            return;
        }

        // 1. クライアントを初期化する
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMaxConnections(threadNum);
        clientConfiguration.setMaxConnectionsPerRoute(threadNum);
        CloudAccount cloudAccount = new CloudAccount(endpoint, clientConfiguration);
        client = cloudAccount.getMNSClient();

        // 2. キューを再作成する
        ReCreateUtil.reCreateQueue(client,queueName);
        // 3. メッセージを送信する
        Function<CloudQueue,Message> sendFunction = new Function<CloudQueue, Message>() {
            @Override
            public Message apply(CloudQueue queue) {
                Message message = new Message();
                message.setMessageBody("BodyTest");
                return queue.putMessage(message);
            }
        };
        actionProcess("SendMessage", sendFunction , durationTime);
        // 4. メッセージを受信する
        Function<CloudQueue,Message> receiveFunction = new Function<CloudQueue, Message>() {
            @Override
            public Message apply(CloudQueue queue) {
                Message message = queue.popMessage();
                String handle = message == null?null:message.getReceiptHandle();
                if (StringUtils.isNotBlank(handle)) {
                    queue.deleteMessage(handle);
                }
                return message;
            }
        };
        actionProcess("ReceiveAndDelMessage", receiveFunction, durationTime);



        client.close();
        System.out.println("=======end=======");
    }

    private static void actionProcess(String actionName, final Function<CloudQueue, Message> function, final long durationSeconds) throws InterruptedException {
        System.out.println(actionName +" start!");

        final AtomicLong totalCount = new AtomicLong(0);

        ThreadPoolExecutor executor = ThreadUtil.initThreadPoolExecutorAbort();
        ThreadUtil.asyncWithReturn(executor, threadNum, new ThreadUtil.AsyncRunInterface() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();

                    CloudQueue queue = client.getQueueRef(queueName);
                    Message message = new Message();
                    message.setMessageBody("BodyTest");
                    long count = 0;

                    Date startDate = new Date();
                    long startTime = startDate.getTime();

                    System.out.printf("[Thread%s]startTime:%s %n", threadName, getBjTime(startDate));
                    long endTime = startTime + durationSeconds * 1000L;
                    while (true) {
                        for (int i = 0; i < 50; ++i) {
                            function.apply(queue);
                        }
                        count += 50;

                        if (System.currentTimeMillis() >= endTime) {
                            break;
                        }
                    }

                    System.out.printf("[Thread%s]endTime:%s,count:%d %n", threadName, getBjTime(new Date()),count);

                    totalCount.addAndGet(count);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });


        executor.shutdown();
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }

        System.out.println(actionName +" QPS: "+(totalCount.get() / durationSeconds));
    }


    protected static boolean parseConf() {

        // メンバーパラメータを初期化する
        endpoint = ServiceSettings.getMNSAccountEndpoint();
        System.out.println("Endpoint: " + endpoint);

        queueName = ServiceSettings.getMNSPropertyValue("perf.queueName","JavaSDKPerfTestQueue");
        System.out.println("QueueName: " + queueName);
        threadNum = Integer.parseInt(ServiceSettings.getMNSPropertyValue("perf.threadNum","2"));
        System.out.println("ThreadNum: " + threadNum);
        durationTime = Long.parseLong(ServiceSettings.getMNSPropertyValue("perf.totalSeconds","6"));
        System.out.println("DurationTime: " + durationTime);

        return true;
    }

    /**
     * UTC+8 の時刻をクエリします。
     */
    private static String getBjTime(Date date){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        return sdf.format(date);
    }


    public interface Function<T, R> {

        /**
         * この関数を指定された引数に適用します。
         *
         * @param t 関数引数
         * @return 関数の結果
         */
        R apply(T t);

    }
}
  • 目次 (1, M)
  • 背景情報
  • 前提条件
  • プロパティファイルを構成するファイル
  • サンプルコード
フィードバック
phone お問い合わせ

Chat now with Alibaba Cloud Customer Service to assist you in finding the right products and services to meet your needs.

alicare alicarealicarealicare