CountDownLatch、CyclicBarrier、Semaphore:三个同步器的使用场景
CountDownLatch、CyclicBarrier、Semaphore:三个同步器的使用场景
适读人群:有Java并发基础、经常遇到多线程协调问题的后端工程师 | 阅读时长:约14分钟
开篇故事
2022年,我们做了一个数据迁移项目,要把旧系统的5000万条订单数据迁移到新库。迁移逻辑分三步:1)从老库分批读数据;2)转换数据格式;3)写入新库。
第一版代码,我用了20个线程跑步骤2的转换,结果数据库写入压力太大,新库直接被打爆,DBA过来找我谈心了。
我加了个Semaphore(5)限制并发写入,但新问题来了:我需要等所有数据写完之后,再做一个数据校验任务,但不知道什么时候所有线程都写完了。
加了CountDownLatch,又来了需求:每转换完1000万条数据,打一个进度日志,然后继续。这个"周期性等待"用CountDownLatch不合适,换成了CyclicBarrier。
三个同步器用齐了。这个项目让我对这三者的适用场景有了很清晰的认识,今天系统总结一下。
一、三个同步器的核心差异
1.1 一句话说清楚各自的职责
- CountDownLatch(倒计数门闩):一个线程(或多个)等待其他N个线程完成某件事。不可重用。
- CyclicBarrier(循环栅栏):N个线程互相等待,直到所有人都到达栅栏,然后一起出发。可重用,可携带Runnable。
- Semaphore(信号量):控制同时访问某资源的线程数量上限。不是"等待",而是"限流"。
1.2 底层实现
| 同步器 | 底层实现 | state含义 |
|---|---|---|
| CountDownLatch | AQS(共享锁) | 剩余计数,从N减到0,await()才返回 |
| CyclicBarrier | ReentrantLock + Condition | 每轮到达计数,到N触发barrier action |
| Semaphore | AQS(共享锁) | 可用permits数,acquire减,release加 |
注意:CyclicBarrier底层用的是ReentrantLock而不是直接继承AQS,这是因为它有"每代(generation)"的概念,需要更复杂的状态管理。
二、核心机制深度解析
2.1 CountDownLatch源码关键点
// CountDownLatch内部AQS子类
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // state = count
}
// await()等待state减到0
@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; // state==0才能获取(返回1)
}
// countDown()每次减1
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) return false; // 已经是0,什么也不做
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0; // 只有减到0才返回true(触发唤醒)
}
}
}关键设计:countDown()减到0时tryReleaseShared返回true,触发doReleaseShared,唤醒所有调用了await()的线程。
不可重用的原因: state减到0后,再调用countDown()因为c==0直接返回false,不会再减。await()因为state==0立刻返回。整个对象已经"报废",不能重置。
2.2 CyclicBarrier的generation机制
CyclicBarrier可重用,靠的是generation(代)机制:
// CyclicBarrier的核心字段(简化)
private int count; // 当前代还需要几个线程到达
private Generation generation; // 当前代
// 每代到达计数重置时,generation也重置
private void nextGeneration() {
trip.signalAll(); // 唤醒所有等待的线程
count = parties; // 重置计数
generation = new Generation(); // 新的一代
}当一个CyclicBarrier被broken(某个线程在await期间被中断或超时),当前generation被标记为broken,所有等待的线程抛出BrokenBarrierException。调用reset()可以重置栅栏,创建新的generation。
2.3 Semaphore的公平性影响
Semaphore的非公平实现下,新来的线程会直接尝试CAS获取permit,不管队列里有没有等待线程。这可能导致某些线程长时间等不到permit(饥饿)。
在"限制数据库连接数"这类场景,通常不需要公平性,非公平效率更高(避免入队/出队开销)。但在"资源分配公平性"要求高的场景(如线程池管理),用公平模式。
三、完整代码实现
3.1 CountDownLatch:多服务初始化等待
package com.laozhang.concurrent.sync;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* CountDownLatch实战:应用启动时等待多个依赖服务初始化完成
*
* 场景:Spring Boot应用启动,需要等待:
* 1. 数据库连接池初始化(~800ms)
* 2. Redis连接建立(~200ms)
* 3. MQ消费者注册(~300ms)
* 4. 本地缓存预热(~1500ms)
* 全部完成后,才开始接受HTTP请求
*
* 测试环境:JDK 11
*/
public class AppStartupDemo {
private final CountDownLatch initLatch;
private final AtomicBoolean startupFailed = new AtomicBoolean(false);
public AppStartupDemo(int serviceCount) {
this.initLatch = new CountDownLatch(serviceCount);
}
/**
* 模拟一个服务的初始化
*/
private void initService(String name, long delayMs, boolean simulateFail) {
new Thread(() -> {
try {
System.out.printf("[%s] 开始初始化...%n", name);
Thread.sleep(delayMs);
if (simulateFail) {
System.out.printf("[%s] 初始化失败!%n", name);
startupFailed.set(true);
} else {
System.out.printf("[%s] 初始化完成(耗时%dms)%n", name, delayMs);
}
} catch (InterruptedException e) {
System.out.printf("[%s] 初始化被中断%n", name);
Thread.currentThread().interrupt();
startupFailed.set(true);
} finally {
initLatch.countDown(); // 无论成功还是失败,都countDown
}
}, name + "-init").start();
}
public boolean waitForStartup(long timeoutSeconds) throws InterruptedException {
boolean completed = initLatch.await(timeoutSeconds, TimeUnit.SECONDS);
if (!completed) {
System.out.println("[主线程] 等待超时!部分服务未在规定时间内初始化完成");
return false;
}
if (startupFailed.get()) {
System.out.println("[主线程] 部分服务初始化失败,启动中止");
return false;
}
return true;
}
public static void main(String[] args) throws InterruptedException {
AppStartupDemo app = new AppStartupDemo(4);
long startTime = System.currentTimeMillis();
// 并行初始化4个服务(并行!不是串行!)
app.initService("数据库连接池", 800, false);
app.initService("Redis连接", 200, false);
app.initService("MQ消费者", 300, false);
app.initService("本地缓存", 1500, false);
System.out.println("[主线程] 等待所有服务初始化完成...");
boolean success = app.waitForStartup(10);
long elapsed = System.currentTimeMillis() - startTime;
if (success) {
System.out.printf("[主线程] 所有服务初始化完成!耗时%dms(串行需要%dms)%n",
elapsed, 800 + 200 + 300 + 1500);
System.out.println("[主线程] 开始接受HTTP请求");
}
}
}3.2 CyclicBarrier:多阶段并行计算
package com.laozhang.concurrent.sync;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;
/**
* CyclicBarrier实战:多线程分阶段矩阵计算
*
* 场景:将一个大矩阵的计算分4轮,每轮结束时汇总结果,
* 再开始下一轮。类似MapReduce的多轮迭代。
*
* 关键点:
* 1. barrierAction在最后一个到达的线程中执行(不是新线程)
* 2. CyclicBarrier可以重用(每轮结束后自动重置)
* 3. 任何线程的中断/超时会导致BrokenBarrierException
*/
public class MatrixComputeDemo {
private static final int WORKER_COUNT = 4;
private static final int ROUNDS = 3;
private static final int DATA_SIZE = 1000;
// 共享计算结果
private final long[] partialResults = new long[WORKER_COUNT];
private final AtomicLong roundTotal = new AtomicLong(0);
private CyclicBarrier barrier;
public void compute() throws InterruptedException {
// barrierAction:每轮结束时执行,汇总各worker的结果
Runnable barrierAction = () -> {
long total = 0;
for (long r : partialResults) total += r;
System.out.printf("[汇总] 本轮计算完成,合计:%d%n", total);
roundTotal.addAndGet(total);
// 清空各worker的中间结果
for (int i = 0; i < partialResults.length; i++) partialResults[i] = 0;
};
barrier = new CyclicBarrier(WORKER_COUNT, barrierAction);
Thread[] workers = new Thread[WORKER_COUNT];
for (int i = 0; i < WORKER_COUNT; i++) {
final int workerId = i;
workers[i] = new Thread(() -> {
Random random = new Random(workerId);
for (int round = 0; round < ROUNDS; round++) {
// 每轮计算自己负责的那部分数据
long partialSum = 0;
for (int j = 0; j < DATA_SIZE / WORKER_COUNT; j++) {
partialSum += random.nextInt(100);
}
partialResults[workerId] = partialSum;
System.out.printf("[Worker-%d] 第%d轮计算完成,partial=%d,等待其他Worker...%n",
workerId, round + 1, partialSum);
try {
barrier.await(); // 等待其他Worker完成本轮
// 所有Worker都到达后,barrierAction执行完,大家继续下一轮
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (BrokenBarrierException e) {
System.out.printf("[Worker-%d] 栅栏被破坏,退出%n", workerId);
return;
}
}
System.out.printf("[Worker-%d] 所有轮次完成%n", workerId);
}, "worker-" + i);
}
for (Thread w : workers) w.start();
for (Thread w : workers) w.join();
System.out.printf("%n全部计算完成,%d轮总计:%d%n", ROUNDS, roundTotal.get());
}
public static void main(String[] args) throws InterruptedException {
new MatrixComputeDemo().compute();
}
}3.3 Semaphore:限流访问外部API
package com.laozhang.concurrent.sync;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Semaphore实战:限制并发调用外部API的数量
*
* 场景:调用第三方风控API,该API限制并发连接数不超过5
* 有20个工作线程,需要限制同时只有5个线程在调用API
*
* 注意:acquire()要在try之外,因为如果acquire()失败(被中断),
* 没有获得permit,不能在finally里release
*/
public class ApiRateLimiterDemo {
private static final int MAX_CONCURRENT = 5; // API允许的最大并发
private static final int WORKER_COUNT = 20; // 工作线程数
private final Semaphore semaphore = new Semaphore(MAX_CONCURRENT);
private final AtomicInteger currentConcurrent = new AtomicInteger(0);
private final AtomicInteger maxConcurrent = new AtomicInteger(0);
private final AtomicInteger totalCalls = new AtomicInteger(0);
/**
* 调用风控API(带并发限制)
*/
public String callRiskApi(String userId) throws InterruptedException {
// 注意:acquire()在try块外!如果这里中断,没有permit,不需要release
semaphore.acquire();
// 获得permit,更新并发统计
int current = currentConcurrent.incrementAndGet();
maxConcurrent.updateAndGet(max -> Math.max(max, current));
try {
// 模拟API调用耗时(50-150ms)
int delay = 50 + ThreadLocalRandom.current().nextInt(100);
Thread.sleep(delay);
int callId = totalCalls.incrementAndGet();
System.out.printf("[API调用#%d] userId=%s,并发=%d,耗时=%dms%n",
callId, userId, current, delay);
return "PASS"; // 模拟风控结果
} finally {
currentConcurrent.decrementAndGet();
semaphore.release(); // 必须在finally里release
}
}
public static void main(String[] args) throws InterruptedException {
ApiRateLimiterDemo demo = new ApiRateLimiterDemo();
Thread[] workers = new Thread[WORKER_COUNT];
long startTime = System.currentTimeMillis();
for (int i = 0; i < WORKER_COUNT; i++) {
final String userId = "user" + i;
workers[i] = new Thread(() -> {
try {
String result = demo.callRiskApi(userId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "worker-" + i);
}
for (Thread w : workers) w.start();
for (Thread w : workers) w.join();
System.out.printf("%n总耗时:%dms%n", System.currentTimeMillis() - startTime);
System.out.printf("最大并发:%d(应 <= %d)%n",
demo.maxConcurrent.get(), MAX_CONCURRENT);
System.out.printf("总调用次数:%d(应 = %d)%n",
demo.totalCalls.get(), WORKER_COUNT);
}
}四、踩坑实录
坑1:CountDownLatch的count设置错误导致永远等待
报错现象: 程序启动后主线程一直阻塞在latch.await(),所有工作线程都已完成,但await()就是不返回。
原因分析: CountDownLatch初始化的count比实际调用countDown()的次数多。比如count设置为5,但只有4个线程会调用countDown(),state永远无法减到0。
常见情况:某个线程在countDown()之前抛了异常(而且没有try-finally),导致countDown()没有执行。
// 错误:某种异常路径跳过了countDown
new Thread(() -> {
if (someCondition()) {
throw new RuntimeException("跳过了countDown!");
}
latch.countDown(); // 条件成立时,这行不执行
}).start();
// 正确:finally保证countDown一定执行
new Thread(() -> {
try {
if (someCondition()) {
throw new RuntimeException("但countDown仍然会执行");
}
} finally {
latch.countDown(); // 无论如何都执行
}
}).start();解法: 始终在finally块里调用countDown();同时给await()设置超时时间而不是无限等待。
坑2:CyclicBarrier的barrierAction抛异常导致栅栏损坏
报错现象: 某次运行时,barrierAction执行失败(抛了RuntimeException),之后所有调用barrier.await()的线程都抛出BrokenBarrierException。
原因分析: 当barrierAction抛出异常,或者某个等待的线程被中断/超时,CyclicBarrier会进入broken状态(generation.broken = true)。之后所有调用await()的线程,都会立刻抛BrokenBarrierException,无法继续参与同步。
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
throw new RuntimeException("汇总操作失败"); // 这会让栅栏broken
});解法:
- barrierAction要做异常处理,不要让异常逃逸
- 捕获
BrokenBarrierException后,调用barrier.reset()重置(但要注意,reset也会让当前等待的线程抛BrokenBarrierException) - 对于不可恢复的错误,直接退出线程
坑3:Semaphore在tryAcquire超时后不需要release
报错现象: 代码逻辑里,tryAcquire(timeout)返回false后,在finally里调用了release(),导致permits数量增多,超过了初始设定值。
原因分析: tryAcquire(timeout)返回false意味着没有获得permit,semaphore的状态没有变化。如果此时调用release(),相当于"归还一个从未借出的许可",permits数量会增加。
boolean acquired = semaphore.tryAcquire(1, TimeUnit.SECONDS);
try {
if (!acquired) return; // 没拿到permit,直接返回
doWork();
} finally {
if (acquired) { // 只有拿到了permit,才需要release
semaphore.release();
}
// 错误写法:无论是否acquired都release(),会让permits数量超限
}解法: 用一个boolean标记是否成功获取了permit,只有成功获取时才在finally里release。
坑4:CyclicBarrier与CountDownLatch的适用场景混淆
报错现象: 用CyclicBarrier实现"主线程等待多个子线程完成",但发现主线程自己也在等待(要加入barrier),逻辑不对。
原因分析: CyclicBarrier是"N个参与者互相等待",适合并行的同类线程互相同步。如果主线程也调用了barrier.await(),它也是参与者之一,这是对的;但如果主线程只是旁观者(等待所有工作线程完成),主线程不应该调用barrier.await(),应该用CountDownLatch。
场景判断:
- "我(主线程)等你们(工作线程)都完成" → CountDownLatch
- "我们(所有工作线程)互相等,等齐了一起继续" → CyclicBarrier
- "你们不能超过N个人同时进" → Semaphore
五、总结与延伸
三个同步器的本质差异:
| CountDownLatch | CyclicBarrier | Semaphore | |
|---|---|---|---|
| 方向 | 其他→等待者 | 互相等待 | 控制并发数 |
| 可重用 | 否 | 是 | 是 |
| 等待方式 | 一或多方等多方 | 多方等多方 | 阻塞或超时 |
| 典型场景 | 服务初始化、结果聚合 | 多阶段并行计算 | 连接池、限流 |
选型快速决策:
- 需要等所有子任务完成再继续 → CountDownLatch
- 多个并行线程需要周期性汇合 → CyclicBarrier(尤其是有barrierAction需求)
- 限制同时执行的线程数量 → Semaphore
JDK 7+替代方案:
CompletableFuture.allOf()可以替代很多CountDownLatch场景,更函数式Phaser比CyclicBarrier更灵活,支持动态增删参与者BlockingQueue限定容量也能实现Semaphore的限流效果
