并发编程深度解析
并发编程深度解析
AQS 原理、ThreadPoolExecutor 参数调优、CAS/volatile 内存语义、锁升级全流程——字节/腾讯并发模块必考。
核心原理
AQS(AbstractQueuedSynchronizer)
AQS 是 ReentrantLock、Semaphore、CountDownLatch、ReadWriteLock 等并发工具的核心骨架,基于 CLH 队列变体 + volatile state + CAS 实现。
// AQS 核心数据结构
public abstract class AbstractQueuedSynchronizer {
// 同步状态:ReentrantLock 中表示重入次数,Semaphore 表示许可数
private volatile int state;
// CLH 双向等待队列(虚拟头节点 + 实际等待线程节点)
private transient volatile Node head;
private transient volatile Node tail;
// 节点状态:CANCELLED=1, SIGNAL=-1, CONDITION=-2, PROPAGATE=-3
static final class Node {
volatile int waitStatus;
volatile Node prev, next;
volatile Thread thread;
}
}ReentrantLock 加锁流程(非公平锁):
// 1. 尝试 CAS 直接抢锁(非公平:不管队列是否有等待者)
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// 2. 已持有锁(可重入):state 累加
if (current == getExclusiveOwnerThread()) {
setState(state + 1);
return true;
}
// 3. 抢锁失败:封装为 Node 加入 CLH 队列,LockSupport.park() 挂起
// 4. 解锁时 state-- 直到 0,LockSupport.unpark() 唤醒队列头部节点公平锁 vs 非公平锁: 非公平锁在 tryAcquire 时不检查队列是否有等待者,直接 CAS 抢锁;公平锁先检查 hasQueuedPredecessors(),保证严格 FIFO。非公平锁吞吐量更高(减少线程切换),但可能导致线程饥饿。
下图展示了 AQS CLH 队列的结构:Thread1 已持锁,Thread2/Thread3 依次入队等待:
ThreadPoolExecutor 七大参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, // corePoolSize:核心线程数,常驻不回收
8, // maximumPoolSize:最大线程数
60L, TimeUnit.SECONDS, // keepAliveTime:非核心线程空闲超时时间
new LinkedBlockingQueue<>(1000), // workQueue:任务队列
new ThreadFactoryBuilder() // threadFactory:线程工厂(设置线程名)
.setNameFormat("order-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy() // handler:拒绝策略
);任务提交流程(面试必答):
- 当前线程数 < corePoolSize → 创建新核心线程执行
- 当前线程数 >= corePoolSize → 放入 workQueue
- workQueue 满 && 线程数 < maximumPoolSize → 创建非核心线程执行
- workQueue 满 && 线程数 >= maximumPoolSize → 执行 RejectedExecutionHandler
四种拒绝策略:
| 策略 | 行为 | 适用场景 |
|---|---|---|
| AbortPolicy(默认) | 抛出 RejectedExecutionException | 需要感知拒绝的场景 |
| CallerRunsPolicy | 调用者线程执行任务 | 减缓提交速率,不丢任务 |
| DiscardPolicy | 直接丢弃 | 允许丢失的监控统计任务 |
| DiscardOldestPolicy | 丢弃队列最老任务 | 实时性要求高,接受旧数据丢失 |
线程池大小如何设置:
- CPU 密集型:
N+1(N = CPU 核数,+1 防止意外停顿) - IO 密集型:
N * 2或N / (1 - 阻塞系数)(阻塞系数 = IO 时间 / 总时间) - 混合型:通过压测 + 监控动态调整,推荐使用动态线程池(美团 DynamicTp)
下图描述了任务提交到线程池后的完整决策流程:
CAS 与 volatile 内存语义
CAS(Compare-And-Swap): 处理器提供的原子指令,compareAndSet(expect, update) 只有当内存值等于 expect 时才更新为 update,是无锁并发的基础。
// AtomicInteger 底层
public final boolean compareAndSet(int expect, int update) {
return U.compareAndSetInt(this, VALUE, expect, update);
// 对应 CPU 指令:CMPXCHG(x86),需要 LOCK 前缀保证原子性
}
// ABA 问题:值从 A 变 B 再变 A,CAS 无法感知中间变化
// 解决:AtomicStampedReference(版本号)或 AtomicMarkableReference(布尔标记)
AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(100, 0);
ref.compareAndSet(100, 200, 0, 1); // 带版本号的 CASvolatile 内存语义(JSR-133):
- 可见性: 写 volatile 变量时,将线程工作内存刷新到主内存;读 volatile 时,从主内存重新加载。
- 有序性: 写 volatile 之前的操作不能重排序到写之后(StoreStore + StoreLoad 屏障);读 volatile 之后的操作不能重排序到读之前(LoadLoad + LoadStore 屏障)。
- 不保证原子性:
count++实际是三步操作(读、加、写),volatile 无法保证。
// double-checked locking(DCL)正确实现
public class Singleton {
private static volatile Singleton instance; // volatile 防止指令重排序
public static Singleton getInstance() {
if (instance == null) { // 第一次检查(无锁)
synchronized (Singleton.class) {
if (instance == null) { // 第二次检查(有锁)
instance = new Singleton(); // 分配内存→初始化→赋值引用
// 若无 volatile,赋值引用可能被重排到初始化之前
// 其他线程看到非 null 的未初始化对象 → NPE
}
}
}
return instance;
}
}高频面试题
Q: synchronized 锁升级全流程?JDK 15 为何废弃偏向锁?(字节必问)
锁升级:无锁 → 偏向锁 → 轻量级锁 → 重量级锁(单向,不可降级)
下图用状态图展示了 synchronized 锁升级的各阶段转换条件:
偏向锁: 假设锁通常由同一线程获取,在对象头 MarkWord 写入线程 ID,后续该线程进入只需比对 MarkWord,无需 CAS。竞争时需等待全局安全点(SafePoint)撤销,成本高。
轻量级锁: 有竞争但持锁时间极短时,在栈帧创建 Lock Record,CAS 将 MarkWord 替换为 Lock Record 指针。CAS 失败则自旋等待;自旋超阈值(JDK 6 引入自适应自旋)则升级为重量级锁。
重量级锁: 基于 OS mutex,竞争失败的线程进入 Monitor 的 EntryList 阻塞,等待 Owner 线程 notify 唤醒。
JDK 15 废弃偏向锁原因: 现代 Java 应用(尤其是微服务)线程竞争普遍,偏向锁撤销需要 SafePoint,大量撤销会导致 STW 暂停频繁;同时 CAS 性能已大幅提升,偏向锁的收益远小于维护成本,故 JDK 15 默认禁用,JDK 18 正式移除。
Q: ReentrantReadWriteLock 的实现原理?什么是锁降级?(阿里)
ReadWriteLock 通过 AQS state 的高 16 位记录读锁数量,低 16 位记录写锁重入次数:
// 读写锁 state 拆分 static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 0x00010000 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 0x0000FFFF static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 读锁数 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 写锁重入数锁降级: 持有写锁时可以获取读锁,然后释放写锁,实现写锁→读锁的降级(不会释放数据的访问权)。锁升级(读→写)不支持,会死锁。
rwLock.writeLock().lock(); try { updateData(); rwLock.readLock().lock(); // 持有写锁时再获取读锁(降级的第一步) } finally { rwLock.writeLock().unlock(); // 释放写锁(此时仍持有读锁) } try { readData(); // 用读锁保护读取 } finally { rwLock.readLock().unlock(); }
Q: CountDownLatch 和 CyclicBarrier 的区别?(腾讯)
对比项 CountDownLatch CyclicBarrier 计数方向 从 N 倒计数到 0 从 0 计数到 N(屏障值) 重用性 不可重用 可重用(reset()) 等待主体 一个或多个线程等待 N 个线程完成 N 个线程互相等待到达屏障 回调 无 所有线程到达时可执行 barrierAction 底层 AQS 共享模式 ReentrantLock + Condition // CountDownLatch:主线程等待3个子任务完成 CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { executor.submit(() -> { doTask(); latch.countDown(); }); } latch.await(); // 主线程阻塞直到计数为0 // CyclicBarrier:3个线程都准备好后同时开始 CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("All ready!")); for (int i = 0; i < 3; i++) { executor.submit(() -> { prepare(); barrier.await(); execute(); }); }
Q: 线程池 submit() 和 execute() 的区别?线程池如何处理异常?(美团必问)
execute(Runnable):无返回值,任务抛出异常会直接打印到错误流并终止该工作线程(线程池会新建线程补充)submit(Callable):返回Future<T>,任务异常被封装在Future中,只有调用future.get()时才抛出ExecutionException线程池异常处理最佳实践:
// 方式1:submit + future.get() 捕获异常 Future<?> future = executor.submit(() -> riskyTask()); try { future.get(); } catch (ExecutionException e) { log.error("Task failed", e.getCause()); } // 方式2:设置 UncaughtExceptionHandler(execute 专用) ThreadFactory factory = r -> { Thread t = new Thread(r); t.setUncaughtExceptionHandler((thread, ex) -> log.error("Thread {} threw exception", thread.getName(), ex)); return t; }; // 方式3:重写 afterExecute(推荐,可同时处理 execute 和 submit) new ThreadPoolExecutor(...) { @Override protected void afterExecute(Runnable r, Throwable t) { if (t == null && r instanceof Future<?> f) { try { f.get(0, TimeUnit.NANOSECONDS); } catch (ExecutionException e) { t = e.getCause(); } catch (TimeoutException | InterruptedException ignored) {} } if (t != null) log.error("Task exception", t); } };
Q: Java 21 虚拟线程对线程池的影响?还需要线程池吗?(2025 字节/阿里新题)
虚拟线程改变了 IO 密集型场景的最佳实践:
传统做法:维护一个固定大小的线程池(如 200 个线程)来处理并发 HTTP/DB 请求,线程阻塞时等待。
虚拟线程做法:为每个请求创建一个虚拟线程,阻塞时自动卸载 Carrier 线程,Carrier 线程继续处理其他虚拟线程。
// JDK 21:每个任务一个虚拟线程,无需控制并发数(IO密集) try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { for (var request : requests) { executor.submit(() -> handleRequest(request)); // 百万级并发 } }但平台线程池仍有价值的场景:
- CPU 密集型任务:虚拟线程不能减少 CPU 竞争,仍需固定大小线程池
- 限制数据库连接数:用
Semaphore或固定线程池控制并发连接数- synchronized 块:虚拟线程在 synchronized 块内阻塞会 pin 住 Carrier 线程,仍需平台线程池
知识星球深度内容
完整大厂面经实录(字节/阿里/腾讯/美团)、简历 1v1 修改、每周高频题精讲,扫码加入「AI 工程师加速社区」知识星球 👉 立即加入
