すべてのプロダクト
Search
ドキュメントセンター

Tair (Redis® OSS-Compatible):メッセージの公開とサブスクライブ

最終更新日:Sep 12, 2024

ApsaraDB for Redisは、オープンソースのRedisと同じパブリッシング (pub) 機能とサブスクリプション (sub) 機能を提供します。 ApsaraDB for Redisでは、複数のクライアントがクライアントによって公開されたメッセージをサブスクライブできます。

シナリオ

ApsaraDB for Redisによって公開されたメッセージは非永続的です。 メッセージ発行者は、メッセージの発行のみを担当しますが、これらのメッセージが受信されたか、以前に送信されたメッセージが保存されたかは関係ありません。 この場合、メッセージは公開された後に失われます。 メッセージサブスクライバーは、パブリッシャーにサブスクライブした後にパブリッシャーによって送信されたメッセージのみを受信できます。 彼らはチャネルで以前のメッセージを受信しません。

加えて、メッセージ発行者 (発行者クライアント) は、必ずしもサーバに排他的に接続する必要はない。 メッセージの発行中に、同じクライアントからリスト操作などの他の操作を実行できます。 しかしながら、メッセージ加入者 (加入者クライアント) は、サーバに排他的に接続する必要がある。 サブスクリプション期間中、クライアントは他の操作を実行できません。 クライアントがチャネル内のメッセージを待っている間、操作はブロックされます。 したがって、メッセージのサブスクライバーは、専用サーバーまたは別のスレッドを使用してメッセージを受信する必要があります (次の例を参照) 。

サンプルコード

メッセージ発行者 (発行者クライアント) の場合

package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
public class KVStorePubClient {
    private Jedis jedis;
    public KVStorePubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
        //The password of the ApsaraDB for Redis instance.
        String authString = jedis.auth(password);
        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
    public void pub(String channel,String message){
        System.out.println("  >>> PUBLISH > Channel:"+channel+" > Message sent: "+message);
        jedis.publish(channel, message);
    }
    public void close(String channel){
        System.out.println("  >>> PUBLISH ends > Channel: "+channel+" > Message:quit");
        //The message publisher terminates the process by sending a quit message.
        jedis.publish(channel, "quit");
    }
}

メッセージ加入者 (加入者クライアント)

package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class KVStoreSubClient extends Thread{
    private Jedis jedis;
    private String channel;
    private JedisPubSub listener;
    public KVStoreSubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
                //The password of the ApsaraDB for Redis instance.
                String authString = jedis.auth(password);//password
                if (!authString.equals("OK"))
                {
                    System.err.println("AUTH Failed: " + authString);
                    return;
                }
    }
    public void setChannelAndListener(JedisPubSub listener,String channel){
        this.listener=listener;
        this.channel=channel;
    }
    private void subscribe(){
        if(listener==null || channel==null){
            System.err.println("Error:SubClient> listener or channel is null");
        }
        System.out.println("  >>> SUBSCRIBE > Channel:"+channel);
        System.out.println();
        //When the receiver is listening for subscribed messages, the process is blocked until the quit message is received (in a passive manner) or the subscription is actively canceled.
        jedis.subscribe(listener, channel);
    }
    public void unsubscribe(String channel){
        System.out.println(" >>> UNSUBSCRIBE > Channel:"+channel);
        System.out.println();
        listener.unsubscribe(channel);
    }
    @Override
    public void run() {
        try{
            System.out.println();
            System.out.println("---------SUBSCRIBE begins-------");
            subscribe();
            System.out.println("----------SUBSCRIBE ends-------");
            System.out.println();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

メッセージリスナーの場合

package message.kvstore.aliyun.com;
import redis.clients.jedis.JedisPubSub;
public class KVStoreMessageListener extends JedisPubSub{
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("  <<< SUBSCRIBE< Channel: " + channel + "> Message received: " + message );
        System.out.println();
        //When the quit message is received, the subscription is canceled (in a passive manner).
        if(message.equalsIgnoreCase("quit")){
            this.unsubscribe(channel);
        }
    }
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
}

主なコードブロック

package message.kvstore.aliyun.com;
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class KVStorePubSubTest {
    //The connection information of the ApsaraDB for Redis instance. This information can be obtained from the console.
    static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
    static final int port = 6379;
    static final String password="password";//password
    public static void main(String[] args) throws Exception{
            KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
            final String channel = "KVStore Channel-A";
            //The message sender starts sending messages, but no clients have subscribed to the channel. Therefore, the messages will not be received.
            pubClient.pub(channel, "Alibaba Cloud message 1: (No subscribers. This message will not be received.)");
            //The message receiver.
            KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
            JedisPubSub listener = new KVStoreMessageListener();
            subClient.setChannelAndListener(listener, channel);
            //The message receiver subscribes to messages.
            subClient.start();
            //The message sender continues sending messages.
            for (int i = 0; i < 5; i++) {
                String message=UUID.randomUUID().toString();
                pubClient.pub(channel, message);
                Thread.sleep(1000);
            }
            //The message receiver unsubscribes from messages.
            subClient.unsubscribe(channel);
            Thread.sleep(1000);
            pubClient.pub(channel, "Alibaba Cloud message 2:(Subscription canceled. This message will not be received.)");
            //The message publisher terminates the process by sending a quit message.
            //When other message receivers receive the quit message in listener.onMessage(), the UNSUBSCRIBE operation is performed. 
            pubClient.close(channel);
        }
    }

結果

正しいエンドポイントとパスワードを入力してApsaraDB for Redisインスタンスに接続し、上記のJavaコードを実行すると、次の出力が表示されます。

  >>> PUBLISH > Channel:KVStore Channel-A > Message sent: Alibaba Cloud message 1: (No subscribers. This message will not be received.)
----------SUBSCRIBE begins-------
  >>> SUBSCRIBE > Channel: KVStore Channel-A
  >>> PUBLISH > Channel: KVStore Channel-A> Message sent: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  <<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  >>> PUBLISH > Channel: KVStore Channel-A> Message sent: ed5924a9-016b-469b-8203-7db63d06f812
  <<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: ed5924a9-016b-469b-8203-7db63d06f812
  >>> PUBLISH > Channel: KVStore Channel-A> Message sent: f1f84e0f-8f35-4362-9567-25716b1531cd
  <<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: f1f84e0f-8f35-4362-9567-25716b1531cd
  >>> PUBLISH > Channel: KVStore Channel-A> Message sent: 746bde54-af8f-44d7-8a49-37d1a245d21b
  <<< SUBSCRIBE< Channel:KVStore Channel-A > Message received: 746bde54-af8f-44d7-8a49-37d1a245d21b
  >>> PUBLISH > Channel: KVStore Channel-A> Message sent: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  <<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  >>> UNSUBSCRIBE > Channel: KVStore Channel-A
----------SUBSCRIBE ends-------
  >>> PUBLISH > Channel:KVStore Channel-A > Message sent: Alibaba Cloud message 2: (Subscription canceled. This message will not be received.)
  >>> PUBLISH ends> Channel:KVStore Channel-A > Message:quit

上記の例は、1つのパブリッシャーと1つのサブスクライバーのみが関与するシナリオを示しています。 複数の発行者、加入者、さらには複数のメッセージチャネルが存在し得る。 このようなシナリオに合わせてコードを変更できます。