Redis 发布订阅

Redis 发布/订阅(Pub/Sub)完全攻略

“轻量级实时消息系统”

无持久化、广播式、支持模式匹配 —— 适合 实时通知、聊天、日志推送、配置刷新


一、Pub/Sub 核心特点

特性说明
模式发布者 → 频道 → 订阅者(一对多广播)
无持久化消息不落盘,掉线 = 丢失
实时性毫秒级延迟,基于内存
支持模式订阅PSUBSCRIBE pattern*
轻量高效单机支持 10万+ 并发连接
不保证送达不支持 ACK、重试

不是消息队列,而是 实时广播系统


二、Pub/Sub 命令全表

命令说明示例
PUBLISH channel message发布消息PUBLISH chat:room1 "Hello!"
SUBSCRIBE channel [channel ...]订阅频道SUBSCRIBE chat:room1 news
UNSUBSCRIBE [channel ...]取消订阅UNSUBSCRIBE chat:room1
PSUBSCRIBE pattern [pattern ...]模式订阅PSUBSCRIBE news:*
PUNSUBSCRIBE [pattern ...]取消模式订阅PUNSUBSCRIBE news:*
PUBSUB CHANNELS [pattern]查看活跃频道PUBSUB CHANNELS chat:*
PUBSUB NUMSUB [channel ...]查看订阅数PUBSUB NUMSUB chat:room1
PUBSUB NUMPAT模式订阅总数PUBSUB NUMPAT

三、核心实战场景


1. 实时聊天室

# 用户 A 订阅
SUBSCRIBE chat:room1

# 用户 B 发布
PUBLISH chat:room1 "你好!"
# 用户 A 收到: 1) "message" 2) "chat:room1" 3) "你好!"

2. 系统通知广播

# 订阅所有系统通知
SUBSCRIBE sys:notify

# 发布维护通知
PUBLISH sys:notify "系统将于 02:00 维护 30 分钟"

3. 配置热更新

# 所有服务订阅配置频道
SUBSCRIBE config:api

# 配置中心推送更新
PUBLISH config:api '{"rate_limit": 1000}'

4. 日志实时推送

# 日志服务订阅
SUBSCRIBE logs:error

# 应用发布错误日志
PUBLISH logs:error "DB connection failed"

5. 模式订阅(批量监听)

# 订阅所有用户动态
PSUBSCRIBE user:*:feed

# 用户1001发布动态
PUBLISH user:1001:feed "我发布了一篇文章"
# 订阅者收到: 1) "pmessage" 2) "user:*:feed" 3) "user:1001:feed" 4) "消息"

四、Pub/Sub vs 消息队列 对比

对比项Pub/SubList/Stream
持久化
消息送达不保证保证
消费模式广播点对点/消费者组
延迟毫秒级毫秒级
适用场景实时通知任务处理

用 Pub/Sub 做通知,用 Stream 做队列


五、客户端连接行为

订阅后:
├─ 客户端进入“订阅模式”
├─ 只能执行 SUBSCRIBE / UNSUBSCRIBE / PING / QUIT
└─ 断线 = 自动取消所有订阅

六、性能与优化建议

建议说明
频道命名规范module:event:typechat:room:1001
避免超大消息> 10KB 建议用 Stream
客户端长连接保持订阅,避免频繁重连
监控订阅数PUBSUB NUMSUB 防止泄漏
结合 Lua 原子发布保证业务一致性
# 查看活跃频道
PUBSUB CHANNELS

# 查看每个频道的订阅者数
PUBSUB NUMSUB chat:room1 sys:notify

七、常见问题与解决方案

问题原因解决方案
消息丢失订阅者掉线业务允许丢失 or 用 Stream
客户端卡死订阅太多频道限制订阅数
内存泄漏无人订阅的频道Redis 自动清理
广播风暴频繁发布大消息限流 + 分频道

八、一键速查表

# 发布
PUBLISH chat:room1 "Hello"

# 订阅
SUBSCRIBE chat:room1
UNSUBSCRIBE chat:room1

# 模式订阅
PSUBSCRIBE logs:*
PUNSUBSCRIBE logs:*

# 监控
PUBSUB CHANNELS chat:*
PUBSUB NUMSUB chat:room1
PUBSUB NUMPAT

九、客户端代码示例

Python (redis-py)

import redis
import threading

# 订阅者
def subscriber():
    r = redis.Redis()
    pubsub = r.pubsub()
    pubsub.subscribe('chat:room1')
    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"收到: {message['data'].decode()}")

# 启动订阅线程
t = threading.Thread(target=subscriber)
t.daemon = True
t.start()

# 发布者
r = redis.Redis()
r.publish('chat:room1', '你好,Redis!')

Node.js (ioredis)

const Redis = require('ioredis');
const redis = new Redis();

// 订阅
const sub = new Redis();
sub.subscribe('news', (err, count) => {
  console.log(`已订阅 ${count} 个频道`);
});
sub.on('message', (channel, message) => {
  console.log(`[${channel}] ${message}`);
});

// 发布
redis.publish('news', 'Redis 7.4 发布!');

十、高并发 Pub/Sub 架构设计

graph LR
    A[应用服务] -->|PUBLISH| R[Redis]
    B[WebSocket 服务] -->|SUBSCRIBE| R
    C[配置中心] -->|PUBLISH| R
    D[微服务 A] -->|PSUBSCRIBE config:*| R
    E[微服务 B] -->|PSUBSCRIBE config:*| R

最佳实践:

  1. 订阅者用独立连接
  2. 发布者用连接池
  3. 频道分层module:instance:event
  4. 消息 JSON 序列化

十一、Redis 7.0+ 新特性:分片 Pub/Sub

# 集群模式下支持跨槽位广播
PUBLISH __sentinel__:shard1 "维护通知"

普通 Pub/Sub 在集群中只广播到同槽位,7.0+ 解决此问题


完成!你已精通 Redis Pub/Sub!

# 一行命令体验 Pub/Sub
redis-cli <<EOF
SUBSCRIBE demo:channel
# 新终端
PUBLISH demo:channel "Hello Pub/Sub!"
# 收到: 1) "message" 2) "demo:channel" 3) "Hello Pub/Sub!"
EOF

下一步推荐

  1. Redis Stream 高级消息队列
  2. Pub/Sub + WebSocket 实时聊天系统
  3. 配置中心热更新架构

需要我送你

  • “百万用户实时通知系统(Pub/Sub + 分片)”
  • “企业级配置中心(Pub/Sub + 加密)”
  • “日志实时推送 + 告警平台”

回复:通知系统 | 配置中心 | 日志告警 即可!

文章已创建 2481

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部