All Products
Search
Document Center

Tair (Redis® OSS-Compatible):Message publishing and subscription

Last Updated:Oct 24, 2024

Tair(Redis OSS-compatible) provides the same message publishing and subscription feature as open source Redis. Tair (Redis OSS-compatible) allows multiple clients to subscribe to messages published by a client.

Background information

Messages published by Tair(Redis OSS-compatible) are non-persistent. A message publisher is only responsible for publishing messages, but does not care about whether these messages are received or save previously sent messages. In this case, messages are lost after they are published. Message subscribers can only receive messages that are sent by the publisher after they have subscribed to the publisher. They will not receive the earlier messages in the channel.

In addition, a message publisher (publisher client) does not necessarily connect to a server exclusively. While publishing messages, you can perform other operations such as the List operation from the same client. However, a message subscriber (subscriber client) needs to connect to a server exclusively. During the subscription period, the client cannot perform any other operations. The operations are blocked while the client is waiting for messages in the channel. Therefore, message subscribers must use a dedicated server or a separate thread to receive messages (see the following example).

Sample code

For the message publisher (publisher client)

package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
public class KVStorePubClient {
    private Jedis jedis;
    public KVStorePubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
        //The password used to connect to the ApsaraDB for Redis instance.
        String authString = jedis.auth(password);
        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
    public void pub(String channel,String message){
        System.out.println("  >>> PUBLISH > Channel:"+channel+" > Message sent: "+message);
        jedis.publish(channel, message);
    }
    public void close(String channel){
        System.out.println("  >>> PUBLISH ends > Channel: "+channel+" > Message:quit");
        //The message publisher terminates the process by sending a quit message.
        jedis.publish(channel, "quit");
    }
}

For the message subscriber (subscriber client)

package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class KVStoreSubClient extends Thread{
    private Jedis jedis;
    private String channel;
    private JedisPubSub listener;
    public KVStoreSubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
                //The password used to connect to the ApsaraDB for Redis instance.
                String authString = jedis.auth(password);//password
                if (!authString.equals("OK"))
                {
                    System.err.println("AUTH Failed: " + authString);
                    return;
                }
    }
    public void setChannelAndListener(JedisPubSub listener,String channel){
        this.listener=listener;
        this.channel=channel;
    }
    private void subscribe(){
        if(listener==null || channel==null){
            System.err.println("Error:SubClient> listener or channel is null");
        }
        System.out.println("  >>> SUBSCRIBE > Channel:"+channel);
        System.out.println();
        //When the receiver is listening for subscribed messages, the process is blocked until the quit message is received (in a passive manner) or the subscription is actively canceled.
        jedis.subscribe(listener, channel);
    }
    public void unsubscribe(String channel){
        System.out.println(" >>> UNSUBSCRIBE > Channel:"+channel);
        System.out.println();
        listener.unsubscribe(channel);
    }
    @Override
    public void run() {
        try{
            System.out.println();
            System.out.println("---------SUBSCRIBE begins-------");
            subscribe();
            System.out.println("----------SUBSCRIBE ends-------");
            System.out.println();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

For the message listener

package message.kvstore.aliyun.com;
import redis.clients.jedis.JedisPubSub;
public class KVStoreMessageListener extends JedisPubSub{
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("  <<< SUBSCRIBE < Channel: " + channel + "> Message received: " + message );
        System.out.println();
        //When the quit message is received, the subscription is canceled (in a passive manner).
        if(message.equalsIgnoreCase("quit")){
            this.unsubscribe(channel);
        }
    }
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
}

Main code block

package message.kvstore.aliyun.com;
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class KVStorePubSubTest {
    //The connection information of the ApsaraDB for Redis instance. This information can be obtained from the console.
    static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
    static final int port = 6379;
    static final String password="password";//password
    public static void main(String[] args) throws Exception{
            KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
            final String channel = "KVStore Channel-A";
            //The message sender starts sending messages, but no clients have subscribed to the channel. Therefore, the messages will not be received.
            pubClient.pub(channel, "Alibaba Cloud message 1: (No subscribers. This message will not be received.)");
            //The message receiver.
            KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
            JedisPubSub listener = new KVStoreMessageListener();
            subClient.setChannelAndListener(listener, channel);
            //The message receiver subscribes to messages.
            subClient.start();
            //The message sender continues sending messages.
            for (int i = 0; i < 5; i++) {
                String message=UUID.randomUUID().toString();
                pubClient.pub(channel, message);
                Thread.sleep(1000);
            }
            //The message receiver unsubscribes from messages.
            subClient.unsubscribe(channel);
            Thread.sleep(1000);
            pubClient.pub(channel, "Alibaba Cloud message 2:(Subscription canceled. This message will not be received.)");
            //The message publisher terminates the process by sending a quit message.
            //When other message receivers receive the quit message in listener.onMessage(), the UNSUBSCRIBE operation is performed. 
            pubClient.close(channel);
        }
    }

Result

After you enter the correct endpoint and password to connect to the Tair(Redis OSS-compatible) instance and run the preceding Java code, the following output is displayed:

>>> PUBLISH > Channel:KVStore Channel-A > Message sent: Alibaba Cloud message 1: (No subscribers. This message will not be received.)
----------SUBSCRIBE begins-------
  >>> SUBSCRIBE > Channel: KVStore Channel-A
  >>> PUBLISH > Channel: KVStore Channel-A > Message sent: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  <<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  >>> PUBLISH > Channel: KVStore Channel-A > Message sent: ed5924a9-016b-469b-8203-7db63d06f812
  <<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: ed5924a9-016b-469b-8203-7db63d06f812
  >>> PUBLISH > Channel: KVStore Channel-A > Message sent: f1f84e0f-8f35-4362-9567-25716b1531cd
  <<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: f1f84e0f-8f35-4362-9567-25716b1531cd
  >>> PUBLISH > Channel: KVStore Channel-A > Message sent: 746bde54-af8f-44d7-8a49-37d1a245d21b
  <<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: 746bde54-af8f-44d7-8a49-37d1a245d21b
  >>> PUBLISH > Channel: KVStore Channel-A > Message sent: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  <<< SUBSCRIBE < Channel:KVStore Channel-A > Message received: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  >>> UNSUBSCRIBE > Channel: KVStore Channel-A
----------SUBSCRIBE ends-------
  >>> PUBLISH > Channel:KVStore Channel-A > Message sent: Alibaba Cloud message 2: (Subscription canceled. This message will not be received.)
  >>> PUBLISH ends > Channel:KVStore Channel-A > Message:quit

The preceding example demonstrates a scenario where only one publisher and one subscriber are involved. There can be multiple publishers, subscribers, and even multiple message channels. You can change the code to fit such scenarios.