Redis Stream 完全攻略
“企业级消息队列” —— 持久化 + 消费者组 + 精确一次 + 百万级吞吐
替代 Kafka/RabbitMQ 的轻量级方案,Redis 5.0+ 原生支持
一、Stream 核心特点
| 特性 | 说明 |
|---|---|
| 底层结构 | Rax 树(Radix Tree) + 宏节点/微节点 |
| 持久化 | 支持 RDB + AOF |
| 消息 ID | timestamp-seq(如 1734028800000-0) |
| 消费者组 | 支持 多消费者负载均衡 + 精确一次 |
| 阻塞读取 | XREAD BLOCK / XREADGROUP |
| 消息确认 | XACK 手动确认 |
| 最大容量 | 无限(受磁盘限制) |
| 性能 | 10万+ QPS,单节点 |
二、Stream 命令全表(共 15 个)
| 命令 | 说明 | 示例 |
|---|---|---|
XADD key [MAXLEN ~ n] * field value | 添加消息 | XADD logs * level error msg "DB down" |
XREAD [COUNT n] [BLOCK ms] STREAMS key id | 读取消息 | XREAD COUNT 10 STREAMS logs 0 |
XREADGROUP GROUP g c [COUNT n] [BLOCK ms] STREAMS key > | 消费者组读取 | XREADGROUP GROUP workers c1 STREAMS tasks > |
XACK key group id [id ...] | 确认消息 | XACK tasks workers 1734028800000-0 |
XPENDING key group [start end count] [consumer] | 查看待确认 | XPENDING tasks workers |
XCLAIM key group consumer min-idle id [id ...] | 认领消息 | XCLAIM tasks workers c2 5000 1734028800000-0 |
XDEL key id [id ...] | 删除消息 | XDEL logs 1734028800000-0 |
XTRIM key MAXLEN ~ 1000 | 裁剪长度 | XTRIM logs MAXLEN ~ 10000 |
XLEN key | 消息数量 | XLEN logs |
XRANGE key start end [COUNT n] | 范围查询 | XRANGE logs 1734028800000-0 1734028900000-0 |
XREVRANGE key end start [COUNT n] | 倒序查询 | XREVRANGE logs + - COUNT 10 |
XGROUP CREATE key group $ MKSTREAM | 创建消费者组 | XGROUP CREATE tasks workers $ MKSTREAM |
XGROUP DESTROY key group | 销毁组 | XGROUP DESTROY tasks workers |
XGROUP SETID key group $ | 设置最后消费 ID | XGROUP SETID tasks workers $ |
XINFO STREAM key | 流信息 | XINFO STREAM logs |
XINFO GROUPS key | 组信息 | XINFO GROUPS tasks |
XINFO CONSUMERS key group | 消费者信息 | XINFO CONSUMERS tasks workers |
三、核心实战场景
1. 任务队列(消费者组)
# 创建消费者组
XGROUP CREATE tasks workers $ MKSTREAM
# 生产者添加任务
XADD tasks * action backup db prod
XADD tasks * action send_email user 1001
# 消费者1 读取
XREADGROUP GROUP workers c1 COUNT 1 BLOCK 0 STREAMS tasks >
# 消费完成后确认
XACK tasks workers 1734028800000-0
2. 日志收集与实时分析
# 应用写入日志
XADD app:logs * level error msg "NullPointer" trace "abc123"
# 日志分析服务消费
XREADGROUP GROUP log_analyzer consumer1 STREAMS app:logs >
# 裁剪旧日志
XTRIM app:logs MAXLEN ~ 100000
3. 订单处理流水线
# 订单创建
XADD orders * order_id 1001 status created amount 99.9
# 支付服务消费
XREADGROUP GROUP payment p1 STREAMS orders >
# 支付成功 → 确认
XACK orders payment 1734028800000-0
4. 消息广播(不使用消费者组)
# 所有服务都读取
XREAD COUNT 10 STREAMS notifications 0
四、消息 ID 详解
1734028800000-0
│ │ │
│ │ └─ 序列号(同毫秒内自增)
│ └────── 时间戳(毫秒)
└─────────────── Unix 时间戳
特殊 ID
| ID | 说明 |
|---|---|
0 | 流开头 |
$ | 流结尾(最新) |
> | 消费者组未消费消息 |
* | 自动生成 ID |
五、消费者组 vs 普通读取
| 对比项 | 消费者组 (XREADGROUP) | 普通读取 (XREAD) |
|---|---|---|
| 负载均衡 | 支持 | 不支持 |
| 精确一次 | 支持 XACK | 不支持 |
| 消息认领 | 支持 XCLAIM | 不支持 |
| 断线续消费 | 支持 | 不支持 |
| 推荐场景 | 任务处理 | 广播通知 |
六、持久化与高可用
# redis.conf
appendonly yes
appendfsync everysec
主从复制
- Stream 支持异步复制
- 从库可读,但不能写
# 从库查看
XINFO STREAM tasks
七、性能与内存优化
| 建议 | 说明 |
|---|---|
用 MAXLEN ~ n 裁剪 | XADD key MAXLEN ~ 10000 * ... |
| 分流存储 | logs:20251112、logs:20251113 |
消费者及时 XACK | 防止 PEL 膨胀 |
定期 XCLAIM 死信 | 处理超时消息 |
| 监控 PEL 长度 | XPENDING key group |
# 自动裁剪
XADD logs MAXLEN ~ 100000 * level info msg "ok"
八、死信处理(消息认领)
# 查看待处理消息
XPENDING tasks workers - + 10 c2
# 认领 5 分钟未确认的消息
XCLAIM tasks workers c2 300000 1734028800000-0
九、一键速查表
# 生产者
XADD s MAXLEN ~ 1000 * f v
# 创建消费者组
XGROUP CREATE s g $ MKSTREAM
# 消费者读取
XREADGROUP GROUP g c COUNT 10 BLOCK 0 STREAMS s >
# 确认
XACK s g 1734028800000-0
# 裁剪
XTRIM s MAXLEN ~ 10000
# 信息
XLEN s
XINFO STREAM s
XINFO GROUPS s
XPENDING s g
十、客户端代码示例
Python (redis-py)
import redis
r = redis.Redis()
# 生产者
msg_id = r.xadd('tasks', {'action': 'backup'}, maxlen=10000)
print(f"消息 ID: {msg_id}")
# 创建消费者组
r.xgroup_create('tasks', 'workers', id='$', mkstream=True)
# 消费者
while True:
msgs = r.xreadgroup('workers', 'c1', {'tasks': '>'}, count=10, block=0)
for stream, entries in msgs:
for msg_id, fields in entries:
print(f"处理 {msg_id}: {fields}")
r.xack('tasks', 'workers', msg_id)
Go (go-redis)
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
// 生产者
id, _ := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "logs",
MaxLen: 10000,
Values: map[string]interface{}{"level": "error", "msg": "DB down"},
}).Result()
// 消费者组
rdb.XGroupCreateMkStream(ctx, "logs", "analyzers", "$")
msgs, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "analyzers",
Consumer: "worker1",
Streams: []string{"logs", ">"},
Count: 10,
Block: 0,
}).Result()
十一、Stream vs Kafka vs RabbitMQ
| 对比项 | Redis Stream | Kafka | RabbitMQ |
|---|---|---|---|
| 吞吐量 | 10万+ QPS | 百万+ | 10万 |
| 持久化 | 支持 | 支持 | 支持 |
| 延迟 | 亚毫秒 | 毫秒 | 毫秒 |
| 部署复杂度 | 低 | 高 | 中 |
| 精确一次 | 支持 | 支持 | 支持 |
| 推荐场景 | 轻量级、实时 | 海量日志 | 复杂路由 |
十二、Stream 在高并发中的终极用法
1. 异步任务 + 死信队列
-- Lua 脚本:安全添加 + 裁剪
EVAL "
local id = redis.call('xadd', KEYS[1], 'MAXLEN', '~', 10000, '*', 'data', ARGV[1])
return id
" 1 tasks "backup_job"
2. 实时大屏数据流
XADD dashboard * users 1001 orders 50 revenue 999.9
完成!你已精通 Redis Stream!
# 一行命令体验 Stream 全流程
redis-cli <<EOF
XADD s MAXLEN ~ ...
bash
XADD s MAXLEN ~ 100 * f v
XGROUP CREATE s g $ MKSTREAM
XREADGROUP GROUP g c BLOCK 0 STREAMS s >
XACK s g
XTRIM s MAXLEN ~ 10
XLEN s
EOF
“`
下一步推荐:
需要我送你:
- “高可靠任务队列(Stream + 死信 + 重试)”?
- “实时日志分析平台(Stream + ELK)”?
- “订单状态机(Stream + 消费者组)”?
回复:可靠队列 | 日志平台 | 订单状态机 即可!