Redis Stream

Redis Stream 完全攻略

“企业级消息队列” —— 持久化 + 消费者组 + 精确一次 + 百万级吞吐

替代 Kafka/RabbitMQ 的轻量级方案,Redis 5.0+ 原生支持


一、Stream 核心特点

特性说明
底层结构Rax 树(Radix Tree) + 宏节点/微节点
持久化支持 RDB + AOF
消息 IDtimestamp-seq(如 1734028800000-0
消费者组支持 多消费者负载均衡 + 精确一次
阻塞读取XREAD BLOCK / XREADGROUP
消息确认XACK 手动确认
最大容量无限(受磁盘限制)
性能10万+ QPS,单节点

二、Stream 命令全表(共 15 个)

命令说明示例
XADD key [MAXLEN ~ n] * field value添加消息XADD logs * level error msg "DB down"
XREAD [COUNT n] [BLOCK ms] STREAMS key id读取消息XREAD COUNT 10 STREAMS logs 0
XREADGROUP GROUP g c [COUNT n] [BLOCK ms] STREAMS key >消费者组读取XREADGROUP GROUP workers c1 STREAMS tasks >
XACK key group id [id ...]确认消息XACK tasks workers 1734028800000-0
XPENDING key group [start end count] [consumer]查看待确认XPENDING tasks workers
XCLAIM key group consumer min-idle id [id ...]认领消息XCLAIM tasks workers c2 5000 1734028800000-0
XDEL key id [id ...]删除消息XDEL logs 1734028800000-0
XTRIM key MAXLEN ~ 1000裁剪长度XTRIM logs MAXLEN ~ 10000
XLEN key消息数量XLEN logs
XRANGE key start end [COUNT n]范围查询XRANGE logs 1734028800000-0 1734028900000-0
XREVRANGE key end start [COUNT n]倒序查询XREVRANGE logs + - COUNT 10
XGROUP CREATE key group $ MKSTREAM创建消费者组XGROUP CREATE tasks workers $ MKSTREAM
XGROUP DESTROY key group销毁组XGROUP DESTROY tasks workers
XGROUP SETID key group $设置最后消费 IDXGROUP SETID tasks workers $
XINFO STREAM key流信息XINFO STREAM logs
XINFO GROUPS key组信息XINFO GROUPS tasks
XINFO CONSUMERS key group消费者信息XINFO CONSUMERS tasks workers

三、核心实战场景


1. 任务队列(消费者组)

# 创建消费者组
XGROUP CREATE tasks workers $ MKSTREAM

# 生产者添加任务
XADD tasks * action backup db prod
XADD tasks * action send_email user 1001

# 消费者1 读取
XREADGROUP GROUP workers c1 COUNT 1 BLOCK 0 STREAMS tasks >

# 消费完成后确认
XACK tasks workers 1734028800000-0

2. 日志收集与实时分析

# 应用写入日志
XADD app:logs * level error msg "NullPointer" trace "abc123"

# 日志分析服务消费
XREADGROUP GROUP log_analyzer consumer1 STREAMS app:logs >

# 裁剪旧日志
XTRIM app:logs MAXLEN ~ 100000

3. 订单处理流水线

# 订单创建
XADD orders * order_id 1001 status created amount 99.9

# 支付服务消费
XREADGROUP GROUP payment p1 STREAMS orders >

# 支付成功 → 确认
XACK orders payment 1734028800000-0

4. 消息广播(不使用消费者组)

# 所有服务都读取
XREAD COUNT 10 STREAMS notifications 0

四、消息 ID 详解

1734028800000-0
│        │    │
│        │    └─ 序列号(同毫秒内自增)
│        └────── 时间戳(毫秒)
└─────────────── Unix 时间戳

特殊 ID

ID说明
0流开头
$流结尾(最新)
>消费者组未消费消息
*自动生成 ID

五、消费者组 vs 普通读取

对比项消费者组 (XREADGROUP)普通读取 (XREAD)
负载均衡支持不支持
精确一次支持 XACK不支持
消息认领支持 XCLAIM不支持
断线续消费支持不支持
推荐场景任务处理广播通知

六、持久化与高可用

# redis.conf
appendonly yes
appendfsync everysec

主从复制

  • Stream 支持异步复制
  • 从库可读,但不能写
# 从库查看
XINFO STREAM tasks

七、性能与内存优化

建议说明
MAXLEN ~ n 裁剪XADD key MAXLEN ~ 10000 * ...
分流存储logs:20251112logs:20251113
消费者及时 XACK防止 PEL 膨胀
定期 XCLAIM 死信处理超时消息
监控 PEL 长度XPENDING key group
# 自动裁剪
XADD logs MAXLEN ~ 100000 * level info msg "ok"

八、死信处理(消息认领)

# 查看待处理消息
XPENDING tasks workers - + 10 c2

# 认领 5 分钟未确认的消息
XCLAIM tasks workers c2 300000 1734028800000-0

九、一键速查表

# 生产者
XADD s MAXLEN ~ 1000 * f v

# 创建消费者组
XGROUP CREATE s g $ MKSTREAM

# 消费者读取
XREADGROUP GROUP g c COUNT 10 BLOCK 0 STREAMS s >

# 确认
XACK s g 1734028800000-0

# 裁剪
XTRIM s MAXLEN ~ 10000

# 信息
XLEN s
XINFO STREAM s
XINFO GROUPS s
XPENDING s g

十、客户端代码示例

Python (redis-py)

import redis
r = redis.Redis()

# 生产者
msg_id = r.xadd('tasks', {'action': 'backup'}, maxlen=10000)
print(f"消息 ID: {msg_id}")

# 创建消费者组
r.xgroup_create('tasks', 'workers', id='$', mkstream=True)

# 消费者
while True:
    msgs = r.xreadgroup('workers', 'c1', {'tasks': '>'}, count=10, block=0)
    for stream, entries in msgs:
        for msg_id, fields in entries:
            print(f"处理 {msg_id}: {fields}")
            r.xack('tasks', 'workers', msg_id)

Go (go-redis)

ctx := context.Background()
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})

// 生产者
id, _ := rdb.XAdd(ctx, &redis.XAddArgs{
    Stream: "logs",
    MaxLen: 10000,
    Values: map[string]interface{}{"level": "error", "msg": "DB down"},
}).Result()

// 消费者组
rdb.XGroupCreateMkStream(ctx, "logs", "analyzers", "$")
msgs, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
    Group:    "analyzers",
    Consumer: "worker1",
    Streams:  []string{"logs", ">"},
    Count:    10,
    Block:    0,
}).Result()

十一、Stream vs Kafka vs RabbitMQ

对比项Redis StreamKafkaRabbitMQ
吞吐量10万+ QPS百万+10万
持久化支持支持支持
延迟亚毫秒毫秒毫秒
部署复杂度
精确一次支持支持支持
推荐场景轻量级、实时海量日志复杂路由

十二、Stream 在高并发中的终极用法

1. 异步任务 + 死信队列

-- Lua 脚本:安全添加 + 裁剪
EVAL "
  local id = redis.call('xadd', KEYS[1], 'MAXLEN', '~', 10000, '*', 'data', ARGV[1])
  return id
" 1 tasks "backup_job"

2. 实时大屏数据流

XADD dashboard * users 1001 orders 50 revenue 999.9

完成!你已精通 Redis Stream!

# 一行命令体验 Stream 全流程
redis-cli <<EOF
XADD s MAXLEN ~ ...

bash
XADD s MAXLEN ~ 100 * f v
XGROUP CREATE s g $ MKSTREAM
XREADGROUP GROUP g c BLOCK 0 STREAMS s >
XACK s g
XTRIM s MAXLEN ~ 10
XLEN s
EOF
“`


下一步推荐

  1. Redis Stream + Lua 实现可靠任务队列
  2. Stream + 消费者组 实现订单处理流水线
  3. 亿级消息系统架构(Stream + 分片 + 备份)

需要我送你

  • “高可靠任务队列(Stream + 死信 + 重试)”
  • “实时日志分析平台(Stream + ELK)”
  • “订单状态机(Stream + 消费者组)”

回复:可靠队列 | 日志平台 | 订单状态机 即可!

文章已创建 2481

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部