Redis 发布/订阅(Pub/Sub)完全攻略
“轻量级实时消息系统”
无持久化、广播式、支持模式匹配 —— 适合 实时通知、聊天、日志推送、配置刷新
一、Pub/Sub 核心特点
| 特性 | 说明 |
|---|---|
| 模式 | 发布者 → 频道 → 订阅者(一对多广播) |
| 无持久化 | 消息不落盘,掉线 = 丢失 |
| 实时性 | 毫秒级延迟,基于内存 |
| 支持模式订阅 | PSUBSCRIBE pattern* |
| 轻量高效 | 单机支持 10万+ 并发连接 |
| 不保证送达 | 不支持 ACK、重试 |
不是消息队列,而是 实时广播系统
二、Pub/Sub 命令全表
| 命令 | 说明 | 示例 |
|---|---|---|
PUBLISH channel message | 发布消息 | PUBLISH chat:room1 "Hello!" |
SUBSCRIBE channel [channel ...] | 订阅频道 | SUBSCRIBE chat:room1 news |
UNSUBSCRIBE [channel ...] | 取消订阅 | UNSUBSCRIBE chat:room1 |
PSUBSCRIBE pattern [pattern ...] | 模式订阅 | PSUBSCRIBE news:* |
PUNSUBSCRIBE [pattern ...] | 取消模式订阅 | PUNSUBSCRIBE news:* |
PUBSUB CHANNELS [pattern] | 查看活跃频道 | PUBSUB CHANNELS chat:* |
PUBSUB NUMSUB [channel ...] | 查看订阅数 | PUBSUB NUMSUB chat:room1 |
PUBSUB NUMPAT | 模式订阅总数 | PUBSUB NUMPAT |
三、核心实战场景
1. 实时聊天室
# 用户 A 订阅
SUBSCRIBE chat:room1
# 用户 B 发布
PUBLISH chat:room1 "你好!"
# 用户 A 收到: 1) "message" 2) "chat:room1" 3) "你好!"
2. 系统通知广播
# 订阅所有系统通知
SUBSCRIBE sys:notify
# 发布维护通知
PUBLISH sys:notify "系统将于 02:00 维护 30 分钟"
3. 配置热更新
# 所有服务订阅配置频道
SUBSCRIBE config:api
# 配置中心推送更新
PUBLISH config:api '{"rate_limit": 1000}'
4. 日志实时推送
# 日志服务订阅
SUBSCRIBE logs:error
# 应用发布错误日志
PUBLISH logs:error "DB connection failed"
5. 模式订阅(批量监听)
# 订阅所有用户动态
PSUBSCRIBE user:*:feed
# 用户1001发布动态
PUBLISH user:1001:feed "我发布了一篇文章"
# 订阅者收到: 1) "pmessage" 2) "user:*:feed" 3) "user:1001:feed" 4) "消息"
四、Pub/Sub vs 消息队列 对比
| 对比项 | Pub/Sub | List/Stream |
|---|---|---|
| 持久化 | 无 | 有 |
| 消息送达 | 不保证 | 保证 |
| 消费模式 | 广播 | 点对点/消费者组 |
| 延迟 | 毫秒级 | 毫秒级 |
| 适用场景 | 实时通知 | 任务处理 |
用 Pub/Sub 做通知,用 Stream 做队列
五、客户端连接行为
订阅后:
├─ 客户端进入“订阅模式”
├─ 只能执行 SUBSCRIBE / UNSUBSCRIBE / PING / QUIT
└─ 断线 = 自动取消所有订阅
六、性能与优化建议
| 建议 | 说明 |
|---|---|
| 频道命名规范 | module:event:type 如 chat:room:1001 |
| 避免超大消息 | > 10KB 建议用 Stream |
| 客户端长连接 | 保持订阅,避免频繁重连 |
| 监控订阅数 | PUBSUB NUMSUB 防止泄漏 |
| 结合 Lua 原子发布 | 保证业务一致性 |
# 查看活跃频道
PUBSUB CHANNELS
# 查看每个频道的订阅者数
PUBSUB NUMSUB chat:room1 sys:notify
七、常见问题与解决方案
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 消息丢失 | 订阅者掉线 | 业务允许丢失 or 用 Stream |
| 客户端卡死 | 订阅太多频道 | 限制订阅数 |
| 内存泄漏 | 无人订阅的频道 | Redis 自动清理 |
| 广播风暴 | 频繁发布大消息 | 限流 + 分频道 |
八、一键速查表
# 发布
PUBLISH chat:room1 "Hello"
# 订阅
SUBSCRIBE chat:room1
UNSUBSCRIBE chat:room1
# 模式订阅
PSUBSCRIBE logs:*
PUNSUBSCRIBE logs:*
# 监控
PUBSUB CHANNELS chat:*
PUBSUB NUMSUB chat:room1
PUBSUB NUMPAT
九、客户端代码示例
Python (redis-py)
import redis
import threading
# 订阅者
def subscriber():
r = redis.Redis()
pubsub = r.pubsub()
pubsub.subscribe('chat:room1')
for message in pubsub.listen():
if message['type'] == 'message':
print(f"收到: {message['data'].decode()}")
# 启动订阅线程
t = threading.Thread(target=subscriber)
t.daemon = True
t.start()
# 发布者
r = redis.Redis()
r.publish('chat:room1', '你好,Redis!')
Node.js (ioredis)
const Redis = require('ioredis');
const redis = new Redis();
// 订阅
const sub = new Redis();
sub.subscribe('news', (err, count) => {
console.log(`已订阅 ${count} 个频道`);
});
sub.on('message', (channel, message) => {
console.log(`[${channel}] ${message}`);
});
// 发布
redis.publish('news', 'Redis 7.4 发布!');
十、高并发 Pub/Sub 架构设计
graph LR
A[应用服务] -->|PUBLISH| R[Redis]
B[WebSocket 服务] -->|SUBSCRIBE| R
C[配置中心] -->|PUBLISH| R
D[微服务 A] -->|PSUBSCRIBE config:*| R
E[微服务 B] -->|PSUBSCRIBE config:*| R
最佳实践:
- 订阅者用独立连接
- 发布者用连接池
- 频道分层:
module:instance:event - 消息 JSON 序列化
十一、Redis 7.0+ 新特性:分片 Pub/Sub
# 集群模式下支持跨槽位广播
PUBLISH __sentinel__:shard1 "维护通知"
普通 Pub/Sub 在集群中只广播到同槽位,7.0+ 解决此问题
完成!你已精通 Redis Pub/Sub!
# 一行命令体验 Pub/Sub
redis-cli <<EOF
SUBSCRIBE demo:channel
# 新终端
PUBLISH demo:channel "Hello Pub/Sub!"
# 收到: 1) "message" 2) "demo:channel" 3) "Hello Pub/Sub!"
EOF
下一步推荐:
需要我送你:
- “百万用户实时通知系统(Pub/Sub + 分片)”?
- “企业级配置中心(Pub/Sub + 加密)”?
- “日志实时推送 + 告警平台”?
回复:通知系统 | 配置中心 | 日志告警 即可!