AI工程师并发编程:异步AI调用的线程模型与实战
AI工程师并发编程:异步AI调用的线程模型与实战
适读人群:有1-5年Java开发经验,想向AI工程师方向转型的开发者 阅读时长:约17分钟 文章价值:① 搞清楚AI场景下各种线程模型的本质差异 ② 掌握Virtual Thread + CompletableFuture + WebFlux的选型逻辑 ③ 拿到一套高吞吐AI服务的并发编程实战代码
有个朋友小陈,Java做了四年,最近在面AI岗。前几天来问我:
"老张,面试官问我,如果同时有1000个用户发AI请求,你怎么处理?我说用线程池,他摇头了。"
我问他怎么回答的。
"我说建一个固定大小200的线程池,每个请求占一个线程,等LLM返回后释放。"
"嗯,你这个思路没错,但AI场景有个特殊性——LLM调用时间很长,线程会阻塞很久。200个线程被锁死了,第201个请求进来,排队。这不是线程池的问题,是你在用同步线程模型处理天生异步的工作负载。"
小陈沉默了一秒:"那怎么答?"
这篇文章就是我给他整理的答案。
为什么AI场景需要特殊的并发模型
先看一组对比数据:
| 操作类型 | 典型延迟 | 线程占用时间 |
|---|---|---|
| 内存读取 | 0.0001ms | 忽略不计 |
| Redis查询 | 0.1-1ms | 极短 |
| 数据库查询 | 1-50ms | 短暂 |
| HTTP接口调用 | 10-200ms | 中等 |
| LLM调用 | 1000-30000ms | 极长 |
LLM调用的线程占用时间是普通HTTP调用的100倍。用传统的"一请求一线程"模式,服务器顶多扛住几十个并发就撑不住了。
核心问题不是并发高,是线程利用率太低——线程大部分时间在等LLM返回,什么也没做。
解法分三个方向:
- Virtual Thread(Java 21):线程变轻量,可以开几万个,等待时挂起,不占OS线程
- CompletableFuture + 自定义线程池:异步编程,但仍然是平台线程
- WebFlux + Reactor:全响应式,彻底非阻塞
三种方案深度对比
| 维度 | Virtual Thread | CompletableFuture | WebFlux |
|---|---|---|---|
| 代码复杂度 | 低(同步写法) | 中等 | 高(响应式思维转变) |
| 性能天花板 | 高 | 中等 | 最高 |
| 调试难度 | 低 | 中等 | 高 |
| 适用场景 | 新项目/Spring Boot 3.2+ | 已有项目改造 | 超高并发/流式场景 |
| 与Spring AI集成 | 最简单 | 简单 | 需要适配 |
代码实战
方案一:Virtual Thread(推荐,Spring Boot 3.2+)
Java 21的虚拟线程是处理AI调用最优雅的方案。写同步代码,获得异步性能。
# Spring Boot 3.2+ 开启虚拟线程,一行配置搞定
spring:
threads:
virtual:
enabled: true@RestController
@RequestMapping("/api/v1/ai")
@RequiredArgsConstructor
@Slf4j
public class VirtualThreadAiController {
private final ChatClient chatClient;
/**
* 开启虚拟线程后,每个请求都在独立的虚拟线程中执行
* 代码完全同步风格,但底层是虚拟线程,等待期间不占OS线程
*/
@PostMapping("/chat")
public String chat(@RequestBody ChatRequest request) {
log.info("当前线程:{},是否虚拟线程:{}",
Thread.currentThread().getName(),
Thread.currentThread().isVirtual()); // 应该是 true
// 直接同步调用,代码极简
// 等待LLM期间,虚拟线程被挂起,OS线程被释放去处理其他请求
return chatClient.prompt()
.user(request.getMessage())
.call()
.content();
}
/**
* 批量并发调用:同时向LLM发多个请求
* 虚拟线程让这个操作变得非常简单
*/
@PostMapping("/batch")
public List<String> batchChat(@RequestBody List<String> prompts) {
// 用虚拟线程池并发执行所有请求
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = prompts.stream()
.map(prompt -> executor.submit(() ->
chatClient.prompt()
.user(prompt)
.call()
.content()
))
.toList();
return futures.stream()
.map(f -> {
try {
return f.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("批量请求失败", e);
return "处理失败: " + e.getMessage();
}
})
.toList();
}
}
}方案二:CompletableFuture + 专用线程池
适合已有Spring MVC项目的改造,不需要升级到Java 21。
@Configuration
public class AiThreadPoolConfig {
/**
* AI专用线程池:核心参数需要根据LLM延迟调优
*
* 计算公式:
* 线程数 = 目标QPS × LLM平均延迟(s) × 1.2(预留20%余量)
* 例:目标QPS=100,平均延迟10s → 线程数 = 100 × 10 × 1.2 = 1200
* 但实际上VirtualThread更合适,Platform Thread不要超过1000
*/
@Bean("aiExecutor")
public ExecutorService aiExecutor() {
return new ThreadPoolExecutor(
50, // corePoolSize
200, // maxPoolSize
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500), // 队列容量500
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "ai-worker-" + count.getAndIncrement());
t.setDaemon(true);
return t;
}
},
// 队列满了,抛异常(不要用CallerRunsPolicy,会阻塞HTTP线程)
new ThreadPoolExecutor.AbortPolicy()
);
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class AsyncAiService {
private final ChatClient chatClient;
@Qualifier("aiExecutor")
private final ExecutorService aiExecutor;
/**
* 返回CompletableFuture,Controller可以直接返回,Tomcat线程立即释放
*/
public CompletableFuture<AiResponse> chatAsync(String userId, String prompt) {
return CompletableFuture
// 在AI线程池里执行LLM调用
.supplyAsync(() -> {
log.info("开始LLM调用,userId={}, thread={}",
userId, Thread.currentThread().getName());
return chatClient.prompt()
.user(prompt)
.call()
.content();
}, aiExecutor)
// 结果处理(可以在另一个线程池里做,避免阻塞AI线程池)
.thenApply(content -> AiResponse.builder()
.userId(userId)
.content(content)
.timestamp(LocalDateTime.now())
.build())
// 统一异常处理
.exceptionally(e -> {
log.error("AI调用失败,userId={}", userId, e);
return AiResponse.builder()
.userId(userId)
.content("服务暂时不可用,请稍后重试")
.error(true)
.build();
});
}
/**
* 并发处理多个Prompt,等全部完成
*/
public CompletableFuture<List<String>> chatAllAsync(List<String> prompts) {
List<CompletableFuture<String>> futures = prompts.stream()
.map(prompt -> CompletableFuture.supplyAsync(() ->
chatClient.prompt().user(prompt).call().content(), aiExecutor))
.toList();
// allOf等全部完成,thenApply收集结果
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(f -> f.getNow("处理失败"))
.toList());
}
/**
* anyOf:多个模型竞速,用最快那个的结果
*/
public CompletableFuture<String> chatWithRacing(String prompt,
ChatClient gpt4o,
ChatClient claude) {
CompletableFuture<String> gptFuture = CompletableFuture.supplyAsync(
() -> gpt4o.prompt().user(prompt).call().content(), aiExecutor);
CompletableFuture<String> claudeFuture = CompletableFuture.supplyAsync(
() -> claude.prompt().user(prompt).call().content(), aiExecutor);
// 谁先返回用谁
return (CompletableFuture<String>) CompletableFuture.anyOf(gptFuture, claudeFuture);
}
}
// Controller:直接返回CompletableFuture
@RestController
@RequiredArgsConstructor
public class AsyncController {
private final AsyncAiService asyncAiService;
@PostMapping("/chat")
public CompletableFuture<AiResponse> chat(
@RequestBody ChatRequest request,
@AuthenticationPrincipal UserDetails user) {
// 直接返回Future,Servlet 3.x会挂起请求,不占Tomcat线程
return asyncAiService.chatAsync(user.getUsername(), request.getMessage());
}
}方案三:流式响应 + SSE(最佳用户体验)
LLM天然支持流式输出,让用户看到"打字机效果",而不是干等。
@RestController
@RequestMapping("/api/stream")
@RequiredArgsConstructor
@Slf4j
public class StreamAiController {
private final ChatClient chatClient;
/**
* Server-Sent Events 流式返回
* 用户看到AI一个字一个字打出来,体验好很多
*/
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(
@RequestParam String prompt,
@RequestParam(defaultValue = "false") boolean useVirtualThread) {
return chatClient.prompt()
.user(prompt)
.stream()
.content() // 返回Flux<String>,每个元素是一个token
.map(token -> ServerSentEvent.<String>builder()
.data(token)
.build())
.doOnComplete(() -> log.info("流式响应完成"))
.doOnError(e -> log.error("流式响应出错", e))
// 遇到错误返回错误事件,不中断连接
.onErrorReturn(ServerSentEvent.<String>builder()
.event("error")
.data("AI服务暂时不可用")
.build());
}
}线程模型选择决策树
回到小陈的面试题,正确答案应该是:
"1000个并发AI请求,我会用Java 21虚拟线程配合Spring Boot 3.2。每个请求在独立虚拟线程里同步等待LLM,等待期间虚拟线程挂起不占OS线程,OS线程可以继续处理其他请求。这样同时处理数千个AI请求完全没问题。如果需要流式返回,额外用SSE+WebFlux做推送层。"
面试官点头了。
