第1680篇:生产级Agent系统的成本控制——Token预算与任务终止策略
第1680篇:生产级Agent系统的成本控制——Token预算与任务终止策略
月底结算,一个同事拿着账单来找我:这个月OpenAI的费用比上个月涨了7倍。排查下来,发现有个Agent任务在一个死循环里转,一直在调LLM,调了3000多次。总共花了400多美元,做了个寂寞。
这件事之后,我把成本控制提到了和功能实现同等重要的位置。今天把我们做的这套Token预算和任务终止系统完整讲一下。
Agent成本的构成
先搞清楚成本从哪里来,才能针对性地控制:
直接成本:
- LLM调用的Token费用(输入Token + 输出Token)
- 嵌入向量API调用费用(记忆检索)
- 语音识别/图像识别API费用
间接成本:
- 向量数据库的存储和查询费用
- 中间件(Redis、消息队列)的使用费用
- 计算资源(代码沙箱的容器运行费用)
在大多数Agent系统里,LLM调用费用占到总成本的70%以上。所以成本控制的核心是:控制Token的使用量。
Token消耗的四大来源
我们测量过,一个典型的10轮对话任务,Token分布大概是:
- System Prompt:约5%
- 对话历史:约45%
- 工具描述:约20%
- LLM输出:约30%
Token预算系统设计
核心思路是:在任务启动时分配Token预算,执行过程中实时追踪消耗,超预算时触发管控策略。
@Data
@Builder
public class TokenBudget {
private String budgetId;
private String taskId;
private String userId;
// 预算配置
private int totalBudget; // 总Token预算
private int llmCallBudget; // LLM调用的Token预算
private int embeddingBudget; // 向量化的Token预算
private int maxLlmCalls; // 最大LLM调用次数
private int maxToolCalls; // 最大工具调用次数
// 实时消耗追踪
private int usedInputTokens; // 已用输入Token
private int usedOutputTokens; // 已用输出Token
private int llmCallCount; // LLM调用次数
private int toolCallCount; // 工具调用次数
// 成本计算
private String modelName; // 用的哪个模型(影响单价)
private double estimatedCostUsd; // 预估美元成本
// 超预算策略
private BudgetExceededPolicy policy;
// WARN_AND_CONTINUE / SWITCH_TO_CHEAPER / STOP
public int getTotalUsedTokens() {
return usedInputTokens + usedOutputTokens;
}
public double getBudgetUtilization() {
return (double) getTotalUsedTokens() / totalBudget;
}
public boolean isExceeded() {
return getTotalUsedTokens() > totalBudget;
}
public boolean isNearLimit(double threshold) {
return getBudgetUtilization() > threshold;
}
}
// 模型价格配置(实时从配置中心加载)
@Component
public class ModelPricing {
private final Map<String, ModelPrice> prices = Map.of(
"gpt-4o", new ModelPrice(5.0, 15.0), // 输入$5/1M, 输出$15/1M
"gpt-4o-mini", new ModelPrice(0.15, 0.6),
"gpt-3.5-turbo", new ModelPrice(0.5, 1.5),
"claude-3-opus", new ModelPrice(15.0, 75.0),
"claude-3-haiku", new ModelPrice(0.25, 1.25)
);
public double estimateCost(String modelName, int inputTokens, int outputTokens) {
ModelPrice price = prices.getOrDefault(modelName,
new ModelPrice(5.0, 15.0)); // 默认按gpt-4o计算
return (inputTokens * price.getInputPer1M() +
outputTokens * price.getOutputPer1M()) / 1_000_000;
}
}Token预算管理器
@Service
public class TokenBudgetManager {
private final RedisTemplate<String, TokenBudget> redis;
private final ModelPricing modelPricing;
/**
* 为任务创建Token预算
*/
public TokenBudget createBudget(String taskId, String userId,
BudgetConfig config) {
// 根据用户等级和任务类型设置预算
int totalBudget = determineBudget(userId, config.getTaskType());
TokenBudget budget = TokenBudget.builder()
.budgetId(UUID.randomUUID().toString())
.taskId(taskId)
.userId(userId)
.totalBudget(totalBudget)
.llmCallBudget((int)(totalBudget * 0.9))
.embeddingBudget((int)(totalBudget * 0.1))
.maxLlmCalls(config.getMaxLlmCalls() != null ? config.getMaxLlmCalls() : 20)
.maxToolCalls(config.getMaxToolCalls() != null ? config.getMaxToolCalls() : 50)
.modelName(config.getPreferredModel())
.policy(config.getBudgetExceededPolicy())
.build();
// 存入Redis,有效期24小时
redis.opsForValue().set("budget:" + taskId, budget, Duration.ofHours(24));
log.info("Token预算创建: taskId={}, budget={}, maxCalls={}",
taskId, totalBudget, budget.getMaxLlmCalls());
return budget;
}
/**
* 记录Token消耗,返回是否超预算
*/
public BudgetCheckResult recordUsage(String taskId, int inputTokens,
int outputTokens, String modelName) {
TokenBudget budget = getBudget(taskId);
if (budget == null) {
return BudgetCheckResult.noBudget();
}
budget.setUsedInputTokens(budget.getUsedInputTokens() + inputTokens);
budget.setUsedOutputTokens(budget.getUsedOutputTokens() + outputTokens);
budget.setLlmCallCount(budget.getLlmCallCount() + 1);
double callCost = modelPricing.estimateCost(modelName, inputTokens, outputTokens);
budget.setEstimatedCostUsd(budget.getEstimatedCostUsd() + callCost);
saveBudget(budget);
// 检查各项限制
if (budget.isExceeded()) {
return BudgetCheckResult.exceeded(
BudgetExceededReason.TOKEN_LIMIT,
budget.getTotalUsedTokens(),
budget.getTotalBudget(),
budget.getPolicy()
);
}
if (budget.getLlmCallCount() > budget.getMaxLlmCalls()) {
return BudgetCheckResult.exceeded(
BudgetExceededReason.CALL_COUNT_LIMIT,
budget.getLlmCallCount(),
budget.getMaxLlmCalls(),
budget.getPolicy()
);
}
// 接近限制时发出警告
if (budget.isNearLimit(0.8)) {
return BudgetCheckResult.warning(
String.format("Token使用量已达%d%%",
(int)(budget.getBudgetUtilization() * 100))
);
}
return BudgetCheckResult.ok(budget);
}
private int determineBudget(String userId, String taskType) {
UserTier tier = userService.getTier(userId);
// 基础预算 * 任务类型倍率
int baseBudget = switch (tier) {
case FREE -> 10_000;
case BASIC -> 50_000;
case PRO -> 200_000;
case ENTERPRISE -> 1_000_000;
};
double multiplier = switch (taskType) {
case "simple_qa" -> 0.5;
case "data_analysis" -> 1.0;
case "code_generation" -> 1.5;
case "complex_research" -> 3.0;
default -> 1.0;
};
return (int)(baseBudget * multiplier);
}
}动态模型降级:超预算时自动切换便宜模型
当Token消耗接近预算时,自动切换到更便宜的模型:
@Service
public class ModelSelectionStrategy {
// 模型按性能从高到低,按成本从高到低
private static final List<ModelOption> MODEL_FALLBACK_CHAIN = List.of(
new ModelOption("gpt-4o", 1.0, 20.0), // 最强,最贵
new ModelOption("gpt-4o-mini", 0.8, 0.75), // 稍弱,便宜很多
new ModelOption("gpt-3.5-turbo", 0.6, 1.5), // 基础,便宜
new ModelOption("claude-3-haiku", 0.75, 1.3) // 平衡型
);
/**
* 根据预算状态选择合适的模型
*/
public String selectModel(TokenBudget budget, TaskComplexity complexity) {
double utilization = budget.getBudgetUtilization();
// 预算充足时,用最佳模型
if (utilization < 0.6) {
return budget.getModelName(); // 用户指定的模型
}
// 预算告急(60%-80%),降一级
if (utilization < 0.8) {
return downgradeModel(budget.getModelName(), 1);
}
// 预算严重告急(>80%),降两级
return downgradeModel(budget.getModelName(), 2);
}
private String downgradeModel(String currentModel, int levels) {
int currentIndex = MODEL_FALLBACK_CHAIN.stream()
.map(ModelOption::getName)
.collect(Collectors.toList())
.indexOf(currentModel);
if (currentIndex < 0) {
return "gpt-4o-mini"; // 找不到当前模型,用默认降级
}
int targetIndex = Math.min(
currentIndex + levels,
MODEL_FALLBACK_CHAIN.size() - 1
);
String downgraded = MODEL_FALLBACK_CHAIN.get(targetIndex).getName();
log.info("模型降级: {} -> {}", currentModel, downgraded);
return downgraded;
}
}Context压缩:减少输入Token
随着对话轮数增加,输入Token会线性增长,是最大的成本来源。主动压缩Context可以显著降低成本:
@Service
public class ContextCompressor {
private final LLMClient llmClient;
/**
* 渐进式Context压缩策略
*/
public List<Message> compress(List<Message> messages, int targetTokens) {
int currentTokens = countTokens(messages);
if (currentTokens <= targetTokens) {
return messages; // 不需要压缩
}
log.info("Context压缩: {} -> {} tokens", currentTokens, targetTokens);
// 策略1:保留最新N条消息(滑动窗口)
if (currentTokens > targetTokens * 2) {
messages = slideWindow(messages, targetTokens);
}
// 策略2:摘要化旧消息
if (countTokens(messages) > targetTokens) {
messages = summarizeOldMessages(messages, targetTokens);
}
return messages;
}
/**
* 滑动窗口:保留System消息 + 最近的N条消息
*/
private List<Message> slideWindow(List<Message> messages, int targetTokens) {
List<Message> systemMessages = messages.stream()
.filter(m -> "system".equals(m.getRole()))
.collect(Collectors.toList());
List<Message> nonSystemMessages = messages.stream()
.filter(m -> !"system".equals(m.getRole()))
.collect(Collectors.toList());
// 从最新的消息往前加,直到达到Token限制
int tokensLeft = targetTokens - countTokens(systemMessages);
List<Message> recentMessages = new ArrayList<>();
for (int i = nonSystemMessages.size() - 1; i >= 0; i--) {
int msgTokens = countTokens(List.of(nonSystemMessages.get(i)));
if (tokensLeft - msgTokens < 0) break;
recentMessages.add(0, nonSystemMessages.get(i));
tokensLeft -= msgTokens;
}
List<Message> result = new ArrayList<>(systemMessages);
// 如果有消息被截断,加一条说明
if (recentMessages.size() < nonSystemMessages.size()) {
int removedCount = nonSystemMessages.size() - recentMessages.size();
result.add(Message.system(
String.format("[早期对话已省略,共省略%d条消息]", removedCount)
));
}
result.addAll(recentMessages);
return result;
}
/**
* 摘要化:用LLM把早期对话压缩成摘要
*/
private List<Message> summarizeOldMessages(List<Message> messages,
int targetTokens) {
int totalMessages = messages.size();
int keepRecent = totalMessages / 3; // 保留最近1/3的消息原文
List<Message> toSummarize = messages.subList(0, totalMessages - keepRecent);
List<Message> toKeep = messages.subList(totalMessages - keepRecent, totalMessages);
// 对旧消息做摘要
String conversationText = toSummarize.stream()
.map(m -> m.getRole() + ": " + m.getContent())
.collect(Collectors.joining("\n"));
String summaryPrompt = """
请将以下对话内容压缩为简洁的摘要,保留关键信息、决策和结果:
""" + conversationText;
// 用便宜的模型做摘要,节省成本
String summary = llmClient.complete(summaryPrompt, "gpt-4o-mini");
List<Message> result = new ArrayList<>();
// 保留原始System消息
messages.stream()
.filter(m -> "system".equals(m.getRole()))
.findFirst()
.ifPresent(result::add);
result.add(Message.system("[早期对话摘要]\n" + summary));
result.addAll(toKeep.stream()
.filter(m -> !"system".equals(m.getRole()))
.collect(Collectors.toList()));
log.info("Context摘要压缩: {} messages -> {} messages",
toSummarize.size(), result.size());
return result;
}
/**
* 工具描述按需加载:只加载当前步骤可能用到的工具
*/
public String selectRelevantTools(String currentTask,
List<ToolRegistration> allTools,
int maxTools) {
// 用嵌入向量找最相关的工具,而不是每次把所有工具都发给LLM
float[] taskEmbedding = embeddingClient.embed(currentTask);
return allTools.stream()
.sorted(Comparator.comparingDouble(tool ->
-cosineSimilarity(taskEmbedding, tool.getDescriptionEmbedding())
))
.limit(maxTools) // 最多加载maxTools个工具
.map(this::formatToolDescription)
.collect(Collectors.joining("\n\n"));
}
}任务终止策略
除了Token预算,还需要有明确的任务终止条件,防止无限循环:
@Service
public class TaskTerminationManager {
/**
* 检查任务是否应该终止
*/
public TerminationDecision shouldTerminate(AgentExecutionContext ctx) {
// 条件1:Token预算超限
BudgetCheckResult budgetCheck = budgetManager.recordUsage(
ctx.getTaskId(), ctx.getLastInputTokens(), ctx.getLastOutputTokens(),
ctx.getModelName()
);
if (budgetCheck.isExceeded()) {
if (budgetCheck.getPolicy() == BudgetExceededPolicy.STOP) {
return TerminationDecision.terminate(
TerminationReason.BUDGET_EXCEEDED,
String.format("Token预算已耗尽(%d/%d)",
budgetCheck.getUsed(), budgetCheck.getLimit())
);
}
}
// 条件2:执行时间超限
Duration elapsed = ctx.getElapsedTime();
Duration maxDuration = ctx.getMaxDuration();
if (elapsed.compareTo(maxDuration) > 0) {
return TerminationDecision.terminate(
TerminationReason.TIMEOUT,
String.format("执行超时(%d分钟/%d分钟)",
elapsed.toMinutes(), maxDuration.toMinutes())
);
}
// 条件3:步骤数超限
if (ctx.getStepCount() >= ctx.getMaxSteps()) {
return TerminationDecision.terminate(
TerminationReason.MAX_STEPS_REACHED,
"已达最大步骤数: " + ctx.getMaxSteps()
);
}
// 条件4:检测到无进展循环
if (detectProgressStagnation(ctx)) {
return TerminationDecision.terminate(
TerminationReason.NO_PROGRESS,
"检测到任务无实质进展(可能陷入循环)"
);
}
// 条件5:累计错误次数超限
if (ctx.getConsecutiveErrors() >= 3) {
return TerminationDecision.terminate(
TerminationReason.TOO_MANY_ERRORS,
"连续失败次数超限: " + ctx.getConsecutiveErrors()
);
}
return TerminationDecision.continueExecution();
}
/**
* 检测无进展循环:相同的工具调用连续出现3次
*/
private boolean detectProgressStagnation(AgentExecutionContext ctx) {
List<String> recentToolCalls = ctx.getRecentToolCallSignatures(6);
if (recentToolCalls.size() < 6) return false;
// 检查是否有重复的工具调用序列
String lastThree = String.join(",", recentToolCalls.subList(3, 6));
String prevThree = String.join(",", recentToolCalls.subList(0, 3));
return lastThree.equals(prevThree);
}
/**
* 优雅终止:在终止前做清理和摘要
*/
public AgentResult gracefulTermination(AgentExecutionContext ctx,
TerminationDecision decision) {
log.warn("任务优雅终止: taskId={}, reason={}, message={}",
ctx.getTaskId(), decision.getReason(), decision.getMessage());
// 生成已完成工作的摘要
String partialResult = summarizeCompletedWork(ctx);
// 记录终止事件
executionTracer.recordTermination(ctx.getTaskId(), decision);
// 通知等待结果的用户
AgentResult result = AgentResult.terminated(
ctx.getTaskId(),
decision.getReason(),
decision.getMessage(),
partialResult
);
return result;
}
private String summarizeCompletedWork(AgentExecutionContext ctx) {
if (ctx.getStepCount() == 0) {
return "任务尚未开始执行。";
}
List<StepResult> completedSteps = ctx.getCompletedSteps();
String stepsDescription = completedSteps.stream()
.map(s -> "- " + s.getDescription())
.collect(Collectors.joining("\n"));
String summaryPrompt = String.format("""
以下是一个Agent任务已完成的步骤,任务因为%s被终止,请生成一个简短的工作摘要:
已完成步骤:
%s
请说明:已完成了什么,还差什么没完成。
""",
ctx.getTerminationReason(),
stepsDescription
);
return llmClient.complete(summaryPrompt, "gpt-4o-mini");
}
}成本监控与告警
@Service
public class CostMonitoringService {
/**
* 实时成本仪表盘数据
*/
public CostDashboard getDashboard(String period) {
LocalDateTime since = parsePeriod(period);
// 按模型统计
List<ModelCostStats> modelStats = nodeRepository.aggregateCostByModel(since);
// 按用户统计(找出成本最高的用户)
List<UserCostStats> userStats = nodeRepository.aggregateCostByUser(since)
.stream()
.sorted(Comparator.comparingDouble(UserCostStats::getTotalCost).reversed())
.limit(10)
.collect(Collectors.toList());
// 按任务类型统计
List<TaskTypeCostStats> taskTypeStats = nodeRepository.aggregateCostByTaskType(since);
// 计算总成本趋势
List<DailyCostTrend> trend = nodeRepository.getDailyCostTrend(since);
return CostDashboard.builder()
.period(period)
.totalCostUsd(modelStats.stream().mapToDouble(m -> m.getTotalCost()).sum())
.totalTokens(modelStats.stream().mapToLong(m -> m.getTotalTokens()).sum())
.modelStats(modelStats)
.topUsers(userStats)
.taskTypeStats(taskTypeStats)
.dailyTrend(trend)
.build();
}
/**
* 预算预警:预测本月是否会超过预算
*/
@Scheduled(cron = "0 0 9 * * ?") // 每天早9点检查
public void checkMonthlyBudgetForecast() {
LocalDate today = LocalDate.now();
int daysInMonth = today.lengthOfMonth();
int daysPassed = today.getDayOfMonth();
double costSoFar = getTotalCostThisMonth();
double projectedCost = costSoFar / daysPassed * daysInMonth;
double monthlyBudget = config.getMonthlyBudgetUsd();
double projectedUtilization = projectedCost / monthlyBudget;
if (projectedUtilization > 0.9) {
alertService.sendAlert(
AlertType.BUDGET_FORECAST_EXCEEDED,
null,
String.format("预测本月成本$%.2f,超过月度预算($%.2f)的%.0f%%," +
"请及时处理",
projectedCost, monthlyBudget, projectedUtilization * 100)
);
}
}
/**
* 自动优化建议
*/
public List<CostOptimizationSuggestion> generateSuggestions() {
List<CostOptimizationSuggestion> suggestions = new ArrayList<>();
// 建议1:找出可以用更便宜模型的任务类型
Map<String, Double> taskTypeAvgQuality = getTaskTypeQualityScores();
Map<String, String> taskTypeCurrentModel = getTaskTypeModelUsage();
for (Map.Entry<String, String> entry : taskTypeCurrentModel.entrySet()) {
String taskType = entry.getKey();
String model = entry.getValue();
double qualityScore = taskTypeAvgQuality.getOrDefault(taskType, 0.9);
// 质量评分高且用的是贵模型,可以考虑降级
if (qualityScore > 0.85 && (model.startsWith("gpt-4") || model.contains("opus"))) {
suggestions.add(CostOptimizationSuggestion.of(
"模型降级",
String.format("任务类型'%s'当前用%s,质量评分%.0f%%," +
"建议改用更便宜的模型可节省约70%%成本",
taskType, model, qualityScore * 100)
));
}
}
// 建议2:找出Context压缩率低的场景
double avgContextCompressionRate = getAvgContextCompressionRate();
if (avgContextCompressionRate < 0.5) {
suggestions.add(CostOptimizationSuggestion.of(
"Context压缩",
String.format("当前Context压缩率%.0f%%,低于建议值," +
"建议启用积极的Context摘要策略",
avgContextCompressionRate * 100)
));
}
return suggestions;
}
}完整的成本感知Agent执行器
把所有组件整合起来:
@Service
public class CostAwareAgentExecutor {
private final TokenBudgetManager budgetManager;
private final ModelSelectionStrategy modelStrategy;
private final ContextCompressor contextCompressor;
private final TaskTerminationManager terminationManager;
public AgentResult execute(AgentTask task) {
// 1. 分配Token预算
TokenBudget budget = budgetManager.createBudget(
task.getTaskId(),
task.getUserId(),
task.getBudgetConfig()
);
AgentExecutionContext ctx = new AgentExecutionContext(task, budget);
List<Message> messages = initMessages(task);
for (int step = 0; step < task.getMaxSteps(); step++) {
// 2. 检查是否应该终止
TerminationDecision termination = terminationManager.shouldTerminate(ctx);
if (termination.shouldTerminate()) {
return terminationManager.gracefulTermination(ctx, termination);
}
// 3. 根据预算状态选择模型
String selectedModel = modelStrategy.selectModel(budget, ctx.getComplexity());
// 4. 压缩Context(如果需要)
int availableTokens = budget.getTotalBudget() - budget.getTotalUsedTokens();
if (availableTokens < 8000) {
messages = contextCompressor.compress(messages, 4000);
}
// 5. 只加载相关工具(按需加载)
String currentTaskDesc = getLastUserMessage(messages);
String toolsDesc = contextCompressor.selectRelevantTools(
currentTaskDesc,
toolRegistry.getAllTools(),
10 // 最多10个工具
);
// 6. 调用LLM
ChatRequest request = ChatRequest.builder()
.model(selectedModel)
.messages(messages)
.toolsDescription(toolsDesc)
.maxTokens(Math.min(2000, availableTokens / 2)) // 限制输出长度
.build();
LLMResponse response = llmClient.chat(request);
// 7. 记录Token消耗
budgetManager.recordUsage(
task.getTaskId(),
response.getUsage().getInputTokens(),
response.getUsage().getOutputTokens(),
selectedModel
);
ctx.addTokenUsage(response.getUsage());
messages.add(Message.assistant(response.getContent()));
if (response.getToolCalls() == null || response.getToolCalls().isEmpty()) {
return AgentResult.success(task.getTaskId(), response.getContent());
}
// 执行工具调用...
executeToolCalls(response.getToolCalls(), messages, ctx);
}
return terminationManager.gracefulTermination(ctx,
TerminationDecision.terminate(TerminationReason.MAX_STEPS_REACHED, ""));
}
}实际案例:成本优化前后对比
我们在一个文档分析Agent上做了完整的成本优化,效果如下:
| 优化项 | 优化前 | 优化后 | 节省 |
|---|---|---|---|
| 模型选择 | 全用gpt-4o | 简单任务用gpt-4o-mini | -68% |
| Context长度 | 不压缩,平均15K tokens | 压缩后平均6K tokens | -60% |
| 工具描述 | 全量加载,约2K tokens | 按需加载,约0.5K tokens | -75% |
| 综合成本 | $0.15/次 | $0.04/次 | -73% |
几点踩坑经验
坑1:Token计数不准确导致预算穿透。
我们最开始用简单的字符数估算Token,结果中文和特殊符号的Token计数偏差很大,实际消耗比预算高了40%。后来改用tiktoken库(OpenAI官方)精确计数,才解决这个问题。
坑2:Context压缩导致任务失败。
Context摘要化会丢失一些细节信息,如果被摘要的内容里有关键参数(如某个ID、某个配置值),任务后续步骤会因为找不到这些信息而失败。后来在摘要前做了一步"关键信息提取",确保重要数据不会被压缩掉。
坑3:模型降级后质量下滑明显,用户投诉。
不是所有任务都适合降级到便宜模型。对于需要推理能力的复杂任务,用gpt-4o-mini代替gpt-4o,质量可能下降30%以上。后来加了任务复杂度评估,复杂任务不降级。
坑4:月度成本预测不够及时。
每天预测一次不够及时,某天突发大量任务,等第二天才告警已经超支了。改成实时监控当月累计成本,超过预算50%时立即告警,避免被动。
成本控制在很多团队里被视为"以后再说"的事情,等到账单来了才追悔莫及。事实上,从第一天就把成本感知设计进架构里,并不会增加多少工作量,但能避免巨大的损失。把这套Token预算和任务终止系统搭建起来,让你的Agent系统在自动化的同时,也在成本上是可控的。
