第1773篇:批量推理优化——异步批处理降低API调用成本的实战方案
第1773篇:批量推理优化——异步批处理降低API调用成本的实战方案
有一类AI应用场景经常被忽视,但成本潜力非常大:批量任务。
比如每天凌晨批量给10万条商品描述打标签,或者每周批量分析几万条用户反馈,或者定期给文档库做向量化。这些任务有个共同特点:不需要实时响应,但量很大,如果用普通同步调用,成本和耗时都是灾难。
我之前做过一个项目,需要每天对20万条日志做意图分析。最开始用同步调用,跑了8小时才能跑完,而且因为并发太高,经常触发Rate Limit,结果反而更慢。后来把整个方案重做成异步批处理,耗时降到1.2小时,成本降了50%。
这篇文章,我来分享批处理的工程方案和踩过的坑。
批处理 vs 实时调用,差异有多大
先建立认知基础。
OpenAI 的 Batch API(2024年推出)相比普通 API:
- 价格折扣:输入和输出均打5折
- 限制:响应时间最长24小时(通常在1小时内完成)
- 适合:大批量、对延迟不敏感的任务
Anthropic 也有类似的批处理方案,价格同样有折扣。
自建批处理系统(不用厂商Batch API)的优势:
- 可以精细控制并发、重试、优先级
- 可以混合不同的模型提供商
- 任务可以跨天调度
实际项目里,两种方式结合用效果最好:官方Batch API处理纯批量任务,自建批处理系统处理需要调度控制的复杂场景。
官方Batch API的使用
先说最简单的,直接用 OpenAI Batch API。
@Service
@Slf4j
public class OpenAIBatchService {
@Autowired
private OpenAIClient openAIClient;
@Autowired
private ObjectMapper objectMapper;
/**
* 提交批量任务
* @param requests 请求列表
* @return batchId
*/
public String submitBatch(List<BatchRequest> requests) throws IOException {
// 1. 构建JSONL格式的输入文件
String jsonlContent = buildJsonlContent(requests);
// 2. 上传文件
String fileId = uploadFile(jsonlContent);
log.info("文件上传成功: fileId={}, requests={}", fileId, requests.size());
// 3. 创建批次
Map<String, Object> batchParams = Map.of(
"input_file_id", fileId,
"endpoint", "/v1/chat/completions",
"completion_window", "24h",
"metadata", Map.of(
"biz_type", "daily_classification",
"total_count", String.valueOf(requests.size())
)
);
String batchId = openAIClient.createBatch(batchParams);
log.info("批次创建成功: batchId={}", batchId);
return batchId;
}
/**
* 构建JSONL格式内容
* 每行是一个独立的请求,格式固定
*/
private String buildJsonlContent(List<BatchRequest> requests) {
StringBuilder sb = new StringBuilder();
for (BatchRequest req : requests) {
Map<String, Object> line = new LinkedHashMap<>();
line.put("custom_id", req.getCustomId()); // 用于结果匹配
line.put("method", "POST");
line.put("url", "/v1/chat/completions");
line.put("body", Map.of(
"model", req.getModel(),
"messages", List.of(
Map.of("role", "system", "content", req.getSystemPrompt()),
Map.of("role", "user", "content", req.getUserMessage())
),
"max_tokens", req.getMaxTokens(),
"temperature", req.getTemperature()
));
try {
sb.append(objectMapper.writeValueAsString(line)).append("\n");
} catch (JsonProcessingException e) {
log.error("序列化失败: customId={}", req.getCustomId(), e);
}
}
return sb.toString();
}
/**
* 轮询批次状态并获取结果
*/
public List<BatchResult> waitAndGetResults(String batchId, Duration timeout)
throws InterruptedException {
long deadline = System.currentTimeMillis() + timeout.toMillis();
int pollIntervalSeconds = 30;
while (System.currentTimeMillis() < deadline) {
BatchStatus status = openAIClient.getBatchStatus(batchId);
log.info("批次状态: batchId={}, status={}, completed={}/{}",
batchId, status.getStatus(),
status.getRequestCounts().getCompleted(),
status.getRequestCounts().getTotal());
if ("completed".equals(status.getStatus())) {
return downloadResults(status.getOutputFileId());
}
if ("failed".equals(status.getStatus()) || "expired".equals(status.getStatus())) {
throw new RuntimeException("批次失败: " + status.getStatus());
}
Thread.sleep(pollIntervalSeconds * 1000L);
// 随时间推移增加轮询间隔,避免浪费API调用
if (pollIntervalSeconds < 300) pollIntervalSeconds = Math.min(pollIntervalSeconds * 2, 300);
}
throw new TimeoutException("批次超时: batchId=" + batchId);
}
/**
* 下载并解析结果文件
*/
private List<BatchResult> downloadResults(String outputFileId) throws IOException {
String jsonlContent = openAIClient.downloadFile(outputFileId);
List<BatchResult> results = new ArrayList<>();
for (String line : jsonlContent.split("\n")) {
if (line.trim().isEmpty()) continue;
try {
JsonNode node = objectMapper.readTree(line);
BatchResult result = BatchResult.builder()
.customId(node.get("custom_id").asText())
.statusCode(node.get("response").get("status_code").asInt())
.content(extractContent(node))
.error(extractError(node))
.build();
results.add(result);
} catch (Exception e) {
log.error("解析结果行失败: line={}", line, e);
}
}
return results;
}
}自建异步批处理系统
对于更复杂的场景——需要优先级控制、支持多模型、需要实时进度反馈——自建批处理系统更灵活。
整体架构:
任务定义
@Data
@Builder
public class BatchJob {
private String jobId;
private String jobName;
private String featureCode;
private JobPriority priority; // HIGH / NORMAL / LOW
private List<BatchTask> tasks; // 任务列表
private JobConfig config; // 调度配置
private String callbackUrl; // 完成后回调地址
private LocalDateTime scheduledTime; // 计划执行时间
private JobStatus status;
@Data
@Builder
public static class JobConfig {
private String modelName;
private int maxConcurrency; // 最大并发数
private int batchSize; // 每批处理数量
private int maxRetries; // 最大重试次数
private Duration taskTimeout; // 单任务超时
private boolean stopOnHighErrorRate; // 错误率高时停止
private double maxErrorRate; // 最大容忍错误率
}
}
@Data
@Builder
public class BatchTask {
private String taskId;
private String jobId;
private String bizObjectId; // 关联的业务对象ID
private String systemPrompt;
private String userMessage;
private Map<String, Object> metadata;
private TaskStatus status; // PENDING / PROCESSING / SUCCESS / FAILED
private int retryCount;
private String result;
private String errorMsg;
}批处理调度器
@Service
@Slf4j
public class BatchJobScheduler {
@Autowired
private BatchJobRepository jobRepository;
@Autowired
private BatchTaskRepository taskRepository;
@Autowired
private RateLimiterService rateLimiter;
@Autowired
private AIClientFactory clientFactory;
@Autowired
private BatchResultProcessor resultProcessor;
// 按优先级维护执行队列
private final PriorityBlockingQueue<BatchJob> executionQueue =
new PriorityBlockingQueue<>(100, Comparator.comparing(BatchJob::getPriority));
@Async
public CompletableFuture<JobResult> executeJob(BatchJob job) {
log.info("开始执行批处理任务: jobId={}, taskCount={}", job.getJobId(), job.getTasks().size());
jobRepository.updateStatus(job.getJobId(), JobStatus.RUNNING);
List<BatchTask> tasks = job.getTasks();
JobConfig config = job.getConfig();
// 分批处理
List<List<BatchTask>> batches = partition(tasks, config.getBatchSize());
int successCount = 0;
int failCount = 0;
for (List<BatchTask> batch : batches) {
// 检查错误率,过高时停止
if (config.isStopOnHighErrorRate() && failCount + successCount > 0) {
double errorRate = (double) failCount / (failCount + successCount);
if (errorRate > config.getMaxErrorRate()) {
log.error("错误率超阈值,停止任务: jobId={}, errorRate={}",
job.getJobId(), errorRate);
break;
}
}
// 并发处理一个批次
List<CompletableFuture<TaskResult>> futures = batch.stream()
.map(task -> processTaskWithRateLimit(task, config))
.collect(Collectors.toList());
// 等待批次完成
List<TaskResult> batchResults = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// 统计
for (TaskResult result : batchResults) {
if (result.isSuccess()) successCount++;
else failCount++;
}
// 批次间短暂休眠,避免突发请求
ThreadUtil.sleep(100);
}
// 任务完成,触发回调
JobResult jobResult = JobResult.builder()
.jobId(job.getJobId())
.successCount(successCount)
.failCount(failCount)
.totalCount(tasks.size())
.build();
jobRepository.updateStatus(job.getJobId(), JobStatus.COMPLETED);
triggerCallback(job, jobResult);
return CompletableFuture.completedFuture(jobResult);
}
private CompletableFuture<TaskResult> processTaskWithRateLimit(
BatchTask task, JobConfig config) {
return CompletableFuture.supplyAsync(() -> {
// 获取令牌,控制请求速率
rateLimiter.acquire(config.getModelName());
return executeTaskWithRetry(task, config);
}, getExecutor(config.getMaxConcurrency()));
}
private TaskResult executeTaskWithRetry(BatchTask task, JobConfig config) {
int maxRetries = config.getMaxRetries();
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
AIClient client = clientFactory.getClient(config.getModelName());
AIResponse response = client.chat(task.getSystemPrompt(), task.getUserMessage());
taskRepository.markSuccess(task.getTaskId(), response.getContent());
return TaskResult.success(task.getTaskId(), response.getContent());
} catch (RateLimitException e) {
// Rate limit:等待后重试
long waitMs = e.getRetryAfterMs() > 0 ? e.getRetryAfterMs() : (1000L * (attempt + 1));
log.warn("Rate limit,等待重试: taskId={}, attempt={}, waitMs={}",
task.getTaskId(), attempt, waitMs);
ThreadUtil.sleep(waitMs);
} catch (Exception e) {
if (attempt == maxRetries) {
log.error("任务最终失败: taskId={}", task.getTaskId(), e);
taskRepository.markFailed(task.getTaskId(), e.getMessage());
return TaskResult.failure(task.getTaskId(), e.getMessage());
}
// 指数退避
ThreadUtil.sleep(1000L * (long)Math.pow(2, attempt));
}
}
return TaskResult.failure(task.getTaskId(), "超出最大重试次数");
}
}限流器实现
批处理系统最容易踩的坑就是限流。各家模型API都有 RPM(每分钟请求数)和 TPM(每分钟Token数)的限制,超了就会被拒绝。
@Component
public class RateLimiterService {
// 每个模型维护独立的限流器
private final Map<String, RateLimiter> rpmLimiters = new ConcurrentHashMap<>();
private final Map<String, TokenBucketRateLimiter> tpmLimiters = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
// 根据各模型的Rate Limit配置初始化
// OpenAI GPT-4o-mini: 500 RPM, 200K TPM (Tier 1)
initLimiter("gpt-4o-mini", 500, 200_000);
initLimiter("gpt-4o", 500, 30_000);
initLimiter("claude-3-haiku-20240307", 1000, 100_000);
initLimiter("claude-3-5-sonnet-20241022", 50, 40_000);
}
private void initLimiter(String modelName, int rpm, int tpm) {
// Guava RateLimiter 基于令牌桶,平滑限流
rpmLimiters.put(modelName, RateLimiter.create(rpm / 60.0)); // 转换为每秒
tpmLimiters.put(modelName, new TokenBucketRateLimiter(tpm));
}
/**
* 请求调用许可(会阻塞直到获取到许可)
*/
public void acquire(String modelName) {
RateLimiter limiter = rpmLimiters.get(modelName);
if (limiter != null) {
limiter.acquire();
}
}
/**
* 消耗Token预算
*/
public boolean consumeTokens(String modelName, int tokenCount) {
TokenBucketRateLimiter tpmLimiter = tpmLimiters.get(modelName);
if (tpmLimiter == null) return true;
return tpmLimiter.tryConsume(tokenCount);
}
/**
* 动态调整限流参数(当检测到频繁429错误时降速)
*/
public void throttleDown(String modelName, double factor) {
RateLimiter limiter = rpmLimiters.get(modelName);
if (limiter != null) {
double currentRate = limiter.getRate();
double newRate = Math.max(currentRate * factor, 1.0); // 最低1 RPS
log.warn("限流降速: model={}, {} -> {} RPS", modelName, currentRate, newRate);
// 注意:Guava RateLimiter不支持动态调整,需要重建
rpmLimiters.put(modelName, RateLimiter.create(newRate));
}
}
}断点续传:大批次任务的必备能力
处理几十万条数据时,任务可能中途失败。没有断点续传,就得从头来过,时间和成本都浪费掉了。
@Service
public class ResumableBatchService {
@Autowired
private BatchTaskRepository taskRepository;
@Autowired
private BatchJobScheduler scheduler;
/**
* 恢复中断的任务,只处理未完成的部分
*/
public JobResult resumeJob(String jobId) {
BatchJob job = jobRepository.findById(jobId);
if (job.getStatus() == JobStatus.COMPLETED) {
log.info("任务已完成,无需恢复: jobId={}", jobId);
return buildResult(job);
}
// 查出未完成的任务
List<BatchTask> pendingTasks = taskRepository.findByJobIdAndStatus(
jobId, List.of(TaskStatus.PENDING, TaskStatus.FAILED)
);
log.info("恢复任务: jobId={}, pendingCount={}", jobId, pendingTasks.size());
// 重置失败任务的状态
List<BatchTask> failedTasks = pendingTasks.stream()
.filter(t -> t.getStatus() == TaskStatus.FAILED && t.getRetryCount() < 3)
.collect(Collectors.toList());
failedTasks.forEach(t -> {
t.setStatus(TaskStatus.PENDING);
t.setErrorMsg(null);
});
taskRepository.saveAll(failedTasks);
// 只提交未完成的任务
job.setTasks(pendingTasks);
return scheduler.executeJob(job).join();
}
/**
* 检查任务进度
*/
public JobProgress getProgress(String jobId) {
long total = taskRepository.countByJobId(jobId);
long success = taskRepository.countByJobIdAndStatus(jobId, TaskStatus.SUCCESS);
long failed = taskRepository.countByJobIdAndStatus(jobId, TaskStatus.FAILED);
long pending = total - success - failed;
return JobProgress.builder()
.jobId(jobId)
.total(total)
.success(success)
.failed(failed)
.pending(pending)
.progressPercent(total > 0 ? (double)(success + failed) / total * 100 : 0)
.estimatedRemainingMinutes(estimateRemaining(jobId, pending))
.build();
}
}成本对比分析
做完这套系统,我们做了一次对比测试,结果很直观。
场景:处理10万条商品描述,提取结构化属性(品牌、规格、价格区间等)。
模型:gpt-4o-mini 平均每条:500 input tokens + 150 output tokens
方案A:同步调用(原方案)
- 并发数:50
- 耗时:约4.5小时(Rate Limit频繁触发,实际更长)
- 成本:$100,000 * (0.00015/1000 * 500 + 0.0006/1000 * 150) = 约$97
方案B:OpenAI Batch API
- 无需管理并发
- 耗时:约1.2小时(OpenAI后台处理)
- 成本:约$48.5(5折)
方案C:自建批处理 + 智能限流
- 并发数:动态调整
- 耗时:约2小时
- 成本:约$97(但加上优先级调度等附加价值)
结论:纯批量任务首选 OpenAI Batch API,能省一半成本。自建批处理适合需要复杂调度、混合模型、实时进度监控的场景。
一些实战建议
任务设计要幂等。同一个任务ID被处理两次,结果应该一样。这在重试和断点续传时非常重要,否则会产生重复结果或计费问题。
Prompt要为批处理优化。同步调用时可以用复杂的多轮对话,批处理时尽量用单轮、输出格式固定的Prompt,方便批量解析结果。
批次大小不是越大越好。太大的批次失败重试代价高,太小的批次调度开销大。我们的经验是单批次500-2000条比较合适。
结果要有校验层。批处理完成后,要对结果进行格式和业务逻辑校验,不合格的记下来留给人工处理,而不是直接写入生产数据库。
批处理优化看起来不如实时对话那么glamorous,但对于有大量数据处理需求的业务,这套东西能实实在在省下来很多钱,还能让整个系统更稳。
