Java 并发工具类实战——CountDownLatch、CyclicBarrier、Semaphore 场景详解
Java 并发工具类实战——CountDownLatch、CyclicBarrier、Semaphore 场景详解
适读人群:Java后端开发,想用好JUC并发工具的工程师 | 阅读时长:约16分钟 | 核心价值:搞清楚三大并发协调工具的本质区别,在合适的场景用合适的工具
每次用并发工具都要查文档的困境
说实话,在我工作的第二年,CountDownLatch、CyclicBarrier和Semaphore这三个我总是分不清楚。
遇到并发协调的场景,我的习惯是:先想想用哪个,想不清楚就去查文档,查完还是有点模糊,最后凭感觉选一个,有时候选对了,有时候选错了再换。
后来我做了一个总结——其实记住三句话就够了:
- CountDownLatch:一个或多个线程等待其他N个线程完成特定任务(一次性)
- CyclicBarrier:N个线程相互等待,等大家都到了一起出发(可重复)
- Semaphore:控制同时访问某资源的线程数量(限流)
用一个生活比喻:
- CountDownLatch = 发令枪发令后,运动员各跑各的,裁判等所有人到终点
- CyclicBarrier = 旅游团集合,导游等所有人都到了才出发
- Semaphore = 停车场入口,有空位才能进,没空位等着
CountDownLatch:倒计时门闩
核心机制
// 创建一个初始计数为N的CountDownLatch
CountDownLatch latch = new CountDownLatch(N);
// 工作线程完成任务后调用
latch.countDown(); // 计数-1
// 等待线程调用,直到计数为0才返回
latch.await();
latch.await(timeout, unit); // 带超时版本CountDownLatch是一次性的:计数归零后,永远不会重置。所有后续的await()都立刻返回。
场景一:并行任务聚合
/**
* 并行初始化多个资源,等所有初始化完成后再启动服务
* 实际场景:Spring Boot启动时并行加载多个缓存
*/
public class ParallelInit {
public void initAll() throws InterruptedException {
List<String> resources = List.of("配置缓存", "用户缓存", "商品缓存", "权限缓存");
CountDownLatch initLatch = new CountDownLatch(resources.size());
ExecutorService pool = Executors.newFixedThreadPool(resources.size());
List<String> failedResources = Collections.synchronizedList(new ArrayList<>());
for (String resource : resources) {
pool.submit(() -> {
try {
long start = System.currentTimeMillis();
initResource(resource);
System.out.printf("[%s] 初始化完成,耗时 %dms%n",
resource, System.currentTimeMillis() - start);
} catch (Exception e) {
failedResources.add(resource);
System.err.printf("[%s] 初始化失败: %s%n", resource, e.getMessage());
} finally {
initLatch.countDown(); // 无论成功失败都要countdown
}
});
}
boolean allDone = initLatch.await(30, TimeUnit.SECONDS);
if (!allDone) {
throw new RuntimeException("初始化超时,超时资源:" + failedResources);
}
if (!failedResources.isEmpty()) {
throw new RuntimeException("部分资源初始化失败:" + failedResources);
}
System.out.println("所有资源初始化完成,服务启动");
pool.shutdown();
}
private void initResource(String name) throws InterruptedException {
Thread.sleep((long)(Math.random() * 1000 + 200)); // 模拟不同资源初始化时间
}
}场景二:并发测试启动枪
/**
* 并发压测:让所有线程在同一时刻开始执行,模拟真实并发
*/
public class ConcurrentTest {
public static void stressTest(int threadCount, Runnable task) throws InterruptedException {
CountDownLatch startGun = new CountDownLatch(1); // 发令枪
CountDownLatch finishLine = new CountDownLatch(threadCount); // 终点线
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
startGun.await(); // 所有线程在这里等待发令
task.run();
successCount.incrementAndGet();
} catch (Exception e) {
failCount.incrementAndGet();
} finally {
finishLine.countDown();
}
}).start();
}
Thread.sleep(100); // 等所有线程就位
long startTime = System.currentTimeMillis();
startGun.countDown(); // 发令!所有线程同时开跑
finishLine.await(); // 等所有线程完成
long totalTime = System.currentTimeMillis() - startTime;
System.out.printf("压测结果 | 线程数:%d 成功:%d 失败:%d 总耗时:%dms%n",
threadCount, successCount.get(), failCount.get(), totalTime);
}
}CyclicBarrier:循环屏障
核心机制
CyclicBarrier让一组线程互相等待,直到所有线程都到达某个屏障点,然后一起继续执行。
// 创建等待N个线程的屏障,可选:所有线程到达后执行的动作
CyclicBarrier barrier = new CyclicBarrier(N);
CyclicBarrier barrier = new CyclicBarrier(N, () -> System.out.println("所有人到齐了!"));
// 线程调用 await 等待,直到N个线程都到达
barrier.await(); // 等待,可被中断
barrier.await(timeout, unit); // 带超时与CountDownLatch的核心区别:
- CyclicBarrier可以重置复用,CountDownLatch是一次性的
- CyclicBarrier是双向等待(参与线程互相等),CountDownLatch是单向等待(等待线程等工作线程)
- CyclicBarrier有屏障动作(barrierAction),在所有线程到达后执行一次
场景:多阶段并行计算
/**
* 多阶段并行计算:大数据分析
* 阶段1:各分区并行加载数据
* 阶段2:各分区并行计算
* 阶段3:汇总结果
* 每个阶段必须等所有分区完成,才能进入下一阶段
*/
public class MultiPhaseComputation {
private final int partitionCount;
private final double[][] partitionData;
private final double[] partitionResults;
public MultiPhaseComputation(int partitionCount) {
this.partitionCount = partitionCount;
this.partitionData = new double[partitionCount][];
this.partitionResults = new double[partitionCount];
}
public double compute() throws Exception {
// 屏障动作:每个阶段完成后打印进度
AtomicInteger phaseNum = new AtomicInteger(1);
CyclicBarrier barrier = new CyclicBarrier(partitionCount, () -> {
System.out.printf("阶段 %d 完成,%d 个分区全部就绪%n",
phaseNum.getAndIncrement(), partitionCount);
});
ExecutorService pool = Executors.newFixedThreadPool(partitionCount);
List<Future<?>> futures = new ArrayList<>();
for (int p = 0; p < partitionCount; p++) {
final int partition = p;
futures.add(pool.submit(() -> {
try {
// --- 阶段1:加载数据 ---
partitionData[partition] = loadData(partition);
barrier.await(); // 等其他分区也加载完
// --- 阶段2:计算 ---
partitionResults[partition] = calculate(partitionData[partition]);
barrier.await(); // 等其他分区也计算完
// 所有分区结果准备好了,可以汇总了
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("计算被中断", e);
}
}));
}
// 等所有任务完成
for (Future<?> f : futures) {
f.get();
}
pool.shutdown();
// 汇总结果
double total = 0;
for (double r : partitionResults) total += r;
return total;
}
private double[] loadData(int partition) throws InterruptedException {
Thread.sleep(100 + (long)(Math.random() * 200));
return new double[]{partition * 10.0};
}
private double calculate(double[] data) throws InterruptedException {
Thread.sleep(50 + (long)(Math.random() * 100));
double sum = 0;
for (double d : data) sum += d;
return sum;
}
}Semaphore:信号量
核心机制
Semaphore维护一组许可证(permits),线程在访问资源前必须获取许可,使用完后释放。
Semaphore semaphore = new Semaphore(N); // N个许可
semaphore.acquire(); // 获取1个许可,不足则阻塞
semaphore.acquire(n); // 获取n个许可
semaphore.tryAcquire(); // 非阻塞尝试
semaphore.tryAcquire(time, unit); // 带超时尝试
semaphore.release(); // 释放1个许可
semaphore.release(n); // 释放n个许可
semaphore.availablePermits(); // 当前可用许可数
semaphore.getQueueLength(); // 等待中的线程数场景一:连接池限流
/**
* 用 Semaphore 实现数据库连接池的并发限制
* 确保同时最多只有 maxConnections 个线程在使用连接
*/
public class DatabaseConnectionPool {
private final Semaphore semaphore;
private final Queue<Connection> connectionPool;
private final int maxConnections;
public DatabaseConnectionPool(int maxConnections) {
this.maxConnections = maxConnections;
this.semaphore = new Semaphore(maxConnections, true); // 公平模式,防饥饿
this.connectionPool = new ConcurrentLinkedQueue<>();
// 初始化连接
for (int i = 0; i < maxConnections; i++) {
connectionPool.offer(createConnection(i));
}
}
public Connection getConnection(long timeoutMs) throws InterruptedException {
// 1. 获取许可(限制并发数)
boolean acquired = semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
if (!acquired) {
throw new RuntimeException("获取连接超时,连接池满载(" + maxConnections + "个连接均在使用)");
}
// 2. 从池中取出连接
Connection conn = connectionPool.poll();
if (conn == null) {
semaphore.release(); // 没拿到连接,释放许可
throw new IllegalStateException("连接池状态异常");
}
return conn;
}
public void releaseConnection(Connection conn) {
// 1. 连接归还池中
connectionPool.offer(conn);
// 2. 释放许可,让等待的线程可以获取
semaphore.release();
}
public void executeQuery(String sql) {
Connection conn = null;
try {
conn = getConnection(3000); // 最多等3秒
conn.execute(sql);
} catch (Exception e) {
System.err.println("查询失败: " + e.getMessage());
} finally {
if (conn != null) releaseConnection(conn); // 必须释放!
}
}
public int getAvailableConnections() {
return semaphore.availablePermits();
}
// 简化的Connection类
record Connection(int id) {
void execute(String sql) throws InterruptedException {
Thread.sleep(50); // 模拟查询耗时
}
}
private Connection createConnection(int id) {
return new Connection(id);
}
}场景二:API 限流
/**
* 基于 Semaphore 的API并发限流
* 限制同一时刻调用某外部API的并发数
*/
@Component
public class ApiRateLimiter {
// 对外部AI API的并发限制:最多5个并发请求
private final Semaphore aiApiSemaphore = new Semaphore(5);
public String callAiApi(String prompt) {
boolean acquired = false;
try {
// 最多等500ms,超时直接降级
acquired = aiApiSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
if (!acquired) {
// 降级:返回缓存结果或提示用户
return "系统繁忙,请稍后重试(当前并发:" +
(5 - aiApiSemaphore.availablePermits()) + "/5)";
}
return httpClient.post("/ai/generate", prompt);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "请求被取消";
} finally {
if (acquired) aiApiSemaphore.release();
}
}
}三个踩坑实录
坑一:CountDownLatch 的 countDown 放在 finally 外面
现象: 等待线程一直阻塞,任务线程都抛异常结束了,但await()永远不返回。
原因: 工作线程代码抛异常,countDown()在try块里,没有执行,计数一直不归零。
// 错误:countDown 可能因为异常不执行
latch.await();
try {
doWork();
latch.countDown(); // 如果 doWork() 抛异常,这行不执行
} catch (Exception e) { ... }
// 正确:countDown 必须在 finally 里
try {
doWork();
} catch (Exception e) {
log.error("工作线程异常", e);
} finally {
latch.countDown(); // 无论成功失败,都必须countdown
}坑二:CyclicBarrier 某线程超时,其他线程被 BrokenBarrierException 轰炸
现象: 某台机器处理速度慢,某个线程await超时,其他所有线程都抛出BrokenBarrierException。
原因: CyclicBarrier有"屏障破坏"(broken barrier)机制:任何一个线程超时、中断或异常,屏障就被标记为broken,其他等待的线程都会收到BrokenBarrierException。
// 处理BrokenBarrierException
try {
barrier.await(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.err.println("当前线程等待超时");
// 超时后,屏障被破坏
} catch (BrokenBarrierException e) {
System.err.println("屏障已破坏(某个线程超时或异常),本轮计算中止");
// 需要重置屏障或重新开始整轮计算
if (!barrier.isBroken()) barrier.reset();
}解法: 设置合理的超时,并在BrokenBarrierException时做好恢复逻辑(重置或放弃本轮)。
坑三:Semaphore.release() 多次调用,许可数超过初始值
现象: 连接池用Semaphore限制并发数为10,运行一段时间后,availablePermits()返回了12,超过初始值。
原因: 某段代码路径里,release()被调用了两次(比如在catch里和finally里各release了一次)。Semaphore不限制release的调用次数,多次release会让许可数超过初始值,最终让超过限制的线程同时进入。
// 错误:double release
boolean acquired = semaphore.tryAcquire();
try {
doWork();
} catch (Exception e) {
semaphore.release(); // 第一次release
throw e;
} finally {
semaphore.release(); // 第二次release!许可泄漏
}
// 正确:跟锁一样,用一个标志位控制
boolean acquired = false;
try {
acquired = semaphore.tryAcquire();
if (!acquired) return;
doWork();
} catch (Exception e) {
throw e;
} finally {
if (acquired) semaphore.release(); // 只release一次
}三工具横向对比
| 特性 | CountDownLatch | CyclicBarrier | Semaphore |
|---|---|---|---|
| 等待模式 | 少数等多数 | 多数互等 | 限制并发数 |
| 可重置 | 否 | 是 | 是(许可可增减) |
| 屏障动作 | 无 | 有 | 无 |
| 典型场景 | 并行任务聚合、起跑线 | 多阶段计算、团队协作 | 连接池、限流 |
| 底层实现 | AQS共享模式 | ReentrantLock+Condition | AQS共享模式 |
小结
三个工具本质上都是线程协调器,记住场景比记住API更重要:
- 需要等待多件事全部发生后才继续:CountDownLatch
- 需要所有参与者到达集合点才继续(且可能重复):CyclicBarrier
- 需要控制资源的并发访问数量:Semaphore
实际工作中,CountDownLatch和Semaphore用得比较多;CyclicBarrier多出现在大数据分区计算、游戏回合同步等场景。
