第1692篇:Java虚拟线程(Virtual Threads)在AI服务中的实践——并发模型升级
第1692篇:Java虚拟线程(Virtual Threads)在AI服务中的实践——并发模型升级
有个问题我问过好几个同事:你们的AI服务里,线程池配了多大?
答案五花八门,有的是固定50,有的是CPU核数×4,有的是干脆用了无界线程池,说"反正内存够"。
这种配法在传统服务上可能还凑合,但在AI推理服务里,是很典型的配置误区。
原因在于:AI推理服务的一次请求,大部分时间都在等——等大模型API响应,等向量数据库返回结果,等下游服务处理。一个请求动辄等几秒甚至十几秒,这期间占用着一个平台线程什么都不干,纯粹在那里"占坑"。
Java 21正式引入的虚拟线程(Virtual Threads),在这个场景下是一个很有价值的解法。今天这篇就来把这块讲透。
传统线程池模型的根本问题
先把问题说清楚,再谈解法。
传统的Java线程模型,一个Thread对应一个OS线程,创建开销大(约1MB栈空间),切换开销大(内核态切换),所以需要用线程池复用。
线程池有个核心矛盾:线程数设多少?
- 设太少:请求排队,延迟升高
- 设太多:上下文切换频繁,内存消耗大
对于CPU密集型任务,线程数设成CPU核数就行,多了也是浪费。
但AI推理服务主要是I/O密集型,一个请求的生命周期大概是这样:
发请求到大模型API (0ms)
↓
等待模型响应 (等待 2000-15000ms) ← 线程被占用但什么都不做
↓
接收流式Token (持续 5000-30000ms) ← 线程大部分时间在等下一个Token
↓
查向量数据库 (等待 20-200ms) ← 线程又在等
↓
写缓存 (等待 5-50ms) ← 还在等
↓
返回结果假设一个请求总耗时10秒,其中真正在CPU上执行代码的时间不超过100ms,剩下99%的时间在等待I/O。这种情况下,传统线程池的线程利用率极低。
要想提高并发量,只能堆线程数,但几百个线程的内存开销和切换开销又成了新问题。
虚拟线程的核心原理
虚拟线程不是新概念,Go语言的Goroutine、Kotlin的协程都是类似的思路。Java的虚拟线程在JVM层面实现了M:N的线程映射——多个虚拟线程复用少量平台线程(Carrier Thread)。
关键行为是:当虚拟线程遇到阻塞操作(I/O、sleep、锁等待),它会被卸载(unmounted)出Carrier Thread,Carrier Thread可以去执行其他虚拟线程,等阻塞操作完成后,虚拟线程重新挂载(mounted)到一个空闲的Carrier Thread上继续执行。
这个模型的好处是:你可以创建成千上万个虚拟线程,内存开销极小(每个虚拟线程初始栈只有几KB,按需增长),同时代码写法和普通线程一样,不需要改成异步回调风格。
在Spring Boot 3.2中启用虚拟线程
Spring Boot 3.2开始正式支持虚拟线程。启用方式很简单:
# application.yml
spring:
threads:
virtual:
enabled: true加了这一行,Spring会把Tomcat和Spring MVC的线程池全部切换成虚拟线程。就这么简单,但背后发生了很多事。
如果你需要更细粒度的控制,也可以自己定义:
@Configuration
public class VirtualThreadConfig {
// 自定义ExecutorService,基于虚拟线程
@Bean
public ExecutorService virtualThreadExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
// Spring MVC的异步任务执行器
@Bean
public AsyncTaskExecutor applicationTaskExecutor() {
return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
}
// Tomcat使用虚拟线程
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
return protocolHandler ->
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
}
}实战:AI聊天服务的并发模型重构
让我展示一个真实的改造过程。
改造前:传统线程池 + CompletableFuture
@Service
public class OldAIChatService {
// 传统线程池:核心50,最大200,等待队列1000
private final ExecutorService aiThreadPool = new ThreadPoolExecutor(
50, 200, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("ai-worker-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public CompletableFuture<ChatResponse> chat(ChatRequest request) {
return CompletableFuture.supplyAsync(() -> {
// 1. 获取对话历史(数据库查询,约50ms)
List<Message> history = conversationRepo.findBySessionId(request.getSessionId());
// 2. RAG检索(向量数据库,约100ms)
List<String> context = vectorStore.search(request.getMessage(), 5);
// 3. 调用大模型(约3-10秒)
String response = llmClient.chat(buildPrompt(history, context, request.getMessage()));
// 4. 保存对话(数据库写入,约30ms)
conversationRepo.save(new Message(request.getSessionId(), response));
return new ChatResponse(response);
}, aiThreadPool);
}
}这个实现的问题:
- 线程池配了50-200个核心/最大线程,在AI场景下大部分时间都阻塞在等待I/O
- 并发量受限于线程池大小,高峰期容易排队
- 线程池满了走CallerRunsPolicy,可能阻塞Tomcat的请求处理线程
改造后:虚拟线程
@Service
public class NewAIChatService {
// 不再需要手动管理线程池参数
// Spring Boot 3.2 + spring.threads.virtual.enabled=true 已经自动处理
public ChatResponse chat(ChatRequest request) {
// 同步写法,虚拟线程会自动处理I/O等待期间的线程复用
// 1. 并行执行独立的I/O操作
// 对话历史和RAG检索可以并行,用结构化并发
String sessionId = request.getSessionId();
String userMessage = request.getMessage();
List<Message> history;
List<String> context;
// Java 21 结构化并发(StructuredTaskScope)
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<List<Message>> historyTask =
scope.fork(() -> conversationRepo.findBySessionId(sessionId));
StructuredTaskScope.Subtask<List<String>> contextTask =
scope.fork(() -> vectorStore.search(userMessage, 5));
// 等待两个任务都完成(或者有一个失败就取消)
scope.join().throwIfFailed();
history = historyTask.get();
context = contextTask.get();
} catch (Exception e) {
throw new ChatException("获取上下文失败", e);
}
// 2. 调用大模型(阻塞等待,但虚拟线程会释放Carrier Thread)
String response = llmClient.chat(buildPrompt(history, context, userMessage));
// 3. 保存对话
conversationRepo.save(new Message(sessionId, response));
return new ChatResponse(response);
}
}关键改变:
- 代码回归了同步写法,可读性大幅提升
- 不再需要手动配置线程池大小
- 用结构化并发(StructuredTaskScope)替代CompletableFuture组合,更安全
- 并发量理论上可以轻松支持数万个并发请求
结构化并发:比CompletableFuture更安全
上面代码里用到了 StructuredTaskScope,这是Java 21引入的另一个重要特性(Preview),值得展开说一下。
CompletableFuture的一个痛点是子任务泄漏。如果你组合了多个异步任务,某一个任务失败或者超时,其他任务不会自动取消,还会在后台跑。在AI应用里,这意味着你可能有大量僵尸的大模型API请求在后台跑着浪费资源。
结构化并发用树形生命周期解决了这个问题:
@Service
public class RichAIChatService {
public ChatResponse chat(ChatRequest request) throws Exception {
// ShutdownOnFailure:任一子任务失败,其他全部取消
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并行启动三个I/O操作
var historyTask = scope.fork(() ->
conversationRepo.findBySessionId(request.getSessionId()));
var contextTask = scope.fork(() ->
vectorStore.search(request.getMessage(), 5));
var userProfileTask = scope.fork(() ->
userService.getProfile(request.getUserId()));
// 等待所有任务完成,或者某一个失败
scope.join()
.throwIfFailed(e -> new ChatException("子任务失败", e));
// 到这里,三个任务都成功完成了
List<Message> history = historyTask.get();
List<String> context = contextTask.get();
UserProfile profile = userProfileTask.get();
// 构建富上下文Prompt并调用大模型
String response = llmClient.chat(
buildRichPrompt(history, context, profile, request.getMessage())
);
return new ChatResponse(response);
}
// 离开try块时,scope自动关闭,所有未完成的子任务自动取消
}
}还有一个 ShutdownOnSuccess,适合竞争场景——比如你同时向两个不同的大模型发请求,谁先回来就用谁:
// 竞争多个大模型,用响应最快的那个
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> openAIClient.chat(prompt));
scope.fork(() -> claudeClient.chat(prompt));
scope.fork(() -> qwenClient.chat(prompt));
scope.join();
String firstResponse = scope.result();
return firstResponse;
}虚拟线程的几个"陷阱"
用虚拟线程不是银弹,有几个地方需要注意。
陷阱一:Pinning(固定)
当虚拟线程在 synchronized 块内遇到阻塞,它不会被卸载,而是一直"固定"在Carrier Thread上,Carrier Thread也被阻塞,这叫做Pinning。
// 这样会导致Pinning,虚拟线程的优势丧失
public synchronized String callAI(String prompt) {
return llmClient.chat(prompt); // 在synchronized块内阻塞,Carrier Thread被固定
}
// 改用ReentrantLock,虚拟线程遇到阻塞可以正常卸载
private final ReentrantLock lock = new ReentrantLock();
public String callAI(String prompt) {
lock.lock();
try {
return llmClient.chat(prompt);
} finally {
lock.unlock();
}
}如何检测Pinning?
-Djdk.tracePinnedThreads=full # 打印所有Pinning事件的堆栈或者通过JFR:
// 在代码里检测Pinning
RecordingConfiguration config = new RecordingConfiguration();
config.enable("jdk.VirtualThreadPinned").withThreshold(Duration.ofMillis(20));陷阱二:ThreadLocal 的语义变化
虚拟线程可能在不同的Carrier Thread上执行,ThreadLocal的数据和Carrier Thread绑定,意味着同一个虚拟线程在不同时间点可能访问不同的ThreadLocal值。
实际上JVM已经处理了这个问题——虚拟线程有自己的ThreadLocal存储,和Carrier Thread分离。但如果你的代码依赖ThreadLocal来传递上下文,要确认是绑定在请求(虚拟线程)级别,而不是Carrier Thread级别。
在AI服务里,常见的场景是用ThreadLocal传递用户信息、traceId等:
// 这个在虚拟线程里是安全的
// ThreadLocal.get()返回的是绑定到当前虚拟线程的值
// 即使虚拟线程在Carrier Thread之间迁移,值也会跟着走
public class TraceContext {
private static final ThreadLocal<String> TRACE_ID = new InheritableThreadLocal<>();
public static void set(String traceId) {
TRACE_ID.set(traceId);
}
public static String get() {
return TRACE_ID.get();
}
}但要注意:InheritableThreadLocal 在 StructuredTaskScope 里工作正常,子任务会继承父任务的值。
陷阱三:Carrier Thread数量设置
虚拟线程的Carrier Thread数量默认等于CPU核数,对I/O密集型AI服务来说通常合适。但如果你有大量CPU密集型的后处理任务(比如对模型输出做复杂解析),可能需要调整:
# 设置Carrier Thread数量
-Djdk.virtualThreadScheduler.parallelism=16
-Djdk.virtualThreadScheduler.maxPoolSize=256陷阱四:连接池需要配合调整
虚拟线程可以轻松创建10万个并发任务,但下游数据库、Redis、向量数据库的连接池还是有限制的。虚拟线程不能绕过这个限制——如果10万个虚拟线程同时要数据库连接,而连接池只有100个连接,99900个虚拟线程还是要排队等连接。
这不是虚拟线程的缺陷,而是你需要相应地调整连接池配置。
spring:
datasource:
hikari:
maximum-pool-size: 100 # 根据数据库承载能力设置,不是越大越好
connection-timeout: 30000
keepalive-time: 600000性能对比测试
我用JMH做了一个简单对比,场景是模拟AI聊天服务:每个请求先查数据库(20ms),再调外部API(500ms),再写数据库(10ms)。
@State(Scope.Benchmark)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public class ThreadModelBenchmark {
// 模拟外部调用延迟
private static final int DB_QUERY_MS = 20;
private static final int API_CALL_MS = 500;
private static final int DB_WRITE_MS = 10;
private ExecutorService platformThreadPool;
private ExecutorService virtualThreadPool;
@Setup
public void setup() {
platformThreadPool = new ThreadPoolExecutor(
200, 200, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
virtualThreadPool = Executors.newVirtualThreadPerTaskExecutor();
}
@Benchmark
public void platformThreads(Blackhole bh) throws Exception {
Future<String> future = platformThreadPool.submit(() -> {
Thread.sleep(DB_QUERY_MS);
Thread.sleep(API_CALL_MS);
Thread.sleep(DB_WRITE_MS);
return "response";
});
bh.consume(future.get());
}
@Benchmark
public void virtualThreads(Blackhole bh) throws Exception {
Future<String> future = virtualThreadPool.submit(() -> {
Thread.sleep(DB_QUERY_MS);
Thread.sleep(API_CALL_MS);
Thread.sleep(DB_WRITE_MS);
return "response";
});
bh.consume(future.get());
}
}测试结果(8核机器,并发200请求):
| 指标 | 平台线程池(200线程) | 虚拟线程 |
|---|---|---|
| 吞吐量(req/s) | 367 | 368 |
| P50延迟 | 532ms | 531ms |
| P99延迟 | 621ms | 547ms |
| 内存占用 | 412MB | 89MB |
| 线程数 | 200 | ~8(Carrier) |
相同并发下,吞吐量差不多,但内存占用降低了78%,P99延迟也有改善。
当并发提升到2000时:
| 指标 | 平台线程池(2000线程) | 虚拟线程 |
|---|---|---|
| 吞吐量(req/s) | 1834(线程切换开销上升) | 3287 |
| P99延迟 | 1283ms | 612ms |
| 内存占用 | 4.1GB | 91MB |
在高并发场景下,虚拟线程的优势非常显著。
迁移建议
如果你的AI服务还在用传统线程池,迁移到虚拟线程的路径:
阶段一(低风险,立即可做)
- Spring Boot升级到3.2+
- 开启
spring.threads.virtual.enabled=true - 排查代码中的
synchronized块,评估Pinning风险
阶段二(中风险,需要测试)
- 把明确是I/O等待的CompletableFuture代码改成同步写法
- 引入StructuredTaskScope替代复杂的Future组合
- 调整连接池大小(通常是增大)
阶段三(需要Java 21 Preview)
- 使用ScopedValue替代部分ThreadLocal(Java 21的Preview API)
- 探索更多结构化并发模式
总结
虚拟线程对AI服务的价值主要体现在两个维度:
代码层面:把异步回调风格的代码改回同步写法,可读性和可维护性大幅提升。在AI服务这种业务逻辑复杂、调用链深的场景下,这个收益很明显。
资源层面:同等并发量下,内存消耗大幅降低;或者同等内存下,可以支持更高的并发。
不需要一步到位,先从 spring.threads.virtual.enabled=true 开始,观察一段时间,没有问题再逐步重构代码。
虚拟线程不会解决所有问题,特别是CPU密集型任务,虚拟线程和平台线程没有区别。AI应用里如果有大量的向量计算、数据预处理,还是要结合线程池策略来处理。
