AI应用的响应式编程:用WebFlux构建高吞吐量AI服务
AI应用的响应式编程:用WebFlux构建高吞吐量AI服务
一、200并发就卡死:一次让人崩溃的生产事故
2025年9月某个周一早晨9点,上海某金融科技公司的AI智能客服系统,在早高峰时段完全瘫痪。
系统工程师刘杰盯着监控屏幕,看到的是一张越来越陡的延迟曲线:
- 8:30:并发用户100人,平均响应时间 2.3秒(正常,LLM调用本来就慢)
- 8:45:并发用户160人,平均响应时间 8.7秒(开始变慢)
- 9:00:并发用户210人,系统请求全部超时,Tomcat线程池耗尽
告警邮件的内容让刘杰脸色发白:
CRITICAL: Thread pool exhausted
Available threads: 0/200
Request queue: 1847 requests waiting
Current memory: 14.2GB / 16GB
Average response time: 47s (timeout threshold: 30s)问题很清楚:Spring MVC + 传统线程池,处理AI这种高延迟(单次3-30秒)的场景,天然就是灾难。
200个Tomcat线程,每个线程在等待LLM响应时被阻塞30秒。第201个请求进来,只能在队列里等。
刘杰当时做了一个临时方案:把Tomcat线程池从200扩到500,服务勉强撑住了。但他知道这只是治标,而且增加线程意味着更多内存开销。
根本解决方案是:响应式编程。
三周后,刘杰带领团队完成了从Spring MVC到Spring WebFlux的迁移。重新压测结果:
| 指标 | Spring MVC | Spring WebFlux | 提升 |
|---|---|---|---|
| 最大并发支撑 | 200并发(线程池耗尽) | 2000并发(稳定运行) | 10倍 |
| 内存占用(2000并发) | OOM崩溃 | 1.8GB | 显著降低 |
| P99响应时间(100并发) | 3.2s | 2.8s | 12.5% |
| 线程数 | 500个工作线程 | 8个Event Loop线程 | 减少98% |
这篇文章,是刘杰的完整迁移方案和踩坑记录。
二、响应式编程的核心理念:为什么和AI天然契合
2.1 传统线程模型的本质问题
传统Spring MVC处理AI请求:
线程1: [→→→ 等待LLM响应(20秒) →→→] [处理响应(0.01秒)]
线程2: [→→→ 等待LLM响应(15秒) →→→] [处理响应(0.01秒)]
线程3: [→→→ 等待LLM响应(25秒) →→→] [处理响应(0.01秒)]
...
线程200: [被阻塞,等待前面的请求完成]
问题:线程在等待IO时什么都不做,纯粹浪费资源
200个线程全部阻塞 = 系统挂死响应式WebFlux处理AI请求:
Event Loop线程1:
t=0ms: 接受请求A → 发送LLM请求 → 注册回调 → 继续处理请求B
t=1ms: 接受请求B → 发送LLM请求 → 注册回调 → 继续处理请求C
t=2ms: 接受请求C → 发送LLM请求 → 注册回调 → 继续处理请求D
...
t=20000ms: 请求A的LLM响应到达 → 触发回调 → 发送响应给用户
t=20001ms: 请求B的LLM响应到达 → 触发回调 → 发送响应给用户
结论:1个Event Loop线程可以同时管理数千个并发请求
不等待 = 不浪费 = 高并发2.2 背压机制与AI流式输出的天然契合
AI流式输出(Streaming)的特点:LLM按token速度生成内容,客户端按网络速度消费内容。
问题:如果LLM生成速度 > 客户端消费速度,怎么办?
Reactor(WebFlux底层)内置了背压机制,当客户端消费慢时,可以自动向上游施加"慢下来"的信号,这和AI流式输出的场景完美契合。
2.3 核心概念速查
// Mono:0或1个元素的响应式流(对应AI的单次请求/响应)
Mono<String> singleResponse = chatClient.prompt()
.user("一次性回答我")
.call()
.toMono(); // 结果只有一个String
// Flux:0到N个元素的响应式流(对应AI的流式输出)
Flux<String> streamResponse = chatClient.prompt()
.user("请流式回答我")
.stream()
.content(); // 每个token都是一个元素三、Spring WebFlux + Spring AI集成
3.1 依赖配置
<!-- pom.xml -->
<dependencies>
<!-- WebFlux替代Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Spring AI OpenAI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<!-- 注意:不需要也不要引入spring-boot-starter-web -->
<!-- 两者同时存在时默认使用MVC -->
<!-- Reactor测试工具 -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>3.2 完整代码对比:MVC vs WebFlux
// ===== Spring MVC版本(阻塞式)=====
// 问题:每个请求占用一个Tomcat线程,等待LLM响应期间线程被阻塞
@RestController
@RequestMapping("/api/chat")
public class MvcChatController {
private final ChatClient chatClient;
// 阻塞式调用:线程被阻塞直到LLM返回
@PostMapping
public ResponseEntity<String> chat(@RequestBody ChatRequest request) {
// 这一行会阻塞线程15-30秒
String response = chatClient.prompt()
.user(request.getMessage())
.call()
.content();
return ResponseEntity.ok(response);
}
}
// ===== Spring WebFlux版本(非阻塞式)=====
// 优点:线程在等待LLM时释放,可以处理其他请求
@RestController
@RequestMapping("/api/chat")
public class WebFluxChatController {
private final ChatClient chatClient;
// 非阻塞调用:返回Mono,框架处理异步
@PostMapping
public Mono<ResponseEntity<String>> chat(@RequestBody ChatRequest request) {
return Mono.fromCallable(() ->
chatClient.prompt()
.user(request.getMessage())
.call()
.content()
)
// publishOn指定在哪个线程执行(LLM调用是阻塞IO,需要切换到有界弹性线程池)
.publishOn(Schedulers.boundedElastic())
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.internalServerError().build());
}
}重要说明:Spring AI的ChatClient内部是阻塞调用(HTTP调用OpenAI API)。在WebFlux中使用时,必须用publishOn(Schedulers.boundedElastic())切换到有界弹性线程池,否则会阻塞Event Loop线程,导致系统更差的性能表现。
四、Mono和Flux在AI场景的正确使用
4.1 Mono:单次AI请求
// AiService.java
@Service
@Slf4j
public class AiService {
private final ChatClient chatClient;
/**
* 单次问答(Mono)
* 适用:需要完整响应后再返回的场景(如翻译、摘要)
*/
public Mono<ChatResponse> singleChat(String sessionId, String message) {
return Mono.fromCallable(() -> {
log.debug("Starting LLM call, sessionId={}", sessionId);
return chatClient.prompt()
.user(message)
.call()
.content();
})
.publishOn(Schedulers.boundedElastic())
.map(content -> new ChatResponse(sessionId, content))
.doOnSuccess(resp -> log.debug("LLM call complete, sessionId={}", sessionId))
.doOnError(e -> log.error("LLM call failed, sessionId={}", sessionId, e))
// 超时控制
.timeout(Duration.ofSeconds(30))
// 错误转换
.onErrorMap(TimeoutException.class,
e -> new AiServiceException("AI服务超时,请稍后重试", e));
}
/**
* 带缓存的AI请求(相同问题不重复调用LLM)
*/
public Mono<String> cachedChat(String question) {
return Mono.fromCallable(() ->
chatClient.prompt().user(question).call().content()
)
.publishOn(Schedulers.boundedElastic())
.cache(Duration.ofMinutes(10)); // 相同Mono被订阅时复用缓存结果
}
}4.2 Flux:流式AI响应
/**
* 流式输出(Flux)
* 适用:需要逐步展示AI生成过程的场景
*/
public Flux<String> streamChat(String sessionId, String message) {
return Flux.from(
chatClient.prompt()
.user(message)
.stream()
.content()
)
// 添加元数据:每个chunk打上sessionId标签
.doOnNext(chunk ->
MDC.put("sessionId", sessionId)
)
// 错误处理:流式传输中途失败时
.onErrorResume(e -> {
log.error("Stream interrupted, sessionId={}", sessionId, e);
// 优雅降级:发送错误信息给用户,然后终止流
return Flux.just("\n\n[AI响应中断,请刷新后重试]");
})
// 完成时清理MDC
.doFinally(signal -> MDC.remove("sessionId"));
}
/**
* 并发多路AI请求(Flux.merge)
* 适用:同时向多个AI服务发请求,取最快的那个
*/
public Mono<String> fastestResponse(String question) {
Mono<String> gpt4Response = Mono.fromCallable(() ->
chatClient.prompt().user(question).call().content()
).publishOn(Schedulers.boundedElastic());
Mono<String> claudeResponse = Mono.fromCallable(() ->
anthropicClient.prompt().user(question).call().content()
).publishOn(Schedulers.boundedElastic());
// 取最快返回的结果
return Mono.firstWithValue(gpt4Response, claudeResponse)
.timeout(Duration.ofSeconds(30));
}五、流式输出:WebFlux实现SSE流式AI响应
5.1 服务端Controller(完整实现)
// StreamChatController.java
@RestController
@RequestMapping("/api/stream")
@Slf4j
public class StreamChatController {
private final AiService aiService;
/**
* SSE流式响应端点
* produces = TEXT_EVENT_STREAM_VALUE 告知客户端使用SSE协议
*/
@PostMapping(
value = "/chat",
produces = MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<ServerSentEvent<String>> streamChat(
@RequestBody StreamChatRequest request,
ServerHttpRequest httpRequest) {
String sessionId = extractSessionId(httpRequest);
log.info("New streaming request, sessionId={}, messageLen={}",
sessionId, request.getMessage().length());
return aiService.streamChat(sessionId, request.getMessage())
// 将每个chunk包装成SSE事件
.map(chunk -> ServerSentEvent.<String>builder()
.id(UUID.randomUUID().toString())
.event("message")
.data(chunk)
.build())
// 发送结束标记
.concatWith(Mono.just(ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()))
// 错误时发送错误事件
.onErrorResume(e -> {
log.error("Stream error, sessionId={}", sessionId, e);
return Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data(e.getMessage())
.build());
})
// 心跳:每5秒发送一次保活(防止代理超时断连)
.mergeWith(
Flux.interval(Duration.ofSeconds(5))
.map(tick -> ServerSentEvent.<String>builder()
.event("ping")
.data("keepalive")
.build())
.takeUntilOther(/* 当主流结束时停止心跳 */ Mono.never())
)
.doOnCancel(() -> log.info("Client disconnected, sessionId={}", sessionId));
}
/**
* 带进度的长任务AI处理
* 例如:分析一份100页的PDF文档
*/
@PostMapping(
value = "/analyze",
produces = MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<ServerSentEvent<ProgressEvent>> analyzeDocument(
@RequestBody AnalyzeRequest request) {
return Flux.create(sink -> {
// 发送进度:文档解析
sink.next(ServerSentEvent.builder(new ProgressEvent("parsing", 10, "正在解析文档...")).build());
String parsedContent = parseDocument(request.getDocumentUrl());
sink.next(ServerSentEvent.builder(new ProgressEvent("parsed", 30, "文档解析完成")).build());
// 发送进度:AI分析(流式)
sink.next(ServerSentEvent.builder(new ProgressEvent("analyzing", 40, "AI分析中...")).build());
// 这里需要订阅并桥接
aiService.streamChat("analyze-" + request.getDocumentId(), parsedContent)
.subscribe(
chunk -> sink.next(ServerSentEvent.builder(
new ProgressEvent("streaming", -1, chunk)).build()),
error -> sink.error(error),
() -> {
sink.next(ServerSentEvent.builder(
new ProgressEvent("done", 100, "分析完成")).build());
sink.complete();
}
);
});
}
}5.2 客户端代码(JavaScript/Java都给)
// 前端JavaScript客户端
class AiStreamClient {
constructor(baseUrl) {
this.baseUrl = baseUrl;
}
async streamChat(message, onChunk, onDone, onError) {
const response = await fetch(`${this.baseUrl}/api/stream/chat`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ message })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 解析SSE格式
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留不完整的最后一行
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
onDone();
return;
}
onChunk(data);
} else if (line.startsWith('event: error')) {
// 下一行是错误数据
} else if (line.startsWith('data: ') && previousEvent === 'error') {
onError(new Error(line.slice(6)));
return;
}
}
}
}
}
// 使用示例
const client = new AiStreamClient('http://localhost:8080');
const responseDiv = document.getElementById('response');
client.streamChat(
'请详细介绍一下Spring WebFlux',
chunk => responseDiv.textContent += chunk, // 逐token显示
() => console.log('Stream complete'),
err => console.error('Stream error:', err)
);// Java客户端(用于服务间调用)
@Component
public class AiStreamJavaClient {
private final WebClient webClient;
public AiStreamJavaClient(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("http://ai-service:8080")
.build();
}
/**
* 从Java客户端消费SSE流式响应
*/
public Flux<String> streamChat(String message) {
return webClient.post()
.uri("/api/stream/chat")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(Map.of("message", message))
.retrieve()
.bodyToFlux(ServerSentEvent.class)
.filter(sse -> "message".equals(sse.event()))
.map(sse -> (String) sse.data())
.filter(data -> !"[DONE]".equals(data));
}
/**
* 收集流式响应为完整字符串
*/
public Mono<String> streamChatCollected(String message) {
return streamChat(message)
.collect(Collectors.joining());
}
}六、背压处理:AI生产速度大于消费速度
6.1 背压策略详解
// BackpressureDemo.java
@Service
public class BackpressureHandlingService {
/**
* 策略1:缓冲(Buffer)
* 适用:客户端偶尔慢,但总体能跟上
* 风险:缓冲区满时OOM
*/
public Flux<String> bufferedStream(String message) {
return aiService.streamChat("session", message)
// 缓冲最多100个元素
.onBackpressureBuffer(100)
// 缓冲区满时丢弃新元素
.onBackpressureDrop(dropped ->
log.warn("Dropped token due to slow consumer: {}", dropped));
}
/**
* 策略2:最新值(Latest)
* 适用:只关心最新状态,中间过程不重要
* 不适用于AI流式输出(会丢失文字)
*/
public Flux<String> latestStream(String message) {
return aiService.streamChat("session", message)
.onBackpressureLatest();
}
/**
* 策略3:限速(Throttle)
* 适用:确保消费速度不超过某个上限
* AI流式场景推荐:防止客户端被淹没
*/
public Flux<String> throttledStream(String message) {
return aiService.streamChat("session", message)
// 每50ms最多发一个token给客户端
.sample(Duration.ofMillis(50));
}
/**
* 策略4:批量聚合(Window)
* 适用:把多个快速token聚合成一批再发送
* 减少网络包数量
*/
public Flux<String> batchedStream(String message) {
return aiService.streamChat("session", message)
// 每10个token或每100ms聚合一次
.bufferTimeout(10, Duration.ofMillis(100))
.map(tokens -> String.join("", tokens));
}
}6.2 生产环境的推荐背压策略
// ProductionStreamingController.java
@PostMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> productionStreamChat(
@RequestBody ChatRequest request) {
return aiService.streamChat(request.getSessionId(), request.getMessage())
// 生产推荐:缓冲256个token,满了就丢弃
// LLM的token很小(平均4-5字节),256个约1KB
.onBackpressureBuffer(
256,
BufferOverflowStrategy.DROP_OLDEST // 优先保留最新的token
)
// 批量聚合:减少SSE事件数量(20个token一批,或50ms一批)
.bufferTimeout(20, Duration.ofMillis(50))
.map(tokens -> String.join("", tokens))
.map(batch -> ServerSentEvent.<String>builder()
.data(batch)
.build())
.timeout(Duration.ofMinutes(5)); // 流式超时:5分钟
}七、错误处理:响应式链路中的AI异常处理
7.1 完整的错误处理策略
// ReactiveErrorHandlingService.java
@Service
@Slf4j
public class ReactiveErrorHandlingService {
private final ChatClient primaryChatClient;
private final ChatClient fallbackChatClient;
public Mono<String> robustChat(String message) {
return callPrimaryModel(message)
// 超时重试:最多3次,每次超时10秒
.retryWhen(
Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
// 只对网络错误和超时重试,不对业务错误重试
.filter(e -> e instanceof WebClientException ||
e instanceof TimeoutException)
.doBeforeRetry(retrySignal ->
log.warn("Retrying LLM call, attempt={}",
retrySignal.totalRetries() + 1))
)
// 主模型失败,切换到备用模型
.onErrorResume(
e -> !(e instanceof InvalidPromptException), // 参数错误不切换
e -> {
log.warn("Primary model failed, switching to fallback: {}", e.getMessage());
return callFallbackModel(message);
}
)
// 最终兜底:返回预设回答
.onErrorReturn("抱歉,AI服务暂时不可用,请稍后重试。")
// 记录最终错误
.doOnError(e -> log.error("All AI calls failed for message: {}", message, e));
}
/**
* 响应式错误转换:将技术异常转为业务异常
*/
public Mono<String> chatWithErrorMapping(String message) {
return Mono.fromCallable(() ->
primaryChatClient.prompt().user(message).call().content()
)
.publishOn(Schedulers.boundedElastic())
.onErrorMap(
ex -> ex.getMessage() != null && ex.getMessage().contains("429"),
ex -> new RateLimitException("AI服务请求频率超限,请1分钟后重试", ex)
)
.onErrorMap(
ex -> ex.getMessage() != null && ex.getMessage().contains("400"),
ex -> new InvalidPromptException("请求内容不符合规范", ex)
)
.onErrorMap(
ex -> ex.getMessage() != null && ex.getMessage().contains("503"),
ex -> new AiServiceUnavailableException("AI服务暂时不可用", ex)
);
}
/**
* 流式响应的错误恢复
* 流中途失败时,优雅地结束流而不是直接断开
*/
public Flux<String> resilientStreamChat(String message) {
return Flux.from(
primaryChatClient.prompt()
.user(message)
.stream()
.content()
)
.onErrorResume(e -> {
// 流中断时,发送一个错误提示token,然后结束
log.error("Stream interrupted: {}", e.getMessage());
return Flux.just("\n\n[响应被中断,这是截止目前的内容]");
})
// 超过3分钟的流式请求强制终止
.timeout(Duration.ofMinutes(3),
Flux.just("\n\n[响应超时,内容可能不完整]"));
}
}7.2 全局WebFlux异常处理器
// GlobalWebExceptionHandler.java
@Component
@Order(-2) // 优先级高于Spring默认处理器
public class GlobalWebExceptionHandler implements WebExceptionHandler {
private final ObjectMapper objectMapper;
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
ErrorResponse errorResponse;
if (ex instanceof RateLimitException) {
response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
response.getHeaders().add("Retry-After", "60");
errorResponse = new ErrorResponse("RATE_LIMIT", ex.getMessage());
} else if (ex instanceof InvalidPromptException) {
response.setStatusCode(HttpStatus.BAD_REQUEST);
errorResponse = new ErrorResponse("INVALID_PROMPT", ex.getMessage());
} else if (ex instanceof AiServiceUnavailableException) {
response.setStatusCode(HttpStatus.SERVICE_UNAVAILABLE);
errorResponse = new ErrorResponse("SERVICE_UNAVAILABLE", "AI服务暂时不可用");
} else {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
errorResponse = new ErrorResponse("INTERNAL_ERROR", "服务器内部错误");
}
try {
byte[] bytes = objectMapper.writeValueAsBytes(errorResponse);
DataBuffer buffer = response.bufferFactory().wrap(bytes);
return response.writeWith(Mono.just(buffer));
} catch (JsonProcessingException e) {
return Mono.error(e);
}
}
}八、并发控制:flatMap的并发参数与AI接口限速
8.1 批量AI请求的并发控制
// BatchAiProcessingService.java
@Service
@Slf4j
public class BatchAiProcessingService {
private final ChatClient chatClient;
// AI API的并发限制(根据实际API限制设置)
private static final int MAX_CONCURRENT_LLM_CALLS = 10;
/**
* 批量处理:对一批文档进行AI摘要
* 关键:用flatMap的concurrency参数控制并发数
*/
public Flux<DocumentSummary> batchSummarize(List<Document> documents) {
return Flux.fromIterable(documents)
// concurrency=10:最多同时发起10个LLM调用
// 不设置concurrency默认是256,会直接超出OpenAI的rate limit
.flatMap(doc -> summarizeDocument(doc), MAX_CONCURRENT_LLM_CALLS)
// 失败的文档记录错误,不影响其他文档处理
.onErrorContinue((e, doc) ->
log.error("Failed to summarize document: {}", doc, e));
}
private Mono<DocumentSummary> summarizeDocument(Document doc) {
return Mono.fromCallable(() ->
chatClient.prompt()
.user("请用3句话总结以下内容:\n" + doc.getContent())
.call()
.content()
)
.publishOn(Schedulers.boundedElastic())
.map(summary -> new DocumentSummary(doc.getId(), summary))
// 单个文档处理超时
.timeout(Duration.ofSeconds(30));
}
/**
* 令牌桶限速:确保不超过API的RPM限制
* 例如:OpenAI gpt-4o限制60 RPM = 1 request/second
*/
public Flux<String> rateLimitedBatchChat(List<String> messages) {
// 使用reactor-extra的InstrumentedFlux实现令牌桶
return Flux.fromIterable(messages)
// 每秒最多处理1个请求(适配60 RPM限制)
.delayElements(Duration.ofMillis(1100)) // 1100ms间隔确保不超限
.flatMap(msg -> Mono.fromCallable(() ->
chatClient.prompt().user(msg).call().content()
).publishOn(Schedulers.boundedElastic()), 5); // 同时最多5个并发
}
/**
* 并行多模型调用(取最快的)
*/
public Mono<String> parallelModelCall(String message) {
return Flux.merge(
callModel(message, "gpt-4o"),
callModel(message, "claude-3-5-sonnet"),
callModel(message, "gemini-1.5-pro")
)
// 取第一个成功的结果
.next()
.timeout(Duration.ofSeconds(30));
}
private Mono<String> callModel(String message, String model) {
return Mono.fromCallable(() ->
chatClient.prompt()
.user(message)
.call()
.content()
)
.publishOn(Schedulers.boundedElastic())
.onErrorResume(e -> Mono.empty()); // 失败不影响其他模型
}
}九、线程模型:理解WebFlux的Event Loop
9.1 线程模型图解
9.2 线程切换的最佳实践
// ThreadModelBestPractice.java
@Service
public class ThreadModelBestPractice {
/**
* 正确示例:阻塞操作切换到boundedElastic
*/
public Mono<String> correct_BlockingOperation(String input) {
return Mono.just(input)
// 在Event Loop中处理非阻塞逻辑
.map(s -> s.trim().toLowerCase())
// 切换到boundedElastic处理阻塞IO
.publishOn(Schedulers.boundedElastic())
.map(s -> {
// 这里可以做阻塞操作:LLM调用、JDBC、文件IO
return chatClient.prompt().user(s).call().content();
})
// 切回Event Loop处理响应序列化
.publishOn(Schedulers.parallel());
}
/**
* 错误示例:在Event Loop中做阻塞操作(不要这样做!)
*/
public Mono<String> wrong_BlockingOnEventLoop(String input) {
return Mono.just(input)
.map(s -> {
// 错误!在map中做阻塞操作,会阻塞Event Loop
// 这会导致整个Event Loop无法处理其他请求
return chatClient.prompt().user(s).call().content(); // 阻塞20秒!
});
}
/**
* 使用Mono.fromCallable(更符合Reactor风格)
*/
public Mono<String> recommended_FromCallable(String input) {
// fromCallable将阻塞代码包装为Mono,不立即执行
return Mono.fromCallable(() ->
chatClient.prompt().user(input).call().content()
)
// 订阅时在boundedElastic上执行
.subscribeOn(Schedulers.boundedElastic());
}
}9.3 线程调度参数配置
# application.yml - 响应式线程配置
spring:
webflux:
# 事件循环线程数(默认是CPU核心数*2,通常不需要修改)
# netty:
# worker-count: 16
# 自定义boundedElastic配置
reactor:
schedulers:
defaultBoundedElasticSize: 200 # 最大线程数(默认200*CPU核数,通常足够)
defaultBoundedElasticQueueSize: 1000 # 任务队列大小
# 连接超时配置
spring:
webflux:
# 读写超时
server:
netty:
connection-timeout: 5s
idle-timeout: 30s十、性能对比:WebFlux vs MVC在AI场景的基准测试
10.1 测试环境
硬件:8核16G,4核给应用,4核给测试工具
JVM:JDK 21,-Xms4g -Xmx4g -XX:+UseZGC
测试工具:wrk2(比wrk更精确的延迟测量)
测试场景:模拟LLM调用(固定延迟20秒,模拟真实AI请求)
测试命令:wrk2 -t4 -c{并发数} -d120s -R{RPS} http://localhost:8080/api/chat10.2 测试结果数据
场景1:低并发(50并发)
Spring MVC:
Throughput: 2.47 RPS
P50: 20.1s P99: 21.3s P999: 24.8s
Threads: 50 active
Memory: 1.2GB
Spring WebFlux:
Throughput: 2.49 RPS (+0.8%)
P50: 20.0s P99: 21.0s P999: 22.1s
Threads: 8 active
Memory: 0.9GB
结论:低并发下性能相当,WebFlux内存占用更低
场景2:中并发(200并发)
Spring MVC:
Throughput: 9.8 RPS
P50: 20.4s P99: 25.8s P999: 58.3s
Threads: 200 active(满载)
Memory: 3.8GB
Spring WebFlux:
Throughput: 9.9 RPS (+1%)
P50: 20.1s P99: 21.5s P999: 23.4s
Threads: 8+120 active(120个在boundedElastic)
Memory: 1.4GB (-63%)
结论:延迟稳定性WebFlux明显优于MVC(P999降低60%)
场景3:高并发(500并发)
Spring MVC:
线程池耗尽,大量请求超时
可用线程: 0/200
Throughput: 6.1 RPS(降级)
P99: 60s(超时)
错误率: 48%
Spring WebFlux:
Throughput: 24.8 RPS (比MVC+407%)
P50: 20.3s P99: 22.1s P999: 25.8s
Memory: 1.8GB
错误率: 0.01%
结论:高并发下WebFlux优势显著!MVC直接崩溃
场景4:极高并发(2000并发)
Spring MVC:
OOM,系统崩溃
Spring WebFlux:
Throughput: 97 RPS
P50: 20.8s P99: 23.4s P999: 31.2s
Memory: 2.1GB
错误率: 0.08%
结论:2000并发WebFlux正常运行,MVC OOM崩溃
这就是刘杰从200并发扩展到2000并发的关键10.3 流式响应专项测试
场景:1000并发流式SSE请求(每个持续约30秒)
Spring MVC(模拟流式:不可行):
1000个线程各阻塞30秒 = 需要1000线程
内存估算:1000线程 × 512KB栈 = 512MB(仅栈)+ 堆内存
结论:实际上无法支持
Spring WebFlux(SSE流式):
活跃连接: 1000
活跃线程: 8 Event Loop + 50 boundedElastic
Memory: 2.3GB
P99首字延迟: 1.2s(从请求到第一个token到达客户端)
吞吐量: 满载正常十一、迁移指南:从Spring MVC迁移到WebFlux
11.1 迁移检查清单
迁移前必读:
□ 确认所有依赖库支持响应式(JDBC需要换R2DBC,Feign需要换WebClient)
□ 识别阻塞代码(使用BlockHound检测)
□ 评估Servlet API的使用(WebFlux不支持HttpServletRequest)
□ 测试框架确认支持WebFlux(MockMvc需要换成WebTestClient)
常见迁移路径:
MVC → WebFlux:
@RestController → 保持不变
ResponseEntity → Mono<ResponseEntity>
List<T> → Flux<T>
HttpServletRequest → ServerHttpRequest
RestTemplate → WebClient
JdbcTemplate → R2dbcEntityTemplate
@Transactional → @Transactional(Reactive版)11.2 渐进式迁移策略
// 阶段1:不修改Controller,只在Service层引入响应式
// 这样可以逐步迁移,降低风险
@RestController
public class HybridController {
private final ReactiveAiService reactiveAiService;
// 保持MVC风格,内部用响应式
@PostMapping("/chat")
public ResponseEntity<String> chat(@RequestBody ChatRequest request) {
// block()将响应式转为阻塞(迁移过渡期可以用)
// 注意:这样无法获得响应式的高并发优势,只是迁移的中间状态
String response = reactiveAiService.chat(request.getMessage()).block();
return ResponseEntity.ok(response);
}
}
// 阶段2:Controller也改为响应式
@RestController
public class FullReactiveController {
@PostMapping("/chat")
public Mono<ResponseEntity<String>> chat(@RequestBody ChatRequest request) {
return reactiveAiService.chat(request.getMessage())
.map(ResponseEntity::ok);
}
}十二、FAQ
Q1:WebFlux学习曲线很陡,值得学吗?
对于AI服务场景,值得。AI请求的特点(高延迟、流式输出)和WebFlux的优势(非阻塞、背压)完美匹配。如果你的AI服务需要支持超过50并发,WebFlux几乎是必选项。学习曲线确实陡,但核心概念(Mono/Flux/publishOn/onError)学会后,其他都是变体。
Q2:WebFlux能和Spring Data JPA一起用吗?
不能。JPA是阻塞式的,和WebFlux的非阻塞模型冲突。需要使用Spring Data R2DBC替代。但迁移成本高,可以用Mono.fromCallable + publishOn(boundedElastic)的方式包装JPA调用,作为过渡方案。
Q3:如何调试响应式代码?
使用.log()操作符打印流事件。使用reactor.tools.agent(BlockHound)检测意外的阻塞调用。IntelliJ IDEA对Reactor的调试支持也越来越好,建议开启"Debug Mode"。
Q4:WebFlux项目如何写单元测试?
使用StepVerifier和WebTestClient:
@Test
void testStreamChat() {
StepVerifier.create(aiService.streamChat("session", "Hello"))
.expectNextMatches(chunk -> !chunk.isEmpty())
.expectComplete()
.verify(Duration.ofSeconds(30));
}Q5:什么情况下不应该用WebFlux?
团队没有响应式经验、时间紧、系统并发量低(<50并发)的情况下不需要用。WebFlux增加了代码复杂度,如果并发量不高,MVC的简单性更有价值。只有在并发量大、需要流式输出的AI服务场景,WebFlux的优势才能体现。
十三、总结
刘杰的故事给了我们一个清晰的结论:AI服务的高延迟特性,让传统Spring MVC的线程模型成为了并发瓶颈的放大器。
WebFlux的核心价值不是"更快",而是"更高效地使用资源":
- 用8个Event Loop线程管理2000个并发连接
- 流式SSE天然支持背压,防止慢客户端撑爆内存
- 响应式链路统一了超时、重试、降级的写法
学习WebFlux的正确顺序:理解响应式概念(Mono/Flux)→ 学会线程切换(publishOn/subscribeOn)→ 掌握错误处理(onError/retry)→ 理解背压(onBackpressure)→ 性能调优。
从200并发到2000并发,本质上是从"一个请求一个线程"到"一个线程管理N个请求"的思维转变。掌握了这个转变,AI服务的并发问题就迎刃而解了。
