ZooKeeper 在微服务中的实战——服务注册、Leader 选举、分布式锁
ZooKeeper 在微服务中的实战——服务注册、Leader 选举、分布式锁
适读人群:需要深入理解 ZooKeeper 并在微服务中使用它的 Java 后端开发者 | 阅读时长:约17分钟 | 核心价值:掌握 ZooKeeper 三大核心场景的完整实现,理解底层原理而不只是会用 API
ZooKeeper 为什么还值得学
很多人问我:现在有了 Nacos、Consul,ZooKeeper 还值得学吗?
我的回答是:如果你只是想用服务注册发现,用 Nacos 就好;但如果你想真正理解分布式协调的原理,ZooKeeper 是绕不开的。
Nacos 的服务注册,底层的一致性思想来自 ZooKeeper;Kafka 早期的 Broker 选举也依赖 ZooKeeper(3.0 后用 KRaft 替代);Hadoop/HBase 的分布式协调还在用 ZooKeeper。
更重要的是,ZooKeeper 的 Watch 机制和临时节点设计,是许多分布式协调场景的基础模式。理解了这些,再去学其他系统,触类旁通。
ZooKeeper 核心数据模型
ZooKeeper 用树状的数据结构(ZNode 树)来存储和管理数据,类似文件系统:
/
├── services/
│ ├── order-service/
│ │ ├── 192.168.1.1:8080 (临时节点)
│ │ └── 192.168.1.2:8080 (临时节点)
│ └── payment-service/
│ └── 192.168.1.3:8080 (临时节点)
├── locks/
│ └── stock-lock/
│ ├── lock-0000000001 (临时顺序节点)
│ └── lock-0000000002 (临时顺序节点)
└── election/
└── leader-00000000001 (临时顺序节点)节点类型:
- 持久节点(PERSISTENT):客户端断开后依然存在
- 临时节点(EPHEMERAL):客户端断开后自动删除(分布式锁、服务注册的基础)
- 临时顺序节点(EPHEMERAL_SEQUENTIAL):临时 + 自动追加递增序号(Leader 选举、公平锁的基础)
场景一:服务注册与发现
基于 Curator 实现服务注册
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.4.0</version>
</dependency>@Configuration
public class ZookeeperConfig {
@Value("${zookeeper.connect-string:localhost:2181}")
private String connectString;
@Bean(destroyMethod = "close")
public CuratorFramework curatorFramework() {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(30000)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("microservices") // 所有节点都在这个命名空间下
.build();
client.start();
return client;
}
@Bean
public ServiceDiscovery<ServiceInstance> serviceDiscovery(CuratorFramework client)
throws Exception {
ServiceDiscovery<ServiceInstance> discovery = ServiceDiscoveryBuilder
.builder(ServiceInstance.class)
.client(client)
.basePath("/services")
.serializer(new JsonInstanceSerializer<>(ServiceInstance.class))
.build();
discovery.start();
return discovery;
}
}@Component
public class ServiceRegistry {
@Autowired
private ServiceDiscovery<ServiceInstance> serviceDiscovery;
@Value("${spring.application.name}")
private String serviceName;
@Value("${server.port}")
private int serverPort;
/**
* 服务注册(创建临时节点,服务停止后节点自动删除)
*/
@PostConstruct
public void register() throws Exception {
ServiceInstance<ServiceInstance> instance = ServiceInstance
.<ServiceInstance>builder()
.name(serviceName)
.address(getLocalIP())
.port(serverPort)
.id(serviceName + ":" + getLocalIP() + ":" + serverPort)
.build();
serviceDiscovery.registerService(instance);
log.info("服务注册成功:{}", instance.getId());
}
/**
* 发现服务实例(查询 ZK,带客户端缓存)
*/
public List<ServiceInstance<ServiceInstance>> discover(String targetServiceName)
throws Exception {
return new ArrayList<>(serviceDiscovery.queryForInstances(targetServiceName));
}
}手动实现服务发现(带 Watch,实时感知节点变化)
@Component
public class ServiceWatcher {
@Autowired
private CuratorFramework zkClient;
private final Map<String, List<String>> serviceCache = new ConcurrentHashMap<>();
/**
* 监听服务节点变化,实时更新本地缓存
*/
public void watchService(String serviceName) throws Exception {
String path = "/services/" + serviceName;
PathChildrenCache watcher = new PathChildrenCache(zkClient, path, true);
watcher.getListenable().addListener((client, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
log.info("服务节点上线: {}", event.getData().getPath());
refreshServiceCache(serviceName);
break;
case CHILD_REMOVED:
log.warn("服务节点下线: {}", event.getData().getPath());
refreshServiceCache(serviceName);
break;
case CHILD_UPDATED:
log.info("服务节点更新: {}", event.getData().getPath());
refreshServiceCache(serviceName);
break;
}
});
watcher.start();
// 初始化缓存
refreshServiceCache(serviceName);
}
private void refreshServiceCache(String serviceName) throws Exception {
String path = "/services/" + serviceName;
List<String> children = zkClient.getChildren().forPath(path);
serviceCache.put(serviceName, children);
log.info("服务缓存已更新,serviceName={}, instances={}", serviceName, children.size());
}
public List<String> getServiceInstances(String serviceName) {
return serviceCache.getOrDefault(serviceName, Collections.emptyList());
}
}场景二:Leader 选举
Leader 选举是 ZooKeeper 最经典的应用场景之一,常用于:
- 分布式任务调度(只让 Leader 节点执行定时任务)
- 数据库主从切换
- 集群协调
@Component
public class LeaderElection {
@Autowired
private CuratorFramework zkClient;
private LeaderLatch leaderLatch;
private volatile boolean isLeader = false;
@PostConstruct
public void startElection() throws Exception {
String applicationId = getLocalIP() + ":" + serverPort;
// Curator 的 LeaderLatch 基于临时顺序节点实现
leaderLatch = new LeaderLatch(
zkClient,
"/election/scheduler-leader", // 选举路径
applicationId // 当选 Leader 时的标识
);
// 添加监听器
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
isLeader = true;
log.info("本节点当选 Leader!applicationId={}", applicationId);
onBecomeLeader();
}
@Override
public void notLeader() {
isLeader = false;
log.info("本节点失去 Leader 身份。applicationId={}", applicationId);
onLoseLeadership();
}
});
leaderLatch.start();
log.info("加入 Leader 选举,等待结果...");
}
/**
* 当选 Leader 后执行
*/
private void onBecomeLeader() {
// 只有 Leader 才执行的逻辑
// 比如:开始接受任务分配、启动定时任务等
schedulerService.startLeaderTasks();
}
/**
* 失去 Leader 后执行
*/
private void onLoseLeadership() {
// 停止 Leader 专属任务
schedulerService.stopLeaderTasks();
}
/**
* 执行任务前先检查是否是 Leader
*/
public void executeIfLeader(Runnable task) {
if (isLeader) {
task.run();
} else {
log.debug("当前节点不是 Leader,跳过任务执行");
}
}
/**
* 主动查询当前 Leader 是谁
*/
public String getCurrentLeader() throws Exception {
Participant leader = leaderLatch.getLeader();
return leader.getId();
}
@PreDestroy
public void close() throws Exception {
if (leaderLatch != null) {
leaderLatch.close();
}
}
}基于 Leader 选举的分布式定时任务
@Component
public class DistributedScheduler {
@Autowired
private LeaderElection leaderElection;
/**
* 每天凌晨执行数据归档
* 多个实例启动,但只有 Leader 实际执行任务
*/
@Scheduled(cron = "0 0 2 * * ?")
public void dailyArchiveTask() {
leaderElection.executeIfLeader(() -> {
log.info("Leader 开始执行每日数据归档任务");
archiveService.archiveYesterdayData();
});
}
}场景三:分布式锁(详细实现)
ZooKeeper 分布式锁基于临时顺序节点,实现公平锁:
@Component
public class ZkDistributedLock {
@Autowired
private CuratorFramework zkClient;
private final String LOCK_BASE_PATH = "/locks";
/**
* 获取分布式锁(非阻塞)
* 返回锁路径(用于解锁),为 null 表示获取失败
*/
public String tryLock(String resourceName) throws Exception {
String lockPath = LOCK_BASE_PATH + "/" + resourceName;
// 确保父路径存在(持久节点)
if (zkClient.checkExists().forPath(lockPath) == null) {
zkClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(lockPath);
}
// 创建临时顺序节点(代表当前客户端的锁请求)
String nodePath = zkClient.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(lockPath + "/lock-");
// 获取所有子节点,排序
List<String> children = zkClient.getChildren().forPath(lockPath);
Collections.sort(children);
// 获取当前节点的序号
String currentNode = nodePath.substring(lockPath.length() + 1);
// 如果当前节点是序号最小的,获取锁成功
if (currentNode.equals(children.get(0))) {
log.info("获取锁成功,nodePath={}", nodePath);
return nodePath;
}
// 不是最小节点,释放刚创建的节点(非阻塞模式直接失败)
zkClient.delete().forPath(nodePath);
return null;
}
/**
* 获取分布式锁(阻塞,等待直到获取锁)
*/
public String lock(String resourceName, long timeoutMs) throws Exception {
String lockPath = LOCK_BASE_PATH + "/" + resourceName;
if (zkClient.checkExists().forPath(lockPath) == null) {
zkClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(lockPath);
}
String nodePath = zkClient.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(lockPath + "/lock-");
long deadline = System.currentTimeMillis() + timeoutMs;
while (true) {
List<String> children = zkClient.getChildren().forPath(lockPath);
Collections.sort(children);
String currentNode = nodePath.substring(lockPath.length() + 1);
int myIndex = children.indexOf(currentNode);
if (myIndex == 0) {
// 当前是最小节点,获取锁成功
return nodePath;
}
// 监听前一个节点
String prevNode = lockPath + "/" + children.get(myIndex - 1);
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zkClient.checkExists()
.usingWatcher((Watcher) event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
latch.countDown(); // 前一个节点删除,重新尝试
}
})
.forPath(prevNode);
if (stat == null) {
// 前一个节点已经删除,直接重试
continue;
}
long remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
// 超时,释放自己的节点
zkClient.delete().forPath(nodePath);
throw new TimeoutException("获取分布式锁超时,resource=" + resourceName);
}
latch.await(remaining, TimeUnit.MILLISECONDS);
}
}
/**
* 释放锁
*/
public void unlock(String nodePath) throws Exception {
if (nodePath != null && zkClient.checkExists().forPath(nodePath) != null) {
zkClient.delete().forPath(nodePath);
log.info("释放锁,nodePath={}", nodePath);
}
}
}三大踩坑实录
坑一:Watch 是一次性的,容易漏事件
现象: 服务注册后,第一次下线能感知到(节点删除触发 Watch),但如果这个服务在短时间内上线又下线了两次,第二次下线没有触发任何 Watch,服务缓存里一直有这个已经下线的实例。
原因: ZooKeeper 的 Watch 是一次性的——触发一次后就失效了,需要重新注册。如果两次事件之间没有重新注册 Watch,第二次事件就收不到。
解法: 使用 Curator 的 PathChildrenCache 或 TreeCache,它们内部会自动重新注册 Watch,实现持续监听。
坑二:ZooKeeper 会话过期导致临时节点消失,服务假下线
现象: 服务还在正常运行,但 ZooKeeper 里的临时节点消失了,其他服务认为它下线了,不再路由请求给它,导致部分请求失败。
原因: 服务器 GC Pause 超过 ZooKeeper session timeout(默认 30 秒),ZK 认为客户端宕机,删除了临时节点。GC 结束后服务还在,但 ZK 已经删了注册信息。
解法:
- 增大 sessionTimeout(比如 60 秒)
- 优化 JVM,减少 Full GC 时间
- 重新注册逻辑:Watch 到 SESSION_EXPIRED 事件时,自动重新注册服务
坑三:Leader 选举后没有检测 Leader 宕机
现象: Leader 节点宕机后,其他节点确实感知到了(临时节点删除),但新的 Leader 选举出来了,旧的定时任务还有一个正在执行中(宕机前已经开始执行),新 Leader 的相同任务也开始执行,同一个任务跑了两遍。
原因: Leader 宕机和旧任务完全停止之间有一个时间窗口。
解法: 定时任务的执行加幂等保护,任务开始时在数据库记录执行标记(唯一索引),任务完成后更新状态。即使两个实例同时尝试执行,唯一索引保证只有一个成功。
总结
ZooKeeper 的核心价值在于:用强一致性的数据模型(Zab 协议保证)+ 临时节点 + Watch 机制,构建出了一套强大的分布式协调原语。
理解了这三个机制,ZooKeeper 的所有高级应用(服务注册、Leader 选举、分布式锁、配置管理)都是自然推导出来的,而不是死记硬背的 API。
