Spring AI 集成私有化文档存储——MinIO + 向量数据库的完整方案
Spring AI 集成私有化文档存储——MinIO + 向量数据库的完整方案
去年年初,我们给一家制造业客户搭建内部知识库系统。客户的要求很明确:文档必须存在自己的服务器上,不能上云,包括向量化之后的数据。
第一版方案,我图省事,直接把文档存在 Linux 本地文件系统里,/data/docs/ 目录下按部门分文件夹。跑了两个月没出问题,然后有一天运维同事告诉我:服务做了水平扩展,部署了三个实例,但只有第一台机器上有文档。
我说:哦。
然后默默地花了一周把文档存储层全部重写成 MinIO。
这篇文章就是那次重写的完整记录,从为什么不能用本地文件系统,到 MinIO 的文档管理、向量化、查询全流程,包括一个让我当时想了很久才想通的文档版本管理设计。
为什么不能直接用本地文件系统
说清楚几个具体问题:
问题一:多实例部署时数据不一致
这就是我踩的那个坑。本地文件系统是机器级别的,水平扩展之后,每台机器各自为政。用户在实例 A 上传的文档,实例 B 根本不知道。
问题二:文档版本管理困难
如果用户更新了一份文档,你要怎么处理旧版本对应的向量数据?本地文件系统里,你拿到的只是一个文件名和文件内容,历史版本需要自己维护。
问题三:文档元数据管理
向量数据库里存的是向量片段(chunk),每个 chunk 关联到原始文档的哪个段落?文档属于哪个部门、什么权限级别?这些元数据放在哪里?本地文件系统里没有这些能力,你只能再搭一张数据库表,然后路径管理和数据库管理之间的同步又是一个坑。
MinIO 解决了第一个问题(分布式对象存储),同时它的元数据能力和版本控制能力可以帮助解决第二、三个问题。
整体架构设计
先看全局:
核心组件:
- MinIO:存原始文档文件(PDF、Word、txt 等)
- MySQL:存文档元数据(文件名、版本、上传人、状态、分块信息等)
- pgvector(或其他向量库):存向量数据,每条记录关联 MinIO 中的文件路径
- Spring AI:负责解析、分块、向量化、查询的流程编排
MinIO 的配置和基础操作
先把 MinIO 客户端搭起来:
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.5.7</version>
</dependency>minio:
endpoint: http://192.168.1.100:9000
access-key: minioadmin
secret-key: minioadmin123
bucket-name: knowledge-docs
# 文档向量化完成后的备份桶
processed-bucket-name: knowledge-docs-processed@Configuration
public class MinioConfig {
@Value("${minio.endpoint}")
private String endpoint;
@Value("${minio.access-key}")
private String accessKey;
@Value("${minio.secret-key}")
private String secretKey;
@Bean
public MinioClient minioClient() {
return MinioClient.builder()
.endpoint(endpoint)
.credentials(accessKey, secretKey)
.build();
}
}MinIO 基础操作封装:
@Service
public class MinioStorageService {
private static final Logger log = LoggerFactory.getLogger(MinioStorageService.class);
@Value("${minio.bucket-name}")
private String bucketName;
private final MinioClient minioClient;
public MinioStorageService(MinioClient minioClient) {
this.minioClient = minioClient;
}
/**
* 上传文件,返回 objectName(MinIO 中的路径)
*/
public String upload(String fileName, InputStream inputStream,
long fileSize, String contentType) {
// 按年月组织目录,避免单目录文件过多
String datePrefix = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy/MM"));
String objectName = datePrefix + "/" + UUID.randomUUID() + "_" + fileName;
try {
// 确保 bucket 存在
ensureBucketExists(bucketName);
minioClient.putObject(
PutObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.stream(inputStream, fileSize, -1)
.contentType(contentType)
.build()
);
log.info("文件上传成功: {}", objectName);
return objectName;
} catch (Exception e) {
throw new StorageException("文件上传失败: " + fileName, e);
}
}
/**
* 下载文件,返回 InputStream
*/
public InputStream download(String objectName) {
try {
return minioClient.getObject(
GetObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.build()
);
} catch (Exception e) {
throw new StorageException("文件下载失败: " + objectName, e);
}
}
/**
* 删除文件
*/
public void delete(String objectName) {
try {
minioClient.removeObject(
RemoveObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.build()
);
log.info("文件删除成功: {}", objectName);
} catch (Exception e) {
throw new StorageException("文件删除失败: " + objectName, e);
}
}
/**
* 获取文件预签名 URL(用于前端直接下载,有效期 1 小时)
*/
public String getPresignedUrl(String objectName) {
try {
return minioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
.bucket(bucketName)
.object(objectName)
.expiry(1, TimeUnit.HOURS)
.build()
);
} catch (Exception e) {
throw new StorageException("获取预签名 URL 失败", e);
}
}
private void ensureBucketExists(String bucket) throws Exception {
boolean exists = minioClient.bucketExists(
BucketExistsArgs.builder().bucket(bucket).build()
);
if (!exists) {
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucket).build());
log.info("Bucket 创建成功: {}", bucket);
}
}
}文档元数据的数据库设计
光有 MinIO 还不够,需要一张表来记录文档的完整元信息:
CREATE TABLE doc_knowledge (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
doc_id VARCHAR(64) NOT NULL UNIQUE COMMENT '文档唯一ID',
file_name VARCHAR(255) NOT NULL COMMENT '原始文件名',
object_name VARCHAR(512) NOT NULL COMMENT 'MinIO 中的 objectName',
file_size BIGINT COMMENT '文件大小(字节)',
file_type VARCHAR(50) COMMENT '文件类型 pdf/docx/txt',
department VARCHAR(100) COMMENT '所属部门',
uploader VARCHAR(100) COMMENT '上传人',
version INT NOT NULL DEFAULT 1 COMMENT '版本号',
status VARCHAR(20) NOT NULL DEFAULT 'PENDING'
COMMENT 'PENDING/VECTORIZING/READY/FAILED/DELETED',
chunk_count INT DEFAULT 0 COMMENT '分块数量',
error_msg TEXT COMMENT '向量化失败原因',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_department (department),
INDEX idx_status (status)
);
-- 文档版本历史表
CREATE TABLE doc_version_history (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
doc_id VARCHAR(64) NOT NULL COMMENT '文档ID',
version INT NOT NULL COMMENT '版本号',
object_name VARCHAR(512) NOT NULL COMMENT '该版本的 MinIO objectName',
replaced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
replaced_by VARCHAR(100) COMMENT '替换人',
INDEX idx_doc_id (doc_id)
);完整的 DocumentService 实现
这是核心,包含上传、向量化、删除三个主要操作:
@Service
@Transactional
public class DocumentService {
private static final Logger log = LoggerFactory.getLogger(DocumentService.class);
private final MinioStorageService storageService;
private final DocKnowledgeMapper docMapper;
private final VectorStore vectorStore;
private final DocumentParser documentParser; // Spring AI 的 DocumentParser
private final TextSplitter textSplitter;
public DocumentService(MinioStorageService storageService,
DocKnowledgeMapper docMapper,
VectorStore vectorStore) {
this.storageService = storageService;
this.docMapper = docMapper;
this.vectorStore = vectorStore;
// 使用 TikaDocumentReader 支持 PDF、Word、PPT 等格式
this.textSplitter = new TokenTextSplitter(800, 100, 5, 10000, true);
}
/**
* 上传文档并触发向量化
*/
public DocKnowledge uploadDocument(MultipartFile file, String department, String uploader) {
String fileName = file.getOriginalFilename();
String fileType = getFileType(fileName);
// 1. 上传到 MinIO
String objectName;
try {
objectName = storageService.upload(
fileName,
file.getInputStream(),
file.getSize(),
file.getContentType()
);
} catch (IOException e) {
throw new DocumentUploadException("文件读取失败", e);
}
// 2. 写入元数据
String docId = UUID.randomUUID().toString().replace("-", "");
DocKnowledge doc = new DocKnowledge();
doc.setDocId(docId);
doc.setFileName(fileName);
doc.setObjectName(objectName);
doc.setFileSize(file.getSize());
doc.setFileType(fileType);
doc.setDepartment(department);
doc.setUploader(uploader);
doc.setStatus("PENDING");
docMapper.insert(doc);
// 3. 异步触发向量化(不阻塞上传接口)
triggerVectorize(docId);
return doc;
}
/**
* 更新文档(版本升级)
*/
public DocKnowledge updateDocument(String docId, MultipartFile newFile, String operator) {
DocKnowledge existing = docMapper.findByDocId(docId);
if (existing == null) {
throw new DocumentNotFoundException("文档不存在: " + docId);
}
// 1. 把旧版本记录到历史表
DocVersionHistory history = new DocVersionHistory();
history.setDocId(docId);
history.setVersion(existing.getVersion());
history.setObjectName(existing.getObjectName());
history.setReplacedBy(operator);
docMapper.insertVersionHistory(history);
// 2. 上传新文件到 MinIO
String newObjectName;
try {
newObjectName = storageService.upload(
newFile.getOriginalFilename(),
newFile.getInputStream(),
newFile.getSize(),
newFile.getContentType()
);
} catch (IOException e) {
throw new DocumentUploadException("新文件读取失败", e);
}
// 3. 删除旧的向量数据
deleteVectors(docId, existing.getVersion());
// 4. 更新元数据
existing.setObjectName(newObjectName);
existing.setFileName(newFile.getOriginalFilename());
existing.setFileSize(newFile.getSize());
existing.setVersion(existing.getVersion() + 1);
existing.setStatus("PENDING");
existing.setChunkCount(0);
existing.setErrorMsg(null);
docMapper.update(existing);
// 5. 触发重新向量化
triggerVectorize(docId);
return existing;
}
/**
* 删除文档(逻辑删除 + 清理向量数据)
*/
public void deleteDocument(String docId) {
DocKnowledge doc = docMapper.findByDocId(docId);
if (doc == null) return;
// 1. 删除向量数据
deleteVectors(docId, null);
// 2. 从 MinIO 删除文件(也可以选择保留,看业务需求)
storageService.delete(doc.getObjectName());
// 3. 逻辑删除元数据记录
docMapper.updateStatus(docId, "DELETED");
log.info("文档删除完成: docId={}", docId);
}
/**
* 向量化处理(核心逻辑)
*/
@Async("vectorizeExecutor")
public void vectorizeDocument(String docId) {
DocKnowledge doc = docMapper.findByDocId(docId);
if (doc == null || !"PENDING".equals(doc.getStatus())) {
return;
}
docMapper.updateStatus(docId, "VECTORIZING");
try {
// 1. 从 MinIO 下载文件
InputStream inputStream = storageService.download(doc.getObjectName());
// 2. 用 Tika 解析文档内容
TikaDocumentReader reader = new TikaDocumentReader(
new InputStreamResource(inputStream)
);
List<Document> rawDocuments = reader.get();
// 3. 文本分块
List<Document> chunks = textSplitter.apply(rawDocuments);
// 4. 给每个 chunk 打上元数据标签(方便后续过滤)
String version = String.valueOf(doc.getVersion());
chunks.forEach(chunk -> {
chunk.getMetadata().put("docId", docId);
chunk.getMetadata().put("fileName", doc.getFileName());
chunk.getMetadata().put("department", doc.getDepartment());
chunk.getMetadata().put("version", version);
chunk.getMetadata().put("objectName", doc.getObjectName());
});
// 5. 写入向量数据库(Spring AI 会自动调用 Embedding 模型)
vectorStore.add(chunks);
// 6. 更新文档状态
docMapper.updateStatusAndChunkCount(docId, "READY", chunks.size());
log.info("文档向量化完成: docId={}, chunks={}", docId, chunks.size());
} catch (Exception e) {
log.error("文档向量化失败: docId={}", docId, e);
docMapper.updateStatusWithError(docId, "FAILED", e.getMessage());
}
}
private void deleteVectors(String docId, Integer version) {
// 通过 metadata 过滤删除
// 注意:不同向量数据库的删除 API 不同,这里是 pgvector 的方式
FilterExpressionBuilder fb = new FilterExpressionBuilder();
Filter.Expression filter;
if (version != null) {
filter = fb.and(
fb.eq("docId", docId),
fb.eq("version", String.valueOf(version))
).build();
} else {
filter = fb.eq("docId", docId).build();
}
vectorStore.delete(filter);
}
private void triggerVectorize(String docId) {
// 发布事件,由监听器异步处理
applicationEventPublisher.publishEvent(new VectorizeEvent(docId));
}
private String getFileType(String fileName) {
if (fileName == null) return "unknown";
int dotIndex = fileName.lastIndexOf('.');
if (dotIndex < 0) return "unknown";
return fileName.substring(dotIndex + 1).toLowerCase();
}
}查询时的部门权限过滤
这是个容易被忽视的点。如果系统有多个部门,查询时要加权限过滤,避免跨部门数据泄露:
@Service
public class KnowledgeQueryService {
private final VectorStore vectorStore;
private final ChatClient chatClient;
public String query(String question, String department, String userId) {
// 构建过滤条件:只搜本部门 + READY 状态的文档
FilterExpressionBuilder fb = new FilterExpressionBuilder();
Filter.Expression filter = fb.and(
fb.eq("department", department),
fb.eq("status", "READY") // 这个 status 是 chunk 级别的,在向量化时写入
).build();
SearchRequest searchRequest = SearchRequest.builder()
.query(question)
.topK(5)
.filterExpression(filter)
.build();
List<Document> relevantDocs = vectorStore.similaritySearch(searchRequest);
if (relevantDocs.isEmpty()) {
return "抱歉,在 " + department + " 部门的知识库中没有找到相关信息。";
}
// 拼装上下文
String context = relevantDocs.stream()
.map(doc -> {
String fileName = (String) doc.getMetadata().get("fileName");
return "【来源:" + fileName + "】\n" + doc.getContent();
})
.collect(Collectors.joining("\n\n---\n\n"));
return chatClient.prompt()
.system("""
你是企业内部知识助手,请基于以下参考资料回答问题。
如果资料中没有相关信息,请明确说明。
不要编造没有依据的内容。
""")
.user("参考资料:\n" + context + "\n\n问题:" + question)
.call()
.content();
}
}文档版本管理的设计思路
这里我想多说几句,因为这个设计当时我想了好几种方案。
方案一:向量数据按版本叠加
每次更新文档,不删旧向量,新旧共存,查询时只拿最新版本的 chunk。
优点:可以查询历史版本。
缺点:向量数据库越来越大,同一文档的旧版本 chunk 占用空间。而且如果文档更新频繁,同一段相似内容会有大量重复 chunk,污染检索结果。
方案二:更新时删旧建新(我采用的)
每次更新,先删除旧版本的所有 chunk,再向量化新版本写入。旧文件在 MinIO 里保留(写入历史版本表),但向量数据只保留最新版本。
优点:向量数据库干净,检索不受旧版本干扰。
缺点:不能基于向量检索历史版本,如果需要历史版本内容,要从 MinIO 重新下载原文。
方案三:双写
向量数据库里同时保留所有版本,但 metadata 里标记版本号,查询时过滤最新版本。
优点:兼顾当前和历史。
缺点:实现复杂,「最新版本」的 metadata 需要在每次更新时批量修改(把旧版本 chunk 的「isLatest」字段改为 false),向量数据库的批量更新操作往往比较重。
我选了方案二,主要原因是这个业务场景里历史版本查询需求极少,偶尔需要查历史就从 MinIO 拉原文,不值得为此把向量数据库搞复杂。
向量化任务的异步处理
文档向量化是耗时操作,特别是大文件,可能需要几十秒甚至几分钟。必须异步处理,不能卡在上传接口里。
@Configuration
public class AsyncConfig {
@Bean("vectorizeExecutor")
public Executor vectorizeExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2); // 向量化是 I/O 密集型(等 Embedding API),线程不用太多
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100); // 队列容量要够,避免任务丢失
executor.setThreadNamePrefix("vectorize-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
@Component
@Async("vectorizeExecutor")
public class VectorizeEventListener {
private final DocumentService documentService;
@EventListener
public void onVectorizeEvent(VectorizeEvent event) {
documentService.vectorizeDocument(event.getDocId());
}
}状态查询接口,让前端可以轮询向量化进度:
@GetMapping("/docs/{docId}/status")
public ResponseEntity<DocStatusVO> getDocStatus(@PathVariable String docId) {
DocKnowledge doc = docService.findByDocId(docId);
if (doc == null) {
return ResponseEntity.notFound().build();
}
DocStatusVO vo = new DocStatusVO();
vo.setDocId(docId);
vo.setStatus(doc.getStatus());
vo.setChunkCount(doc.getChunkCount());
vo.setErrorMsg(doc.getErrorMsg());
return ResponseEntity.ok(vo);
}一些生产中发现的细节问题
PDF 里有扫描图片
用 Tika 解析 PDF,遇到扫描件(图片 PDF)会拿不到文字,rawDocuments 里内容是空的,然后 chunk count = 0,状态 READY,但查询时完全搜不到东西。
解法:解析完之后检查内容长度,如果太短(比如小于 100 字符),标记为 FAILED 并提示「检测到扫描件,请上传文字版 PDF 或 OCR 后的版本」。
大文件内存溢出
上传一个 300MB 的 Word 文件,Tika 解析完了,但向量化时分出来几千个 chunk,一次性调 Embedding API,直接把线程池打满加内存撑爆。
解法:chunk 分批处理,每批 50 个 chunk 调一次 Embedding API,批次之间加 100ms 的间隔(限流)。
// 分批向量化
int batchSize = 50;
for (int i = 0; i < chunks.size(); i += batchSize) {
List<Document> batch = chunks.subList(i, Math.min(i + batchSize, chunks.size()));
vectorStore.add(batch);
if (i + batchSize < chunks.size()) {
Thread.sleep(100); // 避免打满 Embedding API 限流
}
}这些都是教科书里不会写、只有上了生产才能踩到的坑。
