Redisパイプラインメカニズムを使用して、バッチ操作を実行したり、コマンド実行のパフォーマンスを向上させたりできます。 パイプラインを使用すると、複数のコマンドを同時にサーバーに送信できるため、ネットワークの待ち時間が短縮され、パフォーマンスが向上します。 Tair (Redis OSS互換) は、オープンソースRedisのパイプライン技術をサポートしています。
パイプライン化の概要
通常、ピンポンモードは、クライアントとRedisサーバー間の通信で使用されます。 このモードでは、クライアントが最後のコマンドに対する応答をサーバから受信するまで、クライアントはコマンドを発行しません。
さらに、Redisは、クライアントバッチが応答を待たずにコマンドを発行するパイプラインモードを提供します。 クライアントは、応答を受信した後、それらを順番にコマンドと照合し、結果をフロントエンドに返す。
次の図は、ピンポンモードとパイプラインモードの動作を示しています。
パイプライン化は、往復時間 (RTT) ならびにread()
およびwrite()
システムコールの数に関連するネットワーク待ち時間を低減することによって、システム効率および性能を改善する。
パイプラインは、複数のコマンドをサーバーにすばやく送信する必要があり、応答がすぐに必要ないシナリオで便利です。 したがって、パイプライン処理は、システム性能を最適化するためのバッチ処理ツールとして使用することができる。
パイプラインモードでは、パイプラインはクライアントとサーバー間の接続のみを使用し、パイプラインが閉じられるまで非パイプライン操作を実行できません。 他の操作を同時に実行するには、パイプラインを他の操作から分離するための専用接続を確立します。
詳細については、「Redisパイプライン化」をご参照ください。
注意事項
パイプライン化は原子性を保証しない。
パイプラインモードでは、クライアントバッチはコマンドを発行しますが、サーバーは個々のコマンドを解決して順番に実行します。 その結果、サーバは、他のクライアントからのコマンドを実行することもできる。 アトミック性を確保するために、トランザクションスクリプトまたはLuaスクリプトを使用できます。
エラーが発生した場合、パイプラインはロールバックをサポートしません。
実行するコマンドが互いに依存している場合は、パイプラインを使用しないでください。
説明redis-pyなどの特定のクライアントは、MULTIやEXECなどのトランザクションコマンドを使用してパイプラインをシミュレートします。 パイプラインとトランザクションを区別し、特定の要件に基づいて適切に使用することが重要です。 設定すると、エラーが発生します。 トランザクションの制限の詳細については、「トランザクション」をご参照ください。
多数のコマンドが実行される場合、サーバおよび特定のクライアントのバッファ制限のために、パイプライン処理がうまく実行されない可能性がある。
パイプライン化は、基本的に、サーバアーキテクチャから独立したクライアント − サーバ対話パターンである。 プロキシモードのクラスターインスタンス、直接接続モードのクラスターインスタンス、および読み書き分離インスタンスはすべてパイプライン化をサポートしています。
説明クラスタアーキテクチャには固有の限界がある。 例えば、クラスタアーキテクチャは、コマンド内のクロススロット鍵アクセスをサポートしない。 クライアントが接続されている現在のノードに属していないキーにクライアントがアクセスしようとすると、
-MOVED
エラーが発生します。 クラスターアーキテクチャでパイプラインを使用する場合は、パイプライン内のコマンドがクラスターアーキテクチャの要件を満たしていることを確認してください。 詳細については、「クラスターインスタンスおよび読み書き分離インスタンスでサポートされているコマンドの制限」をご参照ください。
サンプルコード
性能比較
次のコードは、パイプラインの有無による操作のパフォーマンス比較を示しています。
package pipeline.kvstore.aliyun.com;
import java.util.Date;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class RedisPipelinePerformanceTest {
static final String host = "xxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password = "password";
public static void main(String[] args) {
Jedis jedis = new Jedis(host, port);
// Specify the password used to connect to the instance.
String authString = jedis.auth(password);// password
if (!authString.equals("OK")) {
System.err.println("AUTH Failed: " + authString);
jedis.close();
return;
}
//Run several commands consecutively.
final int COUNT=5000;
String key = "KVStore-Tanghan";
//1 ---Without pipelines---
jedis.del(key);//Initialize the key.
Date ts1 = new Date();
for (int i = 0; i < COUNT; i++) {
//Send a request and receive a response.
jedis.incr(key);
}
Date ts2 = new Date();
System.out.println("Without pipelines > value is: "+jedis.get(key)+" > Time elapsed: " + (ts2.getTime() - ts1.getTime())+ "ms");
//2 ----With pipelines---
jedis.del(key);//Initialize the key.
Pipeline p1 = jedis.pipelined();
Date ts3 = new Date();
for (int i = 0; i < COUNT; i++) {
//Send the request.
p1.incr(key);
}
//Receive the response.
p1.sync();
Date ts4 = new Date();
System.out.println("With pipelines > value is:"+jedis.get(key)+" > Time elapsed:" + (ts4.getTime() - ts3.getTime())+ "ms");
jedis.close();
}
}
正しいエンドポイントとパスワードを入力してTair (Redis-compatible) インスタンスに接続し、上記のJavaコードを実行すると、次の出力が表示されます。 出力は、パイプラインによってパフォーマンスが向上することを示しています。
Without pipelines > value: 5,000 > Time elapsed: 5,844 ms
With pipelines > value: 5000 > Time elapsed: 78 ms
応答を処理するメソッド
Jedisでパイプラインを使用する場合、2つのメソッドを使用して応答を処理できます。 詳細については、次のサンプルコードをご参照ください。
package pipeline.kvstore.aliyun.com;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
public class PipelineClientTest {
static final String host = "xxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password = "password";
public static void main(String[] args) {
Jedis jedis = new Jedis(host, port);
// Specify the password used to connect to the instance.
String authString = jedis.auth(password);// password
if (!authString.equals("OK")) {
System.err.println("AUTH Failed: " + authString);
jedis.close();
return;
}
String key = "KVStore-Test1";
jedis.del(key);// Initialize the key.
//-------- Method 1
Pipeline p1 = jedis.pipelined();
System.out.println("-----Method 1-----");
for (int i = 0; i < 5; i++) {
p1.incr(key);
System.out.println("Pipeline sends requests");
}
// After the pipeline sends all requests, the client starts to receive responses.
System.out.println("Requests sent. Starting to receive responses");
List<Object> responses = p1.syncAndReturnAll();
if (responses == null || responses.isEmpty()) {
jedis.close();
throw new RuntimeException("Pipeline error: no responses received");
}
for (Object resp : responses) {
System.out.println("Pipeline receives responses: " + resp.toString());
}
System.out.println();
//-------- Method 2
System.out.println("-----Method 2-----");
jedis.del(key);// Initialize the key.
Pipeline p2 = jedis.pipelined();
// Declare the responses first.
Response<Long> r1 = p2.incr(key);
System.out.println("Pipeline sends requests");
Response<Long> r2 = p2.incr(key);
System.out.println("Pipeline sends requests");
Response<Long> r3 = p2.incr(key);
System.out.println("Pipeline sends requests");
Response<Long> r4 = p2.incr(key);
System.out.println("Pipeline sends requests");
Response<Long> r5 = p2.incr(key);
System.out.println("Pipeline sends requests");
try{
r1.get(); // Errors occur because the client has not started to receive responses.
}catch(Exception e){
System.out.println(" <<< Pipeline error: The client has not started to receive responses. >>> ");
}
// After the pipeline sends all requests, the client starts to receive responses.
System.out.println("Requests sent. Starting to receive responses");
p2.sync();
System.out.println("Pipeline receives responses: " + r1.get());
System. out. println ("Pipeline receives responses: " + r2.get ());
System. out. println ("Pipeline receives responses: " + r3.get ());
System. out. println ("Pipeline receives responses: " + r4.get ());
System. out. println ("Pipeline receives responses: " + r5.get ());
jedis.close();
}
}
正しいエンドポイントとパスワードを入力してTair (Redis OSS互換) インスタンスに接続し、上記のJavaコードを実行すると、次の出力が表示されます。
----- Method 1 -----
Pipeline sends requests
Pipeline sends requests
Pipeline sends requests
Pipeline sends requests
Pipeline sends requests
After the pipeline sends all requests, the client starts to receive responses.
Pipeline receives responses: 1
Pipeline receives responses: 2
Pipeline receives responses: 3
Pipeline receives responses: 4
Pipeline receives responses: 5
----- Method 2 -----
Pipeline sends requests
Pipeline sends requests
Pipeline sends requests
Pipeline sends requests
Pipeline sends requests
<Pipeline error: The client has not started to receive responses>
After the pipeline sends all requests, the client starts to receive responses.
Pipeline receives responses: 1
Pipeline receives responses: 2
Pipeline receives responses: 3
Pipeline receives responses: 4
Pipeline receives responses: 5