LLM批量推理优化:处理10万条数据的高效方案
LLM批量推理优化:处理10万条数据的高效方案
那个可怕的预估:30小时
2025年10月,北京一家电商公司的算法工程师赵伟接到了一个需求:
对过去3年积累的10万条用户商品评论做情感分析,提取评论中的正面卖点和负面槽点,用于优化商品详情页的文案。
听起来挺简单,写个循环,调用GPT-4o-mini,输出JSON。
赵伟写了一个快速验证脚本:
// 测试单条处理时间
long start = System.currentTimeMillis();
String result = callGPT4oMini(sampleReview);
long elapsed = System.currentTimeMillis() - start;
System.out.println("单条耗时: " + elapsed + "ms");
// 输出:单条耗时: 1080ms然后做了个简单计算:
单条耗时:1080ms
总数量:100,000条
串行处理时间:100,000 × 1080ms = 108,000,000ms
= 30小时30小时。
这个任务在周五下午3点收到,老板要周一上班看结果。
更糟的是,赵伟还没算费用:
单条Token消耗(估算):
输入:200 tokens(评论内容 + 指令)
输出:100 tokens(JSON格式结果)
总Token:10万 × 300 = 3000万 tokens
GPT-4o-mini价格:
输入:$0.15/百万tokens × 2000万 = $3
输出:$0.60/百万tokens × 1000万 = $6
合计:$9
(实时API价格)$9看起来不贵,但30小时不可接受。
赵伟用了两天时间把30小时压缩到了45分钟,费用降到了$4.5(通过Batch API享受50%折扣)。
本文完整还原这个优化过程。
1. 批量推理的三种策略对比
1.1 三种策略概览
1.2 策略选择矩阵
| 场景 | 推荐策略 | 原因 |
|---|---|---|
| 需要1小时内完成,成本敏感 | Batch API + 并行 | 低成本 + 相对快 |
| 需要实时结果(用户等待) | 并行调用 | 最快 |
| 数据量少(<1000条) | 并行调用 | 简单直接 |
| 离线任务,不急 | Batch API | 最省钱 |
| 上下文依赖(必须顺序处理) | 串行 | 准确性优先 |
| 评论/文档分析(互相独立) | Batch API | 完美场景 |
2. 官方Batch API:50%折扣的正确打开方式
2.1 OpenAI Batch API工作原理
2.2 Spring AI实现Batch API
/**
* OpenAI Batch API封装
* 支持10万条数据的批量推理
*/
@Service
@Slf4j
public class OpenAIBatchService {
private final OpenAiApi openAiApi;
private final ObjectMapper objectMapper;
// Batch API限制:每个文件最多50,000个请求,100MB
private static final int MAX_REQUESTS_PER_BATCH = 50_000;
private static final int BATCH_CHUNK_SIZE = 10_000; // 每批1万条
public OpenAIBatchService(
@Value("${spring.ai.openai.api-key}") String apiKey,
ObjectMapper objectMapper) {
this.openAiApi = new OpenAiApi(apiKey);
this.objectMapper = objectMapper;
}
/**
* 批量情感分析(主入口)
*/
public List<SentimentResult> batchSentimentAnalysis(
List<String> reviews) throws Exception {
log.info("开始批量情感分析 | 总数量: {}", reviews.size());
long totalStart = System.currentTimeMillis();
// 分批处理(每批1万条)
List<List<String>> chunks = partition(reviews, BATCH_CHUNK_SIZE);
List<CompletableFuture<List<SentimentResult>>> futures = new ArrayList<>();
for (int i = 0; i < chunks.size(); i++) {
final int chunkIndex = i;
List<String> chunk = chunks.get(i);
CompletableFuture<List<SentimentResult>> future =
CompletableFuture.supplyAsync(() -> {
try {
return processBatchChunk(chunk, chunkIndex);
} catch (Exception e) {
log.error("批次 {} 处理失败", chunkIndex, e);
throw new RuntimeException(e);
}
});
futures.add(future);
}
// 等待所有批次完成
List<SentimentResult> allResults = new ArrayList<>();
for (CompletableFuture<List<SentimentResult>> future : futures) {
allResults.addAll(future.get(26, TimeUnit.HOURS));
}
long totalElapsed = System.currentTimeMillis() - totalStart;
log.info("批量分析完成 | 总数量: {} | 总耗时: {}分钟 | 成功: {}",
reviews.size(), totalElapsed / 60000, allResults.size());
return allResults;
}
/**
* 处理单个批次
*/
private List<SentimentResult> processBatchChunk(
List<String> reviews, int chunkIndex) throws Exception {
log.info("处理批次 {} | 数量: {}", chunkIndex, reviews.size());
// 1. 构建JSONL请求文件
String jsonlContent = buildBatchRequestJsonl(reviews, chunkIndex);
// 2. 上传文件
String fileId = uploadBatchFile(jsonlContent, "batch_" + chunkIndex + ".jsonl");
log.info("批次 {} 文件上传成功 | file_id: {}", chunkIndex, fileId);
// 3. 创建Batch任务
String batchId = createBatch(fileId);
log.info("批次 {} Batch任务创建 | batch_id: {}", chunkIndex, batchId);
// 4. 等待完成
BatchStatus status = waitForBatchCompletion(batchId, chunkIndex);
log.info("批次 {} 处理完成 | 成功: {} | 失败: {}",
chunkIndex, status.requestCounts().completed(), status.requestCounts().failed());
// 5. 下载并解析结果
return downloadAndParseResults(status.outputFileId(), reviews.size());
}
/**
* 构建JSONL格式的批量请求
* 每行一个JSON对象
*/
private String buildBatchRequestJsonl(List<String> reviews, int chunkIndex) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < reviews.size(); i++) {
String review = reviews.get(i);
String customId = "chunk" + chunkIndex + "_req" + i;
// 构建单个请求
Map<String, Object> request = new LinkedHashMap<>();
request.put("custom_id", customId);
request.put("method", "POST");
request.put("url", "/v1/chat/completions");
Map<String, Object> body = new LinkedHashMap<>();
body.put("model", "gpt-4o-mini");
body.put("max_tokens", 300);
body.put("temperature", 0.1); // 低温度保证输出稳定
List<Map<String, String>> messages = List.of(
Map.of(
"role", "system",
"content", """
你是一个商品评论情感分析专家。
分析用户评论,返回JSON格式:
{
"sentiment": "positive|negative|neutral",
"score": 0.0-1.0,
"positives": ["卖点1", "卖点2"],
"negatives": ["槽点1", "槽点2"],
"summary": "一句话总结"
}
只返回JSON,不要有任何额外说明。
"""
),
Map.of(
"role", "user",
"content", "评论内容:" + review
)
);
body.put("messages", messages);
request.put("body", body);
try {
sb.append(objectMapper.writeValueAsString(request));
sb.append("\n");
} catch (JsonProcessingException e) {
log.error("序列化请求失败 | index: {}", i, e);
}
}
return sb.toString();
}
/**
* 上传JSONL文件到OpenAI
*/
private String uploadBatchFile(String content, String filename) throws Exception {
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
log.info("上传文件 | 文件名: {} | 大小: {}KB", filename, bytes.length / 1024);
// 使用OpenAI Files API上传
// (实际代码需要使用OkHttp或RestTemplate)
MultipartBody.Builder builder = new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("purpose", "batch")
.addFormDataPart("file", filename,
RequestBody.create(bytes, MediaType.parse("application/jsonl")));
Request request = new Request.Builder()
.url("https://api.openai.com/v1/files")
.header("Authorization", "Bearer " + apiKey)
.post(builder.build())
.build();
try (Response response = httpClient.newCall(request).execute()) {
String responseBody = response.body().string();
JsonNode node = objectMapper.readTree(responseBody);
return node.get("id").asText();
}
}
/**
* 创建Batch任务
*/
private String createBatch(String inputFileId) throws Exception {
Map<String, String> body = Map.of(
"input_file_id", inputFileId,
"endpoint", "/v1/chat/completions",
"completion_window", "24h"
);
Request request = new Request.Builder()
.url("https://api.openai.com/v1/batches")
.header("Authorization", "Bearer " + apiKey)
.header("Content-Type", "application/json")
.post(RequestBody.create(
objectMapper.writeValueAsBytes(body),
MediaType.parse("application/json")
))
.build();
try (Response response = httpClient.newCall(request).execute()) {
JsonNode node = objectMapper.readTree(response.body().string());
return node.get("id").asText();
}
}
/**
* 等待Batch完成(轮询)
*/
private BatchStatus waitForBatchCompletion(String batchId, int chunkIndex)
throws Exception {
int pollIntervalSeconds = 30;
int maxWaitHours = 24;
int maxPolls = maxWaitHours * 3600 / pollIntervalSeconds;
for (int poll = 0; poll < maxPolls; poll++) {
Thread.sleep(pollIntervalSeconds * 1000L);
BatchStatus status = getBatchStatus(batchId);
log.info("批次 {} 状态: {} | 完成: {}/{} | 等待: {}分钟",
chunkIndex,
status.status(),
status.requestCounts().completed(),
status.requestCounts().total(),
(poll + 1) * pollIntervalSeconds / 60);
switch (status.status()) {
case "completed":
return status;
case "failed":
case "expired":
case "cancelled":
throw new RuntimeException("Batch任务失败: " + status.status());
default:
// in_progress, validating, finalizing - 继续等待
}
}
throw new RuntimeException("Batch任务超时");
}
/**
* 下载并解析结果
*/
private List<SentimentResult> downloadAndParseResults(
String outputFileId, int expectedCount) throws Exception {
// 下载结果文件
Request request = new Request.Builder()
.url("https://api.openai.com/v1/files/" + outputFileId + "/content")
.header("Authorization", "Bearer " + apiKey)
.get()
.build();
String jsonlContent;
try (Response response = httpClient.newCall(request).execute()) {
jsonlContent = response.body().string();
}
// 解析JSONL
List<SentimentResult> results = new ArrayList<>();
String[] lines = jsonlContent.split("\n");
for (String line : lines) {
if (line.isBlank()) continue;
try {
JsonNode node = objectMapper.readTree(line);
String customId = node.get("custom_id").asText();
JsonNode response = node.get("response");
if (response.get("status_code").asInt() == 200) {
String content = response
.get("body")
.get("choices")
.get(0)
.get("message")
.get("content")
.asText();
SentimentResult result = parseSentimentJson(customId, content);
results.add(result);
} else {
log.error("请求失败 | custom_id: {} | status: {}",
customId, response.get("status_code").asInt());
}
} catch (Exception e) {
log.error("解析结果失败: {}", line, e);
}
}
log.info("解析完成 | 预期: {} | 实际: {} | 成功率: {:.1f}%",
expectedCount, results.size(),
(double) results.size() / expectedCount * 100);
return results;
}
private <T> List<List<T>> partition(List<T> list, int size) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}3. Spring Batch + Spring AI:大规模数据处理架构
3.1 整体架构
3.2 Spring Batch完整实现
/**
* Spring Batch批量情感分析Job
*/
@Configuration
@EnableBatchProcessing
@Slf4j
public class SentimentAnalysisBatchConfig {
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private DataSource dataSource;
/**
* 主Job配置
*/
@Bean
public Job sentimentAnalysisJob(
JobRepository jobRepository,
Step sentimentAnalysisStep) {
return new JobBuilder("sentimentAnalysisJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(sentimentAnalysisStep)
.build();
}
/**
* 分区Step(8路并行)
*/
@Bean
public Step sentimentAnalysisStep(
Step slaveStep,
ReviewPartitioner partitioner) {
return new StepBuilder("sentimentAnalysisStep", jobRepository)
.partitioner("slaveStep", partitioner)
.step(slaveStep)
.gridSize(8) // 8个并行分区
.taskExecutor(batchTaskExecutor())
.build();
}
/**
* 从Step(实际处理单元)
*/
@Bean
public Step slaveStep(
ItemReader<Review> reviewReader,
ItemProcessor<Review, SentimentResult> sentimentProcessor,
ItemWriter<SentimentResult> sentimentWriter) {
return new StepBuilder("slaveStep", jobRepository)
.<Review, SentimentResult>chunk(50, transactionManager) // 每批50条
.reader(reviewReader)
.processor(sentimentProcessor)
.writer(sentimentWriter)
.faultTolerant()
.retryLimit(3)
.retry(OpenAIException.class)
.retry(RateLimitException.class)
.skipLimit(100) // 最多跳过100条失败
.skip(Exception.class)
.backoff(new ExponentialBackOffPolicy())
.build();
}
/**
* 线程池配置(批处理专用)
*/
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8); // 8个分区,对应8个线程
executor.setMaxPoolSize(8);
executor.setQueueCapacity(0); // 不排队,直接执行
executor.setThreadNamePrefix("batch-worker-");
executor.initialize();
return executor;
}
}
/**
* 数据分区器(按ID范围分区)
*/
@Component
@Slf4j
public class ReviewPartitioner implements Partitioner {
@Autowired
private ReviewRepository reviewRepository;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
long totalCount = reviewRepository.countByStatus("PENDING");
long partitionSize = (totalCount + gridSize - 1) / gridSize;
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
for (int i = 0; i < gridSize; i++) {
long minId = i * partitionSize + 1;
long maxId = Math.min((i + 1) * partitionSize, totalCount);
ExecutionContext context = new ExecutionContext();
context.putLong("minId", minId);
context.putLong("maxId", maxId);
context.putInt("partitionIndex", i);
partitions.put("partition" + i, context);
log.info("分区 {} | ID范围: {}-{} | 预计数量: {}",
i, minId, maxId, maxId - minId + 1);
}
return partitions;
}
}
/**
* ItemReader:分批读取评论
*/
@Component
@StepScope
@Slf4j
public class ReviewItemReader implements ItemReader<Review> {
private final ReviewRepository repository;
private final Long minId;
private final Long maxId;
private Iterator<Review> currentBatchIterator;
private long lastProcessedId;
private static final int FETCH_SIZE = 500;
public ReviewItemReader(
ReviewRepository repository,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
this.repository = repository;
this.minId = minId;
this.maxId = maxId;
this.lastProcessedId = minId - 1;
}
@Override
public Review read() {
if (currentBatchIterator == null || !currentBatchIterator.hasNext()) {
// 取下一批
List<Review> batch = repository.findPendingInRange(
lastProcessedId + 1, maxId, PageRequest.of(0, FETCH_SIZE));
if (batch.isEmpty()) {
return null; // 读取完毕
}
currentBatchIterator = batch.iterator();
lastProcessedId = batch.get(batch.size() - 1).getId();
}
return currentBatchIterator.hasNext() ? currentBatchIterator.next() : null;
}
}
/**
* ItemProcessor:调用LLM处理
*/
@Component
@StepScope
@Slf4j
public class SentimentAnalysisProcessor
implements ItemProcessor<Review, SentimentResult> {
private final ChatClient chatClient;
private final RateLimiter rateLimiter;
// 简单的重复内容缓存(相同评论不重复分析)
private final Map<String, SentimentResult> deduplicationCache =
new ConcurrentHashMap<>();
public SentimentAnalysisProcessor(ChatClient chatClient) {
this.chatClient = chatClient;
// 每秒最多50个请求(OpenAI Rate Limit: 500 RPM = 8.3 RPS,留余量)
this.rateLimiter = RateLimiter.create(40.0);
}
@Override
public SentimentResult process(Review review) throws Exception {
// 去重:相同内容直接返回缓存结果
String contentHash = DigestUtils.md5DigestAsHex(
review.getContent().getBytes(StandardCharsets.UTF_8));
SentimentResult cached = deduplicationCache.get(contentHash);
if (cached != null) {
log.debug("命中去重缓存 | review_id: {}", review.getId());
return cached.withReviewId(review.getId());
}
// 限速等待
rateLimiter.acquire();
try {
String response = chatClient.prompt()
.system(SENTIMENT_SYSTEM_PROMPT)
.user("评论内容:" + review.getContent())
.call()
.content();
SentimentResult result = parseSentimentResponse(review.getId(), response);
// 放入去重缓存
deduplicationCache.put(contentHash, result);
return result;
} catch (Exception e) {
log.error("分析失败 | review_id: {}", review.getId(), e);
throw e;
}
}
private static final String SENTIMENT_SYSTEM_PROMPT = """
你是一个商品评论情感分析专家。
分析用户评论,返回JSON格式:
{
"sentiment": "positive|negative|neutral",
"score": 0.0-1.0,
"positives": ["卖点1", "卖点2"],
"negatives": ["槽点1", "槽点2"],
"summary": "一句话总结"
}
只返回JSON,不要有任何额外说明。
""";
}4. 并发控制:在不触发限速的前提下最大化吞吐量
4.1 OpenAI速率限制
GPT-4o-mini速率限制(2025年标准Tier):
RPM (每分钟请求数):500
TPM (每分钟Token数):200,000
每日Token限制:2,000,000
理论最大并发:
按RPM:500/60 = 8.3 RPS
按TPM:200,000 / (200+100 tokens/request) = 667 RPM ≈ 11 RPS
取小值:8 RPS(留10%余量)4.2 自适应速率控制器
/**
* 自适应速率控制器
* 动态调整请求速率,避免触发429错误
*/
@Component
@Slf4j
public class AdaptiveRateLimiter {
// 初始速率:每秒6个请求(留25%余量)
private volatile double currentRps = 6.0;
// Guava RateLimiter(令牌桶算法)
private volatile RateLimiter rateLimiter;
// 连续成功次数(用于提速)
private final AtomicInteger consecutiveSuccesses = new AtomicInteger(0);
// 最近一次429的时间
private volatile long lastRateLimitTime = 0;
// 统计信息
private final AtomicLong totalRequests = new AtomicLong(0);
private final AtomicLong rateLimitHits = new AtomicLong(0);
public AdaptiveRateLimiter() {
this.rateLimiter = RateLimiter.create(currentRps);
}
/**
* 执行带速率控制的LLM调用
*/
public <T> T executeWithRateControl(Supplier<T> llmCall) {
// 等待令牌
rateLimiter.acquire();
totalRequests.incrementAndGet();
try {
T result = llmCall.get();
onSuccess();
return result;
} catch (RateLimitException e) {
onRateLimitHit();
throw e;
}
}
/**
* 成功后可能提速
*/
private void onSuccess() {
int successes = consecutiveSuccesses.incrementAndGet();
// 连续100次成功后,尝试提速10%
if (successes % 100 == 0 && currentRps < 8.0) {
double newRps = Math.min(currentRps * 1.1, 8.0);
updateRate(newRps, "连续成功 " + successes + " 次,提速");
}
}
/**
* 遇到429后降速
*/
private void onRateLimitHit() {
rateLimitHits.incrementAndGet();
consecutiveSuccesses.set(0);
lastRateLimitTime = System.currentTimeMillis();
// 立即降速50%
double newRps = Math.max(currentRps * 0.5, 1.0);
updateRate(newRps, "触发Rate Limit,降速");
}
private synchronized void updateRate(double newRps, String reason) {
log.info("速率调整 | {} | {:.1f} → {:.1f} RPS", reason, currentRps, newRps);
currentRps = newRps;
rateLimiter = RateLimiter.create(newRps);
}
@Scheduled(fixedRate = 60000)
public void reportStats() {
long total = totalRequests.get();
long rateLimitCount = rateLimitHits.get();
log.info("速率控制统计 | 当前速率: {:.1f} RPS | 总请求: {} | 限速次数: {} | 限速率: {:.2f}%",
currentRps, total, rateLimitCount,
total > 0 ? (double) rateLimitCount / total * 100 : 0);
}
}5. 进度追踪:10万条任务的实时进度
5.1 进度追踪设计
/**
* 批处理进度追踪服务
* 支持多任务、实时进度、ETA估算
*/
@Service
@Slf4j
public class BatchProgressTracker {
private final RedisTemplate<String, Object> redisTemplate;
private static final String PROGRESS_KEY_PREFIX = "batch:progress:";
private static final Duration PROGRESS_TTL = Duration.ofDays(7);
public BatchProgressTracker(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 初始化任务进度
*/
public void initProgress(String jobId, int totalItems) {
BatchProgress progress = new BatchProgress(jobId, totalItems);
String key = PROGRESS_KEY_PREFIX + jobId;
redisTemplate.opsForValue().set(key, progress, PROGRESS_TTL);
log.info("任务初始化 | job_id: {} | 总数量: {}", jobId, totalItems);
}
/**
* 更新进度(线程安全)
*/
public void updateProgress(String jobId, int processedCount, int successCount, int failedCount) {
String key = PROGRESS_KEY_PREFIX + jobId;
redisTemplate.execute(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
operations.watch(key);
BatchProgress progress = (BatchProgress) operations.opsForValue().get(key);
if (progress == null) return null;
progress.setProcessedCount(processedCount);
progress.setSuccessCount(successCount);
progress.setFailedCount(failedCount);
progress.setLastUpdateTime(Instant.now());
// 计算速率和ETA
long elapsedSeconds = Duration.between(
progress.getStartTime(), Instant.now()).getSeconds();
if (elapsedSeconds > 0) {
double rps = (double) processedCount / elapsedSeconds;
progress.setCurrentRps(rps);
int remaining = progress.getTotalCount() - processedCount;
if (rps > 0) {
long etaSeconds = (long)(remaining / rps);
progress.setEtaSeconds(etaSeconds);
}
}
operations.multi();
operations.opsForValue().set(key, progress, PROGRESS_TTL);
operations.exec();
return null;
}
});
}
/**
* 查询进度(供前端轮询)
*/
public BatchProgressVO getProgress(String jobId) {
String key = PROGRESS_KEY_PREFIX + jobId;
BatchProgress progress = (BatchProgress) redisTemplate.opsForValue().get(key);
if (progress == null) {
throw new NotFoundException("任务不存在: " + jobId);
}
return BatchProgressVO.builder()
.jobId(jobId)
.totalCount(progress.getTotalCount())
.processedCount(progress.getProcessedCount())
.successCount(progress.getSuccessCount())
.failedCount(progress.getFailedCount())
.progressPercent(
String.format("%.1f%%",
(double) progress.getProcessedCount() / progress.getTotalCount() * 100))
.currentRps(String.format("%.1f条/秒", progress.getCurrentRps()))
.estimatedFinishTime(progress.getEtaSeconds() > 0
? formatEta(progress.getEtaSeconds())
: "计算中...")
.status(progress.getProcessedCount() >= progress.getTotalCount()
? "COMPLETED" : "RUNNING")
.build();
}
private String formatEta(long seconds) {
if (seconds < 60) return seconds + "秒后完成";
if (seconds < 3600) return (seconds / 60) + "分钟后完成";
return String.format("%d小时%d分钟后完成", seconds / 3600, (seconds % 3600) / 60);
}
}
/**
* 进度查询接口
*/
@RestController
@RequestMapping("/api/batch")
public class BatchProgressController {
@Autowired
private BatchProgressTracker progressTracker;
@GetMapping("/progress/{jobId}")
public ResponseEntity<BatchProgressVO> getProgress(@PathVariable String jobId) {
return ResponseEntity.ok(progressTracker.getProgress(jobId));
}
/**
* SSE实时推送进度(前端友好)
*/
@GetMapping(value = "/progress/{jobId}/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<BatchProgressVO>> streamProgress(
@PathVariable String jobId) {
return Flux.interval(Duration.ofSeconds(2))
.map(tick -> {
BatchProgressVO progress = progressTracker.getProgress(jobId);
return ServerSentEvent.<BatchProgressVO>builder()
.event("progress")
.data(progress)
.build();
})
.takeUntil(event ->
"COMPLETED".equals(event.data().getStatus()))
.timeout(Duration.ofHours(25));
}
}6. 失败重试:批量处理的部分失败处理策略
6.1 失败分类与处理策略
/**
* 批量处理失败重试策略
*/
@Service
@Slf4j
public class BatchRetryService {
/**
* 失败类型枚举
*/
public enum FailureType {
RATE_LIMIT(true, Duration.ofSeconds(60)), // 限速:等待后重试
TIMEOUT(true, Duration.ofSeconds(30)), // 超时:立即重试
CONTENT_FILTER(false, null), // 内容过滤:跳过
SERVER_ERROR(true, Duration.ofSeconds(10)), // 服务器错误:重试
INVALID_RESPONSE(false, null), // 响应格式错误:跳过
CONTEXT_LENGTH(false, null); // 超长:截断后重试
final boolean shouldRetry;
final Duration waitBefore;
FailureType(boolean shouldRetry, Duration waitBefore) {
this.shouldRetry = shouldRetry;
this.waitBefore = waitBefore;
}
}
/**
* 失败记录(持久化到数据库)
*/
@Transactional
public void recordFailure(String jobId, String itemId,
FailureType type, String error, String itemContent) {
BatchFailureRecord record = new BatchFailureRecord();
record.setJobId(jobId);
record.setItemId(itemId);
record.setFailureType(type.name());
record.setErrorMessage(error);
record.setItemContent(itemContent);
record.setRetryCount(0);
record.setMaxRetries(type.shouldRetry ? 3 : 0);
record.setNextRetryTime(type.waitBefore != null
? Instant.now().plus(type.waitBefore) : null);
record.setStatus("FAILED");
failureRecordRepository.save(record);
}
/**
* 处理失败记录(重试 + 报告)
*/
@Scheduled(fixedRate = 60000)
@Transactional
public void processFailedItems() {
// 找到可以重试的记录
List<BatchFailureRecord> retryables = failureRecordRepository
.findRetryableItems(Instant.now(), 50); // 每次最多重试50条
log.info("开始失败重试 | 待重试数量: {}", retryables.size());
int retrySuccess = 0;
int retryFailed = 0;
for (BatchFailureRecord record : retryables) {
try {
// 重新执行
SentimentResult result = sentimentProcessor.process(
reviewRepository.findById(record.getItemId()).orElseThrow());
// 成功:更新状态
record.setStatus("RETRY_SUCCESS");
sentimentResultRepository.save(result);
retrySuccess++;
} catch (Exception e) {
record.setRetryCount(record.getRetryCount() + 1);
if (record.getRetryCount() >= record.getMaxRetries()) {
record.setStatus("PERMANENTLY_FAILED");
log.error("永久失败 | item_id: {} | 重试次数: {}",
record.getItemId(), record.getRetryCount());
} else {
// 指数退避
Duration backoff = Duration.ofSeconds(
60L * (long) Math.pow(2, record.getRetryCount()));
record.setNextRetryTime(Instant.now().plus(backoff));
record.setStatus("RETRY_PENDING");
}
retryFailed++;
}
failureRecordRepository.save(record);
}
log.info("重试完成 | 成功: {} | 失败: {}", retrySuccess, retryFailed);
}
/**
* 生成失败报告
*/
public BatchFailureReport generateFailureReport(String jobId) {
Map<String, Long> failuresByType = failureRecordRepository
.countByJobIdGroupByType(jobId);
long totalFailed = failuresByType.values().stream()
.mapToLong(Long::longValue).sum();
long permanentFailed = failureRecordRepository
.countByJobIdAndStatus(jobId, "PERMANENTLY_FAILED");
return BatchFailureReport.builder()
.jobId(jobId)
.totalFailed(totalFailed)
.permanentFailed(permanentFailed)
.retrySuccess(failureRecordRepository.countByJobIdAndStatus(jobId, "RETRY_SUCCESS"))
.failuresByType(failuresByType)
.build();
}
}7. 成本优化:详细的成本分析
7.1 成本对比
===== 10万条评论情感分析成本分析 =====
Token消耗(每条评论):
输入:系统Prompt(150) + 评论内容(200) = 350 tokens
输出:JSON结果 ≈ 120 tokens
合计:470 tokens/条
总Token:10万 × 470 = 4700万 tokens
输入Token:10万 × 350 = 3500万
输出Token:10万 × 120 = 1200万
===== 方案1:实时API =====
输入:$0.15/百万 × 3500万 = $5.25
输出:$0.60/百万 × 1200万 = $7.20
合计:$12.45
时间:并发8 RPS → 约3.5小时
===== 方案2:Batch API(50%折扣)=====
输入:$0.075/百万 × 3500万 = $2.625
输出:$0.30/百万 × 1200万 = $3.60
合计:$6.225(节省50%)
时间:1-4小时(异步)
===== 优化策略 =====
1. 去重:估计20%评论内容重复 → 节省20%
2. Batch API:节省50%
3. 使用更小模型(GPT-4o-mini vs GPT-4o):已经是最小
4. 压缩Prompt:精简系统Prompt → 节省15%
最终成本:$6.225 × 0.8(去重)× 0.85(Prompt优化)= $4.23
对比原始:$12.45
节省:$8.22(66%)7.2 成本实时监控
/**
* Token消耗和成本实时监控
*/
@Component
@Slf4j
public class CostTracker {
// GPT-4o-mini定价(每百万token)
private static final double INPUT_PRICE_PER_MILLION = 0.15;
private static final double OUTPUT_PRICE_PER_MILLION = 0.60;
private static final double BATCH_DISCOUNT = 0.5;
private final AtomicLong totalInputTokens = new AtomicLong(0);
private final AtomicLong totalOutputTokens = new AtomicLong(0);
private final AtomicLong batchInputTokens = new AtomicLong(0);
private final AtomicLong batchOutputTokens = new AtomicLong(0);
/**
* 记录API调用的Token消耗
*/
public void recordUsage(ChatResponse response, boolean isBatch) {
if (response.getMetadata() == null) return;
Usage usage = response.getMetadata().getUsage();
long inputTokens = usage.getPromptTokens();
long outputTokens = usage.getGenerationTokens();
totalInputTokens.addAndGet(inputTokens);
totalOutputTokens.addAndGet(outputTokens);
if (isBatch) {
batchInputTokens.addAndGet(inputTokens);
batchOutputTokens.addAndGet(outputTokens);
}
}
/**
* 获取当前累计成本
*/
public CostReport getCurrentCost() {
long inputTokens = totalInputTokens.get();
long outputTokens = totalOutputTokens.get();
// 实时API成本
double realtimeCost = inputTokens / 1_000_000.0 * INPUT_PRICE_PER_MILLION
+ outputTokens / 1_000_000.0 * OUTPUT_PRICE_PER_MILLION;
// Batch API成本(享受50%折扣)
double batchCost = (batchInputTokens.get() / 1_000_000.0 * INPUT_PRICE_PER_MILLION
+ batchOutputTokens.get() / 1_000_000.0 * OUTPUT_PRICE_PER_MILLION)
* BATCH_DISCOUNT;
return CostReport.builder()
.totalInputTokens(inputTokens)
.totalOutputTokens(outputTokens)
.estimatedCostUSD(realtimeCost)
.batchCostUSD(batchCost)
.estimatedCostCNY(realtimeCost * 7.2)
.build();
}
@Scheduled(fixedRate = 30000)
public void reportCost() {
CostReport report = getCurrentCost();
log.info("成本统计 | 输入Token: {} | 输出Token: {} | 预估成本: ${:.4f} (¥{:.2f})",
report.getTotalInputTokens(),
report.getTotalOutputTokens(),
report.getEstimatedCostUSD(),
report.getEstimatedCostCNY());
}
}8. 实战:10万条评论,从30小时到45分钟
8.1 完整时间线
Day 1 上午:串行方案测试
- 单条耗时:1080ms
- 预估串行时间:30小时
- 结论:不可行
Day 1 下午:并行方案实现
- 8线程并发:1080ms / 8 ≈ 135ms/条(受Rate Limit限制)
- Rate Limit触发:大量429错误
- 调整为6 RPS:约4.6小时
- 结论:可行但成本$12.45
Day 2 上午:Batch API迁移
- 实现Batch API提交
- 第一批1万条:提交成功,等待处理
- 1.5小时后结果返回
- 成功率:99.2%
Day 2 下午:完整流程优化
- 去重优化:发现23%重复评论,实际只处理7.7万条
- 分批并发提交:10个批次并行提交
- 总耗时:45分钟
- 总成本:$4.238.2 最终性能数据
===== 最终结果对比 =====
方案 耗时 成本 成功率
串行(原始) 30小时 $12.45 99.8%
并行(8线程) 4.6小时 $12.45 98.5%
Batch API 1.5小时 $6.23 99.2%
Batch API + 优化 45分钟 $4.23 99.4%
优化效果:
时间:30小时 → 45分钟(-97.5%)
成本:$12.45 → $4.23(-66%)FAQ
Q1:Batch API有什么限制,哪些情况不能用?
A:主要限制:1)最长等待24小时(不适合实时需求);2)每个批次最多50,000个请求;3)总文件大小不超过100MB;4)不支持部分模型(需查最新文档)。适合:离线数据处理、非实时分析、成本敏感场景。
Q2:10万条数据,去重后只剩7.7万条,多余的结果怎么生成?
A:去重后建立内容哈希到结果的映射表。写入数据库时,重复内容直接引用已有结果的ID,不重复存储。这样既节省了API成本,又保证了所有10万条记录都有对应结果。
Q3:Batch API任务失败了一部分,怎么只重新处理失败的?
A:Batch API的输出文件中每条记录都有status_code。读取结果时,将失败的记录收集起来,重新组织一个新的Batch任务。失败原因里的content_filter(内容安全过滤)通常不值得重试,直接标记为跳过。
Q4:Spring Batch的分区并行,多个分区同时操作同一张数据库表会有并发问题吗?
A:不会,因为每个分区处理的ID范围是独立的(互不重叠)。读取时用WHERE id BETWEEN minId AND maxId;写入时每个分区写自己的结果记录,不会冲突。
Q5:RateLimiter用Guava实现够用吗?分布式部署时怎么办?
A:单机部署用Guava够用。多实例部署时,各实例独立的RateLimiter可能总并发超过API限制。解决方案:1)用Redis实现分布式令牌桶;2)每个实例的速率 = API总限制 / 实例数;3)通过配置中心动态调整各实例速率。
总结
10万条数据批量处理,核心方法论:
- Batch API优先:50%折扣 + 异步处理,离线场景的首选
- 去重降本:真实数据通常有15-30%重复,去重是免费的优化
- Spring Batch兜底:成熟的批处理框架处理分区、重试、进度追踪
- 自适应速率:不要固定速率,根据429反馈动态调整
- 失败要处理:批量任务一定有失败,要有重试机制而不是忽略
从30小时到45分钟,本质上是从串行变并发、从实时变异步、从无优化变去重的三重叠加效果。
