全部產品
Search
文件中心

Tair (Redis® OSS-Compatible):管道傳輸(Pipeline)

更新時間:Oct 30, 2024

當您有大量操作、提高命令執行效能等需求時,您可以使用Redis管道傳輸(Pipeline,後面稱為Pipeline)機制。Pipeline可以將多個命令同時發給服務端,減少網路延遲,並提高效能。雲資料庫Tair(相容 Redis)支援原生Redis Pipeline。

Pipeline簡介

通常情況下,用戶端與Redis服務端通訊時採用的是Ping-pong網路互動模式,Ping-pong模式是指用戶端(Client)發送一個命令後會等待命令的執行結果,在用戶端收到伺服器端(Server)返回的結果後,再發送下一個命令,以此類推。

Redis也支援Pipeline模式,不同於Ping-pong模式,Pipeline模式類似流水線的工作模式:用戶端發送一個命令後無需等待執行結果,會繼續發送其他命令;在全部請求發送完畢後,用戶端關閉請求,開始接收響應,收到執行結果後再與之前發送的命令按順序進行一一匹配。在Pipeline模式的具體實現中,大部分Redis用戶端採用批處理的方式,即一次發送多個命令,在接收完所有命令執行結果後再返回給上層業務。

下圖為Ping-pong模式與Pipeline模式的網路通訊示意圖。pipeline

使用Pipeline可通過降低網路往返時延(Round-trip time,簡稱RTT),減少read()write()的系統調用以及進程環境切換次數,以提升程式的執行效率與效能。

Pipeline在某些情境下非常有效,例如有多個操作命令需要被迅速提交至伺服器端,但使用者並不依賴每個操作返回的響應結果,對結果響應也無需立即獲得,那麼Pipeline就可以用來作為最佳化效能的批處理工具。

重要

使用Pipeline時用戶端將獨佔與伺服器端的串連,此期間將不能進行其他“非Pipeline”類型操作,直至Pipeline被關閉;如果要同時執行其他動作,可以為Pipeline操作單獨建立一個串連,將其與常規操作分開。

更多資訊,請參見Redis pipeline

注意事項

  • Pipeline不能保證原子性。

    Pipeline模式只是將用戶端發送命令的方式改為發送批量命令,而服務端在處理批量命令的資料流時,仍然是解析出多個單命令並按順序執行,各個命令相互獨立,即服務端仍有可能在該過程中執行其他用戶端的命令。如需保證原子性,請使用事務或Lua指令碼。

  • 若Pipeline執行過程中發生錯誤,不支援復原。

    Pipeline沒有事務的特性,如待執行命令的前後存在依賴關係,請勿使用Pipeline。

    說明

    某些用戶端(例如redis-py)在實現Pipeline時使用事務命令MULTI、EXEC進行偽裝,請您在使用過程中關注Pipeline與事務的區別,否則可能會產生報錯,關於事務的限制請參見Redis transactions

  • 由於服務端以及部分用戶端存在緩衝區限制,建議單次Pipeline中不要使用過多的命令。

  • Pipeline的本質為用戶端與服務端的互動模式,與服務端的架構無關,因此叢集架構代理模式、叢集架構直連模式以及讀寫分離架構執行個體均支援Pipeline。

    說明

    由於叢集架構本身具有一定限制,例如不支援在單個命令中訪問跨Slot的Key、當訪問到不屬於本節點的資料時會產生-MOVED錯誤等,請在叢集架構中使用Pipeline時確保Pipeline內部的命令符合叢集架構的可執行條件,具體限制請參見叢集架構與讀寫分離架構執行個體的命令限制

程式碼範例

效能對比

如下代碼將示範使用Pipeline與不使用Pipeline的效能對比。

package pipeline.kvstore.aliyun.com;
import java.util.Date;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class RedisPipelinePerformanceTest {
        static final String host = "xxxxxx.m.cnhza.kvstore.aliyuncs.com";
        static final int port = 6379;
        static final String password = "password";
        public static void main(String[] args) {
            Jedis jedis = new Jedis(host, port);
                //ApsaraDB for Redis的執行個體密碼
                String authString = jedis.auth(password);// password
                if (!authString.equals("OK")) {
                    System.err.println("AUTH Failed: " + authString);
                    jedis.close();
                    return;
                }
                //連續執行多次命令操作
                final int COUNT=5000;
                String key = "KVStore-Tanghan";
                // 1 ---不使用pipeline操作---
                jedis.del(key);//初始化key
                Date ts1 = new Date();
                for (int i = 0; i < COUNT; i++) {
                    //發送一個請求,並接收一個響應(Send Request and  Receive Response)
                    jedis.incr(key);
                }
                Date ts2 = new Date();
                System.out.println("不用Pipeline > value為:"+jedis.get(key)+" > 操作用時:" + (ts2.getTime() - ts1.getTime())+ "ms");
                //2 ----對比使用pipeline操作---
                jedis.del(key);//初始化key
                Pipeline p1 = jedis.pipelined();
                Date ts3 = new Date();
                for (int i = 0; i < COUNT; i++) {
                    //發出請求 Send Request 
                    p1.incr(key);
                }
                //接收響應 Receive Response
                p1.sync();
                Date ts4 = new Date();
                System.out.println("使用Pipeline > value為:"+jedis.get(key)+" > 操作用時:" + (ts4.getTime() - ts3.getTime())+ "ms");
                jedis.close();
        }
    }

在輸入了正確的雲資料庫Tair(相容 Redis)執行個體訪問地址和密碼之後,運行以上Java程式,輸出結果如下。從中可以看出使用pipeline的效能要快的多。

不用Pipeline > value為:5000 > 操作用時:5844ms
使用Pipeline > value為:5000 > 操作用時:78ms

響應資料(Response)的處理方式

在Jedis中使用Pipeline時,對於響應資料(Response)的處理有兩種方式,詳情請參見以下程式碼範例。

package pipeline.kvstore.aliyun.com;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
    public class PipelineClientTest {
        static final String host = "xxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
        static final int port = 6379;
        static final String password = "password";
        public static void main(String[] args) {
            Jedis jedis = new Jedis(host, port);
                // 執行個體密碼
                String authString = jedis.auth(password);// password
                if (!authString.equals("OK")) {
                    System.err.println("AUTH Failed: " + authString);
                    jedis.close();
                    return;
                }
                String key = "KVStore-Test1";
                jedis.del(key);//初始化
                // -------- 方法1
                Pipeline p1 = jedis.pipelined();
                System.out.println("-----方法1-----");
                for (int i = 0; i < 5; i++) {
                    p1.incr(key);
                    System.out.println("Pipeline發送請求");
                }
                // 發送請求完成,開始接收響應
                System.out.println("發送請求完成,開始接收響應");
                List<Object> responses = p1.syncAndReturnAll();
                if (responses == null || responses.isEmpty()) {
                    jedis.close();
                    throw new RuntimeException("Pipeline error: 沒有接收到響應");
                }
                for (Object resp : responses) {
                    System.out.println("Pipeline接收響應Response: " + resp.toString());
                }
                System.out.println();
                //-------- 方法2
                System.out.println("-----方法2-----");
                jedis.del(key);//初始化
                Pipeline p2 = jedis.pipelined();  
                //需要先聲明Response
                Response<Long> r1 = p2.incr(key); 
                System.out.println("Pipeline發送請求");
                Response<Long> r2 = p2.incr(key);
                System.out.println("Pipeline發送請求");
                Response<Long> r3 = p2.incr(key);
                System.out.println("Pipeline發送請求");
                Response<Long> r4 = p2.incr(key);  
                System.out.println("Pipeline發送請求");
                Response<Long> r5 = p2.incr(key);
                System.out.println("Pipeline發送請求");
                try{  
                    r1.get();  //此時還未開始接收響應,所以此操作會出錯
                }catch(Exception e){  
                    System.out.println(" <<< Pipeline error:還未開始接收響應  >>> ");  
                }  
             // 發送請求完成,開始接收響應
                System.out.println("發送請求完成,開始接收響應");
                p2.sync();  
                System.out.println("Pipeline接收響應Response: " + r1.get());  
                System.out.println("Pipeline接收響應Response: " + r2.get());  
                System.out.println("Pipeline接收響應Response: " + r3.get());
                System.out.println("Pipeline接收響應Response: " + r4.get());
                System.out.println("Pipeline接收響應Response: " + r5.get());
                jedis.close();
            }
    }

在輸入了正確的雲資料庫Tair(相容 Redis)執行個體訪問地址和密碼之後,運行以上Java程式,輸出結果如下:

-----方法1-----
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
發送請求完成,開始接收響應
Pipeline接收響應Response: 1
Pipeline接收響應Response: 2
Pipeline接收響應Response: 3
Pipeline接收響應Response: 4
Pipeline接收響應Response: 5
-----方法2-----
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
 <<< Pipeline error:還未開始接收響應  >>> 
發送請求完成,開始接收響應
Pipeline接收響應Response: 1
Pipeline接收響應Response: 2
Pipeline接收響應Response: 3
Pipeline接收響應Response: 4
Pipeline接收響應Response: 5