企业级AI知识库(下):从0到1完整项目实战·核心实现篇
2026/4/30大约 9 分钟
企业级AI知识库(下):从0到1完整项目实战·核心实现篇
适读人群:看完上篇架构设计篇的读者,准备动手实现的Java工程师 阅读时长:约22分钟
架构定了,代码才是硬仗
上篇讲了架构设计,评论区有人说:"看完感觉思路很清晰,但真正写代码的时候还是不知道从哪下手。"
这话说到我心里了。架构图是蓝图,而建筑本身是砖头一块一块垒起来的。
这篇直接进代码。从文档处理引擎开始,到混合检索实现,再到Prompt工程和质量评估,把能落地的关键代码全部写出来。
第一部分:文档处理引擎实现
整个系统最耗时间的部分,不是AI调用,而是文档处理。格式各异的文档要统一处理成可以向量化的文本块。
SmartDocumentProcessor——核心处理类:
@Service
@Slf4j
public class SmartDocumentProcessor {
private final DocumentChunkRepository chunkRepository;
private final EmbeddingModel embeddingModel;
private final ApplicationEventPublisher eventPublisher;
/**
* 处理文档入口:解析 → 分块 → 向量化 → 入库
*/
@Async("documentProcessorExecutor")
public CompletableFuture<ProcessResult> processDocument(Document doc) {
log.info("开始处理文档: id={}, title={}, type={}", doc.getId(), doc.getTitle(), doc.getFileType());
StopWatch sw = new StopWatch();
sw.start();
try {
// 1. 解析文档
List<TextChunk> rawChunks = parseDocument(doc);
log.info("文档解析完成: id={}, 原始块数={}", doc.getId(), rawChunks.size());
// 2. 智能分块
List<TextChunk> chunks = smartSplit(rawChunks, doc.getFileType());
log.info("分块完成: id={}, 分块数={}", doc.getId(), chunks.size());
// 3. 批量向量化(每批20个,避免API超时)
List<DocumentChunk> vectorChunks = batchEmbed(doc.getId(), chunks);
// 4. 批量入库
chunkRepository.saveAll(vectorChunks);
sw.stop();
log.info("文档处理完成: id={}, 耗时={}ms, 入库块数={}",
doc.getId(), sw.getTotalTimeMillis(), vectorChunks.size());
// 5. 发布事件,触发ES全文索引
eventPublisher.publishEvent(new DocumentIndexedEvent(doc.getId(), chunks));
return CompletableFuture.completedFuture(
ProcessResult.success(doc.getId(), vectorChunks.size()));
} catch (Exception e) {
log.error("文档处理失败: id={}, error={}", doc.getId(), e.getMessage(), e);
return CompletableFuture.completedFuture(
ProcessResult.failure(doc.getId(), e.getMessage()));
}
}
/**
* 根据文件类型分发到不同解析器
*/
private List<TextChunk> parseDocument(Document doc) throws IOException {
byte[] fileBytes = fileStorageService.download(doc.getFilePath());
return switch (doc.getFileType().toLowerCase()) {
case "pdf" -> new PdfDocumentParser().parse(fileBytes);
case "docx" -> new WordDocumentParser().parse(fileBytes);
case "xlsx" -> new ExcelDocumentParser().parse(fileBytes);
case "md" -> new MarkdownDocumentParser().parse(fileBytes);
case "txt" -> new PlainTextParser().parse(fileBytes);
default -> throw new UnsupportedOperationException(
"不支持的文件类型: " + doc.getFileType());
};
}
/**
* 智能分块:根据内容类型选择分块策略
*/
private List<TextChunk> smartSplit(List<TextChunk> rawChunks, String fileType) {
List<TextChunk> result = new ArrayList<>();
for (TextChunk chunk : rawChunks) {
if (chunk.getType() == ChunkType.TABLE) {
// 表格不拆分,整表作为一块
result.add(chunk);
} else if (chunk.getType() == ChunkType.CODE) {
// 代码块不拆分
result.add(chunk);
} else if (chunk.getContent().length() > 800) {
// 长文本按句子分割,保持200字符重叠
result.addAll(splitBySentence(chunk, 500, 100));
} else {
result.add(chunk);
}
}
return result;
}
/**
* 批量Embedding,带重试和限流
*/
private List<DocumentChunk> batchEmbed(String docId, List<TextChunk> chunks) {
List<DocumentChunk> result = new ArrayList<>();
int batchSize = 20;
for (int i = 0; i < chunks.size(); i += batchSize) {
List<TextChunk> batch = chunks.subList(i, Math.min(i + batchSize, chunks.size()));
// 批量获取embedding
List<String> texts = batch.stream()
.map(TextChunk::getContent)
.collect(Collectors.toList());
EmbeddingResponse embeddingResponse = embeddingModel.embedForResponse(texts);
for (int j = 0; j < batch.size(); j++) {
TextChunk chunk = batch.get(j);
float[] embedding = embeddingResponse.getResults().get(j).getOutput();
DocumentChunk dc = DocumentChunk.builder()
.docId(docId)
.chunkIndex(i + j)
.content(chunk.getContent())
.embedding(embedding)
.chunkType(chunk.getType().name())
.pageNum(chunk.getPageNum())
.metadata(chunk.getMetadata())
.build();
result.add(dc);
}
// 简单限流:每批之间等100ms,避免API限速
if (i + batchSize < chunks.size()) {
try { Thread.sleep(100); } catch (InterruptedException ignored) {}
}
}
return result;
}
}第二部分:混合检索实现
混合检索是系统质量的核心,这里贴完整实现:
@Service
@Slf4j
public class HybridRetrievalService {
private final PgVectorStore vectorStore;
private final ElasticsearchClient esClient;
private final RedisTemplate<String, String> redisTemplate;
private static final int RRF_K = 60;
private static final double VECTOR_WEIGHT = 1.0;
private static final double KEYWORD_WEIGHT = 0.8;
public RetrievalResult retrieve(RetrievalRequest request) {
String cacheKey = "retrieval:" + DigestUtils.md5Hex(
request.getUserId() + ":" + request.getQuery());
// 检查缓存(热点问题5分钟缓存)
String cached = redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
log.debug("命中缓存: query={}", request.getQuery());
return JsonUtils.parse(cached, RetrievalResult.class);
}
// 并行执行两路检索
CompletableFuture<List<ScoredDoc>> vectorFuture =
CompletableFuture.supplyAsync(() -> vectorSearch(request));
CompletableFuture<List<ScoredDoc>> keywordFuture =
CompletableFuture.supplyAsync(() -> keywordSearch(request));
List<ScoredDoc> vectorResults = vectorFuture.join();
List<ScoredDoc> keywordResults = keywordFuture.join();
log.debug("检索结果: vector={}, keyword={}", vectorResults.size(), keywordResults.size());
// RRF融合
List<ScoredDoc> merged = rrfFusion(vectorResults, keywordResults);
// 权限过滤(二次过滤,确保安全)
List<ScoredDoc> filtered = permissionFilter(merged, request.getUserContext());
RetrievalResult result = RetrievalResult.builder()
.docs(filtered.stream().limit(5).collect(Collectors.toList()))
.totalCandidates(merged.size())
.hasVectorResults(!vectorResults.isEmpty())
.hasKeywordResults(!keywordResults.isEmpty())
.build();
// 写入缓存(5分钟)
redisTemplate.opsForValue().set(cacheKey, JsonUtils.toJson(result),
Duration.ofMinutes(5));
return result;
}
private List<ScoredDoc> vectorSearch(RetrievalRequest request) {
String permFilter = buildPermissionFilter(request.getUserContext());
SearchRequest searchReq = SearchRequest.query(request.getQuery())
.withTopK(10)
.withSimilarityThreshold(0.5)
.withFilterExpression(permFilter);
return vectorStore.similaritySearch(searchReq).stream()
.map(doc -> ScoredDoc.fromDocument(doc, "vector"))
.collect(Collectors.toList());
}
private List<ScoredDoc> keywordSearch(RetrievalRequest request) {
try {
Query boolQuery = Query.of(q -> q
.bool(b -> b
.must(m -> m.match(mt -> mt
.field("content")
.query(request.getQuery())
.analyzer("ik_max_word")))
.filter(f -> f.term(t -> t
.field("status").value("PUBLISHED")))
));
SearchResponse<Map> response = esClient.search(s -> s
.index("document_chunks")
.query(boolQuery)
.size(10), Map.class);
return response.hits().hits().stream()
.map(hit -> ScoredDoc.fromEsHit(hit))
.collect(Collectors.toList());
} catch (IOException e) {
log.warn("ES检索失败,降级到纯向量检索: {}", e.getMessage());
return Collections.emptyList();
}
}
/**
* RRF算法融合两路检索结果
*/
private List<ScoredDoc> rrfFusion(List<ScoredDoc> vectorDocs, List<ScoredDoc> keywordDocs) {
Map<String, Double> scoreMap = new HashMap<>();
Map<String, ScoredDoc> docMap = new HashMap<>();
// 向量检索结果打分
for (int i = 0; i < vectorDocs.size(); i++) {
ScoredDoc doc = vectorDocs.get(i);
scoreMap.merge(doc.getId(), VECTOR_WEIGHT / (RRF_K + i + 1), Double::sum);
docMap.put(doc.getId(), doc);
}
// 关键词检索结果打分
for (int i = 0; i < keywordDocs.size(); i++) {
ScoredDoc doc = keywordDocs.get(i);
scoreMap.merge(doc.getId(), KEYWORD_WEIGHT / (RRF_K + i + 1), Double::sum);
docMap.putIfAbsent(doc.getId(), doc);
}
return scoreMap.entrySet().stream()
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.map(e -> {
ScoredDoc doc = docMap.get(e.getKey());
doc.setFinalScore(e.getValue());
return doc;
})
.collect(Collectors.toList());
}
private String buildPermissionFilter(UserContext ctx) {
return String.format(
"status == 'PUBLISHED' AND (visibility == 'PUBLIC' OR department == '%s')",
ctx.getDepartment());
}
}第三部分:Prompt工程
Prompt是答案质量的关键。经过多轮测试,最终用的System Prompt:
@Component
public class KnowledgeBasePrompts {
/**
* 主问答System Prompt
*/
public static final String QA_SYSTEM_PROMPT = """
你是公司内部知识库助手,负责帮助员工快速找到工作所需的信息。
## 回答规则
1. **只基于提供的文档内容回答**,不使用你的预训练知识
2. 如果文档内容不足以回答问题,明确说"根据现有文档,暂无相关信息",并建议联系相关部门
3. **引用来源**:每个关键信息点后面标注来源,格式:[来源:{文档标题}·第{页}页]
4. 回答要简洁有条理,重点优先
5. 如果问题涉及审批流程,完整列出每个步骤
## 禁止行为
- 不能编造文档中没有的信息
- 不能给出具体的人员联系方式(除非文档明确写了)
- 不能对敏感政策(薪资、晋升)做主观解读
## 输出格式
- 直接回答,不需要重复问题
- 多个要点用序号列出
- 复杂流程用流程图文字描述
""";
/**
* 问题改写Prompt(提升检索质量)
*/
public static final String QUERY_REWRITE_PROMPT = """
将用户的自然语言问题改写成更适合文档检索的查询词。
要求:
1. 提取核心关键词
2. 扩展相关同义词(如"请假"可扩展为"假期 休假 年假 病假")
3. 输出JSON格式:{"rewritten": "改写后的查询", "keywords": ["关键词1", "关键词2"]}
用户问题:{question}
""";
/**
* 答案质量自评Prompt
*/
public static final String SELF_EVAL_PROMPT = """
评估以下回答的质量。
原始问题:{question}
检索到的文档:{context}
生成的回答:{answer}
请从以下维度打分(1-5分):
- 准确性:回答是否基于文档内容,没有编造
- 完整性:是否回答了问题的所有方面
- 清晰度:表达是否清晰易懂
输出JSON格式:
{"accuracy": 分数, "completeness": 分数, "clarity": 分数, "issues": "发现的问题(如果有)"}
""";
}带问题改写的完整问答流程:
@Service
@Slf4j
public class KnowledgeBaseQaService {
private final ChatClient chatClient;
private final HybridRetrievalService retrievalService;
public QaResponse answer(QaRequest request) {
// Step1: 问题改写,提升检索质量
String rewrittenQuery = rewriteQuery(request.getQuestion());
log.debug("问题改写: 原始={}, 改写={}", request.getQuestion(), rewrittenQuery);
// Step2: 混合检索
RetrievalResult retrieval = retrievalService.retrieve(
RetrievalRequest.builder()
.query(rewrittenQuery)
.userId(request.getUserId())
.userContext(request.getUserContext())
.build());
if (retrieval.getDocs().isEmpty()) {
return QaResponse.noResult(request.getQuestion());
}
// Step3: 组装上下文
String context = buildContext(retrieval.getDocs());
// Step4: 生成回答
String answer = chatClient.prompt()
.system(KnowledgeBasePrompts.QA_SYSTEM_PROMPT)
.user(String.format("""
参考文档:
---
%s
---
员工问题:%s
""", context, request.getQuestion()))
.call()
.content();
// Step5: 记录查询日志(用于质量评估)
queryLogService.save(QueryLog.builder()
.sessionId(request.getSessionId())
.userId(request.getUserId())
.query(request.getQuestion())
.answer(answer)
.sourceDocIds(retrieval.getDocs().stream()
.map(d -> d.getMetadata().get("docId").toString())
.collect(Collectors.toList()))
.build());
return QaResponse.builder()
.answer(answer)
.sourceDocs(retrieval.getDocs())
.build();
}
private String rewriteQuery(String question) {
try {
String result = chatClient.prompt()
.user(KnowledgeBasePrompts.QUERY_REWRITE_PROMPT
.replace("{question}", question))
.call()
.content();
JsonNode json = objectMapper.readTree(result);
return json.get("rewritten").asText();
} catch (Exception e) {
log.warn("问题改写失败,使用原始问题: {}", e.getMessage());
return question;
}
}
private String buildContext(List<ScoredDoc> docs) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < docs.size(); i++) {
ScoredDoc doc = docs.get(i);
sb.append(String.format("[文档%d] 来源:%s(第%d页)\n%s\n\n",
i + 1,
doc.getMetadata().getOrDefault("title", "未知文档"),
doc.getMetadata().getOrDefault("pageNum", 0),
doc.getContent()));
}
return sb.toString();
}
}第四部分:质量评估与持续优化
系统上线后最重要的工作是量化质量、持续改进:
质量评估服务:
@Service
@Slf4j
public class QualityEvaluationService {
private final QueryLogRepository logRepository;
private final ChatClient chatClient;
/**
* 每日批量评估答案质量(用LLM自评,成本低)
*/
@Scheduled(cron = "0 2 * * * ?") // 每天凌晨2点
public void dailyEvaluation() {
// 取前一天没有评估过的、随机抽样100条
List<QueryLog> logs = logRepository.findUnevaluatedLogs(
LocalDate.now().minusDays(1), 100);
log.info("开始日常质量评估,样本数={}", logs.size());
for (QueryLog log : logs) {
try {
EvalResult eval = evaluateAnswer(log);
logRepository.saveEvalResult(log.getId(), eval);
} catch (Exception e) {
log.warn("评估失败: logId={}", log.getId());
}
}
// 生成日报
generateDailyReport();
}
private EvalResult evaluateAnswer(QueryLog queryLog) {
// 获取当时检索的文档内容
String context = documentService.getChunkContent(queryLog.getSourceDocIds());
String evalResult = chatClient.prompt()
.user(KnowledgeBasePrompts.SELF_EVAL_PROMPT
.replace("{question}", queryLog.getQuery())
.replace("{context}", context)
.replace("{answer}", queryLog.getAnswer()))
.call()
.content();
return JsonUtils.parse(evalResult, EvalResult.class);
}
/**
* 统计关键指标
*/
public DashboardMetrics getDashboardMetrics(LocalDate date) {
return DashboardMetrics.builder()
.totalQueries(logRepository.countByDate(date))
.avgLatencyMs(logRepository.avgLatency(date))
.userSatisfactionRate(logRepository.satisfactionRate(date)) // 点赞/(点赞+点踩)
.noResultRate(logRepository.noResultRate(date)) // 返回"暂无信息"的比例
.avgAccuracyScore(logRepository.avgAccuracyScore(date)) // LLM自评分
.topQueries(logRepository.topQueries(date, 10)) // 高频问题
.topFailedQueries(logRepository.topFailedQueries(date, 10)) // 高频失败问题
.build();
}
}上线效果数据
三个月运行下来的真实数据:
| 指标 | 目标值 | 实际值 | 备注 |
|---|---|---|---|
| 用户满意率(点赞率) | >75% | 81.3% | 超预期 |
| 无结果率 | <15% | 9.8% | 混合检索效果明显 |
| P99响应时间 | <2s | 1.4s | 含缓存 |
| 日均查询量 | - | 324次 | 稳步增长 |
| LLM自评准确率 | >80% | 84.6% | - |
| 节省人工时间/月 | 200h | 估算280h | 基于问卷统计 |
最让我意外的是:上线后第一个月,提"找不到"相关内容的投诉为零。这是之前老系统最常见的抱怨。
踩坑备忘
| 坑 | 描述 | 教训 |
|---|---|---|
| 分块太碎 | 512字符分块导致上下文不完整,AI回答缺乏连贯性 | 技术文档改为按章节分块,可以到2000字符 |
| 忘了文档过期 | 有人根据旧政策操作,引起误会 | 上线就要做文档过期检测,不能后补 |
| Embedding API限速 | 大批量入库时频繁触发API限速 | 提前申请限速提升,实现指数退避重试 |
| ES中文分词 | 未配置IK分词器,中文检索效果差 | ES必须安装IK分词插件,这是基础配置 |
| 权限过滤遗漏 | 早期版本UI层过滤,API直接调可以拿到无权限文档 | 权限过滤必须在检索层做,UI层只是辅助 |
小结
两篇写完了整个企业级AI知识库的完整实现。核心收获是:
- 文档质量决定系统上限:再好的RAG,喂垃圾进去出来还是垃圾
- 混合检索不能省:纯向量检索对精确匹配太弱,混合检索是生产必选
- 质量评估要从第一天做:没有数据就没有方向,持续评估才能持续改进
- 权限是最容易被忽略的安全问题:从架构层就要想清楚
完整代码在知识星球,感兴趣的可以去拉一下,直接能跑。
