AQS源码精读:独占锁acquire和共享锁acquireShared的实现
AQS源码精读:独占锁acquire和共享锁acquireShared的实现
适读人群:有ReentrantLock使用经验、想理解Java并发框架底层原理的工程师 | 阅读时长:约20分钟
开篇故事
2020年秋天,我们团队新来了个985研究生,叫林峰。入职第一个月,他在代码评审里挑战了我一次:
"老张,我看你这里用的是ReentrantLock,但查了一下JDK源码,底层就是AbstractQueuedSynchronizer,感觉AQS的代码写得挺复杂的,你能给我讲讲acquire方法是怎么工作的吗?"
我当时胸有成竹地打开源码,翻到acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}然后我就卡壳了。addWaiter、acquireQueued、selfInterrupt,每个方法我都能说几句,但要把整个流程讲清楚,我发现自己只是"用过",并没有真正"读懂"。
那次之后,我花了一周时间,把AQS源码从头到尾读了两遍,画了流程图,写了测试代码。这一读,让我对整个Java并发框架豁然开朗——ReentrantLock、Semaphore、CountDownLatch、ReadWriteLock,全都是在AQS基础上搭积木。
今天就把这周的学习精华分享出来,重点讲acquire(独占锁)和acquireShared(共享锁)两条核心路径。
一、AQS是什么
1.1 AQS的核心设计
AbstractQueuedSynchronizer(AQS)是Java并发包的核心框架,由Doug Lea在JDK 1.5设计。
它的核心思路是模板方法模式:AQS提供了线程排队、等待、唤醒的通用机制,子类只需要实现"如何判断锁是否可用"和"如何修改锁状态"这两件事。
AQS的三个核心组件:
state变量:
volatile int state,表示同步状态。对于ReentrantLock,state=0表示未锁定,state=n表示被重入了n次。CLH等待队列:一个虚拟的双向链表(FIFO队列),存放等待获取锁的线程节点。
Condition队列:每个
ConditionObject维护一个独立的等待队列,用于实现await/signal。
1.2 子类需要重写的方法
AQS定义了5个需要子类重写的方法(不重写默认抛UnsupportedOperationException):
| 方法 | 用于独占/共享 | 含义 |
|---|---|---|
tryAcquire(int arg) | 独占 | 尝试获取锁,成功返回true |
tryRelease(int arg) | 独占 | 尝试释放锁,完全释放返回true |
tryAcquireShared(int arg) | 共享 | 尝试获取共享锁,<0失败,=0成功但不传播,>0成功且传播 |
tryReleaseShared(int arg) | 共享 | 尝试释放共享锁,成功返回true |
isHeldExclusively() | 独占 | 当前线程是否独占持有锁 |
二、独占锁acquire全流程解析
2.1 addWaiter:入队
// AQS源码(JDK 11),有省略
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 创建节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // CAS将自己设为尾节点
pred.next = node;
return node;
}
}
enq(node); // CAS失败或队列为空,走enq(自旋直到成功)
return node;
}Node的waitStatus字段的几个状态:
0:初始状态SIGNAL(-1):后继节点需要被唤醒(当前节点释放锁时要unpark后继)CANCELLED(1):该节点已取消(超时或中断)CONDITION(-2):节点在Condition队列中PROPAGATE(-3):共享锁传播标志
2.2 acquireQueued:自旋等待
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 只有前驱是head节点时,才有资格尝试获取锁
// head节点是虚拟节点,或者是正在执行的持锁线程的节点
if (p == head && tryAcquire(arg)) {
setHead(node); // 出队:把自己设为head
p.next = null; // 帮助GC
failed = false;
return interrupted;
}
// 判断是否需要park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 记录曾经被中断,但不立即响应
}
} finally {
if (failed)
cancelAcquire(node); // 异常(超时/中断退出)时取消节点
}
}shouldParkAfterFailedAcquire的逻辑:
- 如果前驱的
waitStatus == SIGNAL:可以安全park,返回true - 如果前驱的
waitStatus > 0(CANCELLED):跳过所有已取消的前驱,找到有效前驱 - 其他情况(0或PROPAGATE):CAS设置前驱
waitStatus为SIGNAL,返回false(再循环一次再决定是否park)
2.3 为什么只有前驱是head才尝试tryAcquire?
这是AQS保证公平性的关键设计(非公平锁在AQS外面做了插队,但AQS内部是FIFO的)。
只允许队列中第一个等待节点(前驱是head的节点)尝试获取锁,避免了队列中所有线程同时竞争导致的"惊群效应"(thundering herd)。锁释放时只唤醒head的后继,其他线程继续park。
三、共享锁acquireShared全流程解析
共享锁允许多个线程同时持有,典型的是ReadWriteLock的读锁和Semaphore。
3.1 setHeadAndPropagate:共享锁传播
共享锁和独占锁的关键区别在于这个传播机制:
private void setHeadAndPropagate(Node node, long propagate) {
Node h = head;
setHead(node);
// propagate > 0 表示还有空闲资源
// h.waitStatus < 0 表示head被标记为需要传播(PROPAGATE)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 后继节点是共享模式,则继续唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}这个传播机制解决了什么问题?
假设Semaphore有3个permits,现在有4个线程同时等待。当permits被释放时,需要唤醒多个线程。如果只唤醒head的后继(像独占锁那样),第2、3个等待者要等第1个等待者获取到锁后再唤醒它们,效率低。
通过PROPAGATE机制,第1个线程获取成功后,发现还有剩余permits,就主动唤醒第2个,第2个唤醒第3个,形成级联唤醒。
四、完整代码实现
4.1 基于AQS实现一个自定义互斥锁
package com.laozhang.concurrent.aqs;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 基于AQS实现一个不可重入的互斥锁(MutexLock)
*
* 参考自AQS文档中的示例,加以注释和扩展
* 测试环境:JDK 11
*/
public class MutexLock implements Lock {
/**
* AQS子类:同步器核心实现
* state=0: 未锁定
* state=1: 已锁定
*/
private static final class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
assert acquires == 1;
// CAS将state从0改为1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
// 注意:不支持重入!如果当前线程已持有锁再次acquire,
// 因为state已经是1,CAS失败,会死锁!
}
@Override
protected boolean tryRelease(int releases) {
assert releases == 1;
if (getState() == 0)
throw new IllegalMonitorStateException("锁未被持有,不能释放");
setExclusiveOwnerThread(null);
setState(0); // volatile写,保证可见性
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 提供Condition支持
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* 测试:验证MutexLock的基本功能
*/
public static void main(String[] args) throws InterruptedException {
MutexLock lock = new MutexLock();
int[] sharedData = {0};
// 10个线程并发递增,期望结果是10000
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
lock.lock();
try {
sharedData[0]++;
} finally {
lock.unlock();
}
}
});
threads[i].start();
}
for (Thread t : threads) t.join();
System.out.println("最终结果:" + sharedData[0] + "(期望:10000)");
System.out.println("锁状态:" + lock.isLocked());
}
}4.2 基于AQS实现共享信号量
package com.laozhang.concurrent.aqs;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* 基于AQS实现一个简化版Semaphore(非公平)
*
* 演示acquireShared/releaseShared路径
* 对比JDK内置Semaphore的NonFairSync实现
*/
public class SimpleSemaphore {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits); // state = permits数量
}
/**
* 尝试获取共享锁(非公平版本)
* 返回值:>=0 成功(剩余permits),<0 失败
*/
@Override
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// remaining < 0 直接返回(失败),让调用者进入等待队列
// remaining >= 0 尝试CAS,失败则重试(自旋)
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
/**
* 尝试释放共享锁
*/
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
// CAS失败,自旋重试
}
}
int availablePermits() {
return getState();
}
}
private final Sync sync;
public SimpleSemaphore(int permits) {
if (permits < 0)
throw new IllegalArgumentException("permits must be >= 0");
this.sync = new Sync(permits);
}
/**
* 获取一个permit,不可用时阻塞
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 释放一个permit
*/
public void release() {
sync.releaseShared(1);
}
public int availablePermits() {
return sync.availablePermits();
}
/**
* 测试:连接池场景模拟
* 场景:10个连接的池,20个线程并发访问
*/
public static void main(String[] args) throws InterruptedException {
final int POOL_SIZE = 10;
final int THREAD_COUNT = 20;
SimpleSemaphore semaphore = new SimpleSemaphore(POOL_SIZE);
int[] activeConnections = {0};
int[] maxActiveConnections = {0};
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
try {
semaphore.acquire();
synchronized (activeConnections) {
activeConnections[0]++;
maxActiveConnections[0] = Math.max(maxActiveConnections[0], activeConnections[0]);
System.out.printf("线程%d 获得连接,当前活跃:%d,剩余permits:%d%n",
threadId, activeConnections[0], semaphore.availablePermits());
}
// 模拟数据库操作
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
synchronized (activeConnections) {
activeConnections[0]--;
}
semaphore.release();
}
}, "worker-" + i);
}
long start = System.currentTimeMillis();
for (Thread t : threads) t.start();
for (Thread t : threads) t.join();
System.out.printf("%n总耗时:%dms(理论最少:%dms)%n",
System.currentTimeMillis() - start,
(THREAD_COUNT / POOL_SIZE) * 100);
System.out.println("最大并发连接数:" + maxActiveConnections[0]
+ "(应 <= " + POOL_SIZE + ")");
}
}五、踩坑实录
坑1:tryAcquire返回false后必须进队列,不能直接自旋
报错现象: 自己写AQS子类,在acquire外层加了个自旋重试(不进队列),高并发下CPU飙满,其他线程饿死(starvation)。
原因分析: AQS的排队机制是保证公平等待的基础。如果在tryAcquire失败后不进队列、在外层自旋,就破坏了FIFO保证。等待时间长的线程不会得到优先处理,可能永远等不到锁。
而且无限自旋会消耗大量CPU,在高竞争场景下比park/unpark还要糟糕。
解法: 使用acquire(AQS的公开API),不要绕过排队机制。只有tryLock()的非阻塞版本可以不进队列。
坑2:ConditionObject.await()要求调用者持有独占锁
报错现象: 调用condition.await()时抛出IllegalMonitorStateException。
原因分析: ConditionObject.await()的源码里有这样的检查:
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node); // fullyRelease里检查isHeldExclusively()
...
}fullyRelease调用release(savedState),这要求当前线程是锁的持有者。如果没有持锁就调用await,release失败,抛IllegalMonitorStateException。
解法: 必须在持有对应Lock的情况下调用condition.await()和condition.signal()。
lock.lock();
try {
while (!conditionMet()) {
condition.await(); // 正确:持有lock后才能await
}
// 执行业务逻辑
} finally {
lock.unlock();
}坑3:共享锁的PROPAGATE状态处理不当导致线程无法唤醒
报错现象: 使用自定义共享锁时,Semaphore批量释放后,等待队列里的线程没有全部被唤醒,部分线程永远pending。
原因分析: 这是AQS历史上有记录的一个bug(JDK 6早期版本),后来通过引入PROPAGATE状态修复。
问题出在setHeadAndPropagate和doReleaseShared之间有竞争窗口:
- T1释放,唤醒T2,T2拿到锁,调用
setHeadAndPropagate - 在T2调用
setHeadAndPropagate之前,T3也释放了,doReleaseShared时发现head的后继(T4)还是SIGNAL,唤醒T4 - T2调用
setHeadAndPropagate时,此时的head是T2自己,h.waitStatus是0(T3/T4已经修改),propagate>0条件不触发 - T4被唤醒后,tryAcquireShared失败,又回去park了,但没有人再次唤醒它
引入PROPAGATE状态后,doReleaseShared在没有后继需要唤醒时,会将head的waitStatus设为PROPAGATE(-3),让后续的setHeadAndPropagate能感知到有传播需求。
解法: 不要自己重新实现AQS的doReleaseShared和setHeadAndPropagate,直接继承AQS调用父类实现。这部分逻辑非常精妙,手写几乎必然有bug。
坑4:cancelAcquire中的前驱跳过可能导致队列结构问题
报错现象: 超时的tryLock调用后,队列中偶发出现orphaned节点(队列里有节点但对应线程早已退出),导致唤醒链路断裂。
原因分析: 当节点超时或中断取消时,cancelAcquire需要把该节点从队列中摘除。这个过程涉及对prev和next指针的修改,有多线程安全性的微妙之处:
- AQS对
prev链是强一致的(通过CAS保证),节点取消后会从prev链中去除 - 但
next链是"最终一致"的,节点取消后,next指针不会立即更新,需要后来的节点在shouldParkAfterFailedAcquire里通过pred.waitStatus > 0跳过取消节点
如果代码逻辑没有正确处理CANCELLED状态的跳过,就可能卡在一个已取消的节点前面等待唤醒。
解法: 直接使用JDK提供的ReentrantLock.tryLock(time, unit),不要自己重新实现超时机制。如果需要自定义,务必阅读并理解cancelAcquire的完整源码。
五、总结与延伸
AQS是整个java.util.concurrent包的基石。理解了acquire和acquireShared,就理解了:
ReentrantLock.lock()→sync.acquire(1)(独占)Semaphore.acquire()→sync.acquireSharedInterruptibly(1)(共享)CountDownLatch.await()→sync.acquireSharedInterruptibly(1)(共享,state减到0才能获取)ReentrantReadWriteLock.ReadLock.lock()→sync.acquireShared(1)(共享)ReentrantReadWriteLock.WriteLock.lock()→sync.acquire(1)(独占)
AQS设计的精妙之处:
- CLH队列变体:原始CLH用自旋等前驱,AQS改为park(节省CPU),但保留了"只看前驱"的设计
- 虚拟head节点:简化了头部操作的边界条件处理
- PROPAGATE状态:解决了共享锁释放时的竞争窗口问题
- 中断不立即响应:
acquireQueued记录中断但继续抢锁,保证了锁的获取不会因为中断而跳过,但会在获取锁后补发中断信号
JDK版本变化:
- JDK 5:AQS初版,
AbstractQueuedSynchronizer发布 - JDK 6:修复共享锁的PROPAGATE bug
- JDK 9:引入
AbstractQueuedLongSynchronizer(state是long类型) - JDK 14:VarHandle替换Unsafe用于CAS操作,性能更好、更安全
