Tair(Redis OSS-compatible)也提供了與Redis相同的訊息發布(publish)與訂閱(subscribe)功能。即一個用戶端發布訊息,其他多個客訂閱訊息。
情境介紹
Tair(Redis OSS-compatible)發布的訊息是“非持久”的,即訊息發行者只負責發送訊息,而不管訊息是否有接收方,也不會儲存之前發送的訊息,即發布的訊息“即發即失”;訊息訂閱者也只能得到訂閱之後的訊息,頻道(channel)中此前的訊息將無從獲得。
此外,訊息發行者(即publish用戶端)無需獨佔與伺服器端的串連,您可以在發布訊息的同時,使用同一個用戶端串連進行其他動作(例如List操作等)。但是,訊息訂閱者(即subscribe用戶端)需要獨佔與伺服器端的串連,即進行 subscribe 期間,該用戶端無法執行其他動作,而是以阻塞的方式等待頻道(channel)中的訊息;因此訊息訂閱者需要使用單獨的伺服器串連,或者需要在單獨的線程中使用(參見如下樣本)。
程式碼範例
訊息發行者 (即publish client)
package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
public class KVStorePubClient {
private Jedis jedis;
public KVStorePubClient(String host,int port, String password){
jedis = new Jedis(host,port);
//KVStore的執行個體密碼
String authString = jedis.auth(password);
if (!authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void pub(String channel,String message){
System.out.println(" >>> 發布(PUBLISH) > Channel:"+channel+" > 發送出的Message:"+message);
jedis.publish(channel, message);
}
public void close(String channel){
System.out.println(" >>> 發布(PUBLISH)結束 > Channel:"+channel+" > Message:quit");
//訊息發行者結束髮送,即發送一個“quit”訊息;
jedis.publish(channel, "quit");
}
}
訊息訂閱者 (即subscribe client)
package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class KVStoreSubClient extends Thread{
private Jedis jedis;
private String channel;
private JedisPubSub listener;
public KVStoreSubClient(String host,int port, String password){
jedis = new Jedis(host,port);
//ApsaraDB for Redis的執行個體密碼
String authString = jedis.auth(password);//password
if (!authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void setChannelAndListener(JedisPubSub listener,String channel){
this.listener=listener;
this.channel=channel;
}
private void subscribe(){
if(listener==null || channel==null){
System.err.println("Error:SubClient> listener or channel is null");
}
System.out.println(" >>> 訂閱(SUBSCRIBE) > Channel:"+channel);
System.out.println();
//接收者在偵聽訂閱的訊息時,將會阻塞進程,直至接收到quit訊息(被動方式),或主動取消訂閱
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
System.out.println(" >>> 取消訂閱(UNSUBSCRIBE) > Channel:"+channel);
System.out.println();
listener.unsubscribe(channel);
}
@Override
public void run() {
try{
System.out.println();
System.out.println("----------訂閱訊息SUBSCRIBE 開始-------");
subscribe();
System.out.println("----------訂閱訊息SUBSCRIBE 結束-------");
System.out.println();
}catch(Exception e){
e.printStackTrace();
}
}
}
訊息監聽者
package message.kvstore.aliyun.com;
import redis.clients.jedis.JedisPubSub;
public class KVStoreMessageListener extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {
System.out.println(" <<< 訂閱(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );
System.out.println();
//當接收到的message為quit時,取消訂閱(被動方式)
if(message.equalsIgnoreCase("quit")){
this.unsubscribe(channel);
}
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
}
樣本主程式
package message.kvstore.aliyun.com;
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class KVStorePubSubTest {
//ApsaraDB for Redis的串連資訊,從控制台可以獲得
static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password="password";//password
public static void main(String[] args) throws Exception{
KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
final String channel = "KVStore頻道-A";
//訊息寄件者開始發訊息,此時還無人訂閱,所以此訊息不會被接收
pubClient.pub(channel, "Aliyun訊息1:(此時還無人訂閱,所以此訊息不會被接收)");
//訊息接收者
KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
JedisPubSub listener = new KVStoreMessageListener();
subClient.setChannelAndListener(listener, channel);
//訊息接收者開始訂閱
subClient.start();
//訊息寄件者繼續發訊息
for (int i = 0; i < 5; i++) {
String message=UUID.randomUUID().toString();
pubClient.pub(channel, message);
Thread.sleep(1000);
}
//訊息接收者主動取消訂閱
subClient.unsubscribe(channel);
Thread.sleep(1000);
pubClient.pub(channel, "Aliyun訊息2:(此時訂閱取消,所以此訊息不會被接收)");
//訊息發行者結束髮送,即發送一個“quit”訊息;
//此時如果有其他的訊息接收者,那麼在listener.onMessage()中接收到“quit”時,將執行“unsubscribe”操作。
pubClient.close(channel);
}
}
運行結果
在輸入了正確的Tair(Redis OSS-compatible)執行個體訪問地址和密碼之後,運行以上Java程式,輸出結果如下。
>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:Aliyun訊息1:(此時還無人訂閱,所以此訊息不會被接收)
----------訂閱訊息SUBSCRIBE 開始-------
>>> 訂閱(SUBSCRIBE) > Channel:KVStore頻道-A
>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
<<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:ed5924a9-016b-469b-8203-7db63d06f812
<<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812
>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
<<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
<<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
<<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
>>> 取消訂閱(UNSUBSCRIBE) > Channel:KVStore頻道-A
----------訂閱訊息SUBSCRIBE 結束-------
>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:Aliyun訊息2:(此時訂閱取消,所以此訊息不會被接收)
>>> 發布(PUBLISH)結束 > Channel:KVStore頻道-A > Message:quit
以上樣本中僅示範了一個發行者與一個訂閱者的情況,實際上發行者與訂閱者都可以為多個,發送訊息的頻道(channel)也可以是多個,對以上代碼稍作修改即可。