第2342篇:Java AI的批量处理设计——高效处理大量文档的工程模式
2026/4/30大约 5 分钟
第2342篇:Java AI的批量处理设计——高效处理大量文档的工程模式
适读人群:需要批量处理文档、图片或数据的AI工程师,关注大规模AI任务吞吐量优化的开发者 | 阅读时长:约17分钟 | 核心价值:掌握AI批量处理的核心工程模式,实现高吞吐、可监控、可恢复的批处理系统
有一次我们接了个需求:把公司过去5年积累的内部技术文档(大约8000篇Word文档)全部向量化入库,用于RAG问答系统。
第一版实现很简单:循环读文件、切块、向量化、入库,单线程跑完。估算下来要6小时。
第二版:加了个线程池,20并发。跑到一半,LLM API返回429,重启后不知道从哪里继续。
第三版:加了进度记录、限流控制、失败重试……代码越写越复杂,但还是有问题:如果中途机器宕机,已经处理的部分白费,要从头来过。
这些坑走过一遍之后,逐渐形成了一套稳定的批处理设计模式。
批处理设计的五个核心问题
- 可恢复性:中断后能从断点继续,不重复处理
- 速率控制:不触发LLM API限流
- 并发控制:充分利用资源,同时不打爆内存
- 错误处理:部分失败不影响整体,失败的可以单独重试
- 可观测性:知道进度,知道哪些失败了,知道为什么失败
核心组件设计
任务分片:把大任务拆成小块
// 处理任务的状态
public enum TaskStatus {
PENDING, // 等待处理
PROCESSING, // 处理中
COMPLETED, // 已完成
FAILED, // 失败(可重试)
SKIPPED // 跳过(不重试)
}
// 批处理任务记录
@Entity
@Table(name = "batch_document_task")
public class BatchDocumentTask {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String batchId; // 批次ID
@Column(nullable = false)
private String documentPath; // 文档路径
@Column(nullable = false)
private String documentHash; // 文档MD5(检测文件是否变化)
@Enumerated(EnumType.STRING)
private TaskStatus status = TaskStatus.PENDING;
private int retryCount = 0;
private String errorMessage;
private LocalDateTime createdAt;
private LocalDateTime processedAt;
private int chunkCount; // 分块数量
private long processingTimeMs; // 处理耗时
}// 批次管理器:负责创建和管理批次
@Service
@RequiredArgsConstructor
@Slf4j
public class BatchManager {
private final BatchDocumentTaskRepository taskRepository;
/**
* 创建批次:扫描目录,为每个文件创建任务
* 已经成功处理的文件不会重复创建
*/
@Transactional
public String createBatch(Path directory) throws IOException {
String batchId = UUID.randomUUID().toString().substring(0, 8);
int created = 0, skipped = 0;
try (Stream<Path> files = Files.walk(directory)) {
for (Path file : (Iterable<Path>) files.filter(Files::isRegularFile)::iterator) {
String hash = computeFileHash(file);
// 检查是否已经成功处理过(相同路径+相同hash)
boolean alreadyDone = taskRepository
.existsByDocumentPathAndDocumentHashAndStatus(
file.toString(), hash, TaskStatus.COMPLETED);
if (alreadyDone) {
skipped++;
continue;
}
// 创建任务
BatchDocumentTask task = new BatchDocumentTask();
task.setBatchId(batchId);
task.setDocumentPath(file.toString());
task.setDocumentHash(hash);
task.setCreatedAt(LocalDateTime.now());
taskRepository.save(task);
created++;
}
}
log.info("批次创建完成:batchId={}, 新增任务={}, 跳过(已处理)={}",
batchId, created, skipped);
return batchId;
}
private String computeFileHash(Path file) throws IOException {
try (InputStream is = Files.newInputStream(file)) {
return DigestUtils.md5Hex(is);
}
}
}批处理执行器:带速率控制的并发处理
@Service
@RequiredArgsConstructor
@Slf4j
public class BatchProcessor {
private final BatchDocumentTaskRepository taskRepository;
private final DocumentIngestionService ingestionService;
// 限流:每秒最多5个任务
private final RateLimiter rateLimiter = RateLimiter.create(5.0);
// 并发控制:最多10个并发
private final Semaphore concurrencySemaphore = new Semaphore(10);
/**
* 执行批次
*/
public BatchResult executeBatch(String batchId) {
List<BatchDocumentTask> pendingTasks = taskRepository
.findByBatchIdAndStatusIn(batchId,
List.of(TaskStatus.PENDING, TaskStatus.FAILED));
log.info("开始处理批次:batchId={}, 待处理任务数={}", batchId, pendingTasks.size());
// 使用虚拟线程池处理
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
AtomicInteger total = new AtomicInteger(pendingTasks.size());
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<Void>> futures = pendingTasks.stream()
.map(task -> CompletableFuture.runAsync(() -> {
// 速率控制
rateLimiter.acquire();
try {
concurrencySemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
try {
processTask(task);
successCount.incrementAndGet();
// 定期打印进度
int processed = successCount.get() + failCount.get();
if (processed % 100 == 0) {
log.info("批次进度:{}/{} ({:.1f}%)",
processed, total.get(),
(double) processed / total.get() * 100);
}
} catch (Exception e) {
failCount.incrementAndGet();
markTaskFailed(task, e.getMessage());
} finally {
concurrencySemaphore.release();
}
}, executor))
.toList();
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(24, TimeUnit.HOURS);
} catch (Exception e) {
log.error("批次执行异常", e);
}
BatchResult result = new BatchResult(batchId,
successCount.get(), failCount.get(), total.get());
log.info("批次完成:{}", result);
return result;
}
@Transactional
protected void processTask(BatchDocumentTask task) {
// 标记为处理中(防止其他实例重复处理)
task.setStatus(TaskStatus.PROCESSING);
taskRepository.save(task);
long start = System.currentTimeMillis();
try {
// 实际处理逻辑
int chunkCount = ingestionService.ingestFile(Path.of(task.getDocumentPath()));
// 标记成功
task.setStatus(TaskStatus.COMPLETED);
task.setChunkCount(chunkCount);
task.setProcessingTimeMs(System.currentTimeMillis() - start);
task.setProcessedAt(LocalDateTime.now());
taskRepository.save(task);
} catch (Exception e) {
// 失败时增加重试计数
task.setRetryCount(task.getRetryCount() + 1);
task.setStatus(task.getRetryCount() >= 3 ? TaskStatus.SKIPPED : TaskStatus.FAILED);
task.setErrorMessage(e.getMessage());
taskRepository.save(task);
throw e;
}
}
private void markTaskFailed(BatchDocumentTask task, String errorMessage) {
try {
task.setRetryCount(task.getRetryCount() + 1);
task.setStatus(task.getRetryCount() >= 3 ? TaskStatus.SKIPPED : TaskStatus.FAILED);
task.setErrorMessage(errorMessage);
taskRepository.save(task);
} catch (Exception e) {
log.error("更新任务状态失败", e);
}
}
public record BatchResult(String batchId, int success, int failed, int total) {
public double successRate() { return (double) success / total * 100; }
@Override
public String toString() {
return String.format("BatchResult{batchId=%s, success=%d, failed=%d, total=%d, rate=%.1f%%}",
batchId, success, failed, total, successRate());
}
}
}进度监控端点
@RestController
@RequestMapping("/admin/batch")
@RequiredArgsConstructor
public class BatchMonitorController {
private final BatchDocumentTaskRepository taskRepository;
@GetMapping("/{batchId}/progress")
public BatchProgress getProgress(@PathVariable String batchId) {
long total = taskRepository.countByBatchId(batchId);
long completed = taskRepository.countByBatchIdAndStatus(batchId, TaskStatus.COMPLETED);
long failed = taskRepository.countByBatchIdAndStatus(batchId, TaskStatus.FAILED);
long processing = taskRepository.countByBatchIdAndStatus(batchId, TaskStatus.PROCESSING);
long pending = taskRepository.countByBatchIdAndStatus(batchId, TaskStatus.PENDING);
return new BatchProgress(batchId, total, completed, failed, processing, pending);
}
@GetMapping("/{batchId}/failures")
public List<FailureDetail> getFailures(@PathVariable String batchId) {
return taskRepository.findByBatchIdAndStatus(batchId, TaskStatus.FAILED)
.stream()
.map(t -> new FailureDetail(t.getDocumentPath(), t.getErrorMessage(), t.getRetryCount()))
.toList();
}
public record BatchProgress(String batchId, long total, long completed,
long failed, long processing, long pending) {
public double completionRate() { return total > 0 ? (double) completed / total * 100 : 0; }
}
public record FailureDetail(String path, String error, int retryCount) {}
}Spring Batch集成:更重型的场景
对于每天定时运行的批处理,Spring Batch提供了更完善的基础设施:
@Configuration
@EnableBatchProcessing
public class DocumentBatchJobConfig {
@Bean
public Job documentIngestionJob(JobRepository jobRepository,
Step ingestionStep) {
return new JobBuilder("documentIngestionJob", jobRepository)
.start(ingestionStep)
.build();
}
@Bean
public Step ingestionStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Path> documentReader,
ItemProcessor<Path, List<Document>> documentProcessor,
ItemWriter<List<Document>> vectorStoreWriter) {
return new StepBuilder("ingestionStep", jobRepository)
.<Path, List<Document>>chunk(10, txManager) // 每10个文件一个事务
.reader(documentReader)
.processor(documentProcessor)
.writer(vectorStoreWriter)
.faultTolerant()
.skip(Exception.class) // 跳过处理失败的文件
.skipLimit(100) // 最多跳过100个
.retry(RateLimitedException.class) // 限流时重试
.retryLimit(3)
.build();
}
}批处理是AI工程里容易被忽视但非常关键的一环。一个可恢复、有监控、速率可控的批处理系统,是企业级AI知识库建设的基础。
