すべてのプロダクト
Search
ドキュメントセンター

Tair (Redis® OSS-Compatible):メッセージの公開とサブスクリプション

最終更新日:Jan 10, 2025

Tair (Redis OSS-compatible) は、オープンソースのRedisと同じメッセージ発行およびサブスクリプション機能を提供します。 Tair (Redis OSS互換) では、複数のクライアントがクライアントによって公開されたメッセージをサブスクライブできます。

背景情報

Tair (Redis OSS-compatible) によって公開されたメッセージは非永続的です。 メッセージ発行者は、メッセージの発行のみを担当しますが、これらのメッセージが受信されたか、以前に送信されたメッセージが保存されたかは関係ありません。 この場合、メッセージは公開された後に失われます。 メッセージサブスクライバーは、パブリッシャーにサブスクライブした後にパブリッシャーによって送信されたメッセージのみを受信できます。 彼らはチャネルで以前のメッセージを受信しません。

加えて、メッセージ発行者 (発行者クライアント) は、必ずしもサーバに排他的に接続する必要はない。 メッセージの発行中に、同じクライアントからリスト操作などの他の操作を実行できます。 しかしながら、メッセージ加入者 (加入者クライアント) は、サーバに排他的に接続する必要がある。 サブスクリプション期間中、クライアントは他の操作を実行できません。 クライアントがチャネル内のメッセージを待っている間、操作はブロックされます。 したがって、メッセージのサブスクライバーは、専用サーバーまたは別のスレッドを使用してメッセージを受信する必要があります (次の例を参照) 。

サンプルコード

メッセージ発行者 (発行者クライアント) の場合

packag e message.kvstore.aliyun.com;
redis.clients.jedis.Jedisをインポートします。パブリッククラスKVStorePubClient {
    プライベートJedis jedis;
    public KVStorePubClient (文字列ホスト、intポート、文字列パスワード) {
        jedis=新しいJedis (ホスト、ポート);
        // ApsaraDB for Redisインスタンスへの接続に使用されるパスワード。
        String authString = jedis.auth (パスワード);
        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
    public void pub(String channel,String message){
        System.out.println(">>> 公開> チャンネル:" + Channel + "> 送信されたメッセージ:" + Message);
        jedis.publish (チャンネル、メッセージ);
    }
    public void close(String channel){
        System.out.println(">>> 公開終了> チャンネル:" + チャンネル + "> メッセージ: 終了");
        // メッセージ発行者は、終了メッセージを送信してプロセスを終了します。
        jedis.publish (チャンネル、"quit");
    }
}

メッセージ加入者 (加入者クライアント)

packag e message.kvstore.aliyun.com;
redis.clients.jedis.Jedisをインポートします。redis.clients.jedis.JedisPubSubをインポートします。パブリッククラスKVStoreSubClient extends Thread {
    プライベートJedis jedis;
    プライベートStringチャンネル;
    プライベートJedisPubSubリスナー;
    public KVStoreSubClient (文字列ホスト、intポート、文字列パスワード) {
        jedis=新しいJedis (ホスト、ポート);
                // ApsaraDB for Redisインスタンスへの接続に使用されるパスワード。
                String authString = jedis.auth (パスワード);// password
                if (!authString.equals("OK"))
                {
                    System.err.println("AUTH Failed: " + authString);
                    return;
                }
    }
    public void setChannelAndListener(JedisPubSubリスナー, String channel){
        this.listener=listener;
        this.channel=チャンネル;
    }
    private void subscribe(){
        if(listener==null | | channel==null){
            System.err.println("Error:SubClient> リスナーまたはチャネルがnullです");
        }
        System.out.println(">>> SUBSCRIBE> チャンネル:" + チャンネル);
        System.out.println();
        // 受信者がサブスクライブされたメッセージをリッスンしている場合、プロセスは、終了メッセージが (受動的に) 受信されるか、サブスクリプションがアクティブにキャンセルされるまでブロックされます。
        jedis.subscribe (リスナー、チャンネル);
    }
    public void subscribe(String channel){
        System.out.println(">>> UNSUBSCRIBE> チャンネル:" + チャンネル);
        System.out.println();
        listener. subscribe (チャンネル);
    }
    @Override
    public void run() {
        try{
            System.out.println();
            System.out.println("-------- SUBSCRIBEが始まる --------");
            subscribe();
            System.out.println("--------- SUBSCRIBE終了 -------");
            System.out.println();
        } catch (例外e){
            e.printStackTrace();
        }
    }
}

メッセージリスナーの場合

packag e message.kvstore.aliyun.com;
redis.clients.jedis.JedisPubSubをインポートします。パブリッククラスKVStoreMessageListener extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("<<< SUBSCRIBE <チャンネル:" + Channel + "> 受信したメッセージ:" + Message);
        System.out.println();
        // 終了メッセージが受信されると、サブスクリプションは (パッシブ方式で) キャンセルされます。
        if(message.equalsIgnoreCase("quit")){
            this. subscribe (チャンネル);
        }
    }
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onSubscribe(String channel, int subcribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPUnsubscribe (文字列パターン, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPSubscribe (文字列パターン, int subcribedChannels) {
        // TODO Auto-generated method stub
    }
}

主なコードブロック

packag e message.kvstore.aliyun.com;
java.util.UUIDをインポートします。redis.clients.jedis.JedisPubSubをインポートします。パブリッククラスKVStorePubSubTest {
    // ApsaraDB for Redisインスタンスの接続情報。 この情報はコンソールから取得できます。
    static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
    static final int port = 6379;
    static final String password="password";// password
    public static void main(String[] args) throws Exception {
            KVStorePubClient pubClient = new KVStorePubClient (ホスト、ポート、パスワード);
            final String channel = "KVStore Channel-A";
            // メッセージ送信者はメッセージの送信を開始しますが、クライアントはチャネルにサブスクライブしていません。 したがって、メッセージは受信されません。
            pubClient.pub (チャンネル、"Alibaba Cloudメッセージ1: (購読者なし) このメッセージは受信されません。) ";
            // メッセージの受信者。
            KVStoreSubClientサブクライアント=新しいKVStoreSubClient (ホスト、ポート、パスワード);
            JedisPubSubリスナー=新しいKVStoreMessageListener();
            subClient.setChannelAndListener (リスナー、チャネル);
            // メッセージ受信者はメッセージをサブスクライブします。
            subClient.start();
            // メッセージ送信者はメッセージを送信し続けます。
            for (int i = 0; i < 5; i ++) {
                String message=UUID.randomUUID().toString();
                pubClient.pub (チャンネル、メッセージ);
                Thread.sleep(1000);
            }
            // メッセージ受信者はメッセージからの購読を解除します。
            subClient. decisor (チャンネル);
            Thread.sleep(1000);
            pubClient.pub (チャンネル、"Alibaba Cloudメッセージ2 :( サブスクリプションがキャンセルされました。) このメッセージは受信されません。) ";
            // メッセージ発行者は、終了メッセージを送信してプロセスを終了します。
            // 他のメッセージ受信者がlistener.onMessage() で終了メッセージを受信すると、UNSUBSCRIBE操作が実行されます。 
            pubClient.close (チャンネル);
        }
    }

結果

正しいエンドポイントとパスワードを入力してTair (Redis OSS-compatible) インスタンスに接続し、上記のJavaコードを実行すると、次の出力が表示されます。

>>> 公開> チャンネル: KVStore Channel-A> 送信されたメッセージ: Alibaba Cloudメッセージ1 :( 加入者なし。 このメッセージは受信されません。--------- SUBSCRIBEが始まります --------
  >>> SUBSCRIBE> チャンネル: KVStore Channel-A
  >>> 公開> チャンネル: KVStore Channel-A> メッセージ送信: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  <<< SUBSCRIBE <チャンネル: KVStore Channel-A> メッセージ受信: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  >>> 公開> チャンネル: KVStoreチャンネル-A> メッセージ送信: ed5924a9-016b-469b-8203-7db63d06f812
  <<< SUBSCRIBE <チャンネル: KVStore Channel-A> メッセージ受信: ed5924a9-016b-469b-8203-7db63d06f812
  >>> 公開> チャンネル: KVStoreチャンネル-A> メッセージ送信: f1f84e0f-8f35-4362-9567-25716b1531cd
  <<< SUBSCRIBE <チャンネル: KVStore Channel-A> メッセージ受信: f1f84e0f-8f35-4362-9567-25716b1531cd
  >>> 公開> チャンネル: KVStore Channel-A> メッセージ送信: 746bde54-af8f-44d7-8a49-37d1a245d21b
  <<< SUBSCRIBE <チャンネル: KVStore Channel-A> メッセージ受信: 746bde54-af8f-44d7-8a49-37d1a245d21b
  >>> 公開> チャンネル: KVStore Channel-A> メッセージ送信: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  <<< SUBSCRIBE <チャンネル: KVStore Channel-A> メッセージ受信: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  >>> UNSUBSCRIBE> チャンネル: KVStore Channel-A
--------- SUBSCRIBE終了 -------
  >>> 公開> チャンネル: KVStore Channel-A> 送信されたメッセージ: Alibaba Cloudメッセージ2 :( サブスクリプションがキャンセルされました。 このメッセージは受信されません。
  >>> 公開終了> チャンネル: KVStoreチャンネル-A> メッセージ: 終了 

上記の例は、1つのパブリッシャーと1つのサブスクライバーのみが関与するシナリオを示しています。 複数の発行者、加入者、さらには複数のメッセージチャネルが存在し得る。 このようなシナリオに合わせてコードを変更できます。