Spring AI与Spring Batch深度集成:亿级数据的AI处理
2026/10/2大约 12 分钟Spring Batch大数据处理Spring AIJava批处理
Spring AI与Spring Batch深度集成:亿级数据的AI处理
开篇故事:赵强的"数据工厂"
2026年2月,某头部电商平台的技术总监赵强接到了一个看似不可能的需求:
"我们有2.3亿条历史商品评论,需要用AI对每条评论进行情感分析和打标(商品质量/物流/客服/包装),结果用于优化推荐算法。时间要求:72小时内完成。"
赵强做了快速估算:
- 每条评论平均50个Token
- 调用GPT-4o-mini:$0.15/百万Token
- 总成本:2.3亿 × 50 Token ÷ 100万 × $0.15 ≈ $1725
- 时间:假设并发100,每条0.5秒 → 115万秒 ≈ 13.3天(远超72小时)
光靠简单的异步调用根本不够。赵强最终选择的方案:
- Spring Batch负责任务调度、分区、失败重试
- Spring AI负责批量推理(批处理API)
- 本地小模型(Qwen-1.8B微调版)负责实际推理
- Redis负责进度追踪
- 多机分布式:10台服务器,每台运行5个分区
最终结果:
- 实际耗时:61小时(比预期少11小时)
- 推理成本:约¥1.2万(本地GPU服务器电费+调度成本)
- 处理速度:峰值每秒处理1.1万条评论
- 失败重试自动处理了23万条失败记录
这就是Spring Batch + Spring AI的工业级威力。本文将带你完整实践这套方案。
TL;DR
- Spring Batch核心概念:Job/Step/ItemReader/ItemProcessor/ItemWriter
- AI化改造:ItemProcessor注入ChatClient,ItemWriter批量调用AI API
- 分区处理:PartitionedStep将10亿记录拆分到多机并行处理
- 失败重试:Skip/Retry策略处理AI调用的偶发失败
- 进度监控:Spring Batch的JobExecution + 自定义Prometheus指标
一、Spring Batch基础架构回顾
1.1 核心领域模型
Job(作业)
├── Step1: 数据读取和AI处理
│ ├── ItemReader: 从数据库读取原始数据
│ ├── ItemProcessor: 调用AI进行处理
│ └── ItemWriter: 写入处理结果
└── Step2: 汇总统计
└── Tasklet: 生成处理报告关键配置参数:
├── chunk-size: 每次事务处理的条数(建议10-100)
├── skip-limit: 最多允许跳过多少条失败记录
└── retry-limit: 每条记录最多重试次数1.2 引入依赖
<!-- pom.xml -->
<dependencies>
<!-- Spring Batch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- Spring AI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- Redis(进度追踪) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Micrometer(监控) -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies>二、核心组件:AI化的ItemProcessor
2.1 单条处理的ItemProcessor
// ReviewSentimentProcessor.java
@Component
@Slf4j
@StepScope
public class ReviewSentimentProcessor implements
ItemProcessor<ProductReview, ReviewAnalysisResult> {
private final ChatClient chatClient;
private final MeterRegistry meterRegistry;
// 批处理使用轻量级本地提示词
private static final String ANALYSIS_PROMPT = """
分析以下电商评论,返回JSON格式的分析结果:
评论:%s
返回格式:
{
"sentiment": "positive|negative|neutral",
"score": 1-5的整数,
"tags": ["质量", "物流", "客服", "包装"] 中适用的标签,
"brief": "10字以内的核心问题"
}
只返回JSON,不要其他内容。
""";
@Override
public ReviewAnalysisResult process(ProductReview review) throws Exception {
long startTime = System.currentTimeMillis();
try {
String prompt = String.format(ANALYSIS_PROMPT,
review.getContent().substring(0, Math.min(500, review.getContent().length())));
String response = chatClient.prompt()
.user(prompt)
.call()
.content();
ReviewAnalysisResult result = parseAnalysisResult(response, review);
// 记录处理延迟
meterRegistry.timer("batch.item.process.time")
.record(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
return result;
} catch (Exception e) {
log.warn("处理评论[{}]失败: {}", review.getId(), e.getMessage());
// 抛出异常,交给Spring Batch的skip/retry策略处理
throw e;
}
}
private ReviewAnalysisResult parseAnalysisResult(String jsonStr, ProductReview review) {
try {
// 清理可能的markdown代码块包裹
jsonStr = jsonStr.replaceAll("```json\\s*", "").replaceAll("```\\s*", "").trim();
JsonNode node = objectMapper.readTree(jsonStr);
return ReviewAnalysisResult.builder()
.reviewId(review.getId())
.productId(review.getProductId())
.sentiment(node.path("sentiment").asText("neutral"))
.score(node.path("score").asInt(3))
.tags(parseTagList(node.path("tags")))
.brief(node.path("brief").asText(""))
.processedAt(LocalDateTime.now())
.build();
} catch (Exception e) {
log.warn("解析AI响应失败 [reviewId={}]: {}", review.getId(), e.getMessage());
// 返回默认值而不是失败
return ReviewAnalysisResult.defaultResult(review);
}
}
}2.2 批量处理:提升吞吐量
单条处理效率低,实现批量ItemProcessor:
// BatchReviewProcessor.java
@Component
@Slf4j
@StepScope
public class BatchReviewProcessor implements
ItemProcessor<List<ProductReview>, List<ReviewAnalysisResult>> {
private final ChatClient chatClient;
private final int BATCH_SIZE = 10; // 每批10条
@Override
public List<ReviewAnalysisResult> process(List<ProductReview> reviews) throws Exception {
// 构建批量处理提示词
String batchPrompt = buildBatchPrompt(reviews);
String response = chatClient.prompt()
.system("""
你是电商评论分析助手。严格按照JSON数组格式输出,
每个元素对应输入中的一条评论,顺序不变。
""")
.user(batchPrompt)
.call()
.content();
return parseBatchResponse(response, reviews);
}
private String buildBatchPrompt(List<ProductReview> reviews) {
StringBuilder sb = new StringBuilder();
sb.append("请分析以下").append(reviews.size()).append("条评论:\n\n");
for (int i = 0; i < reviews.size(); i++) {
sb.append("[").append(i + 1).append("] ");
String content = reviews.get(i).getContent();
sb.append(content, 0, Math.min(200, content.length()));
sb.append("\n");
}
sb.append("""
返回JSON数组,每个元素格式:
{"idx": 1, "sentiment": "positive/negative/neutral",
"score": 1-5, "tags": [], "brief": "简述"}
""");
return sb.toString();
}
private List<ReviewAnalysisResult> parseBatchResponse(
String jsonStr, List<ProductReview> reviews) {
// ...解析逻辑
return new ArrayList<>();
}
}
// 需要自定义Reader将多条聚合
@Component
@StepScope
public class BatchingReviewReader implements ItemReader<List<ProductReview>> {
private final JdbcCursorItemReader<ProductReview> delegate;
private final int batchSize;
private boolean exhausted = false;
@Override
public List<ProductReview> read() throws Exception {
if (exhausted) return null;
List<ProductReview> batch = new ArrayList<>();
ProductReview item;
while (batch.size() < batchSize && (item = delegate.read()) != null) {
batch.add(item);
}
if (batch.isEmpty()) {
exhausted = true;
return null;
}
return batch;
}
}三、ItemReader:高效读取大规模数据
3.1 JdbcPagingItemReader(分页读取)
// BatchJobConfig.java
@Configuration
@EnableBatchProcessing
@Slf4j
public class ReviewAnalysisBatchConfig {
private final DataSource dataSource;
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
@Bean
@StepScope
public JdbcPagingItemReader<ProductReview> reviewReader(
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
JdbcPagingItemReader<ProductReview> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(1000); // 每次从数据库取1000条
reader.setPageSize(100); // 每页100条
// 分页查询(支持分区)
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, product_id, content, user_id, created_at");
queryProvider.setFromClause("FROM product_reviews");
queryProvider.setWhereClause(
"WHERE id >= :minId AND id <= :maxId AND ai_analyzed = 0");
queryProvider.setSortKeys(Map.of("id", Order.ASCENDING));
reader.setQueryProvider(queryProvider);
reader.setParameterValues(Map.of("minId", minId, "maxId", maxId));
reader.setRowMapper(new ProductReviewRowMapper());
return reader;
}
// 未分析的评论数量统计
@Bean
@StepScope
public JdbcCursorItemReader<ProductReview> simpleReviewReader() {
return new JdbcCursorItemReaderBuilder<ProductReview>()
.name("reviewReader")
.dataSource(dataSource)
.sql("SELECT * FROM product_reviews WHERE ai_analyzed = 0 ORDER BY id")
.rowMapper(new ProductReviewRowMapper())
.fetchSize(500)
.build();
}
}3.2 ItemWriter:批量写入结果
// AnalysisResultWriter.java
@Component
@Slf4j
@StepScope
public class AnalysisResultWriter implements ItemWriter<ReviewAnalysisResult> {
private final JdbcTemplate jdbcTemplate;
private final RedisTemplate<String, Object> redisTemplate;
private final MeterRegistry meterRegistry;
private static final String BATCH_INSERT_SQL = """
INSERT INTO review_analysis (review_id, product_id, sentiment, score,
tags, brief, processed_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
sentiment = VALUES(sentiment),
score = VALUES(score),
tags = VALUES(tags),
processed_at = VALUES(processed_at)
""";
private static final String UPDATE_ANALYZED_SQL = """
UPDATE product_reviews SET ai_analyzed = 1
WHERE id IN (%s)
""";
@Override
public void write(Chunk<? extends ReviewAnalysisResult> chunk) throws Exception {
List<? extends ReviewAnalysisResult> items = chunk.getItems();
// 批量插入分析结果
List<Object[]> batchArgs = items.stream()
.map(r -> new Object[]{
r.getReviewId(), r.getProductId(),
r.getSentiment(), r.getScore(),
String.join(",", r.getTags()),
r.getBrief(), r.getProcessedAt()
})
.toList();
int[] inserted = jdbcTemplate.batchUpdate(BATCH_INSERT_SQL, batchArgs);
// 标记已分析
String ids = items.stream()
.map(r -> String.valueOf(r.getReviewId()))
.collect(Collectors.joining(","));
jdbcTemplate.update(String.format(UPDATE_ANALYZED_SQL, ids));
// 更新Redis中的进度计数器
redisTemplate.opsForValue().increment(
"batch:review:analysis:processed", items.size());
// 记录指标
meterRegistry.counter("batch.items.written")
.increment(items.size());
log.debug("批量写入 {} 条分析结果", items.size());
}
}四、分区处理:多机并行的关键
4.1 分区策略
// ReviewPartitioner.java
@Component
public class ReviewPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// 查询未处理数据的ID范围
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM product_reviews WHERE ai_analyzed = 0",
Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM product_reviews WHERE ai_analyzed = 0",
Long.class);
if (minId == null || maxId == null) {
return Map.of("partition0", new ExecutionContext());
}
// 将ID范围均匀分成gridSize份
long range = maxId - minId + 1;
long partitionSize = (range + gridSize - 1) / gridSize;
Map<String, ExecutionContext> result = new LinkedHashMap<>();
for (int i = 0; i < gridSize; i++) {
long partitionMin = minId + (long) i * partitionSize;
long partitionMax = Math.min(minId + (long)(i + 1) * partitionSize - 1, maxId);
ExecutionContext context = new ExecutionContext();
context.putLong("minId", partitionMin);
context.putLong("maxId", partitionMax);
context.putString("name", "partition" + i);
result.put("partition" + i, context);
log.info("分区[{}]: ID范围 {}-{}", i, partitionMin, partitionMax);
}
return result;
}
}4.2 分区Job配置
// PartitionedJobConfig.java
@Configuration
@Slf4j
public class PartitionedJobConfig {
@Bean
public Job reviewAnalysisJob(
JobRepository jobRepository,
Step masterStep) {
return new JobBuilder("reviewAnalysisJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(masterStep)
.next(summaryStep())
.build();
}
@Bean
public Step masterStep(
JobRepository jobRepository,
ReviewPartitioner partitioner,
Step workerStep,
TaskExecutor partitionTaskExecutor) {
return new StepBuilder("masterStep", jobRepository)
.partitioner("workerStep", partitioner)
.step(workerStep)
.gridSize(50) // 50个分区,可以在多台机器上运行
.taskExecutor(partitionTaskExecutor)
.build();
}
@Bean
public Step workerStep(
JobRepository jobRepository,
JdbcPagingItemReader<ProductReview> reviewReader,
ReviewSentimentProcessor processor,
AnalysisResultWriter writer) {
return new StepBuilder("workerStep", jobRepository)
.<ProductReview, ReviewAnalysisResult>chunk(50, transactionManager)
.reader(reviewReader)
.processor(processor)
.writer(writer)
// 跳过策略:遇到JSON解析异常或API限流时跳过当前记录
.faultTolerant()
.skip(JsonProcessingException.class)
.skip(RateLimitException.class)
.skipLimit(10000) // 最多跳过1万条
// 重试策略:网络超时时重试
.retry(ResourceAccessException.class)
.retry(TimeoutException.class)
.retryLimit(3)
// 监听器
.listener(new ItemProcessListener<>() {
@Override
public void onSkipInProcess(ProductReview item, Throwable t) {
log.warn("跳过评论[{}]: {}", item.getId(), t.getMessage());
}
})
.build();
}
@Bean
public TaskExecutor partitionTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 本机运行10个分区
executor.setMaxPoolSize(10);
executor.setQueueCapacity(0);
executor.initialize();
return executor;
}
// 汇总步骤
@Bean
public Step summaryStep(JobRepository jobRepository) {
return new StepBuilder("summaryStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
// 生成处理报告
generateSummaryReport(chunkContext.getStepContext().getJobExecutionId());
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
private void generateSummaryReport(Long jobExecutionId) {
Long total = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM review_analysis", Long.class);
Map<String, Long> sentimentCounts = new HashMap<>();
jdbcTemplate.query(
"SELECT sentiment, COUNT(*) as cnt FROM review_analysis GROUP BY sentiment",
rs -> {
sentimentCounts.put(rs.getString("sentiment"), rs.getLong("cnt"));
});
log.info("处理完成!总计: {}, 正面: {}, 负面: {}, 中性: {}",
total,
sentimentCounts.getOrDefault("positive", 0L),
sentimentCounts.getOrDefault("negative", 0L),
sentimentCounts.getOrDefault("neutral", 0L));
}
}五、限流与成本控制
5.1 AI API限流器
// AiRateLimiter.java
@Component
@Slf4j
public class AiRateLimiter {
// 使用Resilience4j RateLimiter
private final RateLimiter rateLimiter;
@Value("${ai.rate-limit.requests-per-minute:500}")
private int requestsPerMinute;
@PostConstruct
public void init() {
RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(requestsPerMinute)
.limitRefreshPeriod(Duration.ofMinutes(1))
.timeoutDuration(Duration.ofSeconds(30)) // 等待30秒获取许可
.build();
this.rateLimiter = RateLimiter.of("ai-api", config);
}
public <T> T executeWithRateLimit(Supplier<T> supplier) {
return rateLimiter.executeSupplier(supplier);
}
// 批处理专用:等待足够的令牌
public void acquirePermit() {
boolean acquired = rateLimiter.acquirePermission(1);
if (!acquired) {
log.warn("AI API限流,等待中...");
}
}
}5.2 成本追踪
// BatchCostTracker.java
@Component
@Slf4j
public class BatchCostTracker {
private final AtomicLong totalTokens = new AtomicLong(0);
private final AtomicLong totalRequests = new AtomicLong(0);
// 每个模型的单价(每百万Token)
private static final Map<String, Double> MODEL_COSTS = Map.of(
"gpt-4o", 15.0,
"gpt-4o-mini", 0.6,
"qwen-local", 0.01 // 本地模型只算电费
);
public void recordUsage(String model, int tokens) {
totalTokens.addAndGet(tokens);
totalRequests.incrementAndGet();
}
@Scheduled(fixedRate = 60000) // 每分钟报告
public void reportCost() {
long tokens = totalTokens.get();
long requests = totalRequests.get();
double cost = tokens / 1_000_000.0 * MODEL_COSTS.getOrDefault("gpt-4o-mini", 0.6);
log.info("批处理成本报告 - 已处理: {} 条, Token消耗: {}, 估计费用: ${:.4f}",
requests, tokens, cost);
}
public double getCurrentEstimatedCost() {
return totalTokens.get() / 1_000_000.0 * 0.6;
}
}六、进度监控与告警
6.1 批处理进度监控API
// BatchProgressController.java
@RestController
@RequestMapping("/api/batch")
@Slf4j
public class BatchProgressController {
private final JobExplorer jobExplorer;
private final JobOperator jobOperator;
private final RedisTemplate<String, Object> redisTemplate;
private final BatchCostTracker costTracker;
@GetMapping("/jobs/{jobName}/status")
public ResponseEntity<BatchJobStatus> getJobStatus(
@PathVariable String jobName) {
// 获取最新一次执行
List<JobInstance> instances = jobExplorer.findJobInstancesByJobName(
jobName, 0, 1);
if (instances.isEmpty()) {
return ResponseEntity.notFound().build();
}
JobInstance latestInstance = instances.get(0);
List<JobExecution> executions = jobExplorer
.getJobExecutions(latestInstance);
if (executions.isEmpty()) {
return ResponseEntity.notFound().build();
}
JobExecution execution = executions.get(0);
// 从Redis获取实时进度
Long processed = (Long) redisTemplate.opsForValue()
.get("batch:review:analysis:processed");
// 估算总量
Long total = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM product_reviews", Long.class);
double progressPct = total != null && total > 0 ?
(double) (processed != null ? processed : 0) / total * 100 : 0;
// 估算剩余时间
Duration elapsed = Duration.between(
execution.getStartTime().toInstant(ZoneOffset.UTC),
Instant.now());
long remainingMs = 0;
if (processed != null && processed > 0) {
double rate = (double) processed / elapsed.toSeconds();
long remaining = (total != null ? total : 0) - processed;
remainingMs = (long)(remaining / rate * 1000);
}
return ResponseEntity.ok(BatchJobStatus.builder()
.jobName(jobName)
.status(execution.getStatus().name())
.processed(processed != null ? processed : 0)
.total(total != null ? total : 0)
.progressPercent(progressPct)
.elapsedSeconds(elapsed.toSeconds())
.estimatedRemainingMs(remainingMs)
.estimatedCostUSD(costTracker.getCurrentEstimatedCost())
.startTime(execution.getStartTime())
.build());
}
// 手动停止任务
@PostMapping("/jobs/{jobExecutionId}/stop")
public ResponseEntity<String> stopJob(
@PathVariable Long jobExecutionId) throws Exception {
jobOperator.stop(jobExecutionId);
return ResponseEntity.ok("停止指令已发送");
}
// 手动重启失败的任务
@PostMapping("/jobs/{jobName}/restart")
public ResponseEntity<String> restartJob(
@PathVariable String jobName) throws Exception {
// Spring Batch支持从断点续跑
Long jobInstanceId = jobExplorer.findJobInstancesByJobName(
jobName, 0, 1).get(0).getInstanceId();
JobParameters params = new JobParametersBuilder()
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
Long executionId = jobOperator.startNextInstance(jobName);
return ResponseEntity.ok("已重启,executionId: " + executionId);
}
}6.2 Grafana监控看板配置
// BatchMetricsListener.java
@Component
@Slf4j
public class BatchMetricsListener implements JobExecutionListener, StepExecutionListener {
private final MeterRegistry meterRegistry;
@Override
public void afterStep(StepExecution stepExecution) {
// 记录Step级别指标
meterRegistry.counter("batch.step.completed",
"step", stepExecution.getStepName(),
"status", stepExecution.getStatus().name()
).increment();
// 记录处理速率
long durationSec = Duration.between(
stepExecution.getStartTime().toInstant(ZoneOffset.UTC),
stepExecution.getEndTime().toInstant(ZoneOffset.UTC)
).toSeconds();
if (durationSec > 0) {
double itemsPerSecond = (double) stepExecution.getWriteCount() / durationSec;
meterRegistry.gauge("batch.step.throughput",
Tags.of("step", stepExecution.getStepName()),
itemsPerSecond);
}
// 记录skip/retry统计
meterRegistry.counter("batch.items.skipped",
"step", stepExecution.getStepName()
).increment(stepExecution.getSkipCount());
log.info("Step [{}] 完成: 读取={}, 处理={}, 写入={}, 跳过={}, 耗时={}s",
stepExecution.getStepName(),
stepExecution.getReadCount(),
stepExecution.getProcessSkipCount(),
stepExecution.getWriteCount(),
stepExecution.getSkipCount(),
durationSec);
}
}七、多机分布式执行
7.1 远程分区(Remote Partitioning)
// RemotePartitioningConfig.java
@Configuration
public class RemotePartitioningConfig {
// 使用RabbitMQ作为分区任务的消息队列
@Bean
public RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory(
JobRepository jobRepository,
RabbitTemplate rabbitTemplate) {
return new RemotePartitioningManagerStepBuilderFactory(
jobRepository, new DirectChannel());
}
@Bean
public Step distributedMasterStep(
JobRepository jobRepository,
ReviewPartitioner partitioner,
MessageChannel outboundChannel) {
return new RemotePartitioningManagerStepBuilderFactory(jobRepository, outboundChannel)
.get("distributedMasterStep")
.partitioner("distributedWorkerStep", partitioner)
.gridSize(100) // 100个分区,分布在多台机器上
.outputChannel(outboundChannel) // 发送到RabbitMQ
.build();
}
// Worker节点的配置(在每台机器上运行)
@Bean
public Step distributedWorkerStep(
JobRepository jobRepository,
MessageChannel inboundChannel) {
return new RemotePartitioningWorkerStepBuilderFactory(jobRepository, inboundChannel)
.get("distributedWorkerStep")
.<ProductReview, ReviewAnalysisResult>chunk(50, transactionManager)
.reader(reviewReader(null, null))
.processor(sentimentProcessor)
.writer(analysisWriter)
.faultTolerant()
.skipLimit(1000)
.skip(Exception.class)
.build();
}
}八、常见问题 FAQ
Q1:Spring Batch的JobRepository支持高并发吗?
A:默认的JDBC JobRepository在高并发下可能成为瓶颈。优化方案:
- 为JobRepository使用独立数据库(避免与业务数据竞争)
- 配置连接池:HikariCP最小10,最大50
- 对于超大规模(>1000个并发分区),考虑使用Map-based的InMemoryJobRepository(重启后不保留状态)
Q2:分区处理时如何保证幂等性?
A:三层幂等保证:
- ItemReader:WHERE条件包含
ai_analyzed = 0,已处理的记录不会再读 - ItemWriter:使用
INSERT ... ON DUPLICATE KEY UPDATE - Job级别:相同参数的Job不会重复创建(JobInstanceAlreadyCompleteException)
Q3:批处理中AI API调用失败率高怎么办?
A:
- 降低chunk-size(从50降到10),减少单次事务失败的影响范围
- 增加重试次数(retry-limit从3到5),配置指数退避
- 引入本地备用模型:API失败时fallback到本地小模型
- 降低并发:限流过激时减少partitionTaskExecutor的线程数
Q4:如何处理超长文本(评论超过模型上下文窗口)?
A:
// 在ItemProcessor中截断长文本
String content = review.getContent();
if (content.length() > 2000) {
// 取前500字 + 后500字(首尾通常信息密度高)
content = content.substring(0, 500) + "..." +
content.substring(content.length() - 500);
}或者使用分段处理后聚合的策略。
Q5:批处理失败后如何从断点续跑?
A:Spring Batch天然支持断点续跑,前提是:
- 正确配置JobRepository(使用数据库而非内存)
- Job参数包含唯一标识(如
run.id) - 不要使用
RunIdIncrementer(它每次都会创建新实例),而是用相同的Job参数重新提交
九、总结
Spring Batch + Spring AI是处理亿级规模AI批处理任务的黄金组合:
| 组件 | 职责 | 关键配置 |
|---|---|---|
| Spring Batch | 任务调度、分区、重试 | chunk-size=50, skip-limit=10000 |
| Spring AI | AI推理调用 | 批量提示词,Token优化 |
| 分区策略 | ID范围切割 | gridSize=50-100 |
| 限流器 | 保护AI API | 500 RPM,30s等待超时 |
| 成本追踪 | 实时费用监控 | 每分钟报告 |
| Prometheus | 性能监控 | 吞吐量/延迟/错误率 |
赵强的案例证明:大规模AI数据处理不是技术不可能,而是工程问题。用正确的工具、正确的架构,2.3亿条记录,61小时,完美收场。
