第1869篇:Spring AI与Spring Batch结合——批量文档处理的工程化方案
第1869篇:Spring AI与Spring Batch结合——批量文档处理的工程化方案
去年做了个企业知识库项目,客户给了一个需求:把他们内部积累的三万多份 Word 文档全部向量化,存到知识库里,然后让员工可以通过对话查询。
三万多份文档,每份平均 20KB,加起来大概 600MB 的文本量。最早的方案是用一个循环跑,结果跑了一个多小时还没跑完,而且中途崩了一次,所有进度全丢了,从头再来。
后来换成 Spring Batch,花了三天时间把批处理框架搭好,最终三万份文档用了约 40 分钟处理完,支持断点续传,支持失败重试,支持并发处理。
这件事让我意识到:批量 AI 处理场景,是典型的体力活——不是技术上有多难,而是工程化需要认真对待。随便一个循环能跑不代表它能打,能打的批处理需要 Spring Batch 这样的框架来托底。
今天把这套方案完整讲出来。
一、为什么需要 Spring Batch
先说清楚,Spring Batch 解决的不是"怎么调 AI",而是批量处理的工程化问题:
Spring Batch 把这些问题都处理好了,我们只需要专注写业务逻辑。
二、依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-pgvector-store-spring-boot-starter</artifactId>
</dependency>
<!-- Batch 需要数据库存储 Job 状态 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>application.yml 中配置 Batch:
spring:
batch:
jdbc:
initialize-schema: always # 自动创建 Batch 元数据表
job:
enabled: false # 关闭启动时自动执行,改为按需触发三、整体架构设计
批量文档处理分三个 Step:
每个 Step 的职责清晰:
- Step1 扫描目录,把文件路径写入数据库(状态:PENDING)
- Step2 读取 PENDING 文件,解析内容,调 AI Embedding,存入向量库,更新状态为 DONE
- Step3 统计处理结果,发邮件通知
四、Job 配置
@Configuration
@EnableBatchProcessing
@Slf4j
public class DocumentBatchJobConfig {
@Bean
public Job documentIndexingJob(JobRepository jobRepository,
Step scanDocumentsStep,
Step processDocumentsStep,
Step summaryStep) {
return new JobBuilder("documentIndexingJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(scanDocumentsStep)
.next(processDocumentsStep)
.next(summaryStep)
.listener(new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("批处理任务开始 jobId={}",
jobExecution.getJobId());
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("批处理任务结束 status={} jobId={}",
jobExecution.getStatus(),
jobExecution.getJobId());
}
})
.build();
}
// Step1:文档扫描
@Bean
public Step scanDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
DocumentScanTasklet documentScanTasklet) {
return new StepBuilder("scanDocumentsStep", jobRepository)
.tasklet(documentScanTasklet, txManager)
.build();
}
// Step2:文档处理(核心步骤,Chunk 模式)
@Bean
public Step processDocumentsStep(
JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<DocumentTask> documentReader,
ItemProcessor<DocumentTask, List<Document>> documentProcessor,
ItemWriter<List<Document>> vectorStoreWriter) {
return new StepBuilder("processDocumentsStep", jobRepository)
.<DocumentTask, List<Document>>chunk(10, txManager)
.reader(documentReader)
.processor(documentProcessor)
.writer(vectorStoreWriter)
// 跳过策略:处理失败的单个文档跳过,记录日志,不影响整体任务
.faultTolerant()
.skip(DocumentProcessException.class)
.skipLimit(100) // 最多跳过 100 个文件
// 重试策略:AI 调用超时自动重试
.retry(AiCallTimeoutException.class)
.retryLimit(3)
.listener(new DocumentProcessStepListener())
// 多线程处理
.taskExecutor(documentProcessExecutor())
.throttleLimit(5) // 同时最多 5 个线程(控制 AI API 并发)
.build();
}
// Step3:汇总报告
@Bean
public Step summaryStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
SummaryTasklet summaryTasklet) {
return new StepBuilder("summaryStep", jobRepository)
.tasklet(summaryTasklet, txManager)
.build();
}
@Bean
public TaskExecutor documentProcessExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("doc-batch-");
executor.initialize();
return executor;
}
}五、Step1:文档扫描 Tasklet
@Component
@Slf4j
public class DocumentScanTasklet implements Tasklet {
private final DocumentTaskRepository documentTaskRepository;
@Value("${app.batch.scan-directory}")
private String scanDirectory;
@Value("${app.batch.supported-extensions:pdf,docx,txt,md}")
private List<String> supportedExtensions;
public DocumentScanTasklet(DocumentTaskRepository documentTaskRepository) {
this.documentTaskRepository = documentTaskRepository;
}
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
Path rootPath = Paths.get(scanDirectory);
log.info("开始扫描目录: {}", rootPath);
// 扫描所有支持的文件
List<Path> files = Files.walk(rootPath)
.filter(Files::isRegularFile)
.filter(path -> supportedExtensions.stream()
.anyMatch(ext -> path.toString().endsWith("." + ext)))
.collect(Collectors.toList());
log.info("扫描到 {} 个文件", files.size());
int newFiles = 0;
for (Path file : files) {
String fileHash = calculateFileHash(file);
// 避免重复处理已处理过的文件
boolean exists = documentTaskRepository
.existsByFilePathAndFileHash(file.toString(), fileHash);
if (!exists) {
DocumentTask task = DocumentTask.builder()
.filePath(file.toString())
.fileName(file.getFileName().toString())
.fileHash(fileHash)
.fileSize(Files.size(file))
.status(DocumentTask.Status.PENDING)
.createdAt(Instant.now())
.build();
documentTaskRepository.save(task);
newFiles++;
}
}
log.info("发现新文件 {} 个", newFiles);
contribution.incrementWriteCount(newFiles);
// 把统计信息写入 Job 上下文,后续 Step 可以读取
chunkContext.getStepContext().getStepExecution()
.getJobExecution().getExecutionContext()
.put("totalFiles", files.size());
chunkContext.getStepContext().getStepExecution()
.getJobExecution().getExecutionContext()
.put("newFiles", newFiles);
return RepeatStatus.FINISHED;
}
private String calculateFileHash(Path file) throws IOException {
byte[] bytes = Files.readAllBytes(file);
return DigestUtils.sha256Hex(bytes);
}
}六、Step2:文档处理的 Reader/Processor/Writer
ItemReader:读取待处理的文档任务
@Bean
@StepScope // 必须加 @StepScope,保证每次 Step 执行都是新实例
public JpaPagingItemReader<DocumentTask> documentReader(
EntityManagerFactory entityManagerFactory) {
return new JpaPagingItemReaderBuilder<DocumentTask>()
.name("documentReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT t FROM DocumentTask t WHERE t.status = 'PENDING' " +
"ORDER BY t.createdAt")
.pageSize(10) // 每次从数据库读取 10 条
.build();
}ItemProcessor:文档解析 + Embedding 生成
@Component
@Slf4j
public class DocumentEmbeddingProcessor
implements ItemProcessor<DocumentTask, List<Document>> {
private final EmbeddingModel embeddingModel;
private final DocumentParser documentParser;
private final DocumentTaskRepository taskRepository;
// 控制 AI 调用速率
private final RateLimiter rateLimiter = RateLimiter.create(10.0); // 每秒最多10个请求
public DocumentEmbeddingProcessor(EmbeddingModel embeddingModel,
DocumentParser documentParser,
DocumentTaskRepository taskRepository) {
this.embeddingModel = embeddingModel;
this.documentParser = documentParser;
this.taskRepository = taskRepository;
}
@Override
public List<Document> process(DocumentTask task) throws Exception {
log.debug("处理文档: {}", task.getFileName());
try {
// 1. 解析文档内容
String content = documentParser.parse(task.getFilePath());
if (content == null || content.isBlank()) {
log.warn("文档内容为空,跳过: {}", task.getFileName());
updateTaskStatus(task, DocumentTask.Status.SKIPPED, "内容为空");
return null; // 返回 null 表示跳过这条记录
}
// 2. 文本分块(大文档需要切分)
List<String> chunks = splitIntoChunks(content, 500, 50); // 500字一块,50字重叠
// 3. 限流控制
rateLimiter.acquire();
// 4. 生成 Document 对象(Spring AI 会在 Writer 里负责 embed)
List<Document> documents = new ArrayList<>();
for (int i = 0; i < chunks.size(); i++) {
Map<String, Object> metadata = new HashMap<>();
metadata.put("source", task.getFilePath());
metadata.put("fileName", task.getFileName());
metadata.put("chunkIndex", i);
metadata.put("totalChunks", chunks.size());
metadata.put("taskId", task.getId());
documents.add(new Document(chunks.get(i), metadata));
}
updateTaskStatus(task, DocumentTask.Status.PROCESSING, null);
log.info("文档 {} 分成 {} 个块", task.getFileName(), chunks.size());
return documents;
} catch (IOException e) {
throw new DocumentProcessException(
"文档解析失败: " + task.getFilePath(), e);
}
}
/**
* 文本分块:按字符数切分,保留重叠以维持上下文连续性
*/
private List<String> splitIntoChunks(String text, int chunkSize, int overlap) {
List<String> chunks = new ArrayList<>();
int start = 0;
while (start < text.length()) {
int end = Math.min(start + chunkSize, text.length());
chunks.add(text.substring(start, end));
start += chunkSize - overlap;
if (start >= text.length()) break;
}
return chunks;
}
private void updateTaskStatus(DocumentTask task, DocumentTask.Status status,
String errorMsg) {
task.setStatus(status);
task.setErrorMessage(errorMsg);
task.setUpdatedAt(Instant.now());
taskRepository.save(task);
}
}ItemWriter:写入向量存储
@Component
@Slf4j
public class VectorStoreDocumentWriter implements ItemWriter<List<Document>> {
private final VectorStore vectorStore;
private final DocumentTaskRepository taskRepository;
public VectorStoreDocumentWriter(VectorStore vectorStore,
DocumentTaskRepository taskRepository) {
this.vectorStore = vectorStore;
this.taskRepository = taskRepository;
}
@Override
public void write(Chunk<? extends List<Document>> chunk) throws Exception {
// 把多个文档列表展平成一个列表
List<Document> allDocuments = chunk.getItems().stream()
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
if (allDocuments.isEmpty()) return;
try {
// 批量写入向量存储
vectorStore.add(allDocuments);
// 更新任务状态为完成
Set<String> taskIds = allDocuments.stream()
.map(doc -> (String) doc.getMetadata().get("taskId"))
.filter(Objects::nonNull)
.collect(Collectors.toSet());
taskIds.forEach(taskId ->
taskRepository.findById(Long.parseLong(taskId)).ifPresent(task -> {
task.setStatus(DocumentTask.Status.DONE);
task.setCompletedAt(Instant.now());
taskRepository.save(task);
})
);
log.info("成功写入 {} 个向量块", allDocuments.size());
} catch (Exception e) {
log.error("向量存储写入失败: {}", e.getMessage());
throw e;
}
}
}七、Step3:汇总报告 Tasklet
@Component
@Slf4j
public class SummaryTasklet implements Tasklet {
private final DocumentTaskRepository taskRepository;
private final NotificationService notificationService;
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// 统计结果
long doneCount = taskRepository.countByStatus(DocumentTask.Status.DONE);
long failedCount = taskRepository.countByStatus(DocumentTask.Status.FAILED);
long skippedCount = taskRepository.countByStatus(DocumentTask.Status.SKIPPED);
// 从 Job 上下文获取总文件数
int totalFiles = (int) chunkContext.getStepContext()
.getStepExecution().getJobExecution()
.getExecutionContext().get("totalFiles");
String summary = String.format(
"批处理完成报告:\n" +
"总文件数: %d\n" +
"成功处理: %d\n" +
"处理失败: %d\n" +
"跳过(内容为空): %d\n",
totalFiles, doneCount, failedCount, skippedCount
);
log.info(summary);
// 发送通知
notificationService.sendBatchCompletionNotice(summary);
return RepeatStatus.FINISHED;
}
}八、触发批处理任务的接口
@RestController
@RequestMapping("/api/admin/batch")
@PreAuthorize("hasRole('ADMIN')")
@Slf4j
public class BatchJobController {
private final JobLauncher jobLauncher;
private final Job documentIndexingJob;
private final JobExplorer jobExplorer;
@PostMapping("/document-indexing/start")
public ResponseEntity<Map<String, Object>> startJob(
@RequestParam(required = false) String scanDirectory) throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("scanDirectory",
scanDirectory != null ? scanDirectory : "")
.addLong("startTime", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncher.run(documentIndexingJob, params);
return ResponseEntity.ok(Map.of(
"jobId", execution.getJobId(),
"status", execution.getStatus().toString(),
"startTime", execution.getStartTime()
));
}
@GetMapping("/document-indexing/{jobId}/status")
public ResponseEntity<Map<String, Object>> getJobStatus(
@PathVariable long jobId) {
JobExecution execution = jobExplorer.getJobExecution(jobId);
if (execution == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(Map.of(
"jobId", jobId,
"status", execution.getStatus().toString(),
"startTime", execution.getStartTime(),
"endTime", execution.getEndTime(),
"stepExecutions", execution.getStepExecutions().stream()
.map(step -> Map.of(
"stepName", step.getStepName(),
"status", step.getStatus().toString(),
"readCount", step.getReadCount(),
"writeCount", step.getWriteCount(),
"skipCount", step.getSkipCount()
))
.collect(Collectors.toList())
));
}
}九、断点续传:Spring Batch 的核心能力
Spring Batch 的 JobRepository 会把 Job 的执行状态持久化到数据库里。如果任务执行到一半崩溃了,重新启动时会自动从上次中断的地方继续:
// 重启失败的任务
@PostMapping("/document-indexing/{jobId}/restart")
public ResponseEntity<Map<String, Object>> restartJob(
@PathVariable long jobId) throws Exception {
JobExecution lastExecution = jobExplorer.getJobExecution(jobId);
if (lastExecution == null || lastExecution.getStatus() != BatchStatus.FAILED) {
return ResponseEntity.badRequest()
.body(Map.of("error", "任务不存在或状态不是 FAILED"));
}
// 使用相同的 JobParameters 重新执行,Batch 会自动跳过已完成的 Step
JobExecution newExecution = jobLauncher.run(
documentIndexingJob,
lastExecution.getJobParameters()
);
return ResponseEntity.ok(Map.of(
"newJobId", newExecution.getJobId(),
"status", newExecution.getStatus().toString()
));
}断点续传的关键:已经成功完成的 Step(COMPLETED 状态)不会重新执行,只有 FAILED 或 STOPPED 的 Step 才会重跑。这对处理中途失败的批任务非常重要。
十、性能调优经验
经验1:Chunk 大小要根据 AI 调用耗时调整
每个 Chunk 是一个事务,如果 Chunk 里有 AI 调用,要把 Chunk 大小设小一点(10-20),避免事务超时。
经验2:并发 Step 的 throttleLimit 要匹配 AI 限流
OpenAI 的 RPM(每分钟请求数)限制是 10000,但 TPM(每分钟 token 数)限制是 200万。处理大文档时更容易触发 TPM 限制,所以并发要适当收紧,并加上 RateLimiter。
经验3:大量文档分批次处理
三万份文档一个 Job 跑完不是好主意,建议按日期或文件夹分批,每批 5000 份,这样单次任务失败影响范围小,也便于监控。
经验4:向量化不要在 Processor 里做
ItemProcessor 的职责是数据转换,不是 I/O 操作。向量化(EmbeddingModel.embed())是 I/O 密集操作,Spring AI 的 VectorStore.add() 内部会自动处理向量化,不需要在 Processor 里手动调用 embeddingModel。
这套方案处理企业级的文档批量索引场景是完全够用的。核心代码大概 600 行,但背后的工程价值——断点续传、并发控制、失败处理、进度追踪——却是随便写个 for 循环完全无法比拟的。
