redis 发布订阅实例

2017年08月07日 14:54 | 2551次浏览 作者原创 版权保护

一个 client 发布消息,其他多个 client 订阅消息。

普通的即时聊天,群聊,关注等等场景。

需要注意的是,ApsaraDB for Redis 发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”;消息订阅者也只能得到订阅之后的消息,频道(channel)中此前的消息将无从获得。

此外,消息发布者(即 publish 客户端)无需独占与服务器端的连接,您可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如 List 操作等)。但是,消息订阅者(即 subscribe 客户端)需要独占与服务器端的连接,即进行 subscribe 期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道(channel)中的消息;因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用(参见如下示例)。

消息发布者 (即 publish client)

public class KVStorePubClient {
    private Jedis jedis;//
    public KVStorePubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
        //KVStore的实例密码
        String authString = jedis.auth(host+":"+password);//password
        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
    public void pub(String channel,String message){
        System.out.println("  >>> 发布(PUBLISH) > Channel:"+channel+" > 发送出的Message:"+message);
        jedis.publish(channel, message);
    }
    public void close(String channel){
        System.out.println("  >>> 发布(PUBLISH)结束 > Channel:"+channel+" > Message:quit");
        //消息发布者结束发送,即发送一个“quit”消息;
        jedis.publish(channel, "quit");
    }
}

消息订阅者 (即 subscribe client)

public class KVStoreSubClient extends Thread {
    private Jedis jedis;
    private String channel;
    private JedisPubSub listener;
    public KVStoreSubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
        //ApsaraDB for Redis的实例密码
        String authString = jedis.auth(host+":"+password);//password
        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
    public void setChannelAndListener(JedisPubSub listener,String channel){
        this.listener=listener;
        this.channel=channel;
    }
    private void subscribe(){
        if(listener==null || channel==null){
            System.err.println("Error:SubClient> listener or channel is null");
        }
        System.out.println("  >>> 订阅(SUBSCRIBE) > Channel:"+channel);
        System.out.println();
        //接收者在侦听订阅的消息时,将会阻塞进程,直至接收到quit消息(被动方式),或主动取消订阅
        jedis.subscribe(listener, channel);
    }
    public void unsubscribe(String channel){
        System.out.println("  >>> 取消订阅(UNSUBSCRIBE) > Channel:"+channel);
        System.out.println();
        listener.unsubscribe(channel);
    }
    @Override
    public void run() {
        try{
            System.out.println();
            System.out.println("----------订阅消息SUBSCRIBE 开始-------");
            subscribe();
            System.out.println("----------订阅消息SUBSCRIBE 结束-------");
            System.out.println();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

消息监听者

public class KVStoreMessageListener extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("  <<< 订阅(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );
        System.out.println();
        //当接收到的message为quit时,取消订阅(被动方式)
        if(message.equalsIgnoreCase("quit")){
            this.unsubscribe(channel);
        }
    }
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
}

示例主程序

public class KVStorePubSubDemo {
    public static void main(String[] args) throws Exception{
        KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
        final String channel = "KVStore频道-A";
        //消息发送者开始发消息,此时还无人订阅,所以此消息不会被接收
        pubClient.pub(channel, "QianXun消息1:(此时还无人订阅,所以此消息不会被接收)");
        //消息接收者
        KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
        JedisPubSub listener = new KVStoreMessageListener();
        subClient.setChannelAndListener(listener, channel);
        //消息接收者开始订阅
        subClient.start();
        //消息发送者继续发消息
        for (int i = 0; i < 5; i++) {
            String message= UUID.randomUUID().toString();
            pubClient.pub(channel, message);
            Thread.sleep(1000);
        }
        //消息接收者主动取消订阅
        subClient.unsubscribe(channel);
        Thread.sleep(1000);
        pubClient.pub(channel, "QianXun消息2:(此时订阅取消,所以此消息不会被接收)");
        //消息发布者结束发送,即发送一个“quit”消息;
        //此时如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”时,将执行“unsubscribe”操作。
        pubClient.close(channel);
    }
}

运行结果

>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:QianXun消息1:(此时还无人订阅,所以此消息不会被接收)

----------订阅消息SUBSCRIBE 开始-------
  >>> 订阅(SUBSCRIBE) > Channel:KVStore频道-A

  >>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:104960af-2b7f-42e5-988e-540027a1c758
  <<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:104960af-2b7f-42e5-988e-540027a1c758

  >>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:5443b677-ef52-4f6b-8297-99c3ebd50a8f
  <<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:5443b677-ef52-4f6b-8297-99c3ebd50a8f

  >>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:4f2badc5-0eb6-4148-9258-26ddb079afe9
  <<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:4f2badc5-0eb6-4148-9258-26ddb079afe9

  >>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:20296b21-d3a8-4cbf-9366-75395abb41ce
  <<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:20296b21-d3a8-4cbf-9366-75395abb41ce

  >>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:664ab473-d854-40c1-99f4-f78f55f6f5c2
  <<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:664ab473-d854-40c1-99f4-f78f55f6f5c2

  >>> 取消订阅(UNSUBSCRIBE) > Channel:KVStore频道-A

----------订阅消息SUBSCRIBE 结束-------

  >>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:QianXun消息2:(此时订阅取消,所以此消息不会被接收)
  >>> 发布(PUBLISH)结束 > Channel:KVStore频道-A > Message:quit

命令模式

1.client1 和 client2都订阅同一channel (qx_channel

client1:

4f4e8f36170d4f0f.m.cnbja.kvstore.aliyuncs.com:6379> subscribe qx_channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "qx_channel"
3) (integer) 1

client2:

4f4e8f36170d4f0f.m.cnbja.kvstore.aliyuncs.com:6379> subscribe qx_channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "qx_channel"
3) (integer) 1

2.publish message 发送到指定的channel

4f4e8f36170d4f0f.m.cnbja.kvstore.aliyuncs.com:6379> publish qx_channel 'hello everybody'
(integer) 2

3.publish在message这个频道上面发送消息后,被subscribe监视到了,然后就被分别打印输出了



小说《我是全球混乱的源头》
此文章本站原创,地址 https://www.vxzsk.com/1268.html   转载请注明出处!谢谢!

感觉本站内容不错,读后有收获?小额赞助,鼓励网站分享出更好的教程