Redis 管道技术(Pipeline)全攻略(2025 版)
一句话定义:一次网络交互发送多条命令,批量返回结果,大幅降低 RTT(往返时延),提升 吞吐量。
1. 为什么需要 Pipeline?
1.1 普通请求 vs Pipeline
| 模式 | 网络交互次数 | 总耗时(假设 RTT=0.5ms) |
|---|---|---|
1000 次 SET 逐个发送 | 1000 次 | 1000 × 0.5ms = 500ms |
| Pipeline 批量 1000 条 | 1 次 | 0.5ms + 处理时间 ≈ 50ms |
性能提升 10 倍以上!
1.2 适用场景
| 场景 | 是否推荐 Pipeline |
|---|---|
| 批量写入(日志、埋点、缓存预热) | 强烈推荐 |
| 高频小操作(计数器、排行榜) | 推荐 |
| 单次查询 | 不必要 |
事务(MULTI/EXEC) | 自动使用 Pipeline |
2. Pipeline 工作原理
客户端 Redis 服务端
│ │
│ 1. 批量发送命令 │
│───────────────────────────>│
│ │ 依次执行命令
│ │ 缓存所有响应
│ 2. 一次性返回所有结果 │
│<───────────────────────────│
│ │
关键点:
- 不保证原子性(与
MULTI/EXEC事务不同)- 顺序执行,顺序返回
- 服务端内存缓存响应 → 避免响应过大
3. 主流客户端 Pipeline 使用方式
3.1 Java(Lettuce)
StatefulRedisConnection<String, String> conn = client.connect();
RedisCommands<String, String> sync = conn.sync();
// 开启 Pipeline
sync.setAutoFlushCommands(false);
List<RedisFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
futures.add(sync.set("key:" + i, "value:" + i));
}
// 批量刷新
sync.flushCommands();
// 等待所有结果
List<KeyValue<String, String>> results = new ArrayList<>();
for (RedisFuture<?> future : futures) {
results.add((KeyValue<String, String>) future.get());
}
sync.setAutoFlushCommands(true); // 恢复默认
Spring Data Redis 封装:
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (int i = 0; i < 1000; i++) {
connection.set(("key:" + i).getBytes(), ("value:" + i).getBytes());
}
return null;
});
3.2 Python(redis-py)
pipe = r.pipeline()
for i in range(1000):
pipe.set(f"key:{i}", f"value:{i}")
results = pipe.execute() # 一次性发送并获取结果
事务 + Pipeline(自动):
with r.pipeline() as pipe:
pipe.multi()
pipe.set("a", 1)
pipe.incr("a")
pipe.execute() # 原子执行
3.3 Go(go-redis)
pipe := client.Pipeline()
for i := 0; i < 1000; i++ {
pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i), 0)
}
cmds, err := pipe.Exec(ctx) // 批量执行
3.4 Node.js(ioredis)
const pipeline = redis.pipeline();
for (let i = 0; i < 1000; i++) {
pipeline.set(`key:${i}`, `value:${i}`);
}
await pipeline.exec(); // 返回 [[null, 'OK'], [null, 'OK'], ...]
3.5 redis-cli(调试用)
# 手动构造 Pipeline
(printf "SET key1 val1\nSET key2 val2\nPING\n"; sleep 1) | redis-cli
4. Pipeline 性能实测(memtier_benchmark)
# 无 Pipeline
memtier_benchmark -c 50 -t 4 --ratio=1:0 -n 100000
# → 8.5W OPS
# 开启 Pipeline=16
memtier_benchmark -c 50 -t 4 --pipeline=16 --ratio=1:0 -n 100000
# → **45W OPS**(提升 5.3 倍)
| Pipeline 深度 | OPS 提升 | 延迟下降 | 内存占用 |
|---|---|---|---|
| 1(默认) | 1x | 基准 | 低 |
| 8 | 3~5x | ~1/8 | 中 |
| 16 | 5~8x | ~1/16 | 高 |
| 32+ | 边际递减 | 风险增加 | 很高 |
5. Pipeline 深度推荐
| 网络环境 | 推荐 Pipeline 深度 |
|---|---|
| 同机房(<0.5ms) | 8 ~ 16 |
| 跨可用区(1~2ms) | 16 ~ 32 |
| 跨地域(>10ms) | 32 ~ 100 |
| 公网高延迟 | 100+ |
黄金法则:Pipeline 深度 × RTT ≈ 目标总延迟
6. 风险与限制
| 风险 | 说明 | 规避方案 |
|---|---|---|
| 响应过大 | 1000 个 1KB 结果 → 1MB 响应 | 分批 Pipeline(每批 100 条) |
| 内存泄漏 | 客户端未消费结果 | 始终调用 execute() |
| 无原子性 | 中间失败不回滚 | 改用 MULTI/EXEC |
| 超时放大 | 整批超时 | 设置合理 socket_timeout |
| 服务端压力 | 突发高 CPU | 限流 + 熔断 |
7. Pipeline vs 事务 vs Lua
| 特性 | Pipeline | MULTI/EXEC | Lua 脚本 |
|---|---|---|---|
| 减少 RTT | 是 | 是 | 是 |
| 原子性 | 否 | 是 | 是 |
| 条件逻辑 | 否 | 否 | 是 |
| 性能 | 最快 | 快 | 中等 |
| 适用场景 | 批量写 | 事务 | 复杂逻辑 |
8. 生产级 Pipeline 最佳实践
8.1 动态分批 Pipeline
def batch_execute(commands, batch_size=100):
results = []
for i in range(0, len(commands), batch_size):
batch = commands[i:i+batch_size]
pipe = r.pipeline()
for cmd, *args in batch:
getattr(pipe, cmd)(*args)
results.extend(pipe.execute())
return results
8.2 超时与重试
// Lettuce
RedisURI uri = RedisURI.create("redis://host");
uri.setTimeout(Duration.ofMillis(500));
client.setOptions(ClientOptions.create()
.disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS));
8.3 监控 Pipeline 性能
# Redis 端观察
redis-cli INFO stats | grep instantaneous_ops_per_sec
redis-cli SLOWLOG GET 10
Prometheus 指标:
redis_pipeline_batch_size_avg
redis_pipeline_latency_ms
9. 一键 Pipeline 压测脚本
#!/bin/bash
# redis_pipeline_test.sh
HOST=${1:-127.0.0.1}
PORT=${2:-6379}
AUTH=${3:-}
DEPTH=${4:-16}
echo "Pipeline 压测:深度=$DEPTH"
memtier_benchmark \
--server=$HOST --port=$PORT ${AUTH:+--authenticate=$AUTH} \
--threads=8 --clients=100 \
--pipeline=$DEPTH \
--ratio=1:0 --data-size=32 \
--test-time=60 \
--json-out-file="pipeline_${DEPTH}_$(date +%s).json"
echo "结果已保存"
使用:
./redis_pipeline_test.sh 10.0.0.10 6379 pass 16
10. 最佳实践清单(Checklist)
| 项目 | 检查 |
|---|---|
| 批量操作必须使用 Pipeline | 提升 5~10x |
| 分批控制响应大小 | < 1MB/批 |
| 设置超时 | socket_timeout < 1s |
监控 instantaneous_ops_per_sec | 验证提升 |
| 避免 Pipeline 内混杂慢命令 | 如 KEYS * |
生产环境深度 8~32 | 平衡性能与风险 |
| 结合连接池使用 | 避免连接频繁创建 |
小结:Pipeline 使用决策树
graph TD
A[有批量命令?] -->|是| B[是否需要原子性?]
B -->|否| C[使用 Pipeline]
B -->|是| D[使用 MULTI/EXEC 或 Lua]
A -->|否| E[单条命令,无需 Pipeline]
彩蛋:Pipeline + Lua 组合技
-- Redis 中执行批量逻辑
local results = {}
for i = 1, 1000 do
local key = KEYS[i]
local val = ARGV[i]
redis.call('SET', key, val)
table.insert(results, 'OK')
end
return results
pipe = r.pipeline()
pipe.eval(lua_script, 1000, *[f"key{i}" for i in range(1000)], *[f"val{i}" for i in range(1000)])
pipe.execute()
极致性能:1 次 RTT 完成 1000 次逻辑。
需要 Pipeline 自动分批库、K8s 侧车批量预热、Pipeline 监控告警 或 跨语言统一 Pipeline 封装?随时告诉我!