Zookeeper Java 客户端搭建
以下是 ZooKeeper Java 客户端完整搭建教程(中文),从 环境准备 → 依赖引入 → 基础连接 → 高级功能(Watcher、分布式锁、主从选举)→ 最佳实践,支持原生 API + Curator 框架。
基于 ZooKeeper 3.8.4,Java 11+,Maven/Gradle
一、环境准备
| 项目 | 要求 |
|---|---|
| JDK | 11 或 17(推荐) |
| 构建工具 | Maven 3.6+ / Gradle 7+ |
| ZooKeeper 服务 | 已运行(单机或集群) |
| 网络 | 客户端能访问 2181 端口 |
二、Maven 项目创建
mvn archetype:generate \
-DgroupId=com.example.zk \
-DartifactId=zk-client-demo \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false
进入项目:
cd zk-client-demo
三、引入依赖
方式 1:原生 ZooKeeper 客户端(轻量)
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.4</version>
</dependency>
</dependencies>
方式 2:Curator 框架(强烈推荐)
解决原生 API 痛点:重连、Watcher 重复注册、异常处理
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.5.0</version>
</dependency>
</dependencies>
四、基础连接(原生 API)
ZkBasicClient.java
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
public class ZkBasicClient {
private static final String CONNECT_STRING = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181";
private static final int SESSION_TIMEOUT = 5000;
private ZooKeeper zk;
private CountDownLatch connectedLatch = new CountDownLatch(1);
public void connect() throws Exception {
zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接成功!");
connectedLatch.countDown();
}
});
connectedLatch.await(); // 阻塞等待连接
}
public void createNode(String path, String data) throws Exception {
String result = zk.create(path, data.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("创建节点: " + result);
}
public String getData(String path) throws Exception {
byte[] data = zk.getData(path, false, null);
return new String(data);
}
public void close() throws InterruptedException {
if (zk != null) zk.close();
}
public static void main(String[] args) throws Exception {
ZkBasicClient client = new ZkBasicClient();
client.connect();
client.createNode("/java_client", "hello from java");
System.out.println("数据: " + client.getData("/java_client"));
client.close();
}
}
五、Watcher 监听(原生)
public void watchNode(String path) throws Exception {
zk.getData(path, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
System.out.println(path + " 数据被修改!");
try {
// 重新注册 Watcher(原生是一次性!)
watchNode(path);
} catch (Exception e) {
e.printStackTrace();
}
}
}, null);
}
原生 Watcher 是一次性触发,需手动重新注册
六、Curator 客户端(推荐!)
CuratorClient.java
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorClient {
private static final String CONNECT_STRING = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181";
private CuratorFramework client;
public void start() {
client = CuratorFrameworkFactory.builder()
.connectString(CONNECT_STRING)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
System.out.println("Curator 连接成功");
}
public void create(String path, String data) throws Exception {
client.create().creatingParentsIfNeeded()
.forPath(path, data.getBytes());
}
public String getData(String path) throws Exception {
return new String(client.getData().forPath(path));
}
public void close() {
if (client != null) client.close();
}
public static void main(String[] args) throws Exception {
CuratorClient client = new CuratorClient();
client.start();
client.create("/curator/app", "curator data");
System.out.println(client.getData("/curator/app"));
client.close();
}
}
七、分布式锁(Curator 推荐)
DistributedLock.java
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
public class DistributedLock {
private final InterProcessMutex lock;
private final String lockPath = "/locks/order_lock";
public DistributedLock(CuratorFramework client) {
this.lock = new InterProcessMutex(client, lockPath);
}
public void acquire() throws Exception {
System.out.println("尝试获取锁...");
lock.acquire(); // 阻塞等待
System.out.println("锁获取成功!");
}
public void release() throws Exception {
if (lock.isAcquiredInThisProcess()) {
lock.release();
System.out.println("锁已释放");
}
}
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(
"zk1:2181,zk2:2181,zk3:2181",
new ExponentialBackoffRetry(1000, 3)
);
client.start();
DistributedLock dl = new DistributedLock(client);
dl.acquire();
// 模拟业务
Thread.sleep(5000);
dl.release();
client.close();
}
}
八、主从选举(Leader Election)
LeaderElection.java
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
public class LeaderElection extends LeaderSelectorListenerAdapter {
private final LeaderSelector leaderSelector;
private final String name;
public LeaderElection(CuratorFramework client, String name) {
this.name = name;
this.leaderSelector = new LeaderSelector(client, "/election/leader", this);
leaderSelector.autoRequeue(); // 自动重新参与选举
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println(name + " 成为 Leader!");
try {
Thread.sleep(5000); // 模拟领导工作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void start() {
leaderSelector.start();
}
public void stop() {
leaderSelector.close();
}
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(
"zk1:2181", new ExponentialBackoffRetry(1000, 3)
);
client.start();
LeaderElection election = new LeaderElection(client, "Client-A");
election.start();
System.in.read(); // 阻塞
election.stop();
client.close();
}
}
九、配置中心(实时监听)
client.getData().usingWatcher((CuratorWatcher) event -> {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
try {
String newData = new String(client.getData().forPath("/config/db"));
System.out.println("配置变更: " + newData);
} catch (Exception e) {
e.printStackTrace();
}
}
}).forPath("/config/db");
十、生产级配置建议
| 配置 | 推荐值 |
|---|---|
sessionTimeoutMs | 10-30 秒 |
connectionTimeoutMs | 10 秒 |
| 重试策略 | ExponentialBackoffRetry(1000, 3) |
| 连接串 | 多个节点,逗号分隔 |
| 线程池 | 避免在主线程操作 |
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("zk1:2181,zk2:2181,zk3:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(10000)
.retryPolicy(new ExponentialBackoffRetry(1000, 5))
.namespace("myapp") // 命名空间隔离
.build();
十一、完整项目结构
zk-client-demo/
├── pom.xml
├── src/
│ └── main/
│ └── java/
│ ├── ZkBasicClient.java
│ ├── CuratorClient.java
│ ├── DistributedLock.java
│ ├── LeaderElection.java
│ └── ConfigWatcher.java
十二、打包运行
mvn clean package
java -cp target/zk-client-demo-1.0-SNAPSHOT.jar com.example.zk.CuratorClient
十三、Gradle 支持(可选)
// build.gradle
dependencies {
implementation 'org.apache.curator:curator-recipes:5.5.0'
}
十四、常见问题
| 问题 | 解决方案 |
|---|---|
ConnectionLoss | 检查网络、ZK 服务、防火墙 |
SessionExpired | 增加 sessionTimeout,避免长时间阻塞 |
| Watcher 不触发 | 原生是一次性,改用 Curator |
| 节点已存在 | 使用 creatingParentsIfNeeded() |
十五、推荐学习路径
- [x] 原生 API 基础连接
- [x] Curator 简化开发
- [x] 分布式锁实现
- [x] 主从选举
- [x] 配置中心 + 实时监听
- [ ] 集成 Spring Boot
十六、GitHub 示例项目
https://github.com/yourname/zk-java-client-demo
包含:Maven + Curator + 分布式锁 + Leader 选举 + 单元测试
下一步建议:
- 运行
DistributedLock.java测试多实例竞争 - 启动两个
LeaderElection.java观察选举 - 集成到 Spring Boot 项目
需要我提供:
- Spring Boot + ZooKeeper 集成
- 单元测试(Testcontainers)
- Docker 部署客户端
- 完整 GitHub 项目模板
请告诉我!