AI 系统的成本追踪——每一次调用花了多少钱,要能查得到
AI 系统的成本追踪——每一次调用花了多少钱,要能查得到
有一天早上,我收到了财务的一封邮件,主题是:"上个月 AI 接口费用同比增加了 340%,请解释原因。"
我打开邮件的时候,脑子里一片空白。
我知道上个月做了几个新功能,Token 消耗肯定会增加,但 340% 是个什么概念?我没法回答这个问题,因为我根本不知道这些钱花在哪里了。是哪个功能消耗最多?是哪几个用户疯狂调用?还是哪段代码有问题在死循环调用 API?
完全不知道。
花了两天时间翻日志,才勉强拼凑出一个不太准确的答案:主要是新上线的"智能报告生成"功能,因为每次生成报告都要调用好几轮 AI,而且这个功能被内部测试人员大量反复调用。
那两天的经历让我下定决心:AI 系统必须有成本追踪,而且必须能追踪到用户、功能、租户这个粒度。
这篇文章就讲这个事怎么做。
成本追踪的难点
先说清楚为什么这件事不是直接看账单就能解决的。
云厂商的账单告诉你的是:某个 API Key 这个月消耗了多少 Token,花了多少钱。这个粒度完全不够用。
你需要知道的是:
- 这笔钱里,智能客服功能花了多少,代码生成花了多少,报告生成花了多少?
- A 租户的用户花了多少,B 租户的用户花了多少?(SaaS 场景下这个很关键,因为成本要分摊给租户)
- 用户 X 这个月消耗了多少 Token?如果每个用户有 Token 配额,你得知道他用了多少
- 某个具体的 API 接口,平均每次调用花多少 Token,有没有异常高消耗的调用?
要回答这些问题,你需要在每一次 AI 调用时采集上下文信息,持久化到可查询的存储里。
成本归因的核心思路
我们的方案是:在 AI 调用层做埋点,把 Token 消耗和业务上下文绑定,写入时序数据库。
为什么是时序数据库?
因为成本追踪本质上是个时序分析问题。你要回答的问题是"某段时间内,某维度的消耗是多少",这正是时序数据库擅长的。我们用的是 InfluxDB,也可以用 Prometheus(但 Prometheus 更适合指标监控,不适合原始明细查询)。
数据模型设计:
每次 AI 调用,记录一条数据:
measurement: ai_token_usage
tags:
- feature_key: "contract_analysis" // 功能标识
- tenant_id: "tenant_001" // 租户ID
- user_id: "user_12345" // 用户ID
- model: "gpt-4o" // 使用的模型
- api_path: "/api/contract/analyze" // 接口路径
- env: "production" // 环境
fields:
- input_tokens: 1200 // 输入 Token
- output_tokens: 350 // 输出 Token
- total_tokens: 1550 // 总 Token
- cost_usd: 0.0155 // 美元成本
- latency_ms: 2340 // 调用耗时
timestamp: 1714012800000000000有了这个数据,你就能做各种维度的聚合查询。
代码实现:基于 AOP 的 Token 采集
手动在每个 AI 调用点加埋点代码太麻烦,而且容易漏。用 AOP 是更好的选择:加一个注解,自动采集。
自定义注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AiCostTracking {
/**
* 功能标识,用于成本归因
*/
String featureKey();
/**
* 是否从上下文自动获取租户ID和用户ID(默认 true)
*/
boolean autoContext() default true;
}AOP 切面实现
@Aspect
@Component
@Slf4j
public class AiCostTrackingAspect {
@Autowired
private InfluxDBWriter influxDBWriter;
@Autowired
private CostCalculator costCalculator;
@Autowired
private UserContextHolder userContextHolder;
@Autowired
private CostAlertService costAlertService;
@Around("@annotation(costTracking)")
public Object trackCost(ProceedingJoinPoint joinPoint, AiCostTracking costTracking) throws Throwable {
String featureKey = costTracking.featureKey();
long startTime = System.currentTimeMillis();
// 获取用户上下文
String tenantId = userContextHolder.getTenantId();
String userId = userContextHolder.getUserId();
Object result = null;
Exception exception = null;
try {
result = joinPoint.proceed();
return result;
} catch (Exception e) {
exception = e;
throw e;
} finally {
long latencyMs = System.currentTimeMillis() - startTime;
// 从返回值中提取 Token 使用情况
TokenUsage tokenUsage = extractTokenUsage(result);
if (tokenUsage != null) {
// 计算成本
String model = extractModel(result);
double costUsd = costCalculator.calculate(model, tokenUsage);
// 构建数据点
AiCostDataPoint dataPoint = AiCostDataPoint.builder()
.featureKey(featureKey)
.tenantId(tenantId)
.userId(userId)
.model(model)
.apiPath(getApiPath())
.inputTokens(tokenUsage.getInputTokens())
.outputTokens(tokenUsage.getOutputTokens())
.totalTokens(tokenUsage.getTotalTokens())
.costUsd(costUsd)
.latencyMs(latencyMs)
.success(exception == null)
.timestamp(System.currentTimeMillis())
.build();
// 异步写入 InfluxDB
influxDBWriter.writeAsync(dataPoint);
// 检查是否触发告警
costAlertService.checkAndAlert(tenantId, userId, featureKey, costUsd, tokenUsage);
}
}
}
private TokenUsage extractTokenUsage(Object result) {
if (result instanceof UnifiedChatResponse) {
UnifiedChatResponse response = (UnifiedChatResponse) result;
return response.getTokenUsage();
}
// 支持其他响应类型
return null;
}
private String extractModel(Object result) {
if (result instanceof UnifiedChatResponse) {
return ((UnifiedChatResponse) result).getModelVersion();
}
return "unknown";
}
private String getApiPath() {
ServletRequestAttributes attributes =
(ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes != null) {
return attributes.getRequest().getRequestURI();
}
return "non-http";
}
}InfluxDB 写入
@Component
@Slf4j
public class InfluxDBWriter {
@Autowired
private InfluxDBClient influxDBClient;
@Value("${influxdb.org}")
private String org;
@Value("${influxdb.bucket}")
private String bucket;
private final ExecutorService asyncExecutor = Executors.newFixedThreadPool(4);
public void writeAsync(AiCostDataPoint dataPoint) {
asyncExecutor.submit(() -> {
try {
write(dataPoint);
} catch (Exception e) {
log.error("Failed to write cost data point to InfluxDB", e);
// 写入失败不影响主业务,但要告警
}
});
}
private void write(AiCostDataPoint dp) {
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
Point point = Point.measurement("ai_token_usage")
.addTag("feature_key", dp.getFeatureKey())
.addTag("tenant_id", safeTag(dp.getTenantId()))
.addTag("user_id", safeTag(dp.getUserId()))
.addTag("model", dp.getModel())
.addTag("api_path", dp.getApiPath())
.addTag("success", String.valueOf(dp.isSuccess()))
.addField("input_tokens", dp.getInputTokens())
.addField("output_tokens", dp.getOutputTokens())
.addField("total_tokens", dp.getTotalTokens())
.addField("cost_usd", dp.getCostUsd())
.addField("latency_ms", dp.getLatencyMs())
.time(dp.getTimestamp(), WritePrecision.MS);
writeApi.writePoint(org, bucket, point);
}
private String safeTag(String value) {
return value != null ? value : "unknown";
}
}成本计算器
@Component
public class CostCalculator {
// 各模型定价(每千 token 美元,2024 年数据,实际使用时按官网最新定价更新)
private static final Map<String, ModelPricing> MODEL_PRICING = new HashMap<>();
static {
// GPT-4o
MODEL_PRICING.put("gpt-4o", new ModelPricing(0.0025, 0.01)); // input/output per 1K tokens
MODEL_PRICING.put("gpt-4o-mini", new ModelPricing(0.00015, 0.0006));
MODEL_PRICING.put("gpt-4-turbo", new ModelPricing(0.01, 0.03));
// Claude
MODEL_PRICING.put("claude-3-5-sonnet-20241022", new ModelPricing(0.003, 0.015));
MODEL_PRICING.put("claude-3-haiku-20240307", new ModelPricing(0.00025, 0.00125));
// 默认
MODEL_PRICING.put("default", new ModelPricing(0.001, 0.002));
}
public double calculate(String model, TokenUsage usage) {
ModelPricing pricing = MODEL_PRICING.getOrDefault(model, MODEL_PRICING.get("default"));
double inputCost = (usage.getInputTokens() / 1000.0) * pricing.getInputPricePerK();
double outputCost = (usage.getOutputTokens() / 1000.0) * pricing.getOutputPricePerK();
return inputCost + outputCost;
}
@Data
@AllArgsConstructor
private static class ModelPricing {
private double inputPricePerK;
private double outputPricePerK;
}
}告警服务
@Service
@Slf4j
public class CostAlertService {
@Autowired
private AlertNotifier alertNotifier;
@Autowired
private BudgetConfigService budgetConfigService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void checkAndAlert(String tenantId, String userId, String featureKey,
double costUsd, TokenUsage tokenUsage) {
// 1. 检查单次调用 Token 异常(防止某个 Prompt 爆炸)
if (tokenUsage.getTotalTokens() > 10000) {
log.warn("Single call token count {} exceeds threshold for feature {}, user {}",
tokenUsage.getTotalTokens(), featureKey, userId);
alertNotifier.sendAlert(AlertType.HIGH_SINGLE_CALL_TOKENS,
buildAlertContext(tenantId, userId, featureKey, costUsd, tokenUsage));
}
// 2. 检查用户日消耗
String userDailyKey = String.format("cost:daily:user:%s:%s", userId, LocalDate.now());
Double userDailyCost = accumulateAndGet(userDailyKey, costUsd);
BudgetConfig userBudget = budgetConfigService.getUserBudget(userId);
if (userBudget != null && userDailyCost > userBudget.getDailyLimitUsd()) {
alertNotifier.sendAlert(AlertType.USER_DAILY_BUDGET_EXCEEDED,
buildAlertContext(tenantId, userId, featureKey, userDailyCost, null));
}
// 3. 检查租户月消耗
String tenantMonthlyKey = String.format("cost:monthly:tenant:%s:%s",
tenantId, YearMonth.now());
Double tenantMonthlyCost = accumulateAndGet(tenantMonthlyKey, costUsd);
BudgetConfig tenantBudget = budgetConfigService.getTenantBudget(tenantId);
if (tenantBudget != null) {
double usageRate = tenantMonthlyCost / tenantBudget.getMonthlyLimitUsd();
// 80% 预警
if (usageRate > 0.8 && usageRate <= 1.0) {
alertNotifier.sendAlert(AlertType.TENANT_BUDGET_WARNING_80PCT,
buildAlertContext(tenantId, userId, featureKey, tenantMonthlyCost, null));
}
// 超过 100% 告警
if (usageRate > 1.0) {
alertNotifier.sendAlert(AlertType.TENANT_BUDGET_EXCEEDED,
buildAlertContext(tenantId, userId, featureKey, tenantMonthlyCost, null));
}
}
}
private Double accumulateAndGet(String key, double delta) {
// 使用 Redis 做实时累加,避免频繁查询 InfluxDB
Double current = (Double) redisTemplate.opsForValue().get(key);
double newValue = (current != null ? current : 0) + delta;
redisTemplate.opsForValue().set(key, newValue, Duration.ofDays(35)); // 保留 35 天
return newValue;
}
}使用方式
在需要追踪的方法上加注解,就这么简单:
@Service
public class ContractAnalysisService {
@Autowired
private UnifiedModelService modelService;
@AiCostTracking(featureKey = "contract_analysis")
public ContractAnalysisResult analyze(String contractText, String userId) {
UnifiedChatRequest request = buildRequest(contractText);
UnifiedChatResponse response = modelService.chat(request);
return parseResult(response.getContent());
}
@AiCostTracking(featureKey = "smart_report")
public ReportResult generateReport(ReportRequest reportRequest) {
// 这里可能多轮调用,每轮都会被追踪
UnifiedChatResponse step1 = modelService.chat(buildStep1Request(reportRequest));
UnifiedChatResponse step2 = modelService.chat(buildStep2Request(step1.getContent()));
return buildReport(step2.getContent());
}
}成本追踪数据流
查询示例
有了 InfluxDB 里的数据,你可以很方便地做各种查询。
Flux 查询语言(InfluxDB 2.x):
查询某租户上月各功能的成本分布:
from(bucket: "ai_metrics")
|> range(start: -30d)
|> filter(fn: (r) => r["_measurement"] == "ai_token_usage")
|> filter(fn: (r) => r["tenant_id"] == "tenant_001")
|> filter(fn: (r) => r["_field"] == "cost_usd")
|> group(columns: ["feature_key"])
|> sum()
|> sort(columns: ["_value"], desc: true)查询 Token 消耗 Top 10 用户:
from(bucket: "ai_metrics")
|> range(start: -7d)
|> filter(fn: (r) => r["_measurement"] == "ai_token_usage")
|> filter(fn: (r) => r["_field"] == "total_tokens")
|> group(columns: ["user_id"])
|> sum()
|> sort(columns: ["_value"], desc: true)
|> limit(n: 10)检测异常高消耗调用(单次超过 8000 Token):
from(bucket: "ai_metrics")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "ai_token_usage")
|> filter(fn: (r) => r["_field"] == "total_tokens")
|> filter(fn: (r) => r["_value"] > 8000)
|> sort(columns: ["_value"], desc: true)几个实践经验
写入不能阻塞主流程
成本数据写入 InfluxDB 这个操作不能同步阻塞,否则一旦 InfluxDB 慢了或者挂了,AI 服务就全部受影响。一定要异步写入,并且要有降级策略(InfluxDB 写入失败时,至少把关键信息写到日志里,后续可以回溯)。
Redis 做实时计数,InfluxDB 做历史分析
两者各有用途,不要混用。Redis 做实时预算检查(快),InfluxDB 做历史趋势分析和报表(完整)。
Tag 数量要控制
InfluxDB 的 Tag 会形成 series,Tag 值的组合数量决定了 series 数量。如果 userId 有几十万个,不要直接把 userId 作为 Tag,否则 series 数量爆炸会导致查询性能急剧下降。可以在查询时用 Field 过滤代替 Tag 过滤。
成本数据也要有保留策略
线上明细数据不用永久保留,可以设置自动过期:明细数据保留 90 天,按天汇总的数据保留 1 年,按月汇总的数据永久保留。InfluxDB 的 Retention Policy 可以自动处理这个。
最后
财务那封邮件之后,我们花了两周搭建了这套成本追踪体系。现在每天早上我都能看到昨天各个功能、各个租户的成本报表,再也不会被财务问到哑口无言了。
更重要的是,通过成本数据,我们发现了几个 Prompt 设计上的问题——某个功能的平均 Token 消耗异常高,排查后发现是 Prompt 里拼了太多无关的上下文信息,优化后成本降了 40%。
成本追踪不是为了管控,而是为了让你能看清楚钱花在哪里,然后做出正确的决策。
