第2327篇:Java虚拟线程与AI并发——Project Loom如何改变LLM调用的编程模式
第2327篇:Java虚拟线程与AI并发——Project Loom如何改变LLM调用的编程模式
适读人群:Java后端工程师,特别是在AI应用中遇到并发和吞吐量瓶颈的开发者 | 阅读时长:约20分钟 | 核心价值:理解虚拟线程的本质优势,掌握在LLM调用场景下的正确用法和避坑点
去年我们做了一个压测,用来对比传统线程池和虚拟线程处理LLM调用的吞吐量差异。
测试场景:模拟100个并发用户同时发送问题,每次LLM调用耗时约3-8秒(这是实际生产中的典型延迟)。
结果让我印象很深:传统Executors.newFixedThreadPool(200),200线程吃满,CPU使用率8%,大部分时间在等LLM响应;虚拟线程版本,同样200个并发用户,CPU使用率5%,但可以轻松扩展到2000个并发用户而不需要改任何代码。
这个结果其实并不意外——LLM调用是典型的IO密集型任务(等待远端API响应),而虚拟线程就是专门为这类场景设计的。
但真正用起来,有不少细节需要注意。
虚拟线程:解决的是什么问题
先把概念说清楚,再看代码。
传统线程(平台线程)的问题:每个线程对应一个OS线程,内存开销约1-2MB。一台8G内存的服务器,最多支撑几千个线程,超过这个数就会OOM或者线程切换开销激增。
对于CPU密集型任务,这个限制不是大问题——CPU忙着算,线程数够用就行。
但对于IO密集型任务(数据库查询、HTTP调用、LLM API调用),线程80%的时间在等待,白白占着OS资源什么都不做。这就是"阻塞问题"。
传统的解法是响应式编程(WebFlux + Project Reactor),但代价是编程模型完全变了:Mono<String>、Flux<T>、.flatMap()、.subscribeOn()……熟悉Spring MVC的工程师需要重新学习一套思维体系。
虚拟线程的思路是:让你继续写同步代码,但底层自动变成异步的。
当虚拟线程遇到IO等待时,JVM会自动挂起(不是阻塞OS线程),把当前OS线程让给其他可运行的虚拟线程。IO完成后,从任意空闲的OS线程恢复执行。
平台线程模型:
平台线程1 [====等待LLM====等待LLM====] → OS线程被占用,无法干其他事
平台线程2 [==等待DB=====等待DB======] → 又一个OS线程被占用
虚拟线程模型:
虚拟线程1 [====]→挂起,OS线程去执行VT3→[恢复==]→挂起→[恢复]
虚拟线程2 [==]→挂起→[恢复=====]→挂起→[恢复]
虚拟线程3 [=====]→挂起→[恢复========]
→ 少量OS线程 × 大量虚拟线程,OS线程利用率接近100%在Spring Boot中启用虚拟线程
从Java 21开始,虚拟线程正式GA。Spring Boot 3.2+内置了对虚拟线程的支持:
# application.yml
spring:
threads:
virtual:
enabled: true # 一行配置,把Spring MVC、@Async等都切换到虚拟线程这一行配置做了什么?它把Spring Boot的内嵌Tomcat(或Jetty/Undertow)的请求处理线程池切换到虚拟线程,把@Async的默认执行器切换到虚拟线程执行器。
验证是否生效:
@RestController
public class ThreadInfoController {
@GetMapping("/thread-info")
public Map<String, Object> threadInfo() {
Thread current = Thread.currentThread();
return Map.of(
"threadName", current.getName(),
"isVirtual", current.isVirtual(), // Java 21新增
"threadId", current.threadId()
);
}
}开启后,响应会包含"isVirtual": true。
LLM调用的并发模式
场景一:简单并发调用,不需要额外配置
开启虚拟线程后,Spring MVC的每个请求已经在虚拟线程上运行,直接写同步代码就可以了:
@RestController
@RequiredArgsConstructor
@Slf4j
public class AiController {
private final ChatClient chatClient;
@PostMapping("/analyze")
public AnalysisResult analyze(@RequestBody AnalysisRequest request) {
// 直接写同步代码,虚拟线程负责处理等待
// 不需要 CompletableFuture,不需要 Mono
log.info("处理请求,线程:{},是否虚拟:{}",
Thread.currentThread().getName(),
Thread.currentThread().isVirtual());
String result = chatClient.prompt()
.user(request.content())
.call()
.content();
return new AnalysisResult(result);
}
}100个用户同时请求,就会有100个虚拟线程同时运行,每个都在等待LLM响应时被挂起,OS线程可以去服务其他虚拟线程。
场景二:一个请求内并行调用多个LLM
这是更典型的场景——比如对同一段代码同时请求"bug分析"和"优化建议",希望并行执行以减少总延迟:
@Service
@RequiredArgsConstructor
public class ParallelAnalysisService {
private final ChatClient chatClient;
// 对同一内容并行执行多种分析
public ComprehensiveAnalysis analyzeComprehensively(String code) {
// 用虚拟线程并行执行三个分析任务
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
Future<String> bugFuture = executor.submit(() ->
analyzeWithRole("你是代码安全专家,找出这段代码的潜在bug和安全问题", code));
Future<String> perfFuture = executor.submit(() ->
analyzeWithRole("你是性能优化专家,分析这段代码的性能问题", code));
Future<String> styleFuture = executor.submit(() ->
analyzeWithRole("你是代码质量专家,评估代码风格和可读性", code));
// 等待所有结果(每个Future.get()都会阻塞当前虚拟线程,但不阻塞OS线程)
String bugs = bugFuture.get(30, TimeUnit.SECONDS);
String performance = perfFuture.get(30, TimeUnit.SECONDS);
String style = styleFuture.get(30, TimeUnit.SECONDS);
return new ComprehensiveAnalysis(bugs, performance, style);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new AiAnalysisException("并行分析失败", e);
}
}
private String analyzeWithRole(String systemPrompt, String content) {
return chatClient.prompt()
.system(systemPrompt)
.user(content)
.call()
.content();
}
public record ComprehensiveAnalysis(String bugs, String performance, String style) {}
}注意try-with-resources的用法:Executors.newVirtualThreadPerTaskExecutor()返回的是ExecutorService,实现了AutoCloseable,try块结束时会自动关闭执行器并等待所有任务完成。这是Java 21推荐的用法,不需要手动shutdown()。
场景三:批量文档处理
处理大量文档时,既要并发又要控制速率(避免LLM API限流):
@Service
@Slf4j
public class BatchDocumentProcessor {
private final ChatClient chatClient;
// 用Semaphore控制并发数(避免API限流)
private final Semaphore rateLimiter;
public BatchDocumentProcessor(ChatClient chatClient) {
this.chatClient = chatClient;
// 最多10个并发LLM调用
this.rateLimiter = new Semaphore(10);
}
public List<DocumentSummary> processDocuments(List<Document> documents) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<DocumentSummary>> futures = documents.stream()
.map(doc -> executor.submit(() -> processOne(doc)))
.toList();
List<DocumentSummary> results = new ArrayList<>();
for (Future<DocumentSummary> future : futures) {
try {
results.add(future.get(60, TimeUnit.SECONDS));
} catch (ExecutionException e) {
log.error("文档处理失败", e.getCause());
results.add(DocumentSummary.failed(e.getMessage()));
} catch (TimeoutException e) {
log.warn("文档处理超时,已跳过");
results.add(DocumentSummary.timeout());
}
}
return results;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("批量处理被中断", e);
}
}
private DocumentSummary processOne(Document doc) throws InterruptedException {
rateLimiter.acquire(); // 获取许可(虚拟线程等待时不占用OS线程)
try {
String summary = chatClient.prompt()
.system("你是文档摘要专家,用100字以内总结文档核心内容")
.user(doc.content())
.call()
.content();
return DocumentSummary.success(doc.id(), summary);
} finally {
rateLimiter.release();
}
}
}虚拟线程的陷阱:Pinning问题
虚拟线程不是万能的。有一类场景会导致虚拟线程"固定"到OS线程上(称为Pinning),退化成平台线程的行为:
1. synchronized块/方法
// 危险:synchronized会导致虚拟线程pin住OS线程
public class BrokenCache {
private final Map<String, String> cache = new HashMap<>();
public synchronized String get(String key) {
if (!cache.containsKey(key)) {
// 这里有LLM调用,但因为synchronized,虚拟线程无法挂起
// 会pin住OS线程,彻底失去虚拟线程的优势
String value = callLlm(key);
cache.put(key, value);
}
return cache.get(key);
}
}
// 正确:用ReentrantLock替换synchronized
public class GoodCache {
private final Map<String, String> cache = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock();
public String get(String key) {
return cache.computeIfAbsent(key, k -> {
// ConcurrentHashMap的computeIfAbsent内部用了synchronized
// 但我们可以改用显式锁
return callLlm(k);
});
}
}
// 更好的方式:用ConcurrentHashMap的原子操作
public class BestCache {
private final Map<String, CompletableFuture<String>> cache = new ConcurrentHashMap<>();
public String get(String key) {
CompletableFuture<String> future = cache.computeIfAbsent(key,
k -> CompletableFuture.supplyAsync(() -> callLlm(k),
Executors.newVirtualThreadPerTaskExecutor()));
try {
return future.get();
} catch (Exception e) {
cache.remove(key); // 失败时清除缓存
throw new RuntimeException(e);
}
}
}2. 检测Pinning的方法
JVM提供了系统属性来打印Pinning警告:
# 启动时添加JVM参数,检测到Pinning时打印警告
java -Djdk.tracePinnedThreads=full -jar your-app.jar或者在Spring Boot的配置里:
# application.yml(等效方式)
spring:
jvm:
args: "-Djdk.tracePinnedThreads=short"虚拟线程与响应式编程的取舍
这个问题经常被问到:有了虚拟线程,还需要WebFlux吗?
简单回答:对于LLM调用这类典型场景,虚拟线程 + Spring MVC已经足够,不需要WebFlux。
但流式输出(SSE)场景是例外:
// 流式输出必须用响应式API,因为LLM是持续吐出token的
// 这里WebFlux的Flux更自然
@GetMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChat(@RequestParam String message) {
return chatClient.prompt()
.user(message)
.stream()
.content();
}流式场景仍然推荐用Spring WebFlux来处理——不是因为性能,而是因为SSE/WebSocket本身就是流式协议,Flux的语义更匹配。
决策树:
性能数据参考
用一段标准的压测代码,测一下虚拟线程在LLM调用场景下的实际效果:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class VirtualThreadBenchmark {
@Test
void benchmarkConcurrentLlmCalls() throws InterruptedException {
int concurrency = 50;
CountDownLatch latch = new CountDownLatch(concurrency);
AtomicLong totalTime = new AtomicLong(0);
AtomicInteger successCount = new AtomicInteger(0);
long start = System.currentTimeMillis();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < concurrency; i++) {
final int taskId = i;
executor.submit(() -> {
try {
long taskStart = System.currentTimeMillis();
// 模拟LLM调用
String result = chatClient.prompt()
.user("请用一句话解释什么是线程" + taskId)
.call()
.content();
totalTime.addAndGet(System.currentTimeMillis() - taskStart);
successCount.incrementAndGet();
} finally {
latch.countDown();
}
});
}
}
latch.await(120, TimeUnit.SECONDS);
long totalElapsed = System.currentTimeMillis() - start;
System.out.printf("并发数:%d%n", concurrency);
System.out.printf("总耗时:%dms%n", totalElapsed);
System.out.printf("成功次数:%d%n", successCount.get());
System.out.printf("平均单次耗时:%dms%n", totalTime.get() / successCount.get());
System.out.printf("等效吞吐量:%.1f req/s%n",
(double) successCount.get() / totalElapsed * 1000);
}
}在我们实际测试中(DeepSeek API,平均响应3秒),50个并发请求:
- 平台线程池(50线程):总耗时约8秒,因为队列等待
- 虚拟线程:总耗时约4秒(受限于API并发限制,不是线程限制)
虚拟线程对LLM应用的意义,不只是性能提升,更是编程模型的简化——你可以继续写熟悉的同步代码,不需要学习响应式编程的思维体系,就能获得高并发能力。这对以Spring MVC为主的Java团队来说,是一个非常实际的收益。
