第1780篇:AI预算管理系统设计——预警、限流与成本异常检测
2026/4/30大约 10 分钟
第1780篇:AI预算管理系统设计——预警、限流与成本异常检测
做AI成本管理系统,最后这块是最有挑战性的:预算管理。
前几篇讲了成本归因、分摊、计费——这些都是"记账"。预算管理是"管账",要在钱还没花完之前就介入,防止失控。
一个典型场景:某个工程师在测试时写了个循环,不小心对同一份文档调用了10000次GPT-4o,一晚上烧了几千美元。等第二天早上看到账单,已经晚了。
好的预算管理系统应该在这种情况发生时,实时发现并干预,而不是事后复盘。
这篇文章,我来讲AI预算管理系统的完整设计,覆盖预警、限流和异常检测三个核心能力。
预算管理的三道防线
三道防线缺一不可:
- 第一道:事前拦截,直接不让超预算的请求通过
- 第二道:事中告警,提前发现预算消耗过快的趋势
- 第三道:模式识别,发现非正常的消耗行为
第一道防线:预算检查拦截器
@Component
@Order(1) // 最高优先级
@Slf4j
public class BudgetGuardFilter implements Filter {
@Autowired
private BudgetGuardService budgetGuard;
@Autowired
private TenantContextResolver tenantResolver;
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
// 只拦截AI调用路径
if (!isAIEndpoint(httpRequest.getRequestURI())) {
chain.doFilter(request, response);
return;
}
TenantContext tenant = tenantResolver.resolve(httpRequest);
if (tenant == null) {
chain.doFilter(request, response);
return;
}
// 快速预算检查(从缓存读,不走数据库)
BudgetStatus status = budgetGuard.quickCheck(tenant.getTenantId());
switch (status.getLevel()) {
case NORMAL:
chain.doFilter(request, response);
break;
case WARNING:
// 告警状态:允许请求,但加上警告头
((HttpServletResponse) response).setHeader(
"X-Budget-Warning",
String.format("预算已使用%.1f%%,请注意控制用量", status.getUsedPercent())
);
chain.doFilter(request, response);
break;
case EXCEEDED:
// 超预算:拒绝请求
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.setStatus(429);
httpResponse.setContentType("application/json;charset=UTF-8");
httpResponse.getWriter().write(JSON.toJSONString(Map.of(
"code", "BUDGET_EXCEEDED",
"message", "月度AI预算已达上限,如需继续使用请联系管理员提升预算",
"budgetCny", status.getBudgetCny(),
"usedCny", status.getUsedCny()
)));
log.warn("预算超限,拒绝请求: tenant={}, used={}, budget={}",
tenant.getTenantId(), status.getUsedCny(), status.getBudgetCny());
break;
case SUSPENDED:
// 账户被暂停
HttpServletResponse suspendResp = (HttpServletResponse) response;
suspendResp.setStatus(403);
suspendResp.getWriter().write(JSON.toJSONString(Map.of(
"code", "ACCOUNT_SUSPENDED",
"message", "账户已被暂停,请联系管理员"
)));
break;
}
}
private boolean isAIEndpoint(String uri) {
return uri.startsWith("/api/ai/") || uri.startsWith("/v1/");
}
}快速预算检查服务
预算检查是每次请求必经之路,必须极快(< 1ms),不能走数据库。
@Service
public class BudgetGuardService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private BudgetConfigRepository budgetConfigRepository;
/**
* 快速预算检查:纯Redis操作,微秒级
*/
public BudgetStatus quickCheck(String tenantId) {
String monthKey = getCurrentMonthKey(tenantId);
// 从Redis获取当月已消耗(分)
String usedStr = redisTemplate.opsForValue().get("budget:used:" + monthKey);
BigDecimal usedCny = usedStr != null ? new BigDecimal(usedStr) : BigDecimal.ZERO;
// 获取预算配置(缓存在Redis,避免每次查DB)
String budgetStr = redisTemplate.opsForValue().get("budget:limit:" + monthKey);
if (budgetStr == null) {
// 缓存未命中,从数据库加载
BudgetConfig config = budgetConfigRepository.findByTenantId(tenantId);
if (config == null || config.getMonthlyBudgetCny() == null) {
return BudgetStatus.noLimit(); // 无预算限制,直接放行
}
// 缓存预算配置到本月底
redisTemplate.opsForValue().set(
"budget:limit:" + monthKey,
config.getMonthlyBudgetCny().toPlainString(),
getExpireToEndOfMonth()
);
budgetStr = config.getMonthlyBudgetCny().toPlainString();
}
BigDecimal budgetCny = new BigDecimal(budgetStr);
BigDecimal usedPercent = usedCny.divide(budgetCny, 4, RoundingMode.HALF_UP)
.multiply(new BigDecimal(100));
BudgetLevel level;
if (usedPercent.compareTo(new BigDecimal("100")) >= 0) {
level = BudgetLevel.EXCEEDED;
} else if (usedPercent.compareTo(new BigDecimal("80")) >= 0) {
level = BudgetLevel.WARNING;
} else {
level = BudgetLevel.NORMAL;
}
return BudgetStatus.builder()
.level(level)
.usedCny(usedCny)
.budgetCny(budgetCny)
.usedPercent(usedPercent.doubleValue())
.build();
}
/**
* AI调用完成后更新Redis中的消耗量
* 使用 INCRBYFLOAT 保证原子性
*/
public void recordConsumption(String tenantId, BigDecimal costCny) {
String monthKey = getCurrentMonthKey(tenantId);
String key = "budget:used:" + monthKey;
// 原子性增加消耗
redisTemplate.opsForValue().increment(key, 0); // 确保key存在
redisTemplate.execute(new SessionCallback<>() {
@Override
public Object execute(RedisOperations operations) {
operations.multi();
// 使用 Lua 脚本保证原子性增加 BigDecimal
return operations.exec();
}
});
// 更新分层(项目、部门)的预算消耗
String deptKey = getDeptMonthKey(tenantId);
if (deptKey != null) {
redisTemplate.opsForValue().increment("budget:used:" + deptKey, 0);
}
}
}第二道防线:实时预警系统
@Service
@Slf4j
public class BudgetAlertService {
// 告警阈值配置
private static final List<Integer> ALERT_THRESHOLDS = List.of(50, 70, 80, 90, 95, 100);
@Autowired
private NotificationService notificationService;
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 每次消耗后检查是否需要发告警
*/
public void checkAndAlert(String tenantId, BigDecimal usedCny, BigDecimal budgetCny) {
double usedPercent = usedCny.divide(budgetCny, 4, RoundingMode.HALF_UP)
.doubleValue() * 100;
for (int threshold : ALERT_THRESHOLDS) {
if (usedPercent >= threshold) {
String alertKey = "alert:sent:" + tenantId + ":" +
LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")) +
":" + threshold;
// 检查这个阈值的告警是否已经发过(避免重复发)
if (redisTemplate.opsForValue().setIfAbsent(alertKey, "1", Duration.ofDays(31))) {
// 新告警,发送
sendBudgetAlert(tenantId, threshold, usedCny, budgetCny);
log.info("发送预算告警: tenant={}, threshold={}%, used={}, budget={}",
tenantId, threshold, usedCny, budgetCny);
}
}
}
}
private void sendBudgetAlert(String tenantId, int thresholdPercent,
BigDecimal used, BigDecimal budget) {
TenantInfo tenant = tenantRepository.findById(tenantId);
BigDecimal remaining = budget.subtract(used);
AlertMessage msg = AlertMessage.builder()
.tenantId(tenantId)
.tenantName(tenant.getName())
.thresholdPercent(thresholdPercent)
.usedCny(used)
.budgetCny(budget)
.remainingCny(remaining)
.remainingDaysInMonth(getRemainingDaysInMonth())
.estimatedDailySpend(getEstimatedDailySpend(tenantId))
.build();
// 多渠道通知
notificationService.sendEmail(tenant.getEmailList(), buildAlertEmail(msg));
notificationService.sendWecom(tenant.getWecomWebhook(), buildAlertWecom(msg));
// 如果是100%超限,同时发给管理员
if (thresholdPercent >= 100) {
notificationService.sendToAdminGroup(buildCriticalAlert(msg));
}
}
/**
* 预测本月是否会超预算
* 基于当前消耗速率进行线性预测
*/
public BudgetPrediction predictMonthEnd(String tenantId) {
// 过去7天的每日消耗
List<BigDecimal> dailySpends = getDailySpends(tenantId, 7);
if (dailySpends.isEmpty()) {
return BudgetPrediction.noData();
}
// 计算日均消耗
BigDecimal avgDailySpend = dailySpends.stream()
.reduce(BigDecimal.ZERO, BigDecimal::add)
.divide(new BigDecimal(dailySpends.size()), 4, RoundingMode.HALF_UP);
// 预测月底总消耗
int remainingDays = getRemainingDaysInMonth();
BigDecimal currentSpend = getMonthlySpend(tenantId);
BigDecimal predictedTotal = currentSpend.add(
avgDailySpend.multiply(new BigDecimal(remainingDays))
);
BudgetConfig budget = getBudgetConfig(tenantId);
boolean willExceed = budget != null && budget.getMonthlyBudgetCny() != null
&& predictedTotal.compareTo(budget.getMonthlyBudgetCny()) > 0;
return BudgetPrediction.builder()
.currentSpend(currentSpend)
.avgDailySpend(avgDailySpend)
.predictedMonthTotal(predictedTotal)
.budgetCny(budget != null ? budget.getMonthlyBudgetCny() : null)
.willExceedBudget(willExceed)
.daysUntilBudgetExhausted(willExceed ?
calculateDaysToBudgetExhaust(currentSpend, avgDailySpend, budget.getMonthlyBudgetCny()) :
null)
.build();
}
}第三道防线:成本异常检测
这是最有技术含量的部分。我们要用统计方法检测"非正常的"消耗模式。
什么是异常
AI成本的异常模式主要有几种:
- 突增型:某个时间点成本突然大幅增加(如循环调用bug)
- 渐进型:成本缓慢但持续上升,最终超出预期(如某功能被滥用)
- 周期异常:非工作时间(如凌晨3点)有大量调用
@Service
@Slf4j
public class CostAnomalyDetector {
/**
* 检测突增异常
* 方法:与历史同时段对比,超过N个标准差视为异常
*/
public List<CostAnomaly> detectSurgeAnomalies() {
LocalDateTime now = LocalDateTime.now();
// 获取过去1小时的各租户成本
Map<String, BigDecimal> currentHourCosts = getCostsByTenant(
now.minusHours(1), now
);
List<CostAnomaly> anomalies = new ArrayList<>();
for (Map.Entry<String, BigDecimal> entry : currentHourCosts.entrySet()) {
String tenantId = entry.getKey();
BigDecimal currentCost = entry.getValue();
// 获取过去4周同一时段的成本(4个数据点)
List<BigDecimal> historicalCosts = getHistoricalHourlyCosts(
tenantId, now.getHour(), now.getDayOfWeek(), 4
);
if (historicalCosts.size() < 2) continue; // 数据不足,跳过
// 计算均值和标准差
double mean = historicalCosts.stream()
.mapToDouble(BigDecimal::doubleValue)
.average().orElse(0);
double stdDev = calculateStdDev(historicalCosts, mean);
// 当前值超过均值 + 3倍标准差,视为异常
double zScore = stdDev > 0 ? (currentCost.doubleValue() - mean) / stdDev : 0;
if (zScore > 3.0) {
CostAnomaly anomaly = CostAnomaly.builder()
.tenantId(tenantId)
.detectedAt(now)
.anomalyType("SURGE")
.currentCost(currentCost)
.historicalAvg(new BigDecimal(mean))
.zScore(zScore)
.severity(classifySeverity(zScore))
.description(String.format(
"过去1小时成本 %.2f元,是历史均值 %.2f元的 %.1f 倍",
currentCost.doubleValue(), mean, currentCost.doubleValue() / mean
))
.build();
anomalies.add(anomaly);
log.warn("检测到成本异常突增: tenant={}, current={}元, avg={}元, zScore={}",
tenantId, currentCost, mean, zScore);
}
}
return anomalies;
}
/**
* 检测非工作时间异常
* 业务系统在深夜有大量AI调用,通常是bug或者恶意行为
*/
public List<CostAnomaly> detectOffHoursAnomalies() {
LocalDateTime now = LocalDateTime.now();
int currentHour = now.getHour();
// 定义非工作时间(0:00-7:00)
if (currentHour >= 7) return Collections.emptyList();
// 获取过去1小时的成本
Map<String, BigDecimal> offHoursCosts = getCostsByTenant(now.minusHours(1), now);
List<CostAnomaly> anomalies = new ArrayList<>();
for (Map.Entry<String, BigDecimal> entry : offHoursCosts.entrySet()) {
String tenantId = entry.getKey();
BigDecimal offHoursCost = entry.getValue();
// 如果非工作时间成本超过一定阈值,认为异常
BigDecimal offHoursThreshold = getOffHoursThreshold(tenantId);
if (offHoursCost.compareTo(offHoursThreshold) > 0) {
anomalies.add(CostAnomaly.builder()
.tenantId(tenantId)
.anomalyType("OFF_HOURS")
.currentCost(offHoursCost)
.severity("HIGH")
.description(String.format(
"非工作时间(%d:00)检测到AI调用,金额 %.2f元,超过阈值 %.2f元",
currentHour, offHoursCost.doubleValue(), offHoursThreshold.doubleValue()
))
.build());
}
}
return anomalies;
}
/**
* 高频小额调用检测
* 比如:每秒钟大量调用,每次只用少量token,可能是程序bug或测试代码流入生产
*/
public List<CostAnomaly> detectHighFrequencyAnomalies() {
// 查询最近5分钟,按分钟统计调用次数
Map<String, Long> callCounts = getCallCountsPerMinute(5);
List<CostAnomaly> anomalies = new ArrayList<>();
for (Map.Entry<String, Long> entry : callCounts.entrySet()) {
String tenantId = entry.getKey();
long callsPerMinute = entry.getValue();
// 获取该租户的正常调用频率基准
double normalCallRate = getNormalCallRate(tenantId);
if (callsPerMinute > normalCallRate * 5) {
anomalies.add(CostAnomaly.builder()
.tenantId(tenantId)
.anomalyType("HIGH_FREQUENCY")
.severity("MEDIUM")
.description(String.format(
"调用频率异常: %d次/分钟,正常基准 %.0f次/分钟",
callsPerMinute, normalCallRate
))
.build());
}
}
return anomalies;
}
private double calculateStdDev(List<BigDecimal> values, double mean) {
double sumSquaredDiffs = values.stream()
.mapToDouble(v -> Math.pow(v.doubleValue() - mean, 2))
.sum();
return Math.sqrt(sumSquaredDiffs / values.size());
}
private String classifySeverity(double zScore) {
if (zScore > 10) return "CRITICAL";
if (zScore > 6) return "HIGH";
if (zScore > 3) return "MEDIUM";
return "LOW";
}
}异常响应自动化
检测到异常后,需要自动响应,不能只靠人工处理。
@Service
public class AnomalyResponseService {
@Autowired
private BudgetGuardService budgetGuard;
@Autowired
private NotificationService notificationService;
/**
* 根据异常严重程度,执行不同的响应动作
*/
public void respond(CostAnomaly anomaly) {
switch (anomaly.getSeverity()) {
case "CRITICAL":
// 立即暂停账户
budgetGuard.suspendTenant(anomaly.getTenantId(),
"成本异常检测:" + anomaly.getDescription());
notificationService.sendUrgentAlert(anomaly);
log.error("CRITICAL异常,已暂停租户: {}", anomaly.getTenantId());
break;
case "HIGH":
// 限制到正常速率的20%
budgetGuard.applyRateLimit(anomaly.getTenantId(), 0.2);
notificationService.sendHighPriorityAlert(anomaly);
log.warn("HIGH异常,已限速: {}", anomaly.getTenantId());
break;
case "MEDIUM":
// 仅告警,不限制
notificationService.sendNormalAlert(anomaly);
log.info("MEDIUM异常,发送告警: {}", anomaly.getTenantId());
break;
case "LOW":
// 仅记录
anomalyRepository.save(anomaly);
break;
}
}
/**
* 限速实现:临时降低某租户的RPM限制
*/
public void applyRateLimit(String tenantId, double factor) {
String key = "ratelimit:factor:" + tenantId;
// 设置限速因子,有效期1小时(1小时后自动恢复)
redisTemplate.opsForValue().set(key, String.valueOf(factor), Duration.ofHours(1));
log.info("限速应用: tenantId={}, factor={}", tenantId, factor);
}
}整合:异常检测调度
@Service
@Slf4j
public class AnomalyDetectionScheduler {
@Autowired
private CostAnomalyDetector detector;
@Autowired
private AnomalyResponseService responseService;
/**
* 每5分钟运行一次异常检测
*/
@Scheduled(fixedRate = 300000)
public void runAnomalyDetection() {
log.debug("开始异常检测...");
List<CostAnomaly> allAnomalies = new ArrayList<>();
// 并行运行多种检测
CompletableFuture<List<CostAnomaly>> surge = CompletableFuture.supplyAsync(
detector::detectSurgeAnomalies
);
CompletableFuture<List<CostAnomaly>> offHours = CompletableFuture.supplyAsync(
detector::detectOffHoursAnomalies
);
CompletableFuture<List<CostAnomaly>> highFreq = CompletableFuture.supplyAsync(
detector::detectHighFrequencyAnomalies
);
try {
allAnomalies.addAll(surge.get(30, TimeUnit.SECONDS));
allAnomalies.addAll(offHours.get(30, TimeUnit.SECONDS));
allAnomalies.addAll(highFreq.get(30, TimeUnit.SECONDS));
} catch (Exception e) {
log.error("异常检测执行失败", e);
}
// 去重(同一租户同类型的异常只处理一次)
allAnomalies.stream()
.collect(Collectors.toMap(
a -> a.getTenantId() + ":" + a.getAnomalyType(),
a -> a,
(a1, a2) -> a1.getSeverity().compareTo(a2.getSeverity()) > 0 ? a1 : a2
))
.values()
.forEach(responseService::respond);
log.debug("异常检测完成,发现{}个异常", allAnomalies.size());
}
}管理后台:预算管理页面
@RestController
@RequestMapping("/admin/budget")
@PreAuthorize("hasRole('ADMIN')")
public class BudgetManagementController {
/**
* 获取所有租户的预算状态概览
*/
@GetMapping("/overview")
public ApiResponse<BudgetOverviewVO> getOverview() {
List<TenantBudgetStatus> statuses = budgetService.getAllTenantStatuses();
long exceededCount = statuses.stream()
.filter(s -> s.getUsedPercent() >= 100).count();
long warningCount = statuses.stream()
.filter(s -> s.getUsedPercent() >= 80 && s.getUsedPercent() < 100).count();
return ApiResponse.success(BudgetOverviewVO.builder()
.totalTenants(statuses.size())
.exceededCount((int) exceededCount)
.warningCount((int) warningCount)
.tenantStatuses(statuses)
.recentAnomalies(anomalyRepository.findRecent(20))
.build());
}
/**
* 更新租户预算
*/
@PutMapping("/{tenantId}/budget")
public ApiResponse<Void> updateBudget(
@PathVariable String tenantId,
@RequestBody UpdateBudgetRequest req,
Authentication auth) {
budgetService.updateBudget(tenantId, req.getMonthlyBudgetCny(), auth.getName());
// 审计日志
auditLog.record("BUDGET_UPDATE", auth.getName(),
String.format("更新租户[%s]预算为%s元", tenantId, req.getMonthlyBudgetCny()));
return ApiResponse.success();
}
/**
* 解除限速/恢复账户
*/
@PostMapping("/{tenantId}/resume")
public ApiResponse<Void> resumeTenant(
@PathVariable String tenantId,
@RequestBody ResumeRequest req,
Authentication auth) {
budgetService.resumeTenant(tenantId, req.getReason());
auditLog.record("TENANT_RESUME", auth.getName(),
String.format("恢复租户[%s],原因:%s", tenantId, req.getReason()));
return ApiResponse.success();
}
}系统整体架构回顾
这个系列讲了10篇,我们把AI成本管理从0到1建立起来:
每一个环节都是真实项目中碰到的问题,不是凑字数的知识点堆砌。
最后说几句实在话
搭这套系统的核心挑战不在于技术,而在于推动组织行为改变。
技术上,一个有经验的团队2-3个月能把这套东西搭起来。但真正发挥价值,需要:
- 业务部门接受成本分摊(有时候会有抵触)
- 管理层认可用ROI来评估AI投入(而不是"感觉有用")
- 工程师团队把成本意识纳入日常开发习惯
技术只是基础,组织文化才是让成本管理真正落地的关键。
工程师能做的最大贡献,是把数据做准、可视化做好,让决策者能看到清晰的成本和价值数据,剩下的就是业务和管理的事了。
