Redis 管道技术

Redis 管道技术(Pipeline)全攻略(2025 版)

一句话定义一次网络交互发送多条命令,批量返回结果,大幅降低 RTT(往返时延),提升 吞吐量


1. 为什么需要 Pipeline?

1.1 普通请求 vs Pipeline

模式网络交互次数总耗时(假设 RTT=0.5ms)
1000 次 SET 逐个发送1000 次1000 × 0.5ms = 500ms
Pipeline 批量 1000 条1 次0.5ms + 处理时间 ≈ 50ms

性能提升 10 倍以上!


1.2 适用场景

场景是否推荐 Pipeline
批量写入(日志、埋点、缓存预热)强烈推荐
高频小操作(计数器、排行榜)推荐
单次查询不必要
事务(MULTI/EXEC自动使用 Pipeline

2. Pipeline 工作原理

客户端                     Redis 服务端
  │                            │
  │  1. 批量发送命令            │
  │───────────────────────────>│
  │                            │  依次执行命令
  │                            │  缓存所有响应
  │  2. 一次性返回所有结果      │
  │<───────────────────────────│
  │                            │

关键点

  • 不保证原子性(与 MULTI/EXEC 事务不同)
  • 顺序执行,顺序返回
  • 服务端内存缓存响应 → 避免响应过大

3. 主流客户端 Pipeline 使用方式

3.1 Java(Lettuce)

StatefulRedisConnection<String, String> conn = client.connect();
RedisCommands<String, String> sync = conn.sync();

// 开启 Pipeline
sync.setAutoFlushCommands(false);

List<RedisFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
    futures.add(sync.set("key:" + i, "value:" + i));
}

// 批量刷新
sync.flushCommands();

// 等待所有结果
List<KeyValue<String, String>> results = new ArrayList<>();
for (RedisFuture<?> future : futures) {
    results.add((KeyValue<String, String>) future.get());
}

sync.setAutoFlushCommands(true); // 恢复默认

Spring Data Redis 封装

redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
    for (int i = 0; i < 1000; i++) {
        connection.set(("key:" + i).getBytes(), ("value:" + i).getBytes());
    }
    return null;
});

3.2 Python(redis-py)

pipe = r.pipeline()
for i in range(1000):
    pipe.set(f"key:{i}", f"value:{i}")
results = pipe.execute()  # 一次性发送并获取结果

事务 + Pipeline(自动)

with r.pipeline() as pipe:
    pipe.multi()
    pipe.set("a", 1)
    pipe.incr("a")
    pipe.execute()  # 原子执行

3.3 Go(go-redis)

pipe := client.Pipeline()
for i := 0; i < 1000; i++ {
    pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i), 0)
}
cmds, err := pipe.Exec(ctx)  // 批量执行

3.4 Node.js(ioredis)

const pipeline = redis.pipeline();
for (let i = 0; i < 1000; i++) {
  pipeline.set(`key:${i}`, `value:${i}`);
}
await pipeline.exec();  // 返回 [[null, 'OK'], [null, 'OK'], ...]

3.5 redis-cli(调试用)

# 手动构造 Pipeline
(printf "SET key1 val1\nSET key2 val2\nPING\n"; sleep 1) | redis-cli

4. Pipeline 性能实测(memtier_benchmark)

# 无 Pipeline
memtier_benchmark -c 50 -t 4 --ratio=1:0 -n 100000
# → 8.5W OPS

# 开启 Pipeline=16
memtier_benchmark -c 50 -t 4 --pipeline=16 --ratio=1:0 -n 100000
# → **45W OPS**(提升 5.3 倍)
Pipeline 深度OPS 提升延迟下降内存占用
1(默认)1x基准
83~5x~1/8
165~8x~1/16
32+边际递减风险增加很高

5. Pipeline 深度推荐

网络环境推荐 Pipeline 深度
同机房(<0.5ms)8 ~ 16
跨可用区(1~2ms)16 ~ 32
跨地域(>10ms)32 ~ 100
公网高延迟100+

黄金法则Pipeline 深度 × RTT ≈ 目标总延迟


6. 风险与限制

风险说明规避方案
响应过大1000 个 1KB 结果 → 1MB 响应分批 Pipeline(每批 100 条)
内存泄漏客户端未消费结果始终调用 execute()
无原子性中间失败不回滚改用 MULTI/EXEC
超时放大整批超时设置合理 socket_timeout
服务端压力突发高 CPU限流 + 熔断

7. Pipeline vs 事务 vs Lua

特性PipelineMULTI/EXECLua 脚本
减少 RTT
原子性
条件逻辑
性能最快中等
适用场景批量写事务复杂逻辑

8. 生产级 Pipeline 最佳实践

8.1 动态分批 Pipeline

def batch_execute(commands, batch_size=100):
    results = []
    for i in range(0, len(commands), batch_size):
        batch = commands[i:i+batch_size]
        pipe = r.pipeline()
        for cmd, *args in batch:
            getattr(pipe, cmd)(*args)
        results.extend(pipe.execute())
    return results

8.2 超时与重试

// Lettuce
RedisURI uri = RedisURI.create("redis://host");
uri.setTimeout(Duration.ofMillis(500));

client.setOptions(ClientOptions.create()
    .disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS));

8.3 监控 Pipeline 性能

# Redis 端观察
redis-cli INFO stats | grep instantaneous_ops_per_sec
redis-cli SLOWLOG GET 10

Prometheus 指标

redis_pipeline_batch_size_avg
redis_pipeline_latency_ms

9. 一键 Pipeline 压测脚本

#!/bin/bash
# redis_pipeline_test.sh

HOST=${1:-127.0.0.1}
PORT=${2:-6379}
AUTH=${3:-}
DEPTH=${4:-16}

echo "Pipeline 压测:深度=$DEPTH"

memtier_benchmark \
  --server=$HOST --port=$PORT ${AUTH:+--authenticate=$AUTH} \
  --threads=8 --clients=100 \
  --pipeline=$DEPTH \
  --ratio=1:0 --data-size=32 \
  --test-time=60 \
  --json-out-file="pipeline_${DEPTH}_$(date +%s).json"

echo "结果已保存"

使用:

./redis_pipeline_test.sh 10.0.0.10 6379 pass 16

10. 最佳实践清单(Checklist)

项目检查
批量操作必须使用 Pipeline提升 5~10x
分批控制响应大小< 1MB/批
设置超时socket_timeout < 1s
监控 instantaneous_ops_per_sec验证提升
避免 Pipeline 内混杂慢命令KEYS *
生产环境深度 8~32平衡性能与风险
结合连接池使用避免连接频繁创建

小结:Pipeline 使用决策树

graph TD
    A[有批量命令?] -->|是| B[是否需要原子性?]
    B -->|否| C[使用 Pipeline]
    B -->|是| D[使用 MULTI/EXEC 或 Lua]
    A -->|否| E[单条命令,无需 Pipeline]

彩蛋:Pipeline + Lua 组合技

-- Redis 中执行批量逻辑
local results = {}
for i = 1, 1000 do
    local key = KEYS[i]
    local val = ARGV[i]
    redis.call('SET', key, val)
    table.insert(results, 'OK')
end
return results
pipe = r.pipeline()
pipe.eval(lua_script, 1000, *[f"key{i}" for i in range(1000)], *[f"val{i}" for i in range(1000)])
pipe.execute()

极致性能:1 次 RTT 完成 1000 次逻辑。


需要 Pipeline 自动分批库K8s 侧车批量预热Pipeline 监控告警跨语言统一 Pipeline 封装?随时告诉我!

文章已创建 2481

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部