第2124篇:LLM成本优化实战——Token预算管理与精打细算的工程之道
第2124篇:LLM成本优化实战——Token预算管理与精打细算的工程之道
适读人群:需要控制AI调用成本的工程师和技术负责人 | 阅读时长:约19分钟 | 核心价值:系统性地分析和降低LLM调用成本,用工程手段把token浪费降低30-50%
"上个月AI调用费用8万块,这个月怎么突然变成20万了?"
这是我在一个客户那里遇到的真实情况。排查发现:有个新功能上线,用户可以上传文档提问,但没有限制文档大小,也没有优化文档处理方式。有用户上传了200页的PDF,每次提问都把整份PDF发给LLM。一个用户一次对话,就烧掉了几十万token。
LLM的成本和传统软件完全不同:每次API调用都在花钱,而且花多少取决于你发出了多少字。工程师如果没有成本意识,很容易写出"token无限消耗"的代码。
Token成本的核心矩阵
/**
* LLM成本分析框架
*
* ===== 成本公式 =====
*
* 总成本 = Σ (input_tokens × input_price + output_tokens × output_price)
*
* ===== 主要模型价格(2024年,美元/1M tokens)=====
*
* GPT-4o: input $5.0 output $15.0
* GPT-4o-mini: input $0.15 output $0.6
* GPT-4-turbo: input $10.0 output $30.0
* Claude 3.5 Sonnet: input $3.0 output $15.0
* Claude 3 Haiku: input $0.25 output $1.25
* Gemini 1.5 Pro: input $3.5 output $10.5
*
* ===== 常见的Token浪费来源 =====
*
* 1. System Prompt过长
* 问题:每次对话都发同一个System Prompt,重复计费
* 例:1000字的System Prompt,10000次对话 = 1000万token白白浪费
*
* 2. 历史对话无限累积
* 问题:不截断对话历史,后期每次请求带上几十轮历史
* 现象:同一个问题越到后面越贵
*
* 3. RAG检索内容过多
* 问题:检索了10段内容,实际有用的只有2段
* 多余的8段都是浪费的input token
*
* 4. 不必要地用强模型
* 问题:简单问题用GPT-4o,成本是GPT-4o-mini的33倍
*
* 5. 没有缓存相同请求
* 问题:FAQ类问题被重复生成,每次都收费
*/Token预算管理器
/**
* Token预算管理器
*
* 在发送请求前,计算和控制token用量
* 避免超出预算或产生不必要的费用
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class TokenBudgetManager {
private final TokenEstimator tokenEstimator;
private final RedisTemplate<String, Object> redisTemplate;
/**
* 检查请求是否在预算范围内,并返回优化建议
*/
public BudgetCheckResult checkAndOptimize(TokenBudgetRequest request) {
// 估算本次请求的token用量
int estimatedInputTokens = tokenEstimator.estimate(request);
List<OptimizationSuggestion> suggestions = new ArrayList<>();
// 检查System Prompt
if (request.getSystemPrompt() != null) {
int systemPromptTokens = tokenEstimator.countTokens(request.getSystemPrompt());
if (systemPromptTokens > 500) {
suggestions.add(new OptimizationSuggestion(
"LONG_SYSTEM_PROMPT",
String.format("System Prompt过长(%d tokens),建议精简到500 tokens以内", systemPromptTokens),
systemPromptTokens - 500
));
}
}
// 检查对话历史
if (request.getConversationHistory() != null) {
int historyTokens = tokenEstimator.countMessages(request.getConversationHistory());
if (historyTokens > 2000) {
suggestions.add(new OptimizationSuggestion(
"LONG_HISTORY",
String.format("对话历史过长(%d tokens),建议启用滑动窗口压缩", historyTokens),
historyTokens - 2000
));
}
}
// 检查RAG上下文
if (request.getRetrievedContext() != null) {
int contextTokens = tokenEstimator.countTokens(request.getRetrievedContext());
if (contextTokens > 3000) {
suggestions.add(new OptimizationSuggestion(
"EXCESSIVE_CONTEXT",
String.format("检索内容过多(%d tokens),建议减少检索数量或启用精排", contextTokens),
contextTokens - 3000
));
}
}
// 检查用户级配额
String userId = request.getUserId();
if (userId != null) {
long todayUsage = getDailyUsage(userId);
if (request.getDailyLimit() != null && todayUsage + estimatedInputTokens > request.getDailyLimit()) {
return BudgetCheckResult.exceeded(
String.format("今日Token用量即将超出上限(已用%d,上限%d)",
todayUsage, request.getDailyLimit()),
suggestions
);
}
}
return BudgetCheckResult.approved(estimatedInputTokens, suggestions);
}
/**
* 记录实际Token消耗(请求完成后调用)
*/
public void recordUsage(String userId, String requestId,
int inputTokens, int outputTokens, String modelId) {
// 记录到Redis滑动窗口(用于实时配额检查)
String dailyKey = "token_usage:" + userId + ":" + LocalDate.now();
redisTemplate.opsForValue().increment(dailyKey, inputTokens + outputTokens);
redisTemplate.expire(dailyKey, Duration.ofDays(2));
// 计算费用
double cost = calculateCost(modelId, inputTokens, outputTokens);
// 记录到月度账单
String monthlyKey = "monthly_cost:" + userId + ":" +
LocalDate.now().getYear() + "-" + LocalDate.now().getMonthValue();
redisTemplate.opsForValue().increment(monthlyKey, (long)(cost * 10000)); // 存储微分
redisTemplate.expire(monthlyKey, Duration.ofDays(35));
log.debug("Token消耗记录: userId={}, input={}, output={}, cost=${:.6f}",
userId, inputTokens, outputTokens, cost);
}
private long getDailyUsage(String userId) {
String key = "token_usage:" + userId + ":" + LocalDate.now();
Object usage = redisTemplate.opsForValue().get(key);
return usage != null ? Long.parseLong(usage.toString()) : 0;
}
private double calculateCost(String modelId, int inputTokens, int outputTokens) {
ModelPricing pricing = MODEL_PRICING.getOrDefault(modelId, DEFAULT_PRICING);
return inputTokens * pricing.inputPricePerToken()
+ outputTokens * pricing.outputPricePerToken();
}
// 模型定价表(每token的价格,美元)
private static final Map<String, ModelPricing> MODEL_PRICING = Map.of(
"gpt-4o", new ModelPricing(5.0 / 1_000_000, 15.0 / 1_000_000),
"gpt-4o-mini", new ModelPricing(0.15 / 1_000_000, 0.6 / 1_000_000),
"claude-3-5-sonnet", new ModelPricing(3.0 / 1_000_000, 15.0 / 1_000_000),
"claude-3-haiku", new ModelPricing(0.25 / 1_000_000, 1.25 / 1_000_000)
);
private static final ModelPricing DEFAULT_PRICING =
new ModelPricing(5.0 / 1_000_000, 15.0 / 1_000_000);
record ModelPricing(double inputPricePerToken, double outputPricePerToken) {}
record OptimizationSuggestion(String type, String description, int estimatedSavings) {}
@Data
@Builder
public static class BudgetCheckResult {
private boolean approved;
private String denyReason;
private int estimatedInputTokens;
private List<OptimizationSuggestion> suggestions;
public static BudgetCheckResult approved(int tokens, List<OptimizationSuggestion> suggestions) {
return BudgetCheckResult.builder()
.approved(true).estimatedInputTokens(tokens).suggestions(suggestions).build();
}
public static BudgetCheckResult exceeded(String reason, List<OptimizationSuggestion> suggestions) {
return BudgetCheckResult.builder()
.approved(false).denyReason(reason).suggestions(suggestions).build();
}
public int getTotalEstimatedSavings() {
return suggestions.stream().mapToInt(OptimizationSuggestion::estimatedSavings).sum();
}
}
}对话历史压缩
/**
* 对话历史压缩器
*
* 长对话的大问题:历史越来越长,每次请求成本越来越高
*
* 策略一:滑动窗口(保留最近N轮)
* 策略二:摘要压缩(把早期对话摘要化)
* 策略三:重要性过滤(保留关键轮次,丢弃闲聊)
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ConversationHistoryCompressor {
private final ChatLanguageModel summaryModel;
private final TokenEstimator tokenEstimator;
/**
* 压缩对话历史到指定token限制
*
* @param maxTokens 历史部分的最大token数
*/
public List<ChatMessage> compress(
List<ChatMessage> history,
int maxTokens,
CompressionStrategy strategy) {
int currentTokens = tokenEstimator.countMessages(history);
if (currentTokens <= maxTokens) {
return history; // 不需要压缩
}
log.debug("压缩对话历史: original={}tokens, target={}tokens, strategy={}",
currentTokens, maxTokens, strategy);
return switch (strategy) {
case SLIDING_WINDOW -> slidingWindowCompress(history, maxTokens);
case SUMMARY -> summaryCompress(history, maxTokens);
case IMPORTANCE_FILTER -> importanceFilterCompress(history, maxTokens);
};
}
/**
* 滑动窗口策略:丢弃最早的对话,保留最近的
*
* 优点:简单快速,不需要额外LLM调用
* 缺点:早期重要信息会丢失
*/
private List<ChatMessage> slidingWindowCompress(
List<ChatMessage> history, int maxTokens) {
// 从最新消息往前保留,直到token限制
List<ChatMessage> result = new ArrayList<>();
int usedTokens = 0;
// 倒序遍历
for (int i = history.size() - 1; i >= 0; i--) {
ChatMessage msg = history.get(i);
int msgTokens = tokenEstimator.countMessage(msg);
if (usedTokens + msgTokens > maxTokens) {
break;
}
result.add(0, msg); // 插入到最前面(保持顺序)
usedTokens += msgTokens;
}
if (result.size() < history.size()) {
log.debug("滑动窗口压缩:保留 {}/{} 条消息", result.size(), history.size());
}
return result;
}
/**
* 摘要压缩策略:把早期对话总结成摘要,保留最近几轮完整内容
*
* 优点:早期上下文以摘要形式保留
* 缺点:需要额外的LLM调用(有成本)
*/
private List<ChatMessage> summaryCompress(
List<ChatMessage> history, int maxTokens) {
// 保留最后6轮对话(12条消息),其余摘要化
int keepRounds = 6;
int keepMessages = keepRounds * 2;
if (history.size() <= keepMessages) {
return history;
}
List<ChatMessage> toSummarize = history.subList(0, history.size() - keepMessages);
List<ChatMessage> toKeep = history.subList(history.size() - keepMessages, history.size());
// 生成历史摘要
String summary = generateHistorySummary(toSummarize);
// 构建新的历史:摘要作为第一条系统消息 + 最近几轮完整内容
List<ChatMessage> compressed = new ArrayList<>();
compressed.add(SystemMessage.from("以下是之前对话的摘要:\n" + summary));
compressed.addAll(toKeep);
log.debug("摘要压缩:原{}条消息,压缩后{}条(含摘要)",
history.size(), compressed.size());
return compressed;
}
private String generateHistorySummary(List<ChatMessage> messages) {
String historyText = messages.stream()
.map(msg -> {
if (msg instanceof UserMessage um) return "用户:" + um.singleText();
if (msg instanceof AiMessage am) return "AI:" + am.text();
return "";
})
.filter(s -> !s.isEmpty())
.collect(Collectors.joining("\n"));
String prompt = """
请简洁地总结以下对话的要点(不超过200字):
%s
总结:
""".formatted(historyText.substring(0, Math.min(4000, historyText.length())));
try {
return summaryModel.generate(prompt);
} catch (Exception e) {
log.warn("历史摘要生成失败: {}", e.getMessage());
return "(历史对话摘要不可用)";
}
}
/**
* 重要性过滤策略:保留包含关键决策、重要信息的轮次
*/
private List<ChatMessage> importanceFilterCompress(
List<ChatMessage> history, int maxTokens) {
// 简化实现:基于启发式规则过滤
// 完整实现需要LLM对每轮对话评分
List<ChatMessage> result = new ArrayList<>();
int usedTokens = 0;
// 始终保留最后4轮
int alwaysKeepFromIndex = Math.max(0, history.size() - 8);
List<ChatMessage> alwaysKeep = history.subList(alwaysKeepFromIndex, history.size());
// 对早期消息评分
for (int i = 0; i < alwaysKeepFromIndex; i += 2) {
if (i + 1 >= alwaysKeepFromIndex) break;
ChatMessage user = history.get(i);
ChatMessage ai = history.get(i + 1);
// 如果用户消息包含关键词,保留这轮
if (isImportantMessage(user)) {
int pairTokens = tokenEstimator.countMessage(user) + tokenEstimator.countMessage(ai);
if (usedTokens + pairTokens <= maxTokens - tokenEstimator.countMessages(alwaysKeep)) {
result.add(user);
result.add(ai);
usedTokens += pairTokens;
}
}
}
result.addAll(alwaysKeep);
return result;
}
private boolean isImportantMessage(ChatMessage message) {
if (!(message instanceof UserMessage um)) return false;
String text = um.singleText().toLowerCase();
// 包含决策性词语的消息被认为重要
return text.contains("要求") || text.contains("必须") ||
text.contains("约束") || text.contains("不能") ||
text.contains("前提") || text.contains("注意");
}
public enum CompressionStrategy {
SLIDING_WINDOW, // 丢弃早期消息(默认,成本最低)
SUMMARY, // 摘要化(保留更多上下文,但有额外成本)
IMPORTANCE_FILTER // 重要性过滤(启发式,无额外成本)
}
}智能模型路由降本
/**
* 基于请求复杂度的模型路由
*
* 不是所有请求都需要GPT-4o
* 简单问题用GPT-4o-mini,成本降低33倍
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class CostOptimizedModelRouter {
/**
* 根据请求复杂度选择合适的模型
*
* 复杂度判断维度:
* - 问题长度和复杂性
* - 是否需要推理
* - 是否需要代码生成
* - 是否需要专业知识
*/
public String selectModel(String userMessage, String feature, Map<String, Object> metadata) {
RequestComplexity complexity = assessComplexity(userMessage, feature);
String model = switch (complexity) {
case SIMPLE -> "gpt-4o-mini"; // 简单问答:成本最低
case MEDIUM -> "gpt-4o-mini"; // 中等复杂度:仍用mini
case COMPLEX -> "gpt-4o"; // 复杂推理:用主力模型
case EXPERT -> "gpt-4o"; // 专家级:用主力模型
};
log.debug("模型选择: complexity={}, model={}, feature={}", complexity, model, feature);
return model;
}
private RequestComplexity assessComplexity(String message, String feature) {
// 特定功能强制使用强模型
if ("code_generation".equals(feature) || "architecture_design".equals(feature)) {
return RequestComplexity.COMPLEX;
}
// 问题长度
int wordCount = message.length();
// 是否包含需要推理的指标词
boolean requiresReasoning = message.contains("分析") || message.contains("比较") ||
message.contains("为什么") || message.contains("如何设计") ||
message.contains("优缺点") || message.contains("建议");
// 是否是简单信息查询
boolean isSimpleLookup = wordCount < 50 && !requiresReasoning;
if (isSimpleLookup) return RequestComplexity.SIMPLE;
if (!requiresReasoning && wordCount < 200) return RequestComplexity.MEDIUM;
if (requiresReasoning) return RequestComplexity.COMPLEX;
return RequestComplexity.MEDIUM;
}
/**
* 计算节省的成本
*/
public double estimateSavings(int tokens, String selectedModel, String fullModel) {
ModelCost selected = MODEL_COSTS.getOrDefault(selectedModel, DEFAULT_COST);
ModelCost full = MODEL_COSTS.getOrDefault(fullModel, DEFAULT_COST);
double selectedCost = tokens * (selected.inputPrice() + selected.outputPrice()) / 2;
double fullCost = tokens * (full.inputPrice() + full.outputPrice()) / 2;
return fullCost - selectedCost;
}
private static final Map<String, ModelCost> MODEL_COSTS = Map.of(
"gpt-4o", new ModelCost(5.0 / 1_000_000, 15.0 / 1_000_000),
"gpt-4o-mini", new ModelCost(0.15 / 1_000_000, 0.6 / 1_000_000)
);
private static final ModelCost DEFAULT_COST =
new ModelCost(5.0 / 1_000_000, 15.0 / 1_000_000);
enum RequestComplexity { SIMPLE, MEDIUM, COMPLEX, EXPERT }
record ModelCost(double inputPrice, double outputPrice) {}
}实践建议
先测量,再优化。不要凭感觉优化
我见过的最大浪费:一个团队花了两周"优化System Prompt",把它从1500 tokens缩减到800 tokens,节省了700 tokens/请求。但他们忽略了另一个问题:RAG检索每次返回10段内容,每段500 tokens,共5000 tokens,其中只有2段真正被用到。优化对话历史和检索内容的ROI远高于优化System Prompt。先用审计日志把token消耗拆解清楚,再决定优化哪里。
对话历史的Token增长是隐性成本杀手
大多数工程师测试AI功能时,只测前几轮对话,这时候成本是正常的。但真实用户可能聊50轮,到第50轮时每次请求的成本可能是第1轮的10倍(因为要带上所有历史)。建议:设置历史Token上限(如4000 tokens),超过时启用压缩策略。这个改动通常能把长对话的平均成本降低60-70%。
模型分流是最高ROI的优化
一个日均10万次请求的系统,如果30%的简单请求从GPT-4o换到GPT-4o-mini,假设平均每次1000 tokens,每月节省:30000次 × 1000 tokens × (5-0.15)/1000000美元 = 约1455美元/月。这个改动只需要几小时的工程实现,ROI极高。关键是要有足够的数据知道哪些请求是"简单的",这又回到了前面说的:先测量。
