第2328篇:Java Reactive与AI流式处理——Project Reactor在AI应用中的深度实践
第2328篇:Java Reactive与AI流式处理——Project Reactor在AI应用中的深度实践
适读人群:熟悉Spring WebFlux基础,希望在AI流式输出场景中深入应用响应式编程的工程师 | 阅读时长:约18分钟 | 核心价值:掌握AI流式处理的响应式实现模式,解决实际生产中的背压、错误处理和状态管理问题
流式输出是AI应用里最常见的需求之一。用户提问后,文字一个一个蹦出来,比等10秒后一次性出现的体验好得多。
但流式输出在工程实现上比普通HTTP请求复杂很多——你需要处理连接保持、断线重传、背压控制、局部错误处理……这些问题在传统同步编程里都很难优雅地解决,而Project Reactor的响应式编程模型,是目前处理这类问题最成熟的方案。
我在做一个面向C端用户的AI写作助手时,踩遍了流式处理的坑:SSE连接在某些移动网络下频繁断开、用户快速切换对话时token仍在从旧的stream里流出、某个文档处理步骤出错导致整个流终止……
这篇文章把这些场景都拆开来讲,给出工程上可用的解法。
基础:Spring AI的Flux是怎么来的
先把底层搞清楚,才能在出问题时不慌。
Spring AI的流式调用返回Flux<String>或Flux<ChatResponse>,本质是把LLM的SSE(Server-Sent Events)流包装成了Reactor的Flux:
// Spring AI内部简化示意(非真实源码,便于理解)
public Flux<ChatResponse> stream(Prompt prompt) {
return Flux.create(sink -> {
// 向OpenAI/DeepSeek发送请求,开启SSE连接
sseClient.stream(prompt)
.subscribe(
event -> sink.next(parseChatResponse(event)), // 每个token
error -> sink.error(error), // 发生错误
() -> sink.complete() // 流结束
);
});
}理解这个基础,后面所有的操作符都有了根基。
场景一:基础的SSE流式输出
最简单的情况:用户问问题,前端通过SSE接收流式回答。
@RestController
@RequiredArgsConstructor
public class StreamController {
private final ChatClient chatClient;
// 基础SSE端点
@GetMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(
@RequestParam String message,
@RequestParam(defaultValue = "default") String sessionId) {
return chatClient.prompt()
.user(message)
.stream()
.content()
// 每个token包装成SSE事件
.map(token -> ServerSentEvent.builder(token)
.event("token")
.build())
// 流结束时发送完成事件
.concatWith(Flux.just(
ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()
))
// 错误处理:出错时发送错误事件,不直接断开连接
.onErrorResume(e -> {
log.error("流式输出错误,sessionId={}", sessionId, e);
return Flux.just(
ServerSentEvent.<String>builder()
.event("error")
.data("生成失败,请重试")
.build()
);
});
}
}前端JavaScript接收:
const eventSource = new EventSource(`/chat/stream?message=${encodeURIComponent(userMessage)}`);
let fullText = '';
eventSource.addEventListener('token', (e) => {
fullText += e.data;
updateUI(fullText);
});
eventSource.addEventListener('done', () => {
eventSource.close();
markComplete();
});
eventSource.addEventListener('error', (e) => {
showError(e.data);
eventSource.close();
});场景二:流式输出 + 实时Token统计
生产环境往往需要统计Token用量,但流式输出时Token信息只在最后一个chunk里:
@Service
@RequiredArgsConstructor
@Slf4j
public class TokenAwareStreamService {
private final ChatClient chatClient;
/**
* 流式输出,同时收集Token统计信息
* 前端收到的是文本token,最后接收一个统计事件
*/
public Flux<StreamEvent> streamWithTokenStats(String sessionId, String message) {
// 使用AtomicReference收集最终的TokenStats
AtomicReference<TokenUsage> tokenUsage = new AtomicReference<>();
return chatClient.prompt()
.user(message)
.stream()
.chatResponse() // 使用ChatResponse而不是content,获取元数据
.doOnNext(response -> {
// 收集token统计(每次chunk都检查,取最后一个非空的)
if (response.getMetadata() != null &&
response.getMetadata().getUsage() != null) {
tokenUsage.set(response.getMetadata().getUsage());
}
})
// 提取文本内容
.map(response -> {
String content = "";
if (response.getResult() != null &&
response.getResult().getOutput() != null) {
content = response.getResult().getOutput().getContent();
}
return StreamEvent.token(content != null ? content : "");
})
// 过滤空token(某些模型会发送空chunk)
.filter(event -> !event.content().isEmpty())
// 流结束后追加统计事件
.concatWith(Mono.fromCallable(() -> {
TokenUsage usage = tokenUsage.get();
if (usage != null) {
log.info("Token用量:input={}, output={}, total={}",
usage.getPromptTokens(),
usage.getGenerationTokens(),
usage.getTotalTokens());
return StreamEvent.stats(
usage.getPromptTokens(),
usage.getGenerationTokens());
}
return StreamEvent.done();
}));
}
// 流事件类型
public sealed interface StreamEvent {
record Token(String content) implements StreamEvent {}
record Stats(int promptTokens, int completionTokens) implements StreamEvent {}
record Done() implements StreamEvent {}
static StreamEvent token(String content) { return new Token(content); }
static StreamEvent stats(int prompt, int completion) { return new Stats(prompt, completion); }
static StreamEvent done() { return new Done(); }
}
}场景三:流式输出的超时和取消
用户可能中途关闭页面,这时候后端应该停止对LLM API的调用,不然白白浪费Token:
@GetMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamWithCancellation(
@RequestParam String message,
ServerHttpRequest request) {
return chatClient.prompt()
.user(message)
.stream()
.content()
// 单个token最长等待5秒(防止LLM中途卡住)
.timeout(Duration.ofSeconds(5))
.map(token -> ServerSentEvent.builder(token).event("token").build())
// 客户端断开时,Flux会自动取消(响应式的背压传播)
.doOnCancel(() -> log.info("客户端断开连接,已取消流式输出"))
.doOnComplete(() -> log.debug("流式输出正常完成"))
.onErrorResume(TimeoutException.class, e -> {
log.warn("流式输出超时");
return Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data("响应超时,请重试")
.build());
});
}重要细节:当Spring WebFlux检测到客户端断开连接时,它会自动取消订阅上游的Flux。这意味着如果你的ChatClient实现支持取消(大多数都支持),LLM的API调用也会被中断。这是响应式背压的天然优势。
场景四:多步骤AI流水线的流式输出
真实的AI应用往往不是单次LLM调用,而是多步骤流水线:先检索文档,再生成回答,最后格式化输出。
@Service
@RequiredArgsConstructor
public class PipelineStreamService {
private final ChatClient chatClient;
private final VectorStore vectorStore;
/**
* RAG流式流水线:
* 1. 先检索相关文档(同步,不流式)
* 2. 基于文档流式生成回答(流式)
* 3. 在流开始前发送一个"检索完成"事件(让前端知道在等什么)
*/
public Flux<PipelineEvent> ragStream(String question) {
return Mono.fromCallable(() -> {
// Step 1:检索文档(同步操作)
List<Document> docs = vectorStore.similaritySearch(
SearchRequest.query(question).withTopK(3));
String context = docs.stream()
.map(Document::getContent)
.collect(Collectors.joining("\n\n"));
return context;
})
.subscribeOn(Schedulers.boundedElastic()) // IO操作在弹性线程池执行
// 发出"检索完成"事件
.flatMapMany(context -> {
// 先发一个状态事件
Flux<PipelineEvent> statusEvent = Flux.just(
PipelineEvent.status("检索完成,正在生成回答..."));
// 再流式生成回答
Flux<PipelineEvent> answerStream = chatClient.prompt()
.system("基于以下资料回答问题:\n" + context)
.user(question)
.stream()
.content()
.map(PipelineEvent::token);
return statusEvent.concatWith(answerStream);
})
// 全局错误处理
.onErrorResume(e -> {
log.error("RAG流水线错误", e);
return Flux.just(PipelineEvent.error(e.getMessage()));
});
}
public sealed interface PipelineEvent {
record Token(String content) implements PipelineEvent {}
record Status(String message) implements PipelineEvent {}
record Error(String message) implements PipelineEvent {}
static PipelineEvent token(String content) { return new Token(content); }
static PipelineEvent status(String message) { return new Status(message); }
static PipelineEvent error(String message) { return new Error(message); }
}
}场景五:流式输出的内容缓存
有些问题(比如静态的FAQ)回答是固定的,没必要每次都调LLM。但流式缓存比普通缓存复杂——你不能把Flux直接存进Redis:
@Service
@RequiredArgsConstructor
public class CachingStreamService {
private final ChatClient chatClient;
private final ReactiveRedisTemplate<String, String> redisTemplate;
private static final Duration CACHE_TTL = Duration.ofHours(1);
public Flux<String> streamWithCache(String question) {
String cacheKey = "stream:cache:" + DigestUtils.md5Hex(question);
// 先检查是否有缓存的完整回答
return redisTemplate.opsForValue().get(cacheKey)
.flatMapMany(cached -> {
// 有缓存:模拟流式发送(分块输出)
log.debug("命中缓存:{}", cacheKey);
return simulateStream(cached);
})
.switchIfEmpty(
// 无缓存:真实流式调用,同时收集完整内容存入缓存
streamAndCache(question, cacheKey)
);
}
private Flux<String> streamAndCache(String question, String cacheKey) {
StringBuilder fullContent = new StringBuilder();
return chatClient.prompt()
.user(question)
.stream()
.content()
.doOnNext(token -> fullContent.append(token))
.doOnComplete(() -> {
// 流完成后存入缓存(异步,不影响主流程)
redisTemplate.opsForValue()
.set(cacheKey, fullContent.toString(), CACHE_TTL)
.subscribe(
result -> log.debug("缓存已存储:{}", cacheKey),
error -> log.warn("缓存存储失败", error)
);
});
}
// 把完整文本模拟成流式输出(每50字一个chunk)
private Flux<String> simulateStream(String fullContent) {
int chunkSize = 50;
List<String> chunks = new ArrayList<>();
for (int i = 0; i < fullContent.length(); i += chunkSize) {
chunks.add(fullContent.substring(i, Math.min(i + chunkSize, fullContent.length())));
}
// 模拟真实的流速
return Flux.fromIterable(chunks)
.delayElements(Duration.ofMillis(30));
}
}常见坑和解法
坑1:流式输出乱码
某些中文内容在流式传输时会被截断在多字节UTF-8字符的中间,导致乱码:
// 确保SSE响应的字符编码正确
@Bean
public WebFluxConfigurer webFluxConfigurer() {
return new WebFluxConfigurer() {
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
configurer.defaultCodecs()
.serverSentEventEncoder(); // SSE编码器默认使用UTF-8
}
};
}坑2:Flux没有被订阅就返回了
这是响应式编程初学者最常见的错误:
// 错误:创建了Flux但没有订阅,LLM根本没被调用
public void wrongUsage() {
Flux<String> stream = chatClient.prompt().user("hello").stream().content();
// 没有.subscribe(),stream只是一个描述,什么都没发生
}
// 正确:在Controller层返回Flux,让Spring WebFlux负责订阅
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> correctUsage(@RequestParam String message) {
return chatClient.prompt().user(message).stream().content();
// Spring WebFlux会自动订阅,并把每个元素发送给客户端
}坑3:在非响应式上下文里订阅Flux
如果你需要在非WebFlux的Spring MVC环境里消费流式输出:
// 在Spring MVC(同步)中消费Flux
@PostMapping("/process")
public String processSync(@RequestBody String question) {
// 用blockLast()阻塞等待所有token
// 注意:不要在虚拟线程里用这个,会有Pinning问题
StringBuilder result = new StringBuilder();
chatClient.prompt()
.user(question)
.stream()
.content()
.doOnNext(result::append)
.blockLast(Duration.ofSeconds(60));
return result.toString();
}响应式和流式输出是AI应用里的一块硬骨头,但掌握了这些模式后,处理更复杂的场景就有了基础。
