ApsaraDB for Redisは、オープンソースのRedisと同じパブリッシング (pub) 機能とサブスクリプション (sub) 機能を提供します。 ApsaraDB for Redisでは、複数のクライアントがクライアントによって公開されたメッセージをサブスクライブできます。
シナリオ
ApsaraDB for Redisによって公開されたメッセージは非永続的です。 メッセージ発行者は、メッセージの発行のみを担当しますが、これらのメッセージが受信されたか、以前に送信されたメッセージが保存されたかは関係ありません。 この場合、メッセージは公開された後に失われます。 メッセージサブスクライバーは、パブリッシャーにサブスクライブした後にパブリッシャーによって送信されたメッセージのみを受信できます。 彼らはチャネルで以前のメッセージを受信しません。
加えて、メッセージ発行者 (発行者クライアント) は、必ずしもサーバに排他的に接続する必要はない。 メッセージの発行中に、同じクライアントからリスト操作などの他の操作を実行できます。 しかしながら、メッセージ加入者 (加入者クライアント) は、サーバに排他的に接続する必要がある。 サブスクリプション期間中、クライアントは他の操作を実行できません。 クライアントがチャネル内のメッセージを待っている間、操作はブロックされます。 したがって、メッセージのサブスクライバーは、専用サーバーまたは別のスレッドを使用してメッセージを受信する必要があります (次の例を参照) 。
サンプルコード
メッセージ発行者 (発行者クライアント) の場合
package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
public class KVStorePubClient {
private Jedis jedis;
public KVStorePubClient(String host,int port, String password){
jedis = new Jedis(host,port);
//The password of the ApsaraDB for Redis instance.
String authString = jedis.auth(password);
if (!authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void pub(String channel,String message){
System.out.println(" >>> PUBLISH > Channel:"+channel+" > Message sent: "+message);
jedis.publish(channel, message);
}
public void close(String channel){
System.out.println(" >>> PUBLISH ends > Channel: "+channel+" > Message:quit");
//The message publisher terminates the process by sending a quit message.
jedis.publish(channel, "quit");
}
}
メッセージ加入者 (加入者クライアント)
package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class KVStoreSubClient extends Thread{
private Jedis jedis;
private String channel;
private JedisPubSub listener;
public KVStoreSubClient(String host,int port, String password){
jedis = new Jedis(host,port);
//The password of the ApsaraDB for Redis instance.
String authString = jedis.auth(password);//password
if (!authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void setChannelAndListener(JedisPubSub listener,String channel){
this.listener=listener;
this.channel=channel;
}
private void subscribe(){
if(listener==null || channel==null){
System.err.println("Error:SubClient> listener or channel is null");
}
System.out.println(" >>> SUBSCRIBE > Channel:"+channel);
System.out.println();
//When the receiver is listening for subscribed messages, the process is blocked until the quit message is received (in a passive manner) or the subscription is actively canceled.
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
System.out.println(" >>> UNSUBSCRIBE > Channel:"+channel);
System.out.println();
listener.unsubscribe(channel);
}
@Override
public void run() {
try{
System.out.println();
System.out.println("---------SUBSCRIBE begins-------");
subscribe();
System.out.println("----------SUBSCRIBE ends-------");
System.out.println();
}catch(Exception e){
e.printStackTrace();
}
}
}
メッセージリスナーの場合
package message.kvstore.aliyun.com;
import redis.clients.jedis.JedisPubSub;
public class KVStoreMessageListener extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {
System.out.println(" <<< SUBSCRIBE< Channel: " + channel + "> Message received: " + message );
System.out.println();
//When the quit message is received, the subscription is canceled (in a passive manner).
if(message.equalsIgnoreCase("quit")){
this.unsubscribe(channel);
}
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
}
主なコードブロック
package message.kvstore.aliyun.com;
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class KVStorePubSubTest {
//The connection information of the ApsaraDB for Redis instance. This information can be obtained from the console.
static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password="password";//password
public static void main(String[] args) throws Exception{
KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
final String channel = "KVStore Channel-A";
//The message sender starts sending messages, but no clients have subscribed to the channel. Therefore, the messages will not be received.
pubClient.pub(channel, "Alibaba Cloud message 1: (No subscribers. This message will not be received.)");
//The message receiver.
KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
JedisPubSub listener = new KVStoreMessageListener();
subClient.setChannelAndListener(listener, channel);
//The message receiver subscribes to messages.
subClient.start();
//The message sender continues sending messages.
for (int i = 0; i < 5; i++) {
String message=UUID.randomUUID().toString();
pubClient.pub(channel, message);
Thread.sleep(1000);
}
//The message receiver unsubscribes from messages.
subClient.unsubscribe(channel);
Thread.sleep(1000);
pubClient.pub(channel, "Alibaba Cloud message 2:(Subscription canceled. This message will not be received.)");
//The message publisher terminates the process by sending a quit message.
//When other message receivers receive the quit message in listener.onMessage(), the UNSUBSCRIBE operation is performed.
pubClient.close(channel);
}
}
結果
正しいエンドポイントとパスワードを入力してApsaraDB for Redisインスタンスに接続し、上記のJavaコードを実行すると、次の出力が表示されます。
>>> PUBLISH > Channel:KVStore Channel-A > Message sent: Alibaba Cloud message 1: (No subscribers. This message will not be received.)
----------SUBSCRIBE begins-------
>>> SUBSCRIBE > Channel: KVStore Channel-A
>>> PUBLISH > Channel: KVStore Channel-A> Message sent: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
<<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
>>> PUBLISH > Channel: KVStore Channel-A> Message sent: ed5924a9-016b-469b-8203-7db63d06f812
<<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: ed5924a9-016b-469b-8203-7db63d06f812
>>> PUBLISH > Channel: KVStore Channel-A> Message sent: f1f84e0f-8f35-4362-9567-25716b1531cd
<<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: f1f84e0f-8f35-4362-9567-25716b1531cd
>>> PUBLISH > Channel: KVStore Channel-A> Message sent: 746bde54-af8f-44d7-8a49-37d1a245d21b
<<< SUBSCRIBE< Channel:KVStore Channel-A > Message received: 746bde54-af8f-44d7-8a49-37d1a245d21b
>>> PUBLISH > Channel: KVStore Channel-A> Message sent: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
<<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
>>> UNSUBSCRIBE > Channel: KVStore Channel-A
----------SUBSCRIBE ends-------
>>> PUBLISH > Channel:KVStore Channel-A > Message sent: Alibaba Cloud message 2: (Subscription canceled. This message will not be received.)
>>> PUBLISH ends> Channel:KVStore Channel-A > Message:quit
上記の例は、1つのパブリッシャーと1つのサブスクライバーのみが関与するシナリオを示しています。 複数の発行者、加入者、さらには複数のメッセージチャネルが存在し得る。 このようなシナリオに合わせてコードを変更できます。