Spring AI批量处理:Spring Batch + AI 流水线实战
2026/4/30大约 7 分钟
Spring AI批量处理:Spring Batch + AI 流水线实战
适读人群:有1-5年Java开发经验,想向AI工程师方向转型的开发者 阅读时长:约18分钟 文章价值:① 掌握Spring Batch + Spring AI构建AI批处理流水线的完整方法 ② 学会处理大规模数据的分片/并行/断点续传 ③ 获得一套可直接复用的企业级AI批处理框架
老吴是我以前在上家公司的同事,做了七年Java,Spring全家桶玩得很溜。上个月他接到一个需求:
公司有一个客户反馈数据库,历史积累了50万条用户留言,产品经理想用AI把每条留言打标签(投诉/建议/表扬)、提取关键词、生成摘要。
"老张,我最开始写了个定时任务,轮询数据库,每条调一次AI,50万条跑了一晚上还没跑完,然后中途OOM了,全部重来。"
我听完就知道问题在哪了。
"你要的是Spring Batch,不是定时任务。大批量AI处理有三个核心问题:怎么分片、出错了怎么续跑、怎么控制AI调用速率不把限额打满。Batch框架把这三件事全帮你想好了。"
这篇文章就是我给老吴讲的那套方案。
为什么需要Spring Batch
定时任务处理大批量数据的问题:
- 没有断点续传:中途失败从头开始
- 没有并行控制:跑得慢或者跑得太猛把系统打垮
- 没有进度监控:不知道跑到哪了
- 没有错误处理:一条失败影响全局
Spring Batch的核心设计就是为了解决这些问题:
整体流水线设计
我们要构建的AI批处理流水线:
代码实现
第一步:数据模型
// 待处理的用户反馈
@Entity
@Table(name = "customer_feedback")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CustomerFeedback {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String userId;
private String content; // 用户反馈内容
private LocalDateTime createdAt;
// AI处理后填入
private String category; // 投诉/建议/表扬/咨询
private String keywords; // 关键词,JSON数组
private String summary; // 摘要
private Double sentiment; // 情感分 0-1
private String processStatus; // PENDING/COMPLETED/FAILED
private LocalDateTime processedAt;
}
// AI处理结果的DTO
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AiFeedbackAnalysis {
private String category;
private List<String> keywords;
private String summary;
private double sentiment;
}第二步:核心AI Processor
@Component
@RequiredArgsConstructor
@Slf4j
public class FeedbackAiProcessor implements ItemProcessor<CustomerFeedback, CustomerFeedback> {
private final ChatClient chatClient;
private final ObjectMapper objectMapper;
private final RateLimiter rateLimiter; // 控制AI调用速率
private static final String ANALYSIS_PROMPT = """
请分析以下客户反馈内容,返回JSON格式结果。
【分析维度】
1. category: 分类(只能是以下之一:投诉/建议/表扬/咨询)
2. keywords: 关键词列表(3-5个)
3. summary: 一句话摘要(不超过50字)
4. sentiment: 情感分数(0.0=极负面, 1.0=极正面)
【严格返回JSON】:
{"category":"投诉","keywords":["物流","延迟","退款"],"summary":"用户投诉订单延迟未收到退款","sentiment":0.1}
【客户反馈】:
%s
""";
@Override
public CustomerFeedback process(CustomerFeedback feedback) throws Exception {
// 限流:避免把API限额打满
if (!rateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
throw new RateLimitException("AI调用限流,将被跳过并重试");
}
log.debug("处理反馈,id={}", feedback.getId());
try {
String prompt = String.format(ANALYSIS_PROMPT, feedback.getContent());
String response = chatClient.prompt()
.user(prompt)
.call()
.content();
AiFeedbackAnalysis analysis = parseAnalysis(response);
// 填入AI分析结果
feedback.setCategory(analysis.getCategory());
feedback.setKeywords(objectMapper.writeValueAsString(analysis.getKeywords()));
feedback.setSummary(analysis.getSummary());
feedback.setSentiment(analysis.getSentiment());
feedback.setProcessStatus("COMPLETED");
feedback.setProcessedAt(LocalDateTime.now());
return feedback;
} catch (Exception e) {
log.warn("AI处理失败,id={},原因={}", feedback.getId(), e.getMessage());
feedback.setProcessStatus("FAILED");
feedback.setProcessedAt(LocalDateTime.now());
// 不抛异常,让Batch框架的SkipPolicy决定是否跳过
throw new AiProcessingException("AI分析失败", e);
}
}
private AiFeedbackAnalysis parseAnalysis(String jsonStr) throws JsonProcessingException {
String cleaned = jsonStr.replaceAll("```json\\s*|```", "").trim();
return objectMapper.readValue(cleaned, AiFeedbackAnalysis.class);
}
}第三步:Job配置(核心)
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
@Slf4j
public class FeedbackAiJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final DataSource dataSource;
private final FeedbackAiProcessor feedbackAiProcessor;
/**
* 定义Job
*/
@Bean
public Job feedbackAiAnalysisJob(Step partitionedStep) {
return new JobBuilder("feedbackAiAnalysisJob", jobRepository)
.start(partitionedStep)
.listener(new JobExecutionListener() {
@Override
public void afterJob(JobExecution jobExecution) {
log.info("Job完成,状态={},耗时={}s",
jobExecution.getStatus(),
Duration.between(jobExecution.getStartTime(),
jobExecution.getEndTime()).getSeconds());
}
})
.build();
}
/**
* 分区Step:把50万数据分成多个分片并行处理
*/
@Bean
public Step partitionedStep(Step workerStep) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", rangePartitioner())
.step(workerStep)
.gridSize(5) // 5个分区,5个线程并行
.taskExecutor(partitionTaskExecutor())
.build();
}
/**
* Worker Step:每个分片的实际处理逻辑
*/
@Bean
public Step workerStep(JdbcPagingItemReader<CustomerFeedback> reader,
JdbcBatchItemWriter<CustomerFeedback> writer) {
return new StepBuilder("workerStep", jobRepository)
.<CustomerFeedback, CustomerFeedback>chunk(50, transactionManager)
// 每次读50条,处理完写入,然后读下50条
.reader(reader)
.processor(feedbackAiProcessor)
.writer(writer)
// 跳过策略:AI处理失败跳过,不影响整体
.faultTolerant()
.skip(AiProcessingException.class)
.skipLimit(1000) // 最多跳过1000条
// 重试策略:限流错误重试3次
.retry(RateLimitException.class)
.retryLimit(3)
.listener(new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
log.info("分片完成,读={}, 写={}, 跳过={}, 失败={}",
stepExecution.getReadCount(),
stepExecution.getWriteCount(),
stepExecution.getSkipCount(),
stepExecution.getWriteSkipCount());
}
})
.build();
}
/**
* 分区器:按ID范围分片
*/
@Bean
public RangePartitioner rangePartitioner() {
return new RangePartitioner(dataSource);
}
/**
* 分页读取器:每次读一页数据,支持断点续传
*/
@Bean
@StepScope
public JdbcPagingItemReader<CustomerFeedback> feedbackReader(
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
Map<String, Order> sortKeys = new LinkedHashMap<>();
sortKeys.put("id", Order.ASCENDING);
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("SELECT *");
queryProvider.setFromClause("FROM customer_feedback");
queryProvider.setWhereClause("WHERE id BETWEEN :minId AND :maxId AND process_status = 'PENDING'");
queryProvider.setSortKeys(sortKeys);
return new JdbcPagingItemReaderBuilder<CustomerFeedback>()
.name("feedbackReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(Map.of("minId", minId, "maxId", maxId))
.pageSize(50)
.rowMapper((rs, rowNum) -> CustomerFeedback.builder()
.id(rs.getLong("id"))
.userId(rs.getString("user_id"))
.content(rs.getString("content"))
.createdAt(rs.getTimestamp("created_at").toLocalDateTime())
.processStatus(rs.getString("process_status"))
.build())
.build();
}
/**
* 批量写入器
*/
@Bean
public JdbcBatchItemWriter<CustomerFeedback> feedbackWriter() {
return new JdbcBatchItemWriterBuilder<CustomerFeedback>()
.sql("""
UPDATE customer_feedback SET
category = :category,
keywords = :keywords,
summary = :summary,
sentiment = :sentiment,
process_status = :processStatus,
processed_at = :processedAt
WHERE id = :id
""")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public TaskExecutor partitionTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("ai-batch-");
executor.initialize();
return executor;
}
}
/**
* 自定义分区器:按ID范围均匀分片
*/
@RequiredArgsConstructor
public class RangePartitioner implements Partitioner {
private final DataSource dataSource;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// 查询待处理数据的ID范围
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM customer_feedback WHERE process_status = 'PENDING'",
Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM customer_feedback WHERE process_status = 'PENDING'",
Long.class);
if (minId == null || maxId == null) {
return Map.of("partition0", new ExecutionContext());
}
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> result = new LinkedHashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long partMinId = minId + i * range;
long partMaxId = Math.min(partMinId + range - 1, maxId);
context.putLong("minId", partMinId);
context.putLong("maxId", partMaxId);
context.putString("name", "partition" + i);
result.put("partition" + i, context);
}
return result;
}
}第四步:Job触发和监控
@RestController
@RequestMapping("/api/batch")
@RequiredArgsConstructor
@Slf4j
public class BatchJobController {
private final JobLauncher jobLauncher;
private final Job feedbackAiAnalysisJob;
private final JobExplorer jobExplorer;
/**
* 手动触发Job(也可以配cron)
*/
@PostMapping("/start")
public ResponseEntity<String> startJob() throws Exception {
// 每次运行用时间戳作为参数,保证Job可以重复运行
JobParameters params = new JobParametersBuilder()
.addString("runAt", LocalDateTime.now().toString())
.toJobParameters();
JobExecution execution = jobLauncher.run(feedbackAiAnalysisJob, params);
return ResponseEntity.ok("Job已启动,executionId=" + execution.getId());
}
/**
* 查询Job运行进度
*/
@GetMapping("/status/{executionId}")
public ResponseEntity<JobStatusDTO> getStatus(@PathVariable Long executionId) {
JobExecution execution = jobExplorer.getJobExecution(executionId);
if (execution == null) {
return ResponseEntity.notFound().build();
}
long totalRead = execution.getStepExecutions().stream()
.mapToLong(StepExecution::getReadCount).sum();
long totalWrite = execution.getStepExecutions().stream()
.mapToLong(StepExecution::getWriteCount).sum();
long totalSkip = execution.getStepExecutions().stream()
.mapToLong(StepExecution::getSkipCount).sum();
return ResponseEntity.ok(JobStatusDTO.builder()
.executionId(executionId)
.status(execution.getStatus().name())
.totalRead(totalRead)
.totalWritten(totalWrite)
.totalSkipped(totalSkip)
.startTime(execution.getStartTime())
.endTime(execution.getEndTime())
.build());
}
}老吴的反馈
按这套方案改造后,老吴发来一段话:
"老张,50万条数据,5个分片并行,加上AI调用限流,大概跑了4个小时全部处理完。中途我的机器重启了一次,重新启动Job,它从断点继续跑,之前完成的完全不用重来。还有800多条因为内容太短AI识别不了,被skip了,写入了单独的错误报告。"
这就是Batch框架的价值:不是让你的代码更快,而是让大规模数据处理变得可靠、可监控、可恢复。
