第2379篇:流式RAG响应的工程实现——边检索边生成的系统架构
大约 5 分钟
第2379篇:流式RAG响应的工程实现——边检索边生成的系统架构
适读人群:关注RAG响应延迟体验的AI工程师 | 阅读时长:约18分钟 | 核心价值:掌握流式RAG的端到端工程实现,解决首字节延迟和用户等待体验问题
我们RAG系统上线初期,用户反馈里出现最多的一个词是"慢"。
实际测了一下:从用户发问到看到第一个字的时间,平均是3.8秒。这里面包括:查询向量化(300ms)+ 向量检索(500ms)+ 构建Prompt(100ms)+ LLM生成(3秒)。
用户等3.8秒看到第一个字,这个体验和普通的ChatGPT相比差太多了。ChatGPT用了流式响应,你问完问题0.5秒内就开始有字出来,心理上感觉快很多。
这篇讲怎么把RAG做成流式的,把"首字节时间"从秒级降到毫秒级。
流式RAG的架构思路
/**
* 普通RAG vs 流式RAG 的响应时序
*
* 普通RAG:
* t=0 用户发问
* t=0.3 完成查询向量化
* t=0.8 完成向量检索
* t=0.9 构建完Prompt
* t=3.9 LLM生成完毕
* t=3.9 用户看到第一个字(首字节延迟:3.9秒)
*
* 流式RAG:
* t=0 用户发问
* t=0.3 完成查询向量化
* t=0.8 完成向量检索
* t=0.9 构建完Prompt,开始发LLM请求
* t=1.2 LLM生成第一个token
* t=1.2 用户看到第一个字(首字节延迟:1.2秒)
* t=4.0 LLM生成完毕
*
* 用户感知的延迟从3.9秒降到1.2秒
* 实际总耗时差不多,但用户感知大幅改善
*/后端:Spring Boot的流式响应实现
@RestController
@RequestMapping("/api/rag")
public class StreamingRAGController {
private final StreamingRAGService streamingRAGService;
/**
* SSE(Server-Sent Events)流式接口
*
* 选用SSE而不是WebSocket:
* - RAG问答是单向流(服务端 -> 客户端)
* - SSE更简单,HTTP原生支持
* - 自动重连
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamQuery(
@RequestParam String question,
@RequestParam String userId,
HttpServletRequest request) {
return streamingRAGService.streamAnswer(question, userId)
.map(token -> ServerSentEvent.<String>builder()
.data(token)
.build())
.onErrorReturn(
ServerSentEvent.<String>builder()
.event("error")
.data("生成过程中出现错误,请重试")
.build()
)
.concatWith(
// 流结束时发送完成信号
Flux.just(ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build())
);
}
}
@Service
public class StreamingRAGService {
private final VectorStore vectorStore;
private final StreamingChatClient streamingChatClient;
private final EmbeddingModel embeddingModel;
/**
* 流式RAG的核心实现
* 返回Flux<String>,每个元素是一个token
*/
public Flux<String> streamAnswer(String question, String userId) {
return Mono.fromCallable(() -> {
// 第一步:检索(同步执行,必须等检索完才能生成)
float[] queryEmbedding = embeddingModel.embed(question);
List<Document> docs = vectorStore.similaritySearch(
SearchRequest.query(question).withTopK(5)
);
String prompt = buildPrompt(question, docs);
return prompt;
})
.subscribeOn(Schedulers.boundedElastic())
.flatMapMany(prompt -> {
// 第二步:流式生成
return streamingChatClient
.prompt(prompt)
.stream()
.content();
});
}
private String buildPrompt(String question, List<Document> docs) {
String context = docs.stream()
.map(Document::getContent)
.collect(Collectors.joining("\n\n"));
return """
基于以下参考内容,回答用户问题:
%s
问题:%s
回答:
""".formatted(context, question);
}
}进阶:检索和生成的部分并行
能不能让检索和生成有部分重叠,进一步降低延迟?
@Service
public class AdvancedStreamingRAGService {
/**
* 两阶段并行策略:
* 1. 先做快速初步检索(Top-3,速度快)
* 2. 用Top-3的结果立即开始生成
* 3. 同时在后台做精细检索(Top-10)
* 4. 如果精细检索找到了更好的文档,插入补充信息
*
* 注意:这个策略有复杂度,适合对延迟非常敏感的场景
* 大多数场景用标准流式就够了
*/
public Flux<StreamToken> twoStageStreamAnswer(String question) {
return Flux.create(emitter -> {
// 快速检索:Top-3,通常200-300ms
CompletableFuture<List<Document>> quickSearchFuture =
CompletableFuture.supplyAsync(() ->
vectorStore.similaritySearch(
SearchRequest.query(question).withTopK(3)
)
);
// 精细检索:Top-10,可能需要更长时间
CompletableFuture<List<Document>> fullSearchFuture =
CompletableFuture.supplyAsync(() ->
vectorStore.similaritySearch(
SearchRequest.query(question).withTopK(10)
)
);
try {
// 等待快速检索完成,立即开始生成
List<Document> quickDocs = quickSearchFuture.get(2, TimeUnit.SECONDS);
String initialPrompt = buildPrompt(question, quickDocs);
// 发送"检索完成"事件
emitter.next(StreamToken.ofEvent("retrieval_done",
"已找到 " + quickDocs.size() + " 条相关内容"));
// 开始流式生成
streamingChatClient.prompt(initialPrompt)
.stream()
.content()
.subscribe(
token -> emitter.next(StreamToken.ofContent(token)),
error -> emitter.error(error),
() -> {
emitter.next(StreamToken.ofEvent("generation_done", null));
emitter.complete();
}
);
} catch (Exception e) {
emitter.error(e);
}
});
}
}前端:正确处理SSE流
/**
* 前端处理SSE的关键点
* (以下是服务端逻辑,实际前端用JavaScript实现)
*
* 主要注意事项:
* 1. 处理[DONE]信号,正确关闭连接
* 2. 处理错误事件
* 3. 实现重连机制
* 4. 处理中文字符被截断的情况(token可能是半个中文字)
*/流式响应中的问题处理
@Service
public class StreamingErrorHandler {
/**
* 流式生成中出错时的处理策略
*
* 挑战:流已经开始输出了,不能像普通HTTP那样
* 返回一个错误状态码
*
* 策略:在流中插入错误标记,前端识别后处理
*/
public Flux<String> wrapWithErrorHandling(Flux<String> sourceStream) {
return sourceStream
.onErrorResume(TimeoutException.class, e -> {
// LLM超时:输出已生成的内容,然后说明被截断
return Flux.just("\n\n[由于响应超时,回答已被截断。请重新提问。]");
})
.onErrorResume(RateLimitException.class, e -> {
return Flux.just("\n\n[当前请求量较大,请稍后重试。]");
})
.onErrorResume(Exception.class, e -> {
log.error("Streaming error", e);
return Flux.just("\n\n[生成过程中出现错误:" + e.getMessage() + "]");
})
.timeout(Duration.ofSeconds(30)); // 总超时时间30秒
}
}流式RAG的日志记录挑战
流式响应给日志记录带来了新问题:你不能等到整个回答生成完才记录,但在流式过程中每个token都记录又太贵了。
@Service
public class StreamingRAGLogger {
/**
* 流式RAG的日志策略
*
* 方法:不记录中间token,只记录完整的最终结果
* 使用.collectList()收集所有token后再记录
*/
public Flux<String> withLogging(Flux<String> stream, String question, String userId) {
StringBuilder fullResponse = new StringBuilder();
long startTime = System.currentTimeMillis();
return stream
.doOnNext(token -> fullResponse.append(token))
.doOnComplete(() -> {
long duration = System.currentTimeMillis() - startTime;
// 异步记录日志,不阻塞流式输出
asyncLogService.log(RAGQueryLog.builder()
.userId(userId)
.question(question)
.fullAnswer(fullResponse.toString())
.duration(duration)
.tokenCount(estimateTokenCount(fullResponse.toString()))
.build()
);
});
}
}一个常见误区
很多工程师实现流式RAG时,误以为要把检索也做成流式的。实际上检索必须是同步的——你必须先检索完,才知道给LLM什么上下文,才能开始生成。
"流式"的部分只是LLM的生成阶段。检索仍然是同步的,但因为LLM生成一旦开始就立即输出token,用户感知到的等待时间大幅减少了。
这个认知对做好流式RAG的Prompt设计也有影响:既然检索是同步的,就要尽量在检索阶段做足准备,让LLM生成时尽可能流畅,不要中途因为上下文不足而"卡壳"。
