专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Redis 发布订阅功能

Redis提供了基于“/订阅”模式的消息机制,消息者和订阅者不进行直接通信,者客户端向指定的频道(channel)消息,订阅该频道的每个客户端都可以收到该消息,有多少订阅者订阅了频道,那么者的消息,每个订阅者都能接受到信息。 消息

PUBLISH

语法:PUBLISH channel message 说明: 将信息 message 发送到指定的频道 channel 。 返回值: 接收到信息 message 的订阅者数量。 示例: 向没有订阅者的频道发送信息 coderknock> PUBLISH new_channel “test publish” (integer) 0 向有订阅者的频道发送信息 coderknock> PUBLISH channel1 “new channel1” (integer) 1

SUBSCRIBE

语法:SUBSCRIBE channel [channel …] 说明: 订阅给定的一个或多个频道的信息。 返回值: 接收到的信息(请参见下面的代码说明)。 示例: coderknock> SUBSCRIBE channel1 Reading messages… (press Ctrl-C to quit)

1、 “subscribe” # 返回值的类型:显示订阅成功
2、 “channel1” # 订阅的频道名字
3、 (integer) 1 # 目前已订阅的频道数量 接收到信息
4、 “message” # 返回值的类型:信息
5、 “channel1” # 来源(从哪个频道发送过来)
6、 “new channel1” # 信息内容

PSUBSCRIBE

语法:PSUBSCRIBE pattern [pattern …] 说明: 订阅一个或多个符合给定模式的频道。 每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等), news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。 返回值: 接收到的信息(请参见下面的代码说明)。 示例: coderknock> PSUBSCRIBE news.* coderknock.* Reading messages… (press Ctrl-C to quit)

1、 “psubscribe” # 返回值的类型:显示订阅成功
2、 “news.*” # 订阅的频道名字
3、 (integer) 1 # 目前已订阅的频道数量
4、 “psubscribe”
5、 “coderknock.*”
6、 (integer) 2
7、 “pmessage” # 返回值的类型:信息
8、 “news.*” # 来源频道模式
9、 “news.123” # 具体频道(从哪个频道发送过来)
10、 “123 # 消息
11、 “pmessage”
12、 “news.*”
13、 “news.222”
14、 “222”

PUBSUB

语法:PUBSUB subcommand [argument [argument …]] 说明: PUBSUB 是一个查看订阅与系统状态的内省命令, 它由数个不同格式的子命令组成, 以下将分别对这些子命令进行介绍。 PUBSUB CHANNELS [pattern] 说明: 列出当前的活跃频道。 示例: coderknock> SUBSCRIBE coderknock sanchan news test Reading messages… (press Ctrl-C to quit)

1、 “subscribe”
2、 “coderknock”
3、 (integer) 1
4、 “subscribe”
5、 “sanchan”
6、 (integer) 2
7、 “subscribe”
8、 “news”
9、 (integer) 3
10、 “subscribe”
11、 “test”
12、 (integer) 4 coderknock> SUBSCRIBE coderknock sanchan blog oschina Reading messages… (press Ctrl-C to quit)
13、 “subscribe”
14、 “coderknock”
15、 (integer) 1
16、 “subscribe”
17、 “sanchan”
18、 (integer) 2
19、 “subscribe”
20、 “blog”
21、 (integer) 3
22、 “subscribe”
23、 “oschina”
24、 (integer) 4 统计出的有订阅的频道 coderknock> PUBSUB CHANNELS
25、 “coderknock”
26、 “sanchan”
27、 “blog”
28、 “news”
29、 “oschina”
30、 “test” 统计频道名包含 o 的频道 coderknock> PUBSUB CHANNELS o 1) “coderknock”
31、 “blog”
32、 “oschina” 我们关闭客户端1,只有客户端1订阅的 “news” “test” 频道消失 coderknock> PUBSUB CHANNELS
33、 “blog”
34、 “oschina”
35、 “coderknock”
36、 “sanchan”

重新订阅 客户端1 coderknock>PSUBSCRIBE news.* coderknock.* Reading messages… (press Ctrl-C to quit)

1、 “psubscribe”
2、 “news.*”
3、 (integer) 1
4、 “psubscribe”
5、 “coderknock.*”
6、 (integer) 2 客户端2 coderknock> PSUBSCRIBE sanchan.* coderknock.* Reading messages… (press Ctrl-C to quit)
7、 “psubscribe”
8、 “sanchan.*”
9、 (integer) 1
10、 “psubscribe”
11、 “coderknock.*”
12、 (integer) 2 客户端3 coderknock> PSUBSCRIBE sanchan.* coderknock.* news blog Reading messages… (press Ctrl-C to quit)
13、 “psubscribe”
14、 “sanchan.*”
15、 (integer) 1
16、 “psubscribe”
17、 “coderknock.*”
18、 (integer) 2
19、 “psubscribe”
20、 “news”
21、 (integer) 3
22、 “psubscribe”
23、 “blog”
24、 (integer) 4

coderknock> PUBSUB CHANNELS (empty list or set)

PUBSUB NUMSUB [channel-1 … channel-N]

说明: 返回给定频道的订阅者数量, 订阅模式的客户端不计算在内。 返回值: 一个多条批量回复(Multi-bulk reply),回复中包含给定的频道,以及频道的订阅者数量。 格式为:频道 channel-1 , channel-1 的订阅者数量,频道 channel-2 , channel-2 的订阅者数量,诸如此类。 回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。 不给定任何频道而直接调用这个命令也是可以的, 在这种情况下, 命令只返回一个空列表。 示例: 订阅的客户端 coderknock> SUBSCRIBE coderknock sanchan blog oschina Reading messages… (press Ctrl-C to quit)

1、 “subscribe”
2、 “coderknock”
3、 (integer) 1
4、 “subscribe”
5、 “sanchan”
6、 (integer) 2
7、 “subscribe”
8、 “blog”
9、 (integer) 3
10、 “subscribe”
11、 “oschina”
12、 (integer) 4 客户端2 coderknock> SUBSCRIBE coderknock sanchan news test Reading messages… (press Ctrl-C to quit)
13、 “subscribe”
14、 “coderknock”
15、 (integer) 1
16、 “subscribe”
17、 “sanchan”
18、 (integer) 2
19、 “subscribe”
20、 “news”
21、 (integer) 3
22、 “subscribe”
23、 “test”
24、 (integer) 4 #客户端3 coderknock> PSUBSCRIBE sanchan.* coderknock.* news blog Reading messages… (press Ctrl-C to quit)
25、 “psubscribe”
26、 “sanchan.*”
27、 (integer) 1
28、 “psubscribe”
29、 “coderknock.*”
30、 (integer) 2
31、 “psubscribe”
32、 “news”
33、 (integer) 3
34、 “psubscribe”
35、 “blog”
36、 (integer) 4 统计,可以看到 PSUBSCRIBE 订阅同样不会被统计而且不支持模式匹配 coderknock> PUBSUB NUMSUB coderknock news blog oschina test sanchan coderknock.* 1) “coderknock”
37、 (integer) 2
38、 “news”
39、 (integer) 1
40、 “blog”
41、 (integer) 1
42、 “oschina”
43、 (integer) 1
44、 “test”
45、 (integer) 1
46、 “sanchan”
47、 (integer) 2
48、 “coderknock.*”
49、 (integer) 0 PUBSUB NUMPAT 说明: 返回 订阅模式 的数量。

注意, 这个命令返回的不是订阅模式的客户端的数量, 而是客户端订阅的所有模式的数量总和。

复杂度:O(N) , N 为给定频道的数量。 返回值: 一个整数回复(Integer reply)。 示例: 采用上面示例中的订阅客户端,这里统计的 订阅模式 包含 PSUBSCRIBE 订阅 coderknock> PUBSUB NUMPAT (integer) 8 添加一个客户端4 coderknock> PSUBSCRIBE blog* Reading messages… (press Ctrl-C to quit)

1、 “psubscribe”
2、 “blog*”
3、 (integer) 1 统计会发现模式增加 1 coderknock> PUBSUB NUMPAT (integer) 9 #退订 UNSUBSCRIBE 语法:UNSUBSCRIBE [channel [channel …]] 说明: 指示客户端退订给定的频道。

如果没有频道被指定,也即是,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。

返回值: 这个命令在不同的客户端中有不同的表现。 基于上一个命令的示例,此时在客户端中执行 coderknock> PUBSUB CHANNELS

1、 “python”
2、 “blog”
3、 “news”
4、 “test”
5、 “oschina”
6、 “coderknock”
7、 “sanchan” 说明订阅成功 统计订阅数量 coderknock> PUBSUB NUMSUB coderknock news blog oschina test sanchan coderknock.* python
8、 “coderknock”
9、 (integer) 3
10、 “news”
11、 (integer) 1
12、 “blog”
13、 (integer) 1
14、 “oschina”
15、 (integer) 1
16、 “test”
17、 (integer) 1
18、 “sanchan”
19、 (integer) 3
20、 “coderknock.*”
21、 (integer) 0
22、 “python”
23、 (integer) 1 “”” time.sleep(10) # 休眠 10 秒 p.unsubscribe(“sanchan”) “”” 此时取消了一个 sanchan 的订阅 coderknock> PUBSUB NUMSUB coderknock news blog oschina test sanchan coderknock.* python
24、 “coderknock”
25、 (integer) 3
26、 “news”
27、 (integer) 1
28、 “blog”
29、 (integer) 1
30、 “oschina”
31、 (integer) 1
32、 “test”
33、 (integer) 1
34、 “sanchan”
35、 (integer) 2
36、 “coderknock.*”
37、 (integer) 0
38、 “python”
39、 (integer) 1 “”” time.sleep(10) # 休眠 10 秒 p.unsubscribe() “”” 此时该 python 中订阅全部退订 coderknock> PUBSUB NUMSUB coderknock news blog oschina test sanchan coderknock.* python
40、 “coderknock”
41、 (integer) 2
42、 “news”
43、 (integer) 1
44、 “blog”
45、 (integer) 1
46、 “oschina”
47、 (integer) 1
48、 “test”
49、 (integer) 1
50、 “sanchan”
51、 (integer) 2
52、 “coderknock.*”
53、 (integer) 0
54、 “python”
55、 (integer) 0 “”” #UNSUBSCRIBE 语法:PUNSUBSCRIBE [pattern [pattern …]] 说明: 指示客户端退订所有给定模式。 如果没有模式被指定,也即是,一个无参数的 PUNSUBSCRIBE 调用被执行,那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式 有关订阅命令有两点需要注意: 客户端在执行订阅命令之后进入了订阅状态,只能接收 SUBSCRIBE 、PSUBSCRIBE 、UNSUBSCRIBE 、PUNSUBSCRIBE 四个命令。

开启的订阅客户端,无法收到该频道之前的消息,因为 Redis 不会对的消息进行持久化。

和很多专业的消息队列系统(例如Kafka、RocketMQ)相比,Redis的订阅略显粗糙,例如无法实现消息堆积和回溯。但胜在足够简单,如果当前场景可以容忍的这些缺点,也不失为一个不错的选择,Kafka目前使用的比较多 kafka学习地址 http://www.jasongj.com/tags/Kafka/

消息订阅代码实例

## 实例一

public class PubSub extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {     System.out.println(Thread.currentThread().getName()+"----> "+channel+" --->"+message);
}

@Override
public void onPMessage(String pattern, String channel, String message) {
    super.onPMessage(pattern, channel, message);
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
    System.out.println(Thread.currentThread().getName()+"onSubscribe");
}

@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
    System.out.println(Thread.currentThread().getName()+"Unsubscribe");
}

@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
    super.onPUnsubscribe(pattern, subscribedChannels);
}

@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
    super.onPSubscribe(pattern, subscribedChannels);
}

@Override
public void onPong(String pattern) {
    System.out.println("Pong");
}

@Override
public void unsubscribe() {
    super.unsubscribe();
}

@Override
public void unsubscribe(String... channels) {
    super.unsubscribe(channels);
}

@Override
public void subscribe(String... channels) {
    super.subscribe(channels);
}

@Override
public void psubscribe(String... patterns) {
    super.psubscribe(patterns);
}

@Override
public void punsubscribe() {
    super.punsubscribe();
}

@Override
public void punsubscribe(String... patterns) {
    super.punsubscribe(patterns);
}

@Override
public void ping() {
    super.ping();
}

@Override
public boolean isSubscribed() {
    return super.isSubscribed();
}

@Override
public void proceedWithPatterns(Client client, String... patterns) {
    super.proceedWithPatterns(client, patterns);
}

@Override
public void proceed(Client client, String... channels) {
    super.proceed(client, channels);
}

@Override
public int getSubscribedChannels() {
    return super.getSubscribedChannels();
}
}

实例二

     @Test
        public void test() throws InterruptedException {
            Jedis jedis = JedisFactory.getJedis();
            jedis.publish("redisChatTest", "我是天才");
            Thread.sleep(5000);
            jedis.publish("redisChatTest", "我牛逼");
            Thread.sleep(5000);
            jedis.publish("redisChatTest", "哈哈");
        }
          @Test
        public void test(){
            Jedis jedis1 = JedisFactory.getJedis();
            PushSubListener pushSubListener = new PushSubListener();
            jedis1.subscribe(pushSubListener,"redisChatTest");
        }

文章永久链接:https://tech.souyunku.com/44720

未经允许不得转载:搜云库技术团队 » Redis 发布订阅功能

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们