wait/notify vs Condition:两种线程通信方式的对比与选型
wait/notify vs Condition:两种线程通信方式的对比与选型
适读人群:Java后端开发者、实现过生产者消费者模式的工程师 | 阅读时长:约15分钟
开篇故事
2019年,我刚入职某互联网公司,接手了一个老系统的代码。里面有个用wait()/notify()实现的任务调度器。
代码大概是这样:有多种任务(优先任务、普通任务、批量任务),几个线程在等待不同类型的任务。通知的时候用notifyAll()唤醒所有线程。
问题:每次有新任务到来,notifyAll()把所有等待的线程全都唤醒,但只有一个线程能拿到任务,其他线程发现不满足自己的条件后又回去等待。这种"惊群效应"在高并发下导致大量不必要的线程切换,CPU利用率虚高。
用jstack观察,大量线程处于BLOCKED状态(等待锁)→ WAITING状态(wait())→ BLOCKED(被notifyAll唤醒后竞争锁)→ WAITING(发现条件不满足再次wait)的循环中。
改用ReentrantLock + 多个Condition后,不同类型的任务在不同的条件变量上等待,唤醒时精确到对应的条件队列,惊群效应消失,CPU使用率下降了约30%。
今天把这两种线程通信方式的原理和选型讲清楚。
一、wait/notify的核心机制
1.1 与synchronized的强绑定
wait()/notify()/notifyAll()是Object类的方法,必须在持有该对象的synchronized锁的情况下调用,否则抛IllegalMonitorStateException。
这个设计是有意为之的:wait让当前线程释放锁并等待,只有持有锁才能安全地检查条件和等待。
synchronized (obj) {
while (!conditionMet()) {
obj.wait(); // 释放obj的锁,线程进入WAITING状态
}
// 条件满足,继续执行
}wait()做了三件事:
- 释放当前持有的锁
- 线程进入WAITING状态(挂起)
- 被唤醒后,重新竞争获取锁
1.2 notify vs notifyAll
notify():从对象的等待集合(Wait Set)中随机唤醒一个线程。
notifyAll():唤醒所有在等待集合中的线程,让它们竞争锁(但一次只有一个能获得锁)。
何时用notify(),何时用notifyAll():
- 所有等待线程都在等待同一个条件,且每次只能有一个线程被满足 →
notify()(精准唤醒) - 等待线程等待不同条件,或需要所有等待者重新检查 →
notifyAll()(广播)
用notify()的风险:如果等待集合里有多类线程,notify()可能唤醒了"错误类型"的线程(该线程发现条件不满足再wait),而真正应该被唤醒的线程一直等待。
二、Condition的核心机制
2.1 Condition与Lock的关系
Condition对象必须从Lock.newCondition()获取,每个Lock可以有多个Condition。
Condition.await()对应Object.wait(),但:
- 持有的是Lock(不是synchronized内置锁)
- await()释放Lock,线程进入WAITING状态
- 被signal()或signalAll()唤醒后,重新竞争Lock
Condition.signal()对应Object.notify(),Condition.signalAll()对应Object.notifyAll()。
关键优势:多个Condition对象 = 多个独立的条件队列,精确唤醒特定类型的等待线程。
2.2 功能对比
| 特性 | wait/notify | Condition |
|---|---|---|
| 依赖的锁 | synchronized内置锁 | ReentrantLock(或其他Lock) |
| 条件队列数 | 每个对象只有一个Wait Set | 每个Lock可以有多个Condition |
| 中断响应 | wait()响应中断 | await()响应中断,awaitUninterruptibly()不响应 |
| 超时等待 | wait(timeout) | await(time, unit),还有awaitNanos |
| 绝对时间等待 | 不支持 | awaitUntil(Date) |
| 精确唤醒 | 不支持(notify随机) | 支持(signal指定Condition) |
三、完整代码实现
3.1 wait/notify实现的经典生产者消费者
package com.laozhang.concurrent.waitnotify;
import java.util.LinkedList;
import java.util.Queue;
/**
* wait/notify实现经典生产者消费者
*
* 注意事项:
* 1. 必须在synchronized块内调用wait/notify
* 2. 等待条件必须用while(不是if),防虚假唤醒
* 3. notify()可能唤醒错误的线程(生产者唤醒生产者),
* 因此建议用notifyAll()(代价是惊群)
*
* 测试环境:JDK 11
*/
public class WaitNotifyDemo {
private final Queue<Integer> buffer = new LinkedList<>();
private final int capacity;
private int totalProduced = 0;
private int totalConsumed = 0;
public WaitNotifyDemo(int capacity) {
this.capacity = capacity;
}
/**
* 生产者:缓冲区满时等待
*/
public void produce(int item) throws InterruptedException {
synchronized (this) {
// 用while防虚假唤醒
while (buffer.size() >= capacity) {
System.out.printf("[生产者] 缓冲区满(%d),等待消费...%n", capacity);
wait();
}
buffer.offer(item);
totalProduced++;
System.out.printf("[生产者] 生产: %d,缓冲区: %d/%d%n",
item, buffer.size(), capacity);
notifyAll(); // 唤醒所有等待线程(消费者和其他生产者)
}
}
/**
* 消费者:缓冲区空时等待
*/
public int consume() throws InterruptedException {
synchronized (this) {
while (buffer.isEmpty()) {
System.out.printf("[消费者] 缓冲区为空,等待生产...%n");
wait();
}
int item = buffer.poll();
totalConsumed++;
System.out.printf("[消费者] 消费: %d,缓冲区: %d/%d%n",
item, buffer.size(), capacity);
notifyAll(); // 唤醒所有等待线程
return item;
}
}
public static void main(String[] args) throws InterruptedException {
WaitNotifyDemo demo = new WaitNotifyDemo(3);
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
demo.produce(i);
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Producer");
Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
demo.consume();
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Consumer");
producer.start();
consumer.start();
producer.join();
consumer.join();
System.out.printf("总计:生产%d,消费%d%n",
demo.totalProduced, demo.totalConsumed);
}
}3.2 Condition多条件队列:精确唤醒
package com.laozhang.concurrent.waitnotify;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Condition多条件队列:多类型任务精确唤醒
*
* 场景:优先任务队列
* - 高优任务:插到队头,signal等待高优任务的消费者
* - 普通任务:追加到队尾,signal等待普通任务的消费者
* - 消费者分为高优消费者和普通消费者,各自在独立Condition上等待
*
* 测试环境:JDK 11
*/
public class ConditionPreciseWakeup {
private final Queue<String> highPriorityQueue = new LinkedList<>();
private final Queue<String> normalQueue = new LinkedList<>();
private final int maxSize = 10;
private final ReentrantLock lock = new ReentrantLock();
// 独立的条件变量
private final Condition notHighFull = lock.newCondition(); // 高优队列不满(生产者等待)
private final Condition notNormalFull = lock.newCondition(); // 普通队列不满
private final Condition hasHighTask = lock.newCondition(); // 有高优任务(消费者等待)
private final Condition hasNormalTask = lock.newCondition(); // 有普通任务
/**
* 提交高优任务
*/
public void submitHigh(String task) throws InterruptedException {
lock.lock();
try {
while (highPriorityQueue.size() >= maxSize) {
System.out.println("[高优生产者] 队列满,等待...");
notHighFull.await();
}
highPriorityQueue.offer(task);
System.out.println("[高优生产者] 提交任务:" + task
+ ",队列大小:" + highPriorityQueue.size());
hasHighTask.signal(); // 精确唤醒等待高优任务的消费者,不影响普通消费者
} finally {
lock.unlock();
}
}
/**
* 提交普通任务
*/
public void submitNormal(String task) throws InterruptedException {
lock.lock();
try {
while (normalQueue.size() >= maxSize) {
notNormalFull.await();
}
normalQueue.offer(task);
hasNormalTask.signal(); // 精确唤醒等待普通任务的消费者
} finally {
lock.unlock();
}
}
/**
* 高优消费者:专门消费高优任务
*/
public String consumeHigh() throws InterruptedException {
lock.lock();
try {
while (highPriorityQueue.isEmpty()) {
System.out.println("[高优消费者] 等待高优任务...");
hasHighTask.await(); // 只在hasHighTask条件上等待
}
String task = highPriorityQueue.poll();
notHighFull.signal(); // 通知高优生产者可以继续
System.out.println("[高优消费者] 消费:" + task);
return task;
} finally {
lock.unlock();
}
}
/**
* 普通消费者:专门消费普通任务
*/
public String consumeNormal() throws InterruptedException {
lock.lock();
try {
while (normalQueue.isEmpty()) {
hasNormalTask.await(); // 只在hasNormalTask条件上等待
}
String task = normalQueue.poll();
notNormalFull.signal();
return task;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionPreciseWakeup scheduler = new ConditionPreciseWakeup();
// 高优生产者
Thread highProducer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
scheduler.submitHigh("HIGH-TASK-" + i);
Thread.sleep(100);
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}, "HighProducer");
// 普通生产者
Thread normalProducer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
scheduler.submitNormal("NORMAL-TASK-" + i);
Thread.sleep(150);
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}, "NormalProducer");
// 高优消费者
Thread highConsumer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
scheduler.consumeHigh();
Thread.sleep(200);
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}, "HighConsumer");
// 普通消费者
Thread normalConsumer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
String task = scheduler.consumeNormal();
System.out.println("[普通消费者] 消费:" + task);
Thread.sleep(300);
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}, "NormalConsumer");
highProducer.start(); normalProducer.start();
highConsumer.start(); normalConsumer.start();
highProducer.join(); normalProducer.join();
highConsumer.join(); normalConsumer.join();
System.out.println("所有任务处理完毕");
}
}四、踩坑实录
坑1:notify()唤醒了错误类型的线程(错误的notify)
报错现象: 用单个wait()/notify()实现多类线程等待,系统偶发"活锁"——生产者唤醒了生产者,消费者唤醒了消费者,大家都在等待,系统进展缓慢。
原因分析: 当生产和消费者都在同一个对象的Wait Set里等待时,notify()是随机的,可能唤醒同类线程:
- 所有消费者在等(缓冲区空),生产者生产后
notify(),唤醒的是另一个消费者(不是自己需要唤醒的),消费者发现缓冲区还是空,又wait - 系统卡住
解法:
- 用
notifyAll()替代notify()(代价是惊群) - 用Condition,分开消费者队列和生产者队列
坑2:wait()在synchronized外调用抛IllegalMonitorStateException
报错现象: java.lang.IllegalMonitorStateException。
原因分析:
// 错误:wait()在synchronized外
if (condition) {
obj.wait(); // IllegalMonitorStateException!
}
// 正确:必须在持有obj锁的synchronized块内
synchronized (obj) {
while (condition) {
obj.wait();
}
}解法: wait()/notify()/notifyAll()必须在持有对应对象锁的synchronized代码块内调用。
坑3:Condition的await()不能在没有持有对应Lock的情况下调用
报错现象: 调用condition.await()抛IllegalMonitorStateException。
原因分析: 同wait(),await()必须在持有对应ReentrantLock的情况下调用。从lock.newCondition()获取的Condition,必须在lock.lock()之后调用condition.await()。
注意:如果从lock1.newCondition()获取了condition,然后在lock2.lock()里调用condition.await(),也会抛异常(锁不匹配)。
坑4:awaitUninterruptibly()可能导致线程无法停止
报错现象: 使用了Condition.awaitUninterruptibly()的线程,在需要停机时无法被中断,一直阻塞。
原因分析: awaitUninterruptibly()顾名思义——不响应中断的await。即使对线程调用interrupt(),它也不会退出等待,会继续等待signal()。
在优雅停机场景中,如果工作线程用了awaitUninterruptibly(),ExecutorService.shutdownNow()发出的interrupt无法唤醒这些线程,它们会一直挂在那里直到有人signal(),或者JVM强制退出。
解法: 在需要优雅停机的线程中,避免使用awaitUninterruptibly();或者在停机时主动调用condition.signalAll()唤醒所有等待线程。
五、总结与延伸
选型决策:
| 场景 | 推荐方案 |
|---|---|
| 简单的单条件等待 | synchronized + wait/notify |
| 需要多个独立条件队列 | ReentrantLock + 多个Condition |
| 需要超时等待精确到纳秒 | Condition.awaitNanos() |
| 需要等待到某个绝对时间 | Condition.awaitUntil(Date) |
| 公平等待(先等先得) | ReentrantLock(true) + Condition |
| 需要不可中断等待 | Condition.awaitUninterruptibly() |
标准库的使用参考:
LinkedBlockingQueue:用ReentrantLock+ 两个Condition(notEmpty、notFull),精确唤醒ArrayBlockingQueue:用ReentrantLock(单锁)+ 两个ConditionTreeMap/HashMap:基本不用到线程通信,自己加锁
