當您有大量操作、提高命令執行效能等需求時,您可以使用Redis管道傳輸(Pipeline,後面稱為Pipeline)機制。Pipeline可以將多個命令同時發給服務端,減少網路延遲,並提高效能。雲資料庫Tair(相容 Redis)支援原生Redis Pipeline。
Pipeline簡介
通常情況下,用戶端與Redis服務端通訊時採用的是Ping-pong網路互動模式,Ping-pong模式是指用戶端(Client)發送一個命令後會等待命令的執行結果,在用戶端收到伺服器端(Server)返回的結果後,再發送下一個命令,以此類推。
Redis也支援Pipeline模式,不同於Ping-pong模式,Pipeline模式類似流水線的工作模式:用戶端發送一個命令後無需等待執行結果,會繼續發送其他命令;在全部請求發送完畢後,用戶端關閉請求,開始接收響應,收到執行結果後再與之前發送的命令按順序進行一一匹配。在Pipeline模式的具體實現中,大部分Redis用戶端採用批處理的方式,即一次發送多個命令,在接收完所有命令執行結果後再返回給上層業務。
下圖為Ping-pong模式與Pipeline模式的網路通訊示意圖。
使用Pipeline可通過降低網路往返時延(Round-trip time,簡稱RTT),減少read()
和write()
的系統調用以及進程環境切換次數,以提升程式的執行效率與效能。
Pipeline在某些情境下非常有效,例如有多個操作命令需要被迅速提交至伺服器端,但使用者並不依賴每個操作返回的響應結果,對結果響應也無需立即獲得,那麼Pipeline就可以用來作為最佳化效能的批處理工具。
使用Pipeline時用戶端將獨佔與伺服器端的串連,此期間將不能進行其他“非Pipeline”類型操作,直至Pipeline被關閉;如果要同時執行其他動作,可以為Pipeline操作單獨建立一個串連,將其與常規操作分開。
更多資訊,請參見Redis pipeline。
注意事項
Pipeline不能保證原子性。
Pipeline模式只是將用戶端發送命令的方式改為發送批量命令,而服務端在處理批量命令的資料流時,仍然是解析出多個單命令並按順序執行,各個命令相互獨立,即服務端仍有可能在該過程中執行其他用戶端的命令。如需保證原子性,請使用事務或Lua指令碼。
若Pipeline執行過程中發生錯誤,不支援復原。
Pipeline沒有事務的特性,如待執行命令的前後存在依賴關係,請勿使用Pipeline。
說明某些用戶端(例如redis-py)在實現Pipeline時使用事務命令MULTI、EXEC進行偽裝,請您在使用過程中關注Pipeline與事務的區別,否則可能會產生報錯,關於事務的限制請參見Redis transactions。
由於服務端以及部分用戶端存在緩衝區限制,建議單次Pipeline中不要使用過多的命令。
Pipeline的本質為用戶端與服務端的互動模式,與服務端的架構無關,因此叢集架構代理模式、叢集架構直連模式以及讀寫分離架構執行個體均支援Pipeline。
說明由於叢集架構本身具有一定限制,例如不支援在單個命令中訪問跨Slot的Key、當訪問到不屬於本節點的資料時會產生
-MOVED
錯誤等,請在叢集架構中使用Pipeline時確保Pipeline內部的命令符合叢集架構的可執行條件,具體限制請參見叢集架構與讀寫分離架構執行個體的命令限制。
程式碼範例
效能對比
如下代碼將示範使用Pipeline與不使用Pipeline的效能對比。
package pipeline.kvstore.aliyun.com;
import java.util.Date;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class RedisPipelinePerformanceTest {
static final String host = "xxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password = "password";
public static void main(String[] args) {
Jedis jedis = new Jedis(host, port);
//ApsaraDB for Redis的執行個體密碼
String authString = jedis.auth(password);// password
if (!authString.equals("OK")) {
System.err.println("AUTH Failed: " + authString);
jedis.close();
return;
}
//連續執行多次命令操作
final int COUNT=5000;
String key = "KVStore-Tanghan";
// 1 ---不使用pipeline操作---
jedis.del(key);//初始化key
Date ts1 = new Date();
for (int i = 0; i < COUNT; i++) {
//發送一個請求,並接收一個響應(Send Request and Receive Response)
jedis.incr(key);
}
Date ts2 = new Date();
System.out.println("不用Pipeline > value為:"+jedis.get(key)+" > 操作用時:" + (ts2.getTime() - ts1.getTime())+ "ms");
//2 ----對比使用pipeline操作---
jedis.del(key);//初始化key
Pipeline p1 = jedis.pipelined();
Date ts3 = new Date();
for (int i = 0; i < COUNT; i++) {
//發出請求 Send Request
p1.incr(key);
}
//接收響應 Receive Response
p1.sync();
Date ts4 = new Date();
System.out.println("使用Pipeline > value為:"+jedis.get(key)+" > 操作用時:" + (ts4.getTime() - ts3.getTime())+ "ms");
jedis.close();
}
}
在輸入了正確的雲資料庫Tair(相容 Redis)執行個體訪問地址和密碼之後,運行以上Java程式,輸出結果如下。從中可以看出使用pipeline的效能要快的多。
不用Pipeline > value為:5000 > 操作用時:5844ms
使用Pipeline > value為:5000 > 操作用時:78ms
響應資料(Response)的處理方式
在Jedis中使用Pipeline時,對於響應資料(Response)的處理有兩種方式,詳情請參見以下程式碼範例。
package pipeline.kvstore.aliyun.com;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
public class PipelineClientTest {
static final String host = "xxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password = "password";
public static void main(String[] args) {
Jedis jedis = new Jedis(host, port);
// 執行個體密碼
String authString = jedis.auth(password);// password
if (!authString.equals("OK")) {
System.err.println("AUTH Failed: " + authString);
jedis.close();
return;
}
String key = "KVStore-Test1";
jedis.del(key);//初始化
// -------- 方法1
Pipeline p1 = jedis.pipelined();
System.out.println("-----方法1-----");
for (int i = 0; i < 5; i++) {
p1.incr(key);
System.out.println("Pipeline發送請求");
}
// 發送請求完成,開始接收響應
System.out.println("發送請求完成,開始接收響應");
List<Object> responses = p1.syncAndReturnAll();
if (responses == null || responses.isEmpty()) {
jedis.close();
throw new RuntimeException("Pipeline error: 沒有接收到響應");
}
for (Object resp : responses) {
System.out.println("Pipeline接收響應Response: " + resp.toString());
}
System.out.println();
//-------- 方法2
System.out.println("-----方法2-----");
jedis.del(key);//初始化
Pipeline p2 = jedis.pipelined();
//需要先聲明Response
Response<Long> r1 = p2.incr(key);
System.out.println("Pipeline發送請求");
Response<Long> r2 = p2.incr(key);
System.out.println("Pipeline發送請求");
Response<Long> r3 = p2.incr(key);
System.out.println("Pipeline發送請求");
Response<Long> r4 = p2.incr(key);
System.out.println("Pipeline發送請求");
Response<Long> r5 = p2.incr(key);
System.out.println("Pipeline發送請求");
try{
r1.get(); //此時還未開始接收響應,所以此操作會出錯
}catch(Exception e){
System.out.println(" <<< Pipeline error:還未開始接收響應 >>> ");
}
// 發送請求完成,開始接收響應
System.out.println("發送請求完成,開始接收響應");
p2.sync();
System.out.println("Pipeline接收響應Response: " + r1.get());
System.out.println("Pipeline接收響應Response: " + r2.get());
System.out.println("Pipeline接收響應Response: " + r3.get());
System.out.println("Pipeline接收響應Response: " + r4.get());
System.out.println("Pipeline接收響應Response: " + r5.get());
jedis.close();
}
}
在輸入了正確的雲資料庫Tair(相容 Redis)執行個體訪問地址和密碼之後,運行以上Java程式,輸出結果如下:
-----方法1-----
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
發送請求完成,開始接收響應
Pipeline接收響應Response: 1
Pipeline接收響應Response: 2
Pipeline接收響應Response: 3
Pipeline接收響應Response: 4
Pipeline接收響應Response: 5
-----方法2-----
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
<<< Pipeline error:還未開始接收響應 >>>
發送請求完成,開始接收響應
Pipeline接收響應Response: 1
Pipeline接收響應Response: 2
Pipeline接收響應Response: 3
Pipeline接收響應Response: 4
Pipeline接收響應Response: 5