用 Spring AI 实现一个完整的文档问答系统——从上传到回答的全链路
用 Spring AI 实现一个完整的文档问答系统——从上传到回答的全链路
我在网上看过很多 RAG 教程,大部分是这样的:
导入两个依赖,写二十行代码,"完成!你的知识库搭好了!"
然后那二十行代码里,没有权限控制,没有错误处理,没有性能监控,没有用户反馈,没有引用来源。一旦文档多了、用户多了、问题复杂了,马上原形毕露。
这篇文章不是那种教程。我想把一个接近生产的完整系统设计写清楚——不是所有细节,但至少是关键路径和关键决策点。
系统的核心需求是:用户上传文档 → 文档被向量化 → 用户提问 → 系统检索相关文档并回答 → 用户可以看到引用来源 → 用户可以反馈答案质量。
看起来简单,但把每个环节做对,需要很多工程决策。
系统架构
先看整体架构:
技术栈:Spring Boot 3.x + Spring AI + Milvus + MySQL + Redis(缓存)+ Kafka(异步处理)
文档上传与处理
这部分的核心设计原则:文档处理是异步的,上传接口要快速返回。
一份 100 页的 PDF,解析 + 向量化可能需要 30 秒以上。如果是同步接口,用户会等很久,还容易超时。所以上传接口只做快速校验和入队,实际处理交给异步任务。
@RestController
@RequestMapping("/api/documents")
@Slf4j
public class DocumentUploadController {
private final DocumentService documentService;
private final UserContext userContext;
/**
* 文档上传接口
* 快速返回,实际处理异步进行
*/
@PostMapping("/upload")
public ResponseEntity<DocumentUploadResponse> upload(
@RequestParam("file") MultipartFile file,
@RequestParam(value = "visibility", defaultValue = "DEPARTMENT") String visibility,
@RequestParam(value = "tags", required = false) List<String> tags,
@AuthenticationPrincipal UserPrincipal userPrincipal) {
// 1. 基础校验
validateFile(file);
// 2. 创建文档记录(PENDING 状态)
DocumentRecord record = documentService.createRecord(
file.getOriginalFilename(),
file.getSize(),
userPrincipal.getUserId(),
VisibilityLevel.valueOf(visibility),
tags
);
// 3. 保存原始文件到对象存储
String storageKey = documentService.storeRawFile(file, record.getId());
// 4. 发送到处理队列
documentService.enqueueForProcessing(record.getId(), storageKey);
log.info("Document upload accepted: {} (id: {})",
file.getOriginalFilename(), record.getId());
return ResponseEntity.accepted()
.body(DocumentUploadResponse.builder()
.documentId(record.getId())
.status("PROCESSING")
.estimatedTimeSeconds(estimateProcessingTime(file.getSize()))
.message("文档已接收,正在处理中")
.build());
}
/**
* 查询文档处理状态
*/
@GetMapping("/{documentId}/status")
public ResponseEntity<DocumentStatusResponse> getStatus(@PathVariable String documentId) {
DocumentRecord record = documentService.findById(documentId);
return ResponseEntity.ok(DocumentStatusResponse.fromRecord(record));
}
private void validateFile(MultipartFile file) {
if (file.isEmpty()) {
throw new IllegalArgumentException("文件不能为空");
}
long maxSizeBytes = 50 * 1024 * 1024; // 50MB
if (file.getSize() > maxSizeBytes) {
throw new IllegalArgumentException("文件大小超过限制(最大50MB)");
}
String filename = file.getOriginalFilename();
if (filename == null || !isSupportedFormat(filename)) {
throw new IllegalArgumentException("不支持的文件格式,请上传 PDF、DOCX 或 TXT 文件");
}
}
private boolean isSupportedFormat(String filename) {
String lower = filename.toLowerCase();
return lower.endsWith(".pdf") || lower.endsWith(".docx") || lower.endsWith(".txt");
}
private int estimateProcessingTime(long fileSizeBytes) {
// 粗略估算:1MB ≈ 3秒
return (int) (fileSizeBytes / (1024 * 1024) * 3) + 5;
}
}异步文档处理消费者:
@Component
@Slf4j
public class DocumentProcessingConsumer {
private final DocumentParserFactory parserFactory;
private final DocumentSplitter splitter;
private final EmbeddingClient embeddingClient;
private final VectorStore vectorStore;
private final DocumentVersionRepository versionRepo;
private final DocumentRecordRepository recordRepo;
@KafkaListener(topics = "document-processing", groupId = "doc-processor")
public void processDocument(DocumentProcessingMessage message) {
String documentId = message.getDocumentId();
log.info("Processing document: {}", documentId);
DocumentRecord record = recordRepo.findById(documentId)
.orElseThrow(() -> new RuntimeException("Document not found: " + documentId));
try {
// 更新状态:处理中
record.setStatus(ProcessingStatus.PROCESSING);
record.setProcessingStartedAt(LocalDateTime.now());
recordRepo.save(record);
// Step 1: 解析文档内容
String rawContent = parserFactory.getParser(record.getFileFormat())
.parse(message.getStorageKey());
// Step 2: 语言检测和预处理
String processedContent = preprocessContent(rawContent, record);
// Step 3: 文档分块
List<TextChunk> chunks = splitter.split(processedContent, SplitConfig.builder()
.chunkSize(512)
.chunkOverlap(50)
.splitByParagraph(true)
.build());
log.info("Document {} split into {} chunks", documentId, chunks.size());
// Step 4: 批量向量化和写入
List<String> vectorIds = indexChunks(chunks, record);
// Step 5: 更新版本记录
DocumentVersion version = DocumentVersion.builder()
.documentKey(record.getId())
.contentHash(computeHash(rawContent))
.metadataHash(computeMetadataHash(record))
.vectorChunkIds(vectorIds)
.lastIndexedAt(LocalDateTime.now())
.indexStatus(DocumentVersion.IndexStatus.INDEXED)
.build();
versionRepo.save(version);
// 更新状态:已完成
record.setStatus(ProcessingStatus.INDEXED);
record.setChunkCount(chunks.size());
record.setProcessingCompletedAt(LocalDateTime.now());
recordRepo.save(record);
log.info("Document {} processing completed: {} chunks indexed",
documentId, chunks.size());
} catch (Exception e) {
log.error("Document processing failed: {}", documentId, e);
record.setStatus(ProcessingStatus.FAILED);
record.setErrorMessage(e.getMessage());
recordRepo.save(record);
}
}
private List<String> indexChunks(List<TextChunk> chunks, DocumentRecord record) {
List<String> vectorIds = new ArrayList<>();
int batchSize = 50;
for (int i = 0; i < chunks.size(); i += batchSize) {
List<TextChunk> batch = chunks.subList(i, Math.min(i + batchSize, chunks.size()));
List<Document> documents = batch.stream()
.map(chunk -> {
String chunkId = record.getId() + "_chunk_" + chunk.getIndex();
Map<String, Object> metadata = buildChunkMetadata(chunk, record);
return new Document(chunkId, chunk.getContent(), metadata);
})
.collect(Collectors.toList());
vectorStore.add(documents);
documents.forEach(doc -> vectorIds.add(doc.getId()));
log.debug("Indexed batch {}/{} for document {}",
(i / batchSize) + 1,
(int) Math.ceil((double) chunks.size() / batchSize),
record.getId());
}
return vectorIds;
}
private Map<String, Object> buildChunkMetadata(TextChunk chunk, DocumentRecord record) {
Map<String, Object> metadata = new HashMap<>();
metadata.put("document_id", record.getId());
metadata.put("document_name", record.getFileName());
metadata.put("source_file", record.getFileName());
metadata.put("chunk_index", chunk.getIndex());
metadata.put("visibility", record.getVisibility().name());
metadata.put("department", record.getUploaderDepartment());
metadata.put("upload_time", record.getCreatedAt().toString());
metadata.put("page_number", chunk.getPageNumber());
if (record.getTags() != null) {
metadata.put("tags", String.join(",", record.getTags()));
}
return metadata;
}
}查询接口
查询接口是整个系统的核心,需要串联多个组件:
@RestController
@RequestMapping("/api/query")
@Slf4j
public class QueryController {
private final QueryPipeline queryPipeline;
private final FeedbackService feedbackService;
private final QueryLogService queryLogService;
@PostMapping
public ResponseEntity<QueryResponse> query(
@RequestBody QueryRequest request,
@AuthenticationPrincipal UserPrincipal userPrincipal) {
long startTime = System.currentTimeMillis();
QueryContext context = QueryContext.builder()
.question(request.getQuestion())
.userId(userPrincipal.getUserId())
.department(userPrincipal.getDepartment())
.roleLevel(userPrincipal.getRoleLevel())
.sessionId(request.getSessionId())
.conversationHistory(request.getConversationHistory())
.build();
QueryResult result = queryPipeline.execute(context);
long latency = System.currentTimeMillis() - startTime;
// 记录查询日志(异步)
queryLogService.logAsync(context, result, latency);
QueryResponse response = QueryResponse.builder()
.queryId(result.getQueryId())
.answer(result.getAnswer())
.sources(buildSourceReferences(result.getUsedDocuments()))
.confidence(result.getConfidence())
.latencyMs(latency)
.build();
return ResponseEntity.ok(response);
}
/**
* 用户反馈接口
*/
@PostMapping("/{queryId}/feedback")
public ResponseEntity<Void> submitFeedback(
@PathVariable String queryId,
@RequestBody FeedbackRequest feedback,
@AuthenticationPrincipal UserPrincipal userPrincipal) {
feedbackService.saveFeedback(UserFeedback.builder()
.queryId(queryId)
.userId(userPrincipal.getUserId())
.rating(feedback.getRating()) // 1-5 分
.helpful(feedback.isHelpful()) // 有没有帮助
.comment(feedback.getComment()) // 用户备注
.issueType(feedback.getIssueType()) // WRONG_ANSWER/INCOMPLETE/IRRELEVANT
.timestamp(LocalDateTime.now())
.build());
return ResponseEntity.ok().build();
}
private List<SourceReference> buildSourceReferences(List<Document> documents) {
return documents.stream()
.map(doc -> SourceReference.builder()
.documentId((String) doc.getMetadata().get("document_id"))
.documentName((String) doc.getMetadata().get("document_name"))
.excerpt(truncate(doc.getContent(), 200)) // 显示摘录
.pageNumber((Integer) doc.getMetadata().get("page_number"))
.relevanceScore((Double) doc.getMetadata().get("rerank_score"))
.build())
.collect(Collectors.toList());
}
}核心查询管道
这是把所有优化手段串联起来的地方:
@Service
@Slf4j
public class QueryPipeline {
private final CombinedQueryRewriter queryRewriter;
private final PermissionAwareVectorStore vectorStore;
private final CohereRerankService rerankService;
private final ContextCompressionPipeline contextCompressor;
private final ConversationAwareAnswerGenerator answerGenerator;
private final SourceConsistencyChecker consistencyChecker;
private final RedisCache cache;
public QueryResult execute(QueryContext context) {
String queryId = UUID.randomUUID().toString();
// Step 0: 缓存检查(完全相同的问题+用户,直接返回缓存)
String cacheKey = buildCacheKey(context);
QueryResult cachedResult = cache.get(cacheKey, QueryResult.class);
if (cachedResult != null) {
log.debug("Cache hit for query: {}", context.getQuestion());
return cachedResult.withQueryId(queryId);
}
// Step 1: Query 改写
List<String> expandedQueries = queryRewriter.rewrite(context.getQuestion());
log.debug("Expanded to {} queries", expandedQueries.size());
// Step 2: 带权限过滤的向量检索
PermissionAwareVectorStore.setCurrentUser(UserContext.fromQueryContext(context));
List<Document> rawDocuments;
try {
rawDocuments = retrieveDocuments(expandedQueries);
} finally {
PermissionAwareVectorStore.clearCurrentUser();
}
if (rawDocuments.isEmpty()) {
return buildNoResultResponse(queryId, context.getQuestion());
}
// Step 3: Rerank
List<Document> rerankedDocuments = rerankService.rerank(
context.getQuestion(), rawDocuments, 8
);
// Step 4: 上下文压缩
String compressedContext = contextCompressor.compress(
context.getQuestion(), rerankedDocuments
);
// Step 5: 生成答案(考虑对话历史)
String answer = answerGenerator.generate(
context.getQuestion(),
compressedContext,
context.getConversationHistory(),
extractDocumentNames(rerankedDocuments)
);
// Step 6: 幻觉检测(高风险场景)
double confidence = 1.0;
if (isHighRiskQuestion(context.getQuestion())) {
SourceConsistencyChecker.ConsistencyCheckResult check =
consistencyChecker.check(answer, rerankedDocuments);
if (check.isHasIssues()) {
log.warn("Potential hallucination detected for query: {}", context.getQuestion());
confidence = check.getConfidence() * 0.7; // 降低置信度
answer = annotateWithUncertainty(answer, check);
}
}
// Step 7: 构建结果
QueryResult result = QueryResult.builder()
.queryId(queryId)
.question(context.getQuestion())
.answer(answer)
.usedDocuments(rerankedDocuments.subList(0, Math.min(5, rerankedDocuments.size())))
.confidence(confidence)
.expandedQueryCount(expandedQueries.size())
.retrievedDocumentCount(rawDocuments.size())
.build();
// 缓存结果(低置信度的不缓存)
if (confidence > 0.8) {
cache.set(cacheKey, result, Duration.ofMinutes(30));
}
return result;
}
private List<Document> retrieveDocuments(List<String> queries) {
Map<String, Document> deduped = new LinkedHashMap<>();
queries.parallelStream().forEach(query -> {
List<Document> docs = vectorStore.similaritySearch(
SearchRequest.query(query).withTopK(6)
);
synchronized (deduped) {
docs.forEach(doc -> deduped.putIfAbsent(doc.getId(), doc));
}
});
return new ArrayList<>(deduped.values());
}
private QueryResult buildNoResultResponse(String queryId, String question) {
return QueryResult.builder()
.queryId(queryId)
.question(question)
.answer("抱歉,在知识库中未找到与您的问题相关的内容。请尝试换个方式描述问题,或联系管理员确认相关文档是否已上传。")
.usedDocuments(Collections.emptyList())
.confidence(0.0)
.build();
}
private boolean isHighRiskQuestion(String question) {
return question.contains("截止") || question.contains("期限") ||
question.contains("不允许") || question.contains("禁止") ||
question.matches(".*[0-9].*(.*).*") || // 包含数字和括号的问题
question.contains("合同") || question.contains("法律");
}
private String buildCacheKey(QueryContext context) {
// 缓存 key 包含问题、用户部门、角色级别(不同权限的人可能看到不同结果)
return String.format("query:%s:%s:%d",
context.getQuestion().hashCode(),
context.getDepartment(),
context.getRoleLevel());
}
private String annotateWithUncertainty(String answer,
SourceConsistencyChecker.ConsistencyCheckResult check) {
StringBuilder annotated = new StringBuilder(answer);
annotated.append("\n\n");
annotated.append("⚠️ 系统提示:以上回答中部分内容可信度较低,建议查阅原始文档确认。");
if (!check.getUnsupportedClaims().isEmpty()) {
annotated.append("以下内容未在文档中找到直接支持:");
check.getUnsupportedClaims().forEach(claim ->
annotated.append("\n- ").append(claim));
}
return annotated.toString();
}
}对话历史管理
这个系统是面向对话场景的,需要支持多轮对话,让用户可以追问。
@Service
@Slf4j
public class ConversationAwareAnswerGenerator {
private final ChatClient chatClient;
private static final String CONVERSATION_RAG_PROMPT = """
你是一个专业的企业知识库助手。请基于检索到的参考文档,回答用户的问题。
重要说明:
1. 只基于参考文档中的内容回答,不要使用文档外的知识
2. 如果文档中没有足够信息,明确说明"文档中未提及此内容"
3. 在回答中引用来源,格式:(来源:文件名)
4. 使用清晰的中文表达,必要时可以使用列表格式
{conversation_history_section}
参考文档:
{context}
当前问题:{question}
请回答:
""";
public String generate(
String question,
String context,
List<ConversationTurn> history,
List<String> documentNames) {
String historySection = buildHistorySection(history);
return chatClient.prompt()
.user(u -> u.text(CONVERSATION_RAG_PROMPT)
.param("conversation_history_section", historySection)
.param("context", context)
.param("question", question))
.call()
.content();
}
private String buildHistorySection(List<ConversationTurn> history) {
if (history == null || history.isEmpty()) {
return "";
}
StringBuilder sb = new StringBuilder("对话历史:\n");
// 只保留最近 5 轮对话,避免 context 太长
List<ConversationTurn> recentHistory = history.stream()
.sorted(Comparator.comparing(ConversationTurn::getTimestamp).reversed())
.limit(5)
.sorted(Comparator.comparing(ConversationTurn::getTimestamp))
.collect(Collectors.toList());
for (ConversationTurn turn : recentHistory) {
sb.append("用户:").append(turn.getUserMessage()).append("\n");
sb.append("助手:").append(turn.getAssistantMessage()).append("\n");
}
sb.append("\n(以上是对话历史,请结合上下文回答当前问题)\n");
return sb.toString();
}
@Data
@Builder
public static class ConversationTurn {
private String userMessage;
private String assistantMessage;
private LocalDateTime timestamp;
}
}用户反馈驱动的质量改进
收集反馈只是第一步,关键是要用反馈来改进系统。
@Service
@Slf4j
@Transactional
public class FeedbackDrivenImprovement {
private final FeedbackRepository feedbackRepo;
private final QueryLogRepository queryLogRepo;
private final EvaluationDatasetRepository evalDatasetRepo;
/**
* 每天定时分析前一天的负面反馈,识别系统弱点
*/
@Scheduled(cron = "0 0 7 * * *")
public void analyzeDailyFeedback() {
LocalDate yesterday = LocalDate.now().minusDays(1);
List<UserFeedback> negativeFeedbacks = feedbackRepo.findByDateAndLowRating(
yesterday, 2 // rating <= 2
);
log.info("Analyzing {} negative feedbacks from {}",
negativeFeedbacks.size(), yesterday);
// 按问题类型分组
Map<String, Long> issueTypeCounts = negativeFeedbacks.stream()
.filter(f -> f.getIssueType() != null)
.collect(Collectors.groupingBy(
f -> f.getIssueType().name(),
Collectors.counting()
));
log.info("Issue type distribution: {}", issueTypeCounts);
// 把评分低的 case 加入评估数据集
for (UserFeedback feedback : negativeFeedbacks) {
QueryLog queryLog = queryLogRepo.findByQueryId(feedback.getQueryId());
if (queryLog != null) {
// 这些变成了 hard cases,重点关注
evalDatasetRepo.save(EvaluationCase.builder()
.question(queryLog.getQuestion())
.source("user_feedback")
.priority("HIGH")
.userNote(feedback.getComment())
.build());
}
}
log.info("Added {} hard cases to evaluation dataset", negativeFeedbacks.size());
}
/**
* 识别高频失败的问题类型
*/
public List<FailurePattern> identifyFailurePatterns(int daysBack) {
LocalDateTime since = LocalDateTime.now().minusDays(daysBack);
List<UserFeedback> negatives = feedbackRepo.findLowRatingsSince(since, 2);
// 对问题文本做简单聚类(基于关键词)
Map<String, List<String>> clusters = new HashMap<>();
for (UserFeedback feedback : negatives) {
QueryLog log = queryLogRepo.findByQueryId(feedback.getQueryId());
if (log != null) {
String category = categorizeQuestion(log.getQuestion());
clusters.computeIfAbsent(category, k -> new ArrayList<>())
.add(log.getQuestion());
}
}
return clusters.entrySet().stream()
.map(e -> FailurePattern.builder()
.category(e.getKey())
.count(e.getValue().size())
.exampleQuestions(e.getValue().subList(0, Math.min(3, e.getValue().size())))
.build())
.sorted(Comparator.comparingInt(FailurePattern::getCount).reversed())
.collect(Collectors.toList());
}
private String categorizeQuestion(String question) {
if (question.contains("日期") || question.contains("时间") || question.contains("截止"))
return "date_time_query";
if (question.contains("流程") || question.contains("步骤") || question.contains("怎么"))
return "process_query";
if (question.contains("多少") || question.contains("金额") || question.contains("费用"))
return "numeric_query";
return "general_query";
}
}接口设计的一些关键决策
写完这些,我想专门说几个设计决策,背后都有真实的理由:
决策 1:为什么文档处理要异步
同步处理一份大 PDF 可能超过 30 秒,HTTP 连接会超时,用户体验很差,而且占用 Web 容器线程。异步处理后,上传接口始终在 200ms 内返回,用户可以通过轮询或 WebSocket 接收处理完成的通知。
决策 2:为什么缓存用问题 + 部门 + 角色级别作为 Key
不同权限的用户,可能看到的文档不同,所以同一个问题在不同权限下的答案可能不同。如果只用问题做 Key,会把 A 用户的答案返回给 B 用户,可能涉及信息安全问题。
决策 3:为什么引用来源要显示摘录而不只是文件名
只显示"参考来源:员工手册.pdf"用户没法验证答案。显示摘录让用户能快速判断这个来源是否真的支持答案,同时也对 LLM 有约束作用——用户可以检查 LLM 有没有曲解原文。
决策 4:为什么对话历史只保留最近 5 轮
更多的历史会把 context 撑大,消耗更多 Token,还会引入更多噪音。5 轮对话覆盖了 95% 的追问场景,超过 5 轮的对话通常说明用户在问另一个新问题了。
性能和成本
最后说一下实际运行数据。以我们一个中等规模的知识库(2 万份文档,500 日活用户)为例:
| 指标 | 数值 |
|---|---|
| 平均查询延迟(P50) | 1.8s |
| P99 查询延迟 | 4.2s |
| 日均查询量 | 3,000 次 |
| 每次查询平均 Token | 3,200 |
| 月均 LLM API 成本 | ¥2,400 |
| 缓存命中率 | 23% |
| 用户好评率(4分及以上) | 82% |
通过缓存,节省了大约 23% 的 API 成本。Query 改写、Rerank、上下文压缩这些优化把好评率从初始的 68% 提升到了 82%。
总结
这个系统涉及的组件比较多,但每个组件背后的逻辑都是清晰的。把这篇文章和前面系列的 10 篇放在一起看,基本上覆盖了一个企业级 RAG 系统的主要技术决策。
真正的挑战不在于任何单一技术,而在于把这些组件正确地组合起来,并且在实际运行中持续调整。
