第1906篇:Spring AI的流式输出与WebFlux——响应式流的背压与错误处理
第1906篇:Spring AI的流式输出与WebFlux——响应式流的背压与错误处理
流式输出这个需求,几乎每个 AI 项目都会遇到。道理很简单:等 LLM 把几百个 token 全部生成完再一次性返回,用户盯着空白页面等三五秒,体验很差。逐字输出让用户感知到"AI 正在思考、正在回答",这种感觉完全不同。
在 Spring 生态里做流式输出,自然要用到 WebFlux。但 WebFlux 的响应式编程模型对很多习惯了命令式代码的同学来说是个坎。更麻烦的是,流式场景下的背压处理和错误处理有一套和同步调用完全不一样的规则。
这篇文章就把这套东西讲清楚,从基础的流式 API 用法,到背压机制的工程实践,到错误处理的各种姿势,尽量结合我踩过的坑来讲。
流式输出的基本结构
先看 Spring AI 的流式调用是什么样的:
@Service
public class StreamChatService {
@Autowired
private ChatClient chatClient;
/**
* 返回 Flux<String>,每个元素是一个 token 片段
*/
public Flux<String> streamChat(String sessionId, String userMessage) {
return chatClient.prompt()
.user(userMessage)
.advisors(a -> a.param(
AbstractChatMemoryAdvisor.CHAT_MEMORY_CONVERSATION_ID_KEY,
sessionId))
.stream()
.content(); // 直接返回内容字符串的流
}
/**
* 返回 Flux<ChatResponse>,包含完整的元数据
*/
public Flux<ChatResponse> streamChatWithMetadata(String userMessage) {
return chatClient.prompt()
.user(userMessage)
.stream()
.chatResponse(); // 返回完整响应对象
}
}控制器层直接把 Flux 返回给客户端,配合 SSE(Server-Sent Events)或者 text/event-stream:
@RestController
@RequestMapping("/ai")
public class StreamController {
@Autowired
private StreamChatService streamChatService;
/**
* SSE 流式输出
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(
@RequestParam String sessionId,
@RequestParam String message) {
return streamChatService.streamChat(sessionId, message)
.map(content -> ServerSentEvent.<String>builder()
.id(UUID.randomUUID().toString())
.event("message")
.data(content)
.build())
// 发送完成信号
.concatWith(Mono.just(ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()))
// 错误处理
.onErrorResume(e -> {
log.error("流式输出异常", e);
return Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data("服务异常,请重试")
.build());
});
}
/**
* 纯文本流式输出(更简单,但客户端处理稍复杂)
*/
@PostMapping(value = "/stream/text",
consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamText(@RequestBody ChatRequest request) {
return streamChatService.streamChat(request.getSessionId(), request.getMessage());
}
}背压(Backpressure):被忽视的重要机制
背压是响应式编程里很核心的概念,但大多数入门教程蜻蜓点水,导致很多人在生产环境遇到问题才意识到它的重要性。
背压的核心问题:LLM 产出 token 的速度,和下游消费(网络传输、客户端渲染)的速度,不一定匹配。如果 LLM 吐字很快,而客户端或者中间网络比较慢,就会在某个缓冲区积压数据,最终可能导致 OOM 或者请求超时。
Spring AI 返回的 Flux<String> 是一个"冷流",天然支持背压协议。问题往往出在中间的操作符和下游处理上。
来看一个典型的背压场景处理:
@Service
public class BackpressureAwareStreamService {
@Autowired
private ChatClient chatClient;
/**
* 带限速的流式输出
* 适用场景:需要控制 token 输出速度(比如模拟打字机效果)
*/
public Flux<String> rateLimitedStream(String message, int tokenPerSecond) {
return chatClient.prompt()
.user(message)
.stream()
.content()
// 限制流速:每秒最多输出指定数量的 token
.delayElements(Duration.ofMillis(1000 / tokenPerSecond))
// 使用 publishOn 指定下游在独立线程池处理,避免阻塞 Netty IO 线程
.publishOn(Schedulers.boundedElastic());
}
/**
* 带缓冲的流式输出
* 适用场景:客户端处理能力弱,需要批量接收
*/
public Flux<String> bufferedStream(String message) {
return chatClient.prompt()
.user(message)
.stream()
.content()
// 每 5 个 token 或者 200ms 汇聚一次
.bufferTimeout(5, Duration.ofMillis(200))
.map(tokens -> String.join("", tokens));
}
/**
* 带背压策略的流式输出
* 适用场景:下游可能跟不上速度,需要丢弃或者缓冲
*/
public Flux<String> withBackpressureStrategy(String message) {
return chatClient.prompt()
.user(message)
.stream()
.content()
// 订阅时使用 BUFFER 策略(默认),缓冲来不及消费的元素
// 当缓冲区满时会切换到 DROP(丢弃)
.onBackpressureBuffer(
256, // 缓冲区大小
dropped -> log.warn("背压触发,丢弃 token: {}", dropped),
BufferOverflowStrategy.DROP_OLDEST
);
}
}流式场景下的错误处理
流的错误处理比普通同步调用复杂,因为错误可能在流进行到一半时才发生,这时候客户端已经收到了部分内容。
错误的几种类型和处理策略:
@Service
public class ResilientStreamService {
@Autowired
private ChatClient chatClient;
@Autowired
private FallbackChatClient fallbackClient;
/**
* 带重试的流式调用
* 注意:重试只对启动流之前的错误有效,流开始后的错误重试会重新开始
*/
public Flux<String> streamWithRetry(String message) {
return Flux.defer(() ->
chatClient.prompt()
.user(message)
.stream()
.content()
)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.filter(e -> isRetryableError(e))
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) ->
new ServiceUnavailableException("AI 服务暂时不可用,请稍后再试"))
);
}
/**
* 带降级的流式调用
* 主服务失败时自动切换到降级服务
*/
public Flux<String> streamWithFallback(String message) {
return chatClient.prompt()
.user(message)
.stream()
.content()
.onErrorResume(e -> {
log.error("主服务流式调用失败,切换到降级服务", e);
// 降级到更便宜/更稳定的模型
return fallbackClient.prompt()
.user(message)
.stream()
.content();
});
}
/**
* 带超时的流式调用
* 流启动超时 + 流内部 token 间隔超时,两个都要处理
*/
public Flux<String> streamWithTimeout(String message) {
return chatClient.prompt()
.user(message)
.stream()
.content()
// 整个流的超时(从订阅到完成)
.timeout(Duration.ofSeconds(60))
// 相邻 token 间隔超时(如果某个 token 超过 5 秒没来,可能是网络问题)
.timeout(Duration.ofSeconds(5),
Mono.error(new StreamTimeoutException("Token 间隔超时,流中断")))
.onErrorMap(TimeoutException.class,
e -> new StreamTimeoutException("流式响应超时"));
}
/**
* 带部分内容保存的流式调用
* 当发生错误时,已经生成的内容不丢失
*/
public Flux<StreamChunk> streamWithPartialSave(String sessionId, String message) {
StringBuilder partialContent = new StringBuilder();
return chatClient.prompt()
.user(message)
.stream()
.content()
.doOnNext(token -> partialContent.append(token))
.map(token -> StreamChunk.builder()
.token(token)
.type(ChunkType.CONTENT)
.build())
.doOnComplete(() -> {
// 流完成,保存完整内容到对话记忆
String fullContent = partialContent.toString();
conversationMemoryService.saveAiResponse(sessionId, fullContent);
log.debug("流完成,总 token 数: {}", fullContent.length());
})
.doOnError(e -> {
// 出错时保存已生成的部分内容
String partialResponse = partialContent.toString();
if (!partialResponse.isEmpty()) {
conversationMemoryService.savePartialAiResponse(
sessionId, partialResponse + "...[响应中断]");
}
log.error("流式输出中断,已保存部分内容,长度: {}",
partialResponse.length(), e);
})
.onErrorResume(e -> Flux.just(
StreamChunk.builder()
.token("\n\n[响应中断,请重试]")
.type(ChunkType.ERROR)
.build()
));
}
private boolean isRetryableError(Throwable e) {
// 网络超时和服务器错误可以重试,但认证错误和参数错误不重试
return e instanceof ConnectTimeoutException
|| e instanceof ReadTimeoutException
|| (e instanceof HttpClientErrorException hce
&& hce.getStatusCode().is5xxServerError());
}
}流式输出的全链路追踪
流式调用的日志和监控比同步调用难做,因为 token 是逐个来的,不能等流结束才记录。来看一个完整的全链路追踪实现:
@Component
public class StreamMetricsAdvisor implements StreamAroundAdvisor {
@Autowired
private MeterRegistry meterRegistry;
@Autowired
private TraceService traceService;
@Override
public Flux<AdvisedResponse> aroundStream(AdvisedRequest request,
StreamAroundAdvisorChain chain) {
String traceId = traceService.startTrace("ai.stream.chat");
AtomicInteger tokenCount = new AtomicInteger(0);
AtomicLong firstTokenTime = new AtomicLong(0);
long startTime = System.currentTimeMillis();
return chain.nextAroundStream(request)
.doOnNext(response -> {
tokenCount.incrementAndGet();
// 记录首个 token 时间(TTFT 指标)
if (tokenCount.get() == 1) {
firstTokenTime.set(System.currentTimeMillis());
long ttft = firstTokenTime.get() - startTime;
meterRegistry.timer("ai.stream.ttft")
.record(ttft, TimeUnit.MILLISECONDS);
log.debug("TTFT: {}ms, traceId={}", ttft, traceId);
}
})
.doOnComplete(() -> {
long totalTime = System.currentTimeMillis() - startTime;
int total = tokenCount.get();
// 记录完成指标
meterRegistry.counter("ai.stream.completed").increment();
meterRegistry.timer("ai.stream.total.time")
.record(totalTime, TimeUnit.MILLISECONDS);
meterRegistry.summary("ai.stream.token.count").record(total);
log.info("流式完成: traceId={}, totalTokens={}, totalTimeMs={}",
traceId, total, totalTime);
traceService.endTrace(traceId, "COMPLETED");
})
.doOnError(e -> {
long totalTime = System.currentTimeMillis() - startTime;
meterRegistry.counter("ai.stream.error",
"type", e.getClass().getSimpleName()).increment();
log.error("流式出错: traceId={}, tokensBeforeError={}, timeMs={}",
traceId, tokenCount.get(), totalTime, e);
traceService.endTrace(traceId, "ERROR");
})
.doOnCancel(() -> {
// 客户端断开连接时触发
log.info("客户端取消订阅: traceId={}, tokensDelivered={}",
traceId, tokenCount.get());
traceService.endTrace(traceId, "CANCELLED");
});
}
@Override
public String getName() { return "StreamMetricsAdvisor"; }
@Override
public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; }
}实现打字机效果的前后端完整方案
来看一个完整的实现,包括后端的 Flux 和前端的 EventSource:
后端:
@RestController
@RequestMapping("/api/chat")
public class TypewriterChatController {
@Autowired
private ChatClient chatClient;
@GetMapping(value = "/typewriter", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> typewriterChat(
@RequestParam String message,
@RequestParam(defaultValue = "default") String sessionId) {
return chatClient.prompt()
.user(message)
.stream()
.content()
// 把每个 token 包装成 SSE 事件
.map(token -> ServerSentEvent.<String>builder()
.event("token")
.data(token)
.build())
// 流结束时发送完成信号
.concatWith(
Mono.just(ServerSentEvent.<String>builder()
.event("done")
.data("")
.build())
)
// 收集完整内容后保存(异步,不阻塞流)
.mergeWith(
chatClient.prompt()
.user(message)
.stream()
.content()
.collect(Collectors.joining())
.doOnNext(fullContent ->
memoryService.saveAiMessage(sessionId, fullContent))
.flatMapMany(ignored -> Flux.empty())
)
.onErrorResume(e -> Flux.just(
ServerSentEvent.<String>builder()
.event("error")
.data("生成回答时出现问题,请重试")
.build()
));
}
}前端 JavaScript(简化版):
function startStream(message, sessionId) {
const url = `/api/chat/typewriter?message=${encodeURIComponent(message)}&sessionId=${sessionId}`;
const eventSource = new EventSource(url);
const outputElement = document.getElementById('ai-output');
eventSource.addEventListener('token', (event) => {
outputElement.textContent += event.data;
// 自动滚动到底部
outputElement.scrollTop = outputElement.scrollHeight;
});
eventSource.addEventListener('done', (event) => {
eventSource.close();
console.log('流式输出完成');
});
eventSource.addEventListener('error', (event) => {
outputElement.textContent += '\n[出现错误,请重试]';
eventSource.close();
});
// 页面关闭时关闭 SSE 连接
window.addEventListener('beforeunload', () => eventSource.close());
return eventSource; // 返回引用,供外部控制(比如手动取消)
}流式处理的架构拓扑
踩坑记录
坑1:WebFlux 和 Servlet 混用
我们项目一开始是传统 Spring MVC 应用,AI 流式接口想用 WebFlux,于是引入了 spring-webflux 依赖。结果启动时报各种冲突,因为同一个 Spring Boot 应用里不能同时有 Servlet Web 和 Reactive Web 两套完整的 Web 容器。
解决方案:要么整体迁移到 WebFlux,要么在 Spring MVC 里用 SseEmitter 或者 ResponseBodyEmitter 来做 SSE,不引入 WebFlux 依赖。
如果不想全量迁移 WebFlux,用 SseEmitter 的做法:
@GetMapping(value = "/stream/mvc", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamMvc(@RequestParam String message) {
SseEmitter emitter = new SseEmitter(60_000L); // 60 秒超时
// 在独立线程里订阅 Flux
Executors.newSingleThreadExecutor().submit(() -> {
try {
chatClient.prompt()
.user(message)
.stream()
.content()
.subscribe(
token -> {
try {
emitter.send(SseEmitter.event()
.name("token")
.data(token));
} catch (IOException e) {
emitter.completeWithError(e);
}
},
emitter::completeWithError,
emitter::complete
);
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}坑2:流没有被消费就被 GC
Reactor 的 Flux 是惰性求值的,如果没有订阅者,什么都不会发生。我有次写了一段代码,觉得 doOnComplete 里的逻辑会自动执行,结果发现根本没执行——因为我忘记了这个 Flux 没有被订阅。
坑3:Reactor 的 Context 丢失
在响应式链里,Spring Security 和 MDC(日志上下文)等依赖 ThreadLocal 的机制会失效。特别是用 subscribeOn 或 publishOn 切换线程后,ThreadLocal 里的信息会丢失。
解决方案:用 Reactor Context 传递跨线程的上下文信息:
Flux<String> stream = chatClient.prompt()
.user(message)
.stream()
.content()
.contextWrite(ctx -> ctx
.put("userId", userId)
.put("traceId", traceId)
);
// 在 doOnNext 里读取
.doOnNext(token -> {
Mono.deferContextual(ctx -> {
String currentUserId = ctx.get("userId");
// 使用 userId
return Mono.empty();
}).subscribe();
})小结
Spring AI 的流式输出和 WebFlux 配合,能构建出非常流畅的 AI 交互体验。重点需要掌握的几个点:
Flux.stream().content()是基础,ServerSentEvent封装给前端- 背压策略要根据场景选择:buffer、drop 还是 error
- 错误处理分层:retry 处理瞬时故障,fallback 处理服务不可用,onErrorResume 向客户端传递友好错误
- 流的生命周期事件(doOnNext/doOnComplete/doOnError/doOnCancel)用好了,监控和日志就完整了
- WebFlux 和 Servlet 不能混用,没准备全面响应式化的项目用 SseEmitter 来过渡
