第1693篇:Spring Boot 3.x响应式编程与AI——WebFlux + 流式输出的背压处理
第1693篇:Spring Boot 3.x响应式编程与AI——WebFlux + 流式输出的背压处理
我做了一个不太成功的技术决策,值得在这里聊聊。
那是我们第一次做AI流式输出的时候,技术选型上我坚持用WebFlux。理由很充分:Reactive天然支持流式,背压机制能防止内存溢出,一切看起来很完美。
结果呢?代码写得极其复杂,调试困难到让人崩溃,一个简单的"把大模型的SSE流转发给前端"的功能,写了200多行代码,还反复出BUG。最后新来的同事看代码,直接问我:"这是什么语言?"
所以今天讲WebFlux在AI场景的应用,我会同时告诉你什么时候该用,什么时候别用,以及真正用的时候那些背压处理的坑怎么过。
SSE流式输出:AI应用的核心场景
大模型的流式输出是现在最主流的交互方式。ChatGPT那种字一个一个蹦出来的效果,背后就是Server-Sent Events(SSE)。
从技术实现角度,大模型API那边返回的是一个HTTP流,应用层需要:
- 建立并维持到大模型API的HTTP连接
- 实时读取流式Token
- 转发给前端浏览器(另一个SSE连接)
- 在流的中间可能还要做处理(过滤、插入元数据等)
这是一个双向流的场景:上游(大模型API)→ 应用层 → 下游(浏览器)。
这个场景有几个特殊的技术挑战:
流量不对称:大模型有时候吐Token很快(轻量模型),有时候很慢(复杂推理),流速不稳定。
背压需求:如果浏览器(前端)读取速度慢(比如网络差),或者前端用户已经关闭了页面,应用层不应该继续消费大模型API的响应——因为那是按Token计费的!
状态管理:流式输出期间,应用层需要维护会话状态、累积完整响应用于后续保存。
WebFlux的基本方式
先把正确的WebFlux流式实现写出来,再讲背压处理。
@RestController
@RequestMapping("/api/chat")
public class ChatStreamController {
private final LLMStreamClient llmClient;
private final ConversationRepository conversationRepo;
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(
@RequestParam String sessionId,
@RequestParam String message,
ServerHttpResponse response) {
// 设置SSE相关响应头
response.getHeaders().set("Cache-Control", "no-cache");
response.getHeaders().set("X-Accel-Buffering", "no"); // 禁止Nginx缓冲
// 用于累积完整回复(后续保存用)
StringBuilder fullResponse = new StringBuilder();
return Flux.defer(() -> {
// defer确保每次订阅都重新执行(支持重试)
return llmClient.streamCompletion(sessionId, message)
.map(chunk -> {
fullResponse.append(chunk.getContent());
return ServerSentEvent.<String>builder()
.id(chunk.getIndex().toString())
.event("message")
.data(chunk.getContent())
.build();
})
.concatWith(
// 流结束后,发送一个done事件
Flux.just(ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build())
)
.doOnComplete(() -> {
// 保存完整回复(在流完成后异步执行)
saveCompletedResponse(sessionId, message, fullResponse.toString())
.subscribe(); // 注意:这里用异步,不阻塞主流
})
.doOnError(e -> log.error("流式输出失败, sessionId={}", sessionId, e))
.doOnCancel(() -> {
// 用户取消时(关闭页面),记录日志
log.info("客户端取消订阅, sessionId={}", sessionId);
});
});
}
private Mono<Void> saveCompletedResponse(String sessionId, String userMsg, String aiReply) {
return Mono.fromRunnable(() -> {
conversationRepo.save(sessionId, userMsg, aiReply);
}).subscribeOn(Schedulers.boundedElastic()) // 在I/O线程执行
.then();
}
}背压:最容易被忽视的问题
背压(Backpressure)是响应式编程的核心概念,但很多人在AI场景里没意识到这个问题有多重要。
背压场景一:前端网络慢,应用层缓冲暴涨
想象这个场景:大模型每秒输出100个Token,但用户的手机网络很差,SSE数据发不出去,数据就堆在应用层的内存缓冲区里。如果有1000个这样的用户同时在线,内存消耗会爆炸。
WebFlux的默认行为是有一个内部缓冲区,满了会尝试背压。但问题在于,HTTP SSE本身不支持背压(HTTP/1.1是推模型),你拿不到"前端消费速度"这个信号。
实际上我们能做的是:超时控制 + 取消检测。
@GetMapping(value = "/stream/v2", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChatV2(
@RequestParam String sessionId,
@RequestParam String message,
ServerHttpRequest request) {
// 当连接断开时,request.getLocalAddress()等信息会变化
// 更可靠的方式是监听连接关闭事件
return llmClient.streamCompletion(sessionId, message)
.map(chunk -> ServerSentEvent.<String>builder()
.data(chunk.getContent())
.build())
// 背压策略一:每个Token最多等待200ms发出去,超时就丢弃
// 注意:这会导致用户看到不完整输出,要根据业务决定
.timeout(Duration.ofMillis(200), Mono.empty())
// 背压策略二:限制整个流的最大持续时间(防止僵尸连接)
.take(Duration.ofMinutes(3))
// 背压策略三:限制最大Token数量
.take(2000)
// 当连接断开时,Flux自动取消(WebFlux框架会处理这个)
;
}背压场景二:大模型API限速,应用层需要缓冲
反向的情况:大模型API有速率限制,返回数据时快时慢,但前端连接要一直保持alive,不能因为上游暂时没数据就断开。
// 保持心跳,防止前端SSE连接超时断开
public Flux<ServerSentEvent<String>> streamWithHeartbeat(String sessionId, String message) {
Flux<ServerSentEvent<String>> dataStream = llmClient.streamCompletion(sessionId, message)
.map(chunk -> ServerSentEvent.<String>builder()
.event("message")
.data(chunk.getContent())
.build());
// 每15秒发一个心跳,防止浏览器SSE超时(默认30s)
Flux<ServerSentEvent<String>> heartbeat = Flux.interval(Duration.ofSeconds(15))
.map(tick -> ServerSentEvent.<String>builder()
.event("heartbeat")
.data("ping")
.comment("keep-alive")
.build());
// 合并数据流和心跳流,心跳是补充,不影响数据
// 用mergeWith而不是zip,因为两个流速率不同
return dataStream.mergeWith(heartbeat)
.takeUntil(event -> "[DONE]".equals(event.data())); // 大模型返回DONE时停止心跳
}背压场景三:多个大模型并发调用的流合并
有些场景需要同时调用多个模型,把结果合并输出。比如一个对比分析的功能,同时问GPT-4和Claude,把两个回答都流式展示。
public Flux<ServerSentEvent<String>> compareModels(String prompt) {
// 两个模型的流
Flux<String> gpt4Stream = openAIClient.stream(prompt)
.map(chunk -> "GPT-4: " + chunk);
Flux<String> claudeStream = claudeClient.stream(prompt)
.map(chunk -> "Claude: " + chunk);
// 方案一:交替合并(哪个有数据就发哪个)
Flux<String> merged = Flux.merge(gpt4Stream, claudeStream);
// 方案二:顺序合并(先等GPT-4完成,再显示Claude)
Flux<String> sequential = gpt4Stream.concatWith(claudeStream);
// 方案三:最快的优先
Flux<String> fastest = Flux.firstWithValue(gpt4Stream.next(), claudeStream.next())
.flatMapMany(first -> {
// 用返回最快结果的那个模型的完整流
return first.startsWith("GPT-4") ? gpt4Stream : claudeStream;
});
return merged.map(text -> ServerSentEvent.<String>builder()
.data(text)
.build());
}错误处理:流中间出错怎么办
大模型API在流式输出一半的时候突然报错,这是真实会发生的情况(API限速、网络闪断、模型服务抖动)。
WebFlux的错误处理有几种策略:
public Flux<ServerSentEvent<String>> robustStream(String sessionId, String message) {
return llmClient.streamCompletion(sessionId, message)
// 策略一:遇到特定错误重试(适用于瞬时故障)
.retryWhen(Retry.backoff(3, Duration.ofMillis(500))
.filter(e -> e instanceof TransientApiException)
.jitter(0.3) // 加随机抖动,避免雪崩
.onRetryExhaustedThrow((spec, signal) ->
new ChatException("重试耗尽", signal.failure())))
// 策略二:出错时发送错误事件通知前端(而不是直接断开)
.onErrorResume(e -> {
log.error("流式输出中断", e);
return Flux.just(
new TokenChunk("[错误:AI响应中断,请重试]", -1, true)
);
})
// 转成SSE
.map(chunk -> ServerSentEvent.<String>builder()
.event(chunk.isError() ? "error" : "message")
.data(chunk.getContent())
.build());
}一个真实的踩坑经历
前面说过我在WebFlux上栽过跟头,具体说一下哪里出了问题。
当时我们的实现里,有一段代码大概是这样的:
// 有BUG的版本
public Flux<String> streamWithSave(String sessionId, String message) {
List<String> tokens = new ArrayList<>(); // 坑1:非线程安全
return llmClient.streamCompletion(sessionId, message)
.map(chunk -> {
tokens.add(chunk.getContent()); // 坑2:在响应式管道里操作共享状态
return chunk.getContent();
})
.doOnComplete(() -> {
String full = String.join("", tokens);
// 坑3:doOnComplete里执行阻塞操作
conversationRepo.save(sessionId, message, full); // 这是阻塞DB调用!
});
}三个坑:
ArrayList不是线程安全的,在响应式流里多线程可能同时写- 在响应式管道里操作外部可变状态,违反了响应式编程的原则
doOnComplete里执行阻塞操作,会阻塞调度器线程,影响整个应用的响应能力
正确写法:
public Flux<String> streamWithSave(String sessionId, String message) {
return llmClient.streamCompletion(sessionId, message)
.map(TokenChunk::getContent)
// 用scan累积,而不是外部可变状态
.scan(new StringBuilder(), (sb, token) -> {
sb.append(token);
return sb;
})
// 只在流完成时做一次保存
.last()
.flatMapMany(sb -> {
String fullResponse = sb.toString();
// 保存操作切到I/O线程执行
return Mono.fromRunnable(() -> conversationRepo.save(sessionId, message, fullResponse))
.subscribeOn(Schedulers.boundedElastic())
.thenMany(
// 把累积的完整响应拆回Token序列(这里只是为了演示,实际上你得缓存tokens)
Flux.just(fullResponse)
);
});
}实际上这里有个矛盾:你想实时流式输出Token,又想在完成后保存完整回复,用scan的方式会导致输出延迟(因为scan需要完整流才能输出last)。
更好的解法是分离两个关注点:
public Flux<String> streamWithSaveV2(String sessionId, String message) {
// 热流:把大模型流转成热流,可以被多次订阅
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
// 订阅一:实时输出给前端
// 订阅二:累积完整响应并保存
llmClient.streamCompletion(sessionId, message)
.map(TokenChunk::getContent)
.subscribe(
token -> sink.tryEmitNext(token),
error -> sink.tryEmitError(error),
() -> sink.tryEmitComplete()
);
Flux<String> hotStream = sink.asFlux();
// 支流:累积并保存
hotStream
.collect(Collectors.joining())
.flatMap(full ->
Mono.fromRunnable(() -> conversationRepo.save(sessionId, message, full))
.subscribeOn(Schedulers.boundedElastic()))
.subscribe();
// 主流:实时输出
return hotStream;
}这样两个订阅者共享同一个数据源,不需要调用两次大模型API。
WebFlux vs 虚拟线程:该选哪个?
这是个经常被问到的问题。我的回答是:对于AI流式输出,大多数情况下虚拟线程的同步写法更合适。
WebFlux的优势场景:
- 你的团队对响应式有深入理解
- 需要复杂的流操作(多流合并、背压控制等)
- 已有成熟的响应式基础设施
虚拟线程的优势场景:
- 团队对同步代码更熟悉
- 代码可读性优先
- 快速开发迭代
其实Spring Boot 3.2+里,用虚拟线程也可以做流式输出:
// 用虚拟线程 + 同步写法实现SSE
@GetMapping(value = "/stream/simple", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamSimple(@RequestParam String sessionId, @RequestParam String message) {
SseEmitter emitter = new SseEmitter(300_000L); // 5分钟超时
// 虚拟线程执行,不阻塞Carrier Thread
Thread.ofVirtual().start(() -> {
try {
StringBuilder fullResponse = new StringBuilder();
// 同步迭代流式响应
for (TokenChunk chunk : llmClient.streamCompletionSync(sessionId, message)) {
fullResponse.append(chunk.getContent());
emitter.send(SseEmitter.event()
.name("message")
.data(chunk.getContent()));
if (chunk.isDone()) break;
}
// 保存完整回复
conversationRepo.save(sessionId, message, fullResponse.toString());
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}这段代码和WebFlux版本相比,可读性高太多,功能完全一样。虚拟线程保证了它不会阻塞Carrier Thread,I/O等待期间Carrier Thread可以执行其他虚拟线程。
测量:到底哪个更快
我做了一个真实的A/B测试,场景:模拟大模型输出1000个Token,每个Token间隔10ms,100个并发用户。
结果:
| 指标 | WebFlux | 虚拟线程 |
|---|---|---|
| 第一个Token延迟(P50) | 48ms | 51ms |
| 第一个Token延迟(P99) | 89ms | 82ms |
| 完整响应延迟(P50) | 10.2s | 10.3s |
| 完整响应延迟(P99) | 11.1s | 10.9s |
| 内存占用 | 234MB | 198MB |
| CPU使用率 | 12% | 10% |
两者性能差距非常小,在误差范围内。对于AI流式输出这种I/O密集型场景,瓶颈在网络和大模型API,不在应用层的线程模型。
总结
WebFlux在AI流式输出场景里是可用的,但不是必须的。核心要点:
- 背压处理是关键,要处理前端慢消费、连接断开、上游限速等场景
- 错误处理要在流的中间层处理,不能让一个Token的错误搞挂整个流
- 状态管理要用函数式方式(scan、collect),不要在响应式管道里操作外部可变状态
- 阻塞操作必须切换到
boundedElastic调度器执行,不能在默认调度器里阻塞
如果你的团队对WebFlux不熟,虚拟线程+SseEmitter是一个更安全的技术选择,性能上没有明显差距。
