JDK21虚拟线程Virtual Threads:百万并发连接的正确实现方式
2026/4/30大约 13 分钟
JDK21虚拟线程Virtual Threads:百万并发连接的正确实现方式
适读人群:做过高并发服务、遭遇过线程池瓶颈的Java后端开发者 | 阅读时长:约22分钟
开篇故事
2022年,我们有个HTTP调用网关服务,用的是传统线程池,配置了500个线程。正常情况下没问题,但每到整点有大量定时任务触发,服务就开始报错:RejectedExecutionException - pool is full。
我们的应用实际上大部分时间都在等待下游响应(IO等待),CPU利用率只有15%,但线程全被占满了。
当时的解决方案是用Reactor(响应式编程),用一套完全不同的编程模型——回调、Publisher、Subscriber,代码复杂度翻了三倍,调试难度翻了十倍。我们团队花了三个月时间重写,上线后状况良好,但维护成本极高。
如果当时有JDK21虚拟线程,这个问题可以用一行代码解决。
今天把虚拟线程的原理和最佳实践全部讲透。
一、为什么需要虚拟线程?
1.1 传统平台线程的瓶颈
传统Java平台线程(Platform Thread):
- 一个Java线程 = 一个OS线程
- 每个OS线程默认占用约1MB栈内存
- 线程切换需要内核态切换(开销约1-10微秒)
瓶颈:
- 1000个线程 = 1GB内存(仅栈内存)
- 10000个线程会让OS调度开销显著增大
- 实际上,大多数IO密集型线程90%以上时间在等待1.2 虚拟线程的核心理念
虚拟线程(Virtual Thread):
- 由JVM管理,不是OS线程
- 栈内存按需分配,初始约几百字节(后面会扩展)
- 大量虚拟线程共享少量OS线程(载体线程,Carrier Thread)
- 阻塞时"卸载"(unmount)出载体线程,让出OS线程给其他虚拟线程
理论上限:
- 可以创建数百万个虚拟线程
- Java 21实测:10万个虚拟线程约30MB内存(vs 平台线程100GB)1.3 虚拟线程 vs 响应式编程
响应式编程(Reactor/RxJava):
- 编程模型复杂(回调地狱升级版)
- 调试困难(堆栈跟踪分散)
- 学习成本高
- 但性能好,适合极端场景
虚拟线程:
- 编程模型和传统线程完全一致(同步风格)
- 调试容易(连续堆栈)
- 学习成本极低(已有Java基础就够了)
- 性能在IO密集型场景媲美响应式
- 不适合CPU密集型(CPU密集型还是用平台线程)1.4 版本时间线
JDK19:Virtual Threads Preview(JEP 425)
JDK20:第二次Preview
JDK21:Virtual Threads GA(2023年9月,JEP 444)二、虚拟线程工作原理深度解析
2.1 Mount/Unmount机制
┌────────────────────────────────────────────────────────────────┐
│ 虚拟线程工作原理 │
│ │
│ 虚拟线程1 ─── mount ──► 载体线程(OS线程1) ──► 执行代码 │
│ │ │
│ │ IO等待(如socket.read()) │
│ │ │
│ └── unmount ──► 虚拟线程1挂起 │
│ 载体线程(OS线程1)释放出来 │
│ │
│ 虚拟线程2 ─── mount ──► 载体线程(OS线程1) ──► 执行代码 │
│ (之前在等待,现在可以运行) │
│ │
│ 效果:少量OS线程高效服务大量虚拟线程 │
└────────────────────────────────────────────────────────────────┘Mermaid图:
2.2 关键概念:Pinning(钉住)
虚拟线程在某些情况下无法unmount,被"钉住"在载体线程上:
导致Pinning的情况:
1. synchronized块/方法(JDK21及之前)
synchronized (lock) {
Thread.sleep(1000); // 虚拟线程被钉住!载体线程无法释放
}
2. native方法调用(JNI)
影响:
- 虚拟线程被钉住时,占用了一个载体线程
- 如果很多虚拟线程被钉住,可能耗尽载体线程池
解决方案:
- 用ReentrantLock替代synchronized(JDK21)
- JDK24会解决synchronized的Pinning问题(JEP 491)2.3 吞吐量对比测试数据
实测环境:MacBook M2,8核,JDK21
场景:模拟HTTP服务,每个请求sleep 100ms(模拟IO等待)
并发请求数:1000
平台线程池(固定500线程):
- 前500个请求立即处理
- 后500个请求等待线程释放
- 总耗时:约200ms
- 线程内存:约500MB
虚拟线程(每请求一个虚拟线程):
- 1000个虚拟线程同时运行
- 总耗时:约100ms(每个请求只需100ms)
- 虚拟线程内存:约5MB
吞吐量提升:约2倍(IO等待比例越高,提升越明显)三、完整代码示例
3.1 虚拟线程的四种创建方式
import java.util.concurrent.*;
import java.util.*;
import java.time.*;
import java.util.stream.*;
/**
* 虚拟线程完整示例
* 引入版本:JDK19 Preview;GA版本:JDK21(2023年9月,JEP 444)
*/
public class VirtualThreadDemo {
// ===== 方式1:Thread.ofVirtual() 创建单个虚拟线程 =====
static void createSingleVirtualThread() throws Exception {
// 旧写法:创建平台线程
Thread platformThread = new Thread(() -> {
System.out.println("Platform thread: " + Thread.currentThread());
});
platformThread.start();
platformThread.join();
// 新写法:创建虚拟线程
Thread virtualThread = Thread.ofVirtual()
.name("my-virtual-thread")
.start(() -> {
System.out.println("Virtual thread: " + Thread.currentThread());
System.out.println("Is virtual: " + Thread.currentThread().isVirtual());
});
virtualThread.join();
// 或者:Thread.startVirtualThread()(最简洁)
Thread.startVirtualThread(() -> {
System.out.println("Quick virtual thread");
}).join();
}
// ===== 方式2:Executors.newVirtualThreadPerTaskExecutor() =====
static void useVirtualThreadExecutor() throws Exception {
// 旧写法:固定线程池
try (var oldPool = Executors.newFixedThreadPool(10)) {
// 最多10个并发
}
// 新写法:虚拟线程执行器(每个任务一个虚拟线程)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var futures = new ArrayList<Future<String>>();
// 提交1000个任务(会创建1000个虚拟线程)
for (int i = 0; i < 1000; i++) {
final int taskId = i;
futures.add(executor.submit(() -> {
Thread.sleep(10); // 模拟IO等待
return "Task " + taskId + " done";
}));
}
// 收集结果
int completedCount = 0;
for (var f : futures) {
f.get(); // 等待每个任务完成
completedCount++;
}
System.out.println("Completed: " + completedCount + " tasks");
}
}
// ===== 方式3:StructuredTaskScope(JDK21,结构化并发)=====
static void useStructuredConcurrency() throws Exception {
// 场景:并行调用多个服务,全部成功才继续
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userFuture = scope.fork(() -> fetchUser(1L));
var orderFuture = scope.fork(() -> fetchOrders(1L));
var scoreFuture = scope.fork(() -> fetchScore(1L));
scope.join(); // 等待所有任务
scope.throwIfFailed(); // 如果有失败,抛出异常
// 所有任务成功,获取结果
var user = userFuture.get();
var orders = orderFuture.get();
var score = scoreFuture.get();
System.out.println("User: " + user + ", Orders: " + orders + ", Score: " + score);
}
// 场景:并行调用,取最快的结果
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> { Thread.sleep(100); return "primary"; });
scope.fork(() -> { Thread.sleep(50); return "fallback"; });
scope.join();
String result = scope.result(); // 第一个成功的结果
System.out.println("Fastest result: " + result);
}
}
// ===== 方式4:在Spring Boot中启用(配置化)=====
// application.properties:
// spring.threads.virtual.enabled=true
// 这样Spring MVC/WebFlux的每个请求都会用虚拟线程处理
// 模拟IO操作
static String fetchUser(long id) throws Exception {
Thread.sleep(50); // 模拟数据库查询
return "User#" + id;
}
static List<String> fetchOrders(long userId) throws Exception {
Thread.sleep(80); // 模拟查询
return List.of("Order#1", "Order#2");
}
static int fetchScore(long userId) throws Exception {
Thread.sleep(30); // 模拟查询
return 95;
}
public static void main(String[] args) throws Exception {
System.out.println("=== 创建单个虚拟线程 ===");
createSingleVirtualThread();
System.out.println("\n=== 虚拟线程执行器 ===");
long start = System.currentTimeMillis();
useVirtualThreadExecutor();
System.out.println("耗时: " + (System.currentTimeMillis() - start) + "ms");
System.out.println("\n=== 结构化并发 ===");
start = System.currentTimeMillis();
useStructuredConcurrency();
System.out.println("耗时: " + (System.currentTimeMillis() - start) + "ms");
}
}3.2 吞吐量对比测试
import java.util.concurrent.*;
import java.util.*;
import java.util.concurrent.atomic.*;
/**
* 虚拟线程 vs 平台线程 性能对比
*/
public class VirtualThreadBenchmark {
static final int REQUESTS = 10_000;
static final int IO_SLEEP_MS = 50; // 模拟50ms的IO等待
// 模拟IO密集型操作
static String simulateIOTask(int id) throws InterruptedException {
Thread.sleep(IO_SLEEP_MS);
return "Result-" + id;
}
// ===== 测试1:平台线程池(固定大小)=====
static long testPlatformThreadPool(int poolSize) throws Exception {
long start = System.currentTimeMillis();
try (var executor = Executors.newFixedThreadPool(poolSize)) {
var futures = new ArrayList<Future<String>>(REQUESTS);
for (int i = 0; i < REQUESTS; i++) {
final int id = i;
futures.add(executor.submit(() -> simulateIOTask(id)));
}
for (var f : futures) f.get();
}
return System.currentTimeMillis() - start;
}
// ===== 测试2:虚拟线程(每任务一个)=====
static long testVirtualThreads() throws Exception {
long start = System.currentTimeMillis();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var futures = new ArrayList<Future<String>>(REQUESTS);
for (int i = 0; i < REQUESTS; i++) {
final int id = i;
futures.add(executor.submit(() -> simulateIOTask(id)));
}
for (var f : futures) f.get();
}
return System.currentTimeMillis() - start;
}
// ===== 内存占用对比 =====
static void measureMemory() throws Exception {
Runtime runtime = Runtime.getRuntime();
// 测试平台线程内存
runtime.gc();
long beforePlatform = runtime.totalMemory() - runtime.freeMemory();
List<Thread> platformThreads = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 1000; i++) {
Thread t = new Thread(() -> {
try { latch.await(); } catch (InterruptedException e) {}
});
t.start();
platformThreads.add(t);
}
Thread.sleep(500);
long afterPlatform = runtime.totalMemory() - runtime.freeMemory();
latch.countDown();
for (Thread t : platformThreads) t.join();
System.out.printf("1000平台线程额外内存: %.1f MB%n",
(afterPlatform - beforePlatform) / 1024.0 / 1024.0);
// 测试虚拟线程内存
runtime.gc();
Thread.sleep(200);
long beforeVirtual = runtime.totalMemory() - runtime.freeMemory();
CountDownLatch latch2 = new CountDownLatch(1);
List<Thread> virtualThreads = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
Thread t = Thread.ofVirtual().start(() -> {
try { latch2.await(); } catch (InterruptedException e) {}
});
virtualThreads.add(t);
}
Thread.sleep(500);
long afterVirtual = runtime.totalMemory() - runtime.freeMemory();
latch2.countDown();
for (Thread t : virtualThreads) t.join();
System.out.printf("1000虚拟线程额外内存: %.1f MB%n",
(afterVirtual - beforeVirtual) / 1024.0 / 1024.0);
}
public static void main(String[] args) throws Exception {
System.out.println("测试场景:" + REQUESTS + "个任务,每个IO等待" + IO_SLEEP_MS + "ms");
// 预热
System.out.println("预热中...");
testPlatformThreadPool(100);
testVirtualThreads();
// 正式测试
System.out.println("\n正式测试:");
long t100 = testPlatformThreadPool(100);
System.out.printf("平台线程池(100): %d ms (理论最少: %d ms)%n",
t100, IO_SLEEP_MS * REQUESTS / 100);
long t200 = testPlatformThreadPool(200);
System.out.printf("平台线程池(200): %d ms%n", t200);
long t500 = testPlatformThreadPool(500);
System.out.printf("平台线程池(500): %d ms%n", t500);
long tv = testVirtualThreads();
System.out.printf("虚拟线程: %d ms (接近理论最少: %d ms)%n",
tv, IO_SLEEP_MS);
System.out.printf("%n提升倍数(相比100线程池): %.1fx%n", (double) t100 / tv);
System.out.println("\n内存占用对比:");
measureMemory();
}
}3.3 Pinning问题和解决方案
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
/**
* 虚拟线程Pinning问题演示和解决
*/
public class VirtualThreadPinning {
private static final Object MONITOR = new Object();
private static final ReentrantLock REENTRANT_LOCK = new ReentrantLock();
// ===== 有Pinning问题的代码 =====
static void withPinning(int id) throws InterruptedException {
synchronized (MONITOR) { // synchronized导致Pinning
Thread.sleep(100); // IO等待时虚拟线程被钉住!
System.out.println("Pinned task " + id + " done");
}
}
// ===== 避免Pinning的正确写法 =====
static void withoutPinning(int id) throws InterruptedException {
REENTRANT_LOCK.lock(); // 使用ReentrantLock
try {
Thread.sleep(100); // IO等待时虚拟线程可以正常unmount
System.out.println("Non-pinned task " + id + " done");
} finally {
REENTRANT_LOCK.unlock();
}
}
// ===== 检测Pinning事件 =====
static void detectPinning() throws Exception {
// 开启Pinning诊断:
// java -Djdk.tracePinnedThreads=full ...
// 或者用JFR(Java Flight Recorder):
// java -XX:StartFlightRecording=settings=default,filename=pinning.jfr ...
// 代码层面检测:
System.setProperty("jdk.tracePinnedThreads", "short"); // 打印Pinned事件
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 5; i++) {
final int id = i;
executor.submit(() -> withPinning(id));
}
}
}
// ===== 数据库连接池适配 =====
// 问题:HikariCP等连接池内部用synchronized,虚拟线程会Pinning
// 解决:使用支持虚拟线程的版本
// HikariCP 5.1.0+:已解决synchronized问题
// JDBC驱动:大多数主流驱动已更新
// ===== ThreadLocal的注意事项 =====
static void threadLocalCaution() {
// 警告:大量虚拟线程使用ThreadLocal会增加内存压力
// 因为每个虚拟线程都有自己的ThreadLocal副本
ThreadLocal<String> context = new ThreadLocal<>();
// 推荐改用ScopedValue(JDK21,更适合虚拟线程)
// ScopedValue<String> scoped = ScopedValue.newInstance();
// ScopedValue.where(scoped, "value").run(() -> {
// System.out.println(scoped.get()); // "value"
// });
// ThreadLocal在虚拟线程里还是可以用,但要注意清理
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
context.set("virtual-thread-value");
try {
System.out.println(context.get());
} finally {
context.remove(); // 重要!用完要清理
}
});
}
}
public static void main(String[] args) throws Exception {
System.out.println("=== Pinning对比 ===");
// 对比:有Pinning vs 无Pinning
long start = System.currentTimeMillis();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var tasks = new ArrayList<Future<?>>();
for (int i = 0; i < 100; i++) {
final int id = i;
tasks.add(executor.submit(() -> withPinning(id)));
}
for (var t : tasks) t.get();
}
System.out.println("有Pinning(synchronized): " + (System.currentTimeMillis() - start) + "ms");
start = System.currentTimeMillis();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var tasks = new ArrayList<Future<?>>();
for (int i = 0; i < 100; i++) {
final int id = i;
tasks.add(executor.submit(() -> withoutPinning(id)));
}
for (var t : tasks) t.get();
}
System.out.println("无Pinning(ReentrantLock): " + (System.currentTimeMillis() - start) + "ms");
}
}四、踩坑实录
坑1:用虚拟线程做CPU密集型任务反而更慢
// 误区:虚拟线程不适合CPU密集型任务!
// CPU密集型任务不会有IO等待,虚拟线程没有优势
// 反而因为调度开销更大
// 错误用法:
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
// CPU密集型:矩阵计算、加密解密等
return computeFibonacci(40); // 纯计算
});
}
}
// 问题:虚拟线程调度器是Work-Stealing Pool,
// CPU密集型会导致载体线程全被占满,后续虚拟线程无法调度
// 正确做法:CPU密集型用固定大小的平台线程池
int cpuCores = Runtime.getRuntime().availableProcessors();
try (var executor = Executors.newFixedThreadPool(cpuCores)) {
// CPU密集型任务
}坑2:synchronized导致Pinning影响整体性能
// 生产问题:第三方库用了synchronized,导致虚拟线程Pinning
// 排查方法:
// 1. JFR分析
// java -XX:+EnableDynamicAgentLoading \
// -XX:StartFlightRecording=filename=vt.jfr \
// -Djdk.tracePinnedThreads=full \
// YourApp
// 2. 代码层面监控
Thread.currentThread().isVirtual(); // 检查是否是虚拟线程
// 临时解决方案(等待第三方库升级):
// 增加载体线程池大小
// System.setProperty("jdk.virtualThreadScheduler.parallelism", "32"); // 默认是CPU核心数
// 根本解决:等待JDK24的JEP 491(解决synchronized pinning)坑3:线程池混用导致混淆
// 常见混淆:把虚拟线程执行器和普通线程池混用
// 错误:用信号量控制虚拟线程并发数(通常不必要)
Semaphore semaphore = new Semaphore(100);
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
semaphore.acquire(); // 限制并发,但这对IO等待没意义
try {
doIOWork();
} finally {
semaphore.release();
}
});
}
}
// 如果是为了保护下游系统不被过载,这是合理的
// 但如果只是担心"太多线程",对虚拟线程来说不必要
// 虚拟线程的设计初衷就是可以创建大量线程坑4:ThreadLocal内存泄漏
// 问题:虚拟线程生命周期短,但ThreadLocal的值可能持有大对象
// 危险:在虚拟线程里设置大对象到ThreadLocal,不清理
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 100000; i++) {
executor.submit(() -> {
MY_THREAD_LOCAL.set(new HeavyObject()); // 危险!
doWork();
// 如果虚拟线程被复用(实际上不会),HeavyObject会一直留着
// 虽然虚拟线程不复用,但每次创建都会有ThreadLocal副本
});
}
}
// 正确:用try-finally清理
MY_THREAD_LOCAL.set(new HeavyObject());
try {
doWork();
} finally {
MY_THREAD_LOCAL.remove(); // 必须清理
}
// 更好:JDK21的ScopedValue(自动清理)
// ScopedValue<HeavyObject> sv = ScopedValue.newInstance();
// ScopedValue.where(sv, new HeavyObject()).run(this::doWork);
// // doWork执行完毕后,ScopedValue自动失效,不需要手动清理坑5:虚拟线程不适合作为线程池(不要Pooling虚拟线程)
// 错误理解:把虚拟线程当平台线程一样做池化
// 错误:
class WrongVirtualThreadPool {
// 不要这样!虚拟线程设计上就是"用完即丢"
// 不需要pooling,也不应该pooling
private final Queue<Thread> pool = new LinkedList<>();
}
// 正确理解:
// 1. newVirtualThreadPerTaskExecutor() 每个任务创建一个虚拟线程,自动管理
// 2. 不要担心创建虚拟线程的开销(比创建平台线程便宜很多)
// 3. 不要尝试重用虚拟线程(JVM会优化的)
// 正确用法:
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 提交任务,每个任务自动获得一个虚拟线程
executor.submit(() -> doWork());
}五、总结与延伸
5.1 虚拟线程适用场景决策树
你的任务是:
IO密集型(数据库查询、HTTP调用、文件读写)
├── 需要高并发(数千~百万请求)
│ └── 虚拟线程(每任务一个虚拟线程)
└── 并发不高(几十~几百)
└── 普通线程池也够用
CPU密集型(计算、加密、图像处理)
└── 固定大小平台线程池(CPU核心数)
混合型(IO+CPU)
├── IO部分:虚拟线程
└── CPU部分:平台线程池5.2 性能数字总结
| 对比项 | 平台线程 | 虚拟线程 |
|---|---|---|
| 创建开销 | ~1ms | ~微秒级 |
| 内存(每个) | ~1MB | ~几百字节 |
| 实际百万并发内存 | ~1TB(不可能) | ~几百MB(可行) |
| IO密集型吞吐量(1000并发,50ms IO) | 受线程数限制 | 理论最大吞吐 |
| CPU密集型性能 | 好 | 略差(调度开销) |
5.3 版本兼容建议
- JDK21(LTS):虚拟线程GA,生产可用
- Spring Boot 3.2+:一键启用虚拟线程(
spring.threads.virtual.enabled=true) - Tomcat 10.1.x+, Jetty 12+:原生支持虚拟线程
- 数据库驱动:PostgreSQL JDBC 42.7+, MySQL Connector/J 8.2+等均已兼容
- JDK24:预计解决synchronized的Pinning问题(JEP 491)
