AI成本控制实战:Token计量·限流·费用优化方案
2026/4/30大约 9 分钟
AI成本控制实战:Token计量·限流·费用优化方案
适读人群:有1-5年Java开发经验,想向AI工程师方向转型的开发者 阅读时长:约17分钟 文章价值:
- 掌握AI接口成本的完整计量和追踪方案
- 学会多层次限流策略,防止成本失控
- 获得可落地的Token优化技巧,最高节省60%费用
这张账单让老板拍了桌子
某天下午,我朋友小郑急匆匆给我发了条消息:
"老张,我完了。这个月OpenAI的账单出来了,2.3万美元。老板已经叫我去办公室了。"
他们的AI写作辅助工具上线刚满一个月,用户量不算大,但账单触目惊心。
我帮他查了调用日志,发现几个问题叠加在一起:
- Prompt模板里带了一段500字的系统说明,每次调用都原封不动地传,根本没必要这么长
- 用户每输入一个字就实时调用一次AI(联想补全),完全没有防抖
- 用gpt-4处理一些简单的格式化任务,杀鸡用了牛刀
- 没有任何限流,某个测试账号跑了压力测试,一天消耗了整个月预算的20%
这四个问题单拎出来任何一个都不致命,但加在一起就是2.3万美元的事故。
今天这篇文章,我来系统讲AI成本控制的完整方案。
成本问题的根本原因分析
mindmap
root((AI成本失控原因))
Prompt设计问题
System Prompt过长
无效上下文携带
不必要的Few-shot示例
调用频率问题
无防抖频繁调用
缓存命中率低
重复查询
模型选型问题
用高价模型做简单任务
未根据场景选择模型
缺乏控制
无限流机制
无预算告警
无异常检测成本计量体系
首先要做到"知道钱花在哪里",才能有针对性地优化:
package com.laozhang.ai.cost;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.LocalDate;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* AI费用计量服务
* 精确追踪每个业务场景、每个模型的Token消耗和成本
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class AICostMeterService {
private final MeterRegistry meterRegistry;
private final CostAlertService alertService;
// 实时Token计数(按日期+场景+模型三维度)
private final Map<String, AtomicLong> inputTokenCounters = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> outputTokenCounters = new ConcurrentHashMap<>();
// 2026年最新价格(美元/百万Token)
private static final Map<String, ModelCost> MODEL_COSTS = Map.of(
"gpt-4o", new ModelCost(5.0, 15.0),
"gpt-4o-mini", new ModelCost(0.15, 0.60),
"gpt-4-turbo", new ModelCost(10.0, 30.0),
"claude-3-5-sonnet", new ModelCost(3.0, 15.0),
"claude-3-haiku", new ModelCost(0.25, 1.25),
"qwen-long", new ModelCost(0.5, 1.5),
"qwen-turbo", new ModelCost(0.3, 0.6)
);
/**
* 记录一次AI调用的Token消耗
*/
public void recordUsage(String scene, String model,
long inputTokens, long outputTokens) {
String date = LocalDate.now().toString();
String key = date + ":" + scene + ":" + model;
// 累计Token数
inputTokenCounters.computeIfAbsent(key, k -> new AtomicLong())
.addAndGet(inputTokens);
outputTokenCounters.computeIfAbsent(key, k -> new AtomicLong())
.addAndGet(outputTokens);
// 发布到Micrometer(用于Prometheus/Grafana)
Counter.builder("ai.token.input")
.tag("scene", scene)
.tag("model", model)
.register(meterRegistry)
.increment(inputTokens);
Counter.builder("ai.token.output")
.tag("scene", scene)
.tag("model", model)
.register(meterRegistry)
.increment(outputTokens);
// 计算本次调用成本
double callCost = calculateCost(model, inputTokens, outputTokens);
Counter.builder("ai.cost.usd")
.tag("scene", scene)
.tag("model", model)
.register(meterRegistry)
.increment(callCost);
}
/**
* 计算成本(美元)
*/
public double calculateCost(String model, long inputTokens, long outputTokens) {
ModelCost cost = MODEL_COSTS.getOrDefault(model, new ModelCost(0.01, 0.03));
double inputCost = (inputTokens / 1_000_000.0) * cost.inputPricePerMillion();
double outputCost = (outputTokens / 1_000_000.0) * cost.outputPricePerMillion();
return inputCost + outputCost;
}
/**
* 每小时生成费用报告,超阈值告警
*/
@Scheduled(cron = "0 0 * * * *")
public void hourlyCostReport() {
String today = LocalDate.now().toString();
double todayTotalCost = inputTokenCounters.entrySet().stream()
.filter(e -> e.getKey().startsWith(today))
.mapToDouble(e -> {
String[] parts = e.getKey().split(":");
String model = parts[2];
long input = e.getValue().get();
long output = outputTokenCounters.getOrDefault(
e.getKey(), new AtomicLong()).get();
return calculateCost(model, input, output);
})
.sum();
log.info("[费用报告] 今日已消耗: ${}",
BigDecimal.valueOf(todayTotalCost).setScale(2, RoundingMode.HALF_UP));
// 超过每日预算的80%则告警
double dailyBudget = 100.0; // 每日预算100美元
if (todayTotalCost > dailyBudget * 0.8) {
alertService.sendCostAlert(todayTotalCost, dailyBudget);
}
}
public record ModelCost(double inputPricePerMillion, double outputPricePerMillion) {}
}多层次限流方案
graph TB
Request[用户请求] --> L1[第一层: 全局限流<br/>系统整体QPS上限]
L1 --> L2[第二层: 用户级限流<br/>每用户每分钟/天限额]
L2 --> L3[第三层: 场景级限流<br/>不同功能不同配额]
L3 --> L4[第四层: 预算限流<br/>预算耗尽则降级]
L4 --> Process[正常处理]
L1 -->|超限| R1[返回429]
L2 -->|超限| R2[返回用户配额说明]
L3 -->|超限| R3[降级到免费功能]
L4 -->|超限| R4[降级到本地模型]package com.laozhang.ai.ratelimit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.List;
/**
* 基于Redis的多层AI调用限流器
*
* 实现了用户级 + 功能级 + 预算级三层限流
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class AIRateLimiter {
private final RedisTemplate<String, String> redisTemplate;
// Lua脚本:原子性地检查并更新计数器
// 保证分布式环境下的原子性
private static final String RATE_LIMIT_LUA = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return 0
end
return 1
""";
/**
* 检查用户是否超过每分钟调用限额
*
* 限额设计:
* - 免费用户:5次/分钟,50次/天
* - 基础版用户:20次/分钟,500次/天
* - 专业版用户:100次/分钟,5000次/天
*/
public RateLimitResult checkUserLimit(String userId, UserTier tier, String scene) {
// 检查每分钟限额
String minuteKey = "ratelimit:user:" + userId + ":minute:" +
(System.currentTimeMillis() / 60000);
boolean minuteOk = executeLimit(minuteKey, tier.minuteLimit(), 60);
if (!minuteOk) {
return RateLimitResult.limited(
"每分钟调用超限,请稍后再试",
"MINUTE_LIMIT"
);
}
// 检查每天限额
String dayKey = "ratelimit:user:" + userId + ":day:" +
java.time.LocalDate.now();
boolean dayOk = executeLimit(dayKey, tier.dailyLimit(), 86400);
if (!dayOk) {
return RateLimitResult.limited(
"今日调用次数已达上限,明日0点恢复",
"DAILY_LIMIT"
);
}
// 检查场景级限额(特定功能有单独限额)
if (scene != null && tier.sceneLimit(scene) > 0) {
String sceneKey = "ratelimit:user:" + userId + ":scene:" + scene +
":day:" + java.time.LocalDate.now();
boolean sceneOk = executeLimit(sceneKey, tier.sceneLimit(scene), 86400);
if (!sceneOk) {
return RateLimitResult.limited(
"该功能今日使用次数已达上限",
"SCENE_LIMIT"
);
}
}
return RateLimitResult.allowed();
}
private boolean executeLimit(String key, int limit, int windowSeconds) {
DefaultRedisScript<Long> script = new DefaultRedisScript<>(
RATE_LIMIT_LUA, Long.class
);
Long result = redisTemplate.execute(
script,
List.of(key),
String.valueOf(limit),
String.valueOf(windowSeconds)
);
return result != null && result == 1L;
}
public enum UserTier {
FREE(5, 50),
BASIC(20, 500),
PRO(100, 5000);
private final int minuteLimit;
private final int dailyLimit;
UserTier(int minuteLimit, int dailyLimit) {
this.minuteLimit = minuteLimit;
this.dailyLimit = dailyLimit;
}
public int minuteLimit() { return minuteLimit; }
public int dailyLimit() { return dailyLimit; }
public int sceneLimit(String scene) {
// 某些高消耗场景单独限额
return switch (scene) {
case "essay-generation" -> this == FREE ? 3 : this == BASIC ? 20 : 100;
case "code-review" -> this == FREE ? 5 : this == BASIC ? 50 : 200;
default -> 0; // 0表示不单独限制,走通用限额
};
}
}
public record RateLimitResult(boolean allowed, String message, String limitType) {
static RateLimitResult allowed() {
return new RateLimitResult(true, null, null);
}
static RateLimitResult limited(String message, String limitType) {
return new RateLimitResult(false, message, limitType);
}
}
}Token优化策略
策略1:动态Prompt压缩
/**
* Prompt压缩服务
* 在不影响效果的前提下,最大化减少输入Token
*/
@Service
@Slf4j
public class PromptOptimizerService {
/**
* 智能压缩对话历史
* 保留最近N轮完整对话,更早的对话只保留摘要
*/
public String compressHistory(List<ChatMessage> history, int keepRecentRounds) {
if (history.size() <= keepRecentRounds * 2) {
return formatFullHistory(history);
}
// 超出保留轮数的历史,提取摘要
List<ChatMessage> toCompress = history.subList(
0, history.size() - keepRecentRounds * 2
);
List<ChatMessage> toKeep = history.subList(
history.size() - keepRecentRounds * 2, history.size()
);
String summary = generateSummary(toCompress);
String recentHistory = formatFullHistory(toKeep);
return "【早期对话摘要】\n" + summary + "\n\n【最近对话】\n" + recentHistory;
}
/**
* 移除Prompt中的冗余内容
*/
public String removeRedundancy(String prompt) {
return prompt
// 移除多余空行
.replaceAll("\n{3,}", "\n\n")
// 移除行首行尾多余空格
.replaceAll("(?m)^\\s+|\\s+$", "")
// 移除重复的说明文字(如多次出现的格式要求)
.trim();
}
private String generateSummary(List<ChatMessage> messages) {
// 用便宜的模型来做摘要(比如gpt-4o-mini)
// 这里简化处理,实际可调用LLM生成摘要
return messages.stream()
.filter(m -> "user".equals(m.role()))
.map(ChatMessage::content)
.limit(3)
.collect(Collectors.joining(";"))
+ " 等话题";
}
private String formatFullHistory(List<ChatMessage> messages) {
return messages.stream()
.map(m -> ("user".equals(m.role()) ? "用户" : "助手") + ":" + m.content())
.collect(Collectors.joining("\n"));
}
}策略2:模型路由(按复杂度选模型)
/**
* 智能模型路由
* 根据任务复杂度自动选择最经济的模型
*
* 原则:简单任务用便宜模型,复杂任务才用贵的模型
* 效果:相同业务效果,成本降低40-60%
*/
@Service
@Slf4j
public class ModelRouterService {
/**
* 根据任务类型和内容选择合适的模型
*/
public String selectModel(String taskType, String content) {
return switch (taskType) {
// 简单分类/意图识别 → 最便宜的模型
case "intent-detection", "classification" -> "gpt-4o-mini";
// 格式化/提取/总结 → 便宜模型足够
case "formatting", "extraction", "summary" -> {
// 内容很长时升级到更好的模型
yield content.length() > 5000 ? "gpt-4o" : "gpt-4o-mini";
}
// 创意写作 → 需要好模型
case "creative-writing", "essay" -> "gpt-4o";
// 代码相关 → GPT-4o系列最强
case "code-generation", "code-review" -> "gpt-4o";
// 复杂推理/分析 → 必须用好模型
case "complex-analysis", "reasoning" -> "gpt-4o";
// 默认:便宜模型
default -> "gpt-4o-mini";
};
}
/**
* 估算任务复杂度(0-10分)
* 高复杂度任务才用高级模型
*/
public int estimateComplexity(String prompt) {
int score = 0;
// 长度加分
if (prompt.length() > 1000) score += 2;
if (prompt.length() > 3000) score += 2;
// 包含代码加分
if (prompt.contains("```") || prompt.contains("def ") ||
prompt.contains("class ")) score += 3;
// 包含数学/分析关键词加分
if (prompt.contains("分析") || prompt.contains("推理") ||
prompt.contains("对比")) score += 2;
// 包含多步骤指令加分
long stepCount = prompt.chars().filter(c -> c == '1' || c == '2').count();
if (stepCount > 3) score += 1;
return Math.min(score, 10);
}
}成本优化效果对比
| 优化措施 | 优化前 | 优化后 | 节省比例 |
|---|---|---|---|
| Prompt压缩(去冗余+历史摘要) | 平均2000 tokens/次 | 平均800 tokens/次 | 60% |
| 模型路由(简单任务用mini) | 全用gpt-4o | 80%用gpt-4o-mini | 50% |
| 语义缓存(命中率60%) | 100%调用LLM | 40%调用LLM | 60% |
| 用户限流(防止滥用) | 无限制 | 有配额 | 20-30% |
| 综合优化后 | 月费$23,000 | 月费$6,000 | 74% |
预算告警与降级
/**
* 预算控制器
* 当成本接近预算时自动降级处理策略
*/
@Service
@RequiredArgsConstructor
public class BudgetController {
private final AICostMeterService costMeter;
// 预算阈值配置
private static final double DAILY_BUDGET_USD = 100.0;
private static final double WARNING_THRESHOLD = 0.8; // 80%触发告警
private static final double FALLBACK_THRESHOLD = 0.95; // 95%触发降级
/**
* 获取当前允许的模型(根据预算状态动态调整)
*/
public String getAllowedModel(String preferredModel) {
double todayCost = costMeter.getTodayCost();
double usageRatio = todayCost / DAILY_BUDGET_USD;
if (usageRatio < WARNING_THRESHOLD) {
// 预算充足,使用首选模型
return preferredModel;
} else if (usageRatio < FALLBACK_THRESHOLD) {
// 接近预算,降级到便宜模型
log.warn("[成本告警] 今日已用 {:.1f}%,强制使用mini模型", usageRatio * 100);
return "gpt-4o-mini"; // 不管原来要用什么,都降级
} else {
// 即将超支,只允许使用最便宜的模型
log.error("[预算告警] 今日已用 {:.1f}%,切换到最低成本模式", usageRatio * 100);
return "gpt-4o-mini";
// 更激进的策略:直接返回null,拒绝所有非必要AI调用
}
}
}