Zookeeper Java 客户端搭建

以下是 ZooKeeper Java 客户端完整搭建教程(中文),从 环境准备 → 依赖引入 → 基础连接 → 高级功能(Watcher、分布式锁、主从选举)→ 最佳实践支持原生 API + Curator 框架

基于 ZooKeeper 3.8.4,Java 11+,Maven/Gradle


一、环境准备

项目要求
JDK11 或 17(推荐)
构建工具Maven 3.6+ / Gradle 7+
ZooKeeper 服务已运行(单机或集群)
网络客户端能访问 2181 端口

二、Maven 项目创建

mvn archetype:generate \
  -DgroupId=com.example.zk \
  -DartifactId=zk-client-demo \
  -DarchetypeArtifactId=maven-archetype-quickstart \
  -DinteractiveMode=false

进入项目:

cd zk-client-demo

三、引入依赖

方式 1:原生 ZooKeeper 客户端(轻量)

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.8.4</version>
    </dependency>
</dependencies>

方式 2:Curator 框架(强烈推荐

解决原生 API 痛点:重连、Watcher 重复注册、异常处理

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>5.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>5.5.0</version>
    </dependency>
</dependencies>

四、基础连接(原生 API)

ZkBasicClient.java

import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;

public class ZkBasicClient {
    private static final String CONNECT_STRING = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private ZooKeeper zk;
    private CountDownLatch connectedLatch = new CountDownLatch(1);

    public void connect() throws Exception {
        zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                System.out.println("连接成功!");
                connectedLatch.countDown();
            }
        });
        connectedLatch.await(); // 阻塞等待连接
    }

    public void createNode(String path, String data) throws Exception {
        String result = zk.create(path, data.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("创建节点: " + result);
    }

    public String getData(String path) throws Exception {
        byte[] data = zk.getData(path, false, null);
        return new String(data);
    }

    public void close() throws InterruptedException {
        if (zk != null) zk.close();
    }

    public static void main(String[] args) throws Exception {
        ZkBasicClient client = new ZkBasicClient();
        client.connect();

        client.createNode("/java_client", "hello from java");
        System.out.println("数据: " + client.getData("/java_client"));

        client.close();
    }
}

五、Watcher 监听(原生)

public void watchNode(String path) throws Exception {
    zk.getData(path, event -> {
        if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
            System.out.println(path + " 数据被修改!");
            try {
                // 重新注册 Watcher(原生是一次性!)
                watchNode(path);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }, null);
}

原生 Watcher 是一次性触发,需手动重新注册


六、Curator 客户端(推荐!)

CuratorClient.java

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorClient {
    private static final String CONNECT_STRING = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181";
    private CuratorFramework client;

    public void start() {
        client = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_STRING)
                .sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        client.start();
        System.out.println("Curator 连接成功");
    }

    public void create(String path, String data) throws Exception {
        client.create().creatingParentsIfNeeded()
              .forPath(path, data.getBytes());
    }

    public String getData(String path) throws Exception {
        return new String(client.getData().forPath(path));
    }

    public void close() {
        if (client != null) client.close();
    }

    public static void main(String[] args) throws Exception {
        CuratorClient client = new CuratorClient();
        client.start();

        client.create("/curator/app", "curator data");
        System.out.println(client.getData("/curator/app"));

        client.close();
    }
}

七、分布式锁(Curator 推荐)

DistributedLock.java

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

public class DistributedLock {
    private final InterProcessMutex lock;
    private final String lockPath = "/locks/order_lock";

    public DistributedLock(CuratorFramework client) {
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void acquire() throws Exception {
        System.out.println("尝试获取锁...");
        lock.acquire(); // 阻塞等待
        System.out.println("锁获取成功!");
    }

    public void release() throws Exception {
        if (lock.isAcquiredInThisProcess()) {
            lock.release();
            System.out.println("锁已释放");
        }
    }

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "zk1:2181,zk2:2181,zk3:2181",
            new ExponentialBackoffRetry(1000, 3)
        );
        client.start();

        DistributedLock dl = new DistributedLock(client);
        dl.acquire();

        // 模拟业务
        Thread.sleep(5000);

        dl.release();
        client.close();
    }
}

八、主从选举(Leader Election)

LeaderElection.java

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;

public class LeaderElection extends LeaderSelectorListenerAdapter {
    private final LeaderSelector leaderSelector;
    private final String name;

    public LeaderElection(CuratorFramework client, String name) {
        this.name = name;
        this.leaderSelector = new LeaderSelector(client, "/election/leader", this);
        leaderSelector.autoRequeue(); // 自动重新参与选举
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        System.out.println(name + " 成为 Leader!");
        try {
            Thread.sleep(5000); // 模拟领导工作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void start() {
        leaderSelector.start();
    }

    public void stop() {
        leaderSelector.close();
    }

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "zk1:2181", new ExponentialBackoffRetry(1000, 3)
        );
        client.start();

        LeaderElection election = new LeaderElection(client, "Client-A");
        election.start();

        System.in.read(); // 阻塞
        election.stop();
        client.close();
    }
}

九、配置中心(实时监听)

client.getData().usingWatcher((CuratorWatcher) event -> {
    if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
        try {
            String newData = new String(client.getData().forPath("/config/db"));
            System.out.println("配置变更: " + newData);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}).forPath("/config/db");

十、生产级配置建议

配置推荐值
sessionTimeoutMs10-30 秒
connectionTimeoutMs10 秒
重试策略ExponentialBackoffRetry(1000, 3)
连接串多个节点,逗号分隔
线程池避免在主线程操作
CuratorFramework client = CuratorFrameworkFactory.builder()
    .connectString("zk1:2181,zk2:2181,zk3:2181")
    .sessionTimeoutMs(30000)
    .connectionTimeoutMs(10000)
    .retryPolicy(new ExponentialBackoffRetry(1000, 5))
    .namespace("myapp")  // 命名空间隔离
    .build();

十一、完整项目结构

zk-client-demo/
├── pom.xml
├── src/
│   └── main/
│       └── java/
│           ├── ZkBasicClient.java
│           ├── CuratorClient.java
│           ├── DistributedLock.java
│           ├── LeaderElection.java
│           └── ConfigWatcher.java

十二、打包运行

mvn clean package
java -cp target/zk-client-demo-1.0-SNAPSHOT.jar com.example.zk.CuratorClient

十三、Gradle 支持(可选)

// build.gradle
dependencies {
    implementation 'org.apache.curator:curator-recipes:5.5.0'
}

十四、常见问题

问题解决方案
ConnectionLoss检查网络、ZK 服务、防火墙
SessionExpired增加 sessionTimeout,避免长时间阻塞
Watcher 不触发原生是一次性,改用 Curator
节点已存在使用 creatingParentsIfNeeded()

十五、推荐学习路径

  1. [x] 原生 API 基础连接
  2. [x] Curator 简化开发
  3. [x] 分布式锁实现
  4. [x] 主从选举
  5. [x] 配置中心 + 实时监听
  6. [ ] 集成 Spring Boot

十六、GitHub 示例项目

https://github.com/yourname/zk-java-client-demo

包含:Maven + Curator + 分布式锁 + Leader 选举 + 单元测试


下一步建议

  1. 运行 DistributedLock.java 测试多实例竞争
  2. 启动两个 LeaderElection.java 观察选举
  3. 集成到 Spring Boot 项目

需要我提供:

  • Spring Boot + ZooKeeper 集成
  • 单元测试(Testcontainers)
  • Docker 部署客户端
  • 完整 GitHub 项目模板

请告诉我!

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注