Zookeeper 分布式锁实现原理

以下是 ZooKeeper 分布式锁实现原理完整剖析(中文),从 核心思想 → 临时顺序节点 → 锁竞争流程 → 公平性保证 → 羊群效应优化 → Java 实现(原生 + Curator)→ 图解 + 时序图 + 类比 + 最佳实践,让你 彻底掌握 ZK 锁的“排队抢号”机制


一、分布式锁核心需求

需求说明
互斥性同一时刻只能一个线程持有锁
高可用锁服务 crash 不影响已获取的锁
可重入支持同一线程多次加锁
公平性先来先得,避免饥饿
自动释放客户端崩溃,锁自动释放

ZooKeeper 分布式锁 = 临时顺序节点 + Watcher 监听


二、核心实现原理:临时顺序节点(EPHEMERAL_SEQUENTIAL)

# 客户端尝试加锁
create -e -s /locks/order_lock ""
# → 创建节点:/locks/order_lock-00000001
特性作用
临时(-e)客户端断开 → 节点自动删除 → 锁释放
顺序(-s)自动追加 10 位递增序号 → 实现公平排队

三、分布式锁竞争流程(5 步)

sequenceDiagram
    participant C1 as 客户端1
    participant C2 as 客户端2
    participant ZK as ZooKeeper

    C1->>ZK: create -e -s /locks/lock ""
    ZK-->>C1: /locks/lock-00000001
    C2->>ZK: create -e -s /locks/lock ""
    ZK-->>C2: /locks/lock-00000002

    C1->>ZK: ls /locks
    ZK-->>C1: [lock-00000001, lock-00000002]
    Note right of C1: 自己是最小 → 获取锁

    C2->>ZK: ls /locks + watch
    Note right of C2: 监听前一个节点 lock-00000001

    C1->>ZK: delete /locks/lock-00000001
    ZK->>C2: NodeDeleted 事件
    C2->>ZK: ls /locks
    ZK-->>C2: [lock-00000002]
    Note right of C2: 自己是最小 → 获取锁

四、锁状态机

stateDiagram-v2
    [*] --> TRYING
    TRYING --> LOCKED: 是最小节点
    TRYING --> WAITING: 监听前一个节点
    WAITING --> LOCKED: 前节点删除
    LOCKED --> [*]: 释放锁

五、公平性保证(FIFO)

客户端创建顺序节点名获取锁顺序
A1lock-00011
B2lock-00022
C3lock-00033

先到先得,绝对公平


六、羊群效应(Herd Effect)与优化

问题:所有客户端都监听 /locks → 节点删除 → 所有 Watcher 触发

# 100 个客户端都监听 /locks
ls /locks true
# lock-0001 删除 → 100 个客户端都被唤醒

优化:只监听前一个节点

String myNode = "/locks/lock-00000005";
List<String> children = zk.getChildren("/locks", false);
String prevNode = getPrevNode(children, myNode);
zk.exists(prevNode, true);  // 只监听前一个

七、Java 原生实现(完整代码)

public class DistributedLock {
    private ZooKeeper zk;
    private String lockPath = "/locks/order_lock";
    private String myNode;
    private CountDownLatch lockLatch = new CountDownLatch(1);

    public DistributedLock(ZooKeeper zk) throws Exception {
        this.zk = zk;
        ensurePath(lockPath);
    }

    public boolean tryLock() throws Exception {
        // 1. 创建临时顺序节点
        myNode = zk.create(lockPath + "/lock-", null,
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        // 2. 获取所有子节点
        List<String> children = zk.getChildren(lockPath, false);
        Collections.sort(children);

        // 3. 判断是否最小
        if (myNode.endsWith(children.get(0))) {
            return true; // 获取锁
        }

        // 4. 监听前一个节点
        String prev = getPrevNode(children);
        Stat stat = zk.exists(lockPath + "/" + prev, event -> {
            if (event.getType() == Event.EventType.NodeDeleted) {
                lockLatch.countDown(); // 唤醒等待
            }
        });

        if (stat == null) {
            return tryLock(); // 防止竞态
        }

        // 5. 阻塞等待
        lockLatch.await();
        return tryLock(); // 重新检查
    }

    public void unlock() throws Exception {
        zk.delete(myNode, -1);
    }

    private String getPrevNode(List<String> children) {
        String mySeq = myNode.substring(lockPath.length() + 1);
        int idx = children.indexOf(mySeq);
        return children.get(idx - 1);
    }

    private void ensurePath(String path) throws Exception {
        String[] parts = path.split("/");
        String curr = "";
        for (String p : parts) {
            if (p.isEmpty()) continue;
            curr += "/" + p;
            if (zk.exists(curr, false) == null) {
                zk.create(curr, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }
    }
}

八、Curator 实现(推荐!一行代码)

InterProcessMutex lock = new InterProcessMutex(client, "/locks/order_lock");
lock.acquire(); // 阻塞获取
// ... 业务逻辑
lock.release(); // 释放

Curator 自动处理:重连、Watcher 注册、羊群效应、异常重试


九、可重入锁实现(Curator)

InterProcessMutex lock = new InterProcessMutex(client, "/locks/order_lock");
lock.acquire();
lock.acquire(); // 同一线程可重入
lock.release();
lock.release();

内部用 ThreadLocal 记录重入次数


十、锁超时与死锁预防

// Curator 超时获取
if (lock.acquire(10, TimeUnit.SECONDS)) {
    try {
        // 业务
    } finally {
        lock.release();
    }
}

ZK 锁无死锁:临时节点 + 会话超时自动释放


十一、生产最佳实践

建议操作
使用 Curator避免原生 API 复杂性
锁路径隔离/locks/service-a, /locks/service-b
短锁业务快速完成
异常处理finally { release() }
监控锁数量echo mntr | grep zk_znode_count

十二、性能对比

实现QPS延迟复杂度
ZK 锁~100010ms
Redis 锁~100001ms
DB 锁~10050ms

ZK 适合高一致性、低并发场景


十三、常见问题

问题原因解决
锁泄露客户端崩溃未释放临时节点自动删
羊群效应所有客户端监听根节点监听前一个节点
频繁重试获取锁失败轮询acquire(timeout)
锁失效会话超时增大 sessionTimeout

十四、分布式锁记忆口诀

“临序抢最小,听前被唤醒,断开自动删,公平不饥饿”

  • 临序:临时顺序节点
  • 抢最小:最小序号获取锁
  • 听前:监听前一个节点
  • 被唤醒:前节点删除 → 事件通知
  • 断开删:会话断开 → 锁释放
  • 公平:FIFO

十五、zkCli 手动测试

# 客户端1
create -e -s /locks/lock ""
# → /locks/lock00000001 → 获取锁

# 客户端2
create -e -s /locks/lock ""
# → /locks/lock00000002
exists /locks/lock00000001 true  # 监听前一个

# 客户端1 释放
delete /locks/lock00000001
# → 客户端2 收到 NodeDeleted → 获取锁

总结:分布式锁流程图

graph TD
    A[尝试加锁] --> B[创建临时顺序节点]
    B --> C{是最小节点?}
    C -->|是| D[获取锁]
    C -->|否| E[监听前一个节点]
    E --> F{前节点删除?}
    F -->|是| D
    D --> G[执行业务]
    G --> H[删除自己的节点]
    H --> I[锁释放]

动手练习

  1. [x] 用 zkCli 模拟 2 个客户端争锁
  2. [x] Java 实现 可重入锁
  3. [x] 用 Curator 实现 超时获取
  4. [ ] 压测 100 客户端并发加锁

需要我提供:

  • Curator 完整可重入锁源码
  • 高并发锁压测脚本
  • 锁监控(Prometheus + Grafana)
  • 与 Redis 锁对比分析

请继续提问!

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注