AI 应用的限流和降级策略——防止某个用户烧完你一个月的预算
AI 应用的限流和降级策略——防止某个用户烧完你一个月的预算
适读人群:运营 AI 应用的技术负责人、后端工程师 | 阅读时长:约 14 分钟 | 核心价值:完整的 AI 应用成本控制工程体系,从限流到降级全覆盖
有一个月,我们的 API 账单超出预算 340%。
不是被攻击,是被一个用户用的。他在用我们的 AI 代码审查工具,发现可以把整个代码仓库扔进去让 AI 分析,一个仓库十几万行代码,提交了三十几次,消耗了将近 800 万 token。
那个月光他一个用户就花了我们几千块。
我花了一个周末建了一套成本控制体系。之后两年没再出现这种情况。
成本失控的几种情形
在开始讲方案之前,先说清楚成本失控通常怎么发生的:
情形一:单用户超量使用——就是上面说的,某个用户发现了可以大量消耗资源的用法。
情形二:爬虫或脚本批量调用——如果接口没有做好防护,有人写脚本跑批量请求。
情形三:功能设计导致隐性消耗——比如你做了一个「自动总结」功能,每次用户打开对话都触发一次 API 调用,用户不知道,你也没注意,但每天几万次自动调用慢慢把预算吃干净了。
情形四:测试环境没有限制——开发人员在测试环境里跑大量测试,用的是生产 Key,月底账单里有一大块是测试流量。
情形五:Prompt 太长——某个功能的 System Prompt 几千 token,每次调用都带着,用量一大,这部分占比惊人。
成本控制的三个维度
维度一:限流——控制请求频率和数量
维度二:配额——控制 token 用量上限
维度三:降级——在条件触发时自动换用便宜的模型
三个维度组合使用,才是完整的方案。
维度一:限流实现
限流分几个层级:
用户级别限流:每个用户在单位时间内的请求数上限
IP 级别限流:防止未登录的爬虫请求
全局限流:防止整个系统被压垮
import time
import redis
from typing import Optional
from enum import Enum
class LimitResult(Enum):
ALLOWED = "allowed"
RATE_LIMITED = "rate_limited"
QUOTA_EXCEEDED = "quota_exceeded"
class RateLimiter:
"""滑动窗口限流器"""
def __init__(self, redis_client):
self.redis = redis_client
def check_and_increment(
self,
key: str,
limit: int,
window_seconds: int
) -> tuple[LimitResult, dict]:
"""
滑动窗口限流
返回 (结果, 状态信息)
"""
now = time.time()
window_start = now - window_seconds
pipe = self.redis.pipeline()
# 移除窗口外的旧记录
pipe.zremrangebyscore(key, 0, window_start)
# 统计当前窗口的请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(now): now})
# 设置key的过期时间
pipe.expire(key, window_seconds * 2)
results = pipe.execute()
current_count = results[1] # zcard的结果
if current_count >= limit:
# 超限,撤销刚才添加的记录
self.redis.zrem(key, str(now))
# 计算距离下一个可用时间
oldest_in_window = self.redis.zrange(key, 0, 0, withscores=True)
if oldest_in_window:
retry_after = int(window_seconds - (now - oldest_in_window[0][1])) + 1
else:
retry_after = window_seconds
return LimitResult.RATE_LIMITED, {
'current_count': current_count,
'limit': limit,
'window_seconds': window_seconds,
'retry_after': retry_after
}
return LimitResult.ALLOWED, {
'current_count': current_count + 1,
'limit': limit,
'remaining': limit - current_count - 1
}
class TokenQuotaManager:
"""Token 用量配额管理"""
# 不同用户等级的每日token配额
TIER_LIMITS = {
'free': 50_000, # 免费用户:5万token/天
'basic': 200_000, # 基础用户:20万token/天
'pro': 1_000_000, # 专业用户:100万token/天
'enterprise': -1 # 企业用户:无限制(有合同保障)
}
def __init__(self, redis_client, db_client):
self.redis = redis_client
self.db = db_client
def get_daily_quota(self, user_id: str, user_tier: str) -> int:
"""获取用户的每日token配额"""
return self.TIER_LIMITS.get(user_tier, self.TIER_LIMITS['free'])
async def check_quota(self, user_id: str, estimated_tokens: int, user_tier: str) -> tuple[bool, dict]:
"""检查用户是否还有足够的token配额"""
quota = self.get_daily_quota(user_id, user_tier)
if quota == -1:
return True, {'quota': -1, 'used': 0, 'remaining': -1}
# 从Redis获取今日已用量
today_key = f"token_quota:{user_id}:{self._today()}"
used_today = int(self.redis.get(today_key) or 0)
remaining = quota - used_today
if remaining < estimated_tokens:
return False, {
'quota': quota,
'used': used_today,
'remaining': remaining,
'needed': estimated_tokens
}
return True, {
'quota': quota,
'used': used_today,
'remaining': remaining
}
async def record_usage(self, user_id: str, tokens_used: int):
"""记录token使用量"""
today_key = f"token_quota:{user_id}:{self._today()}"
pipe = self.redis.pipeline()
pipe.incrby(today_key, tokens_used)
pipe.expire(today_key, 86400 * 2) # 保留2天数据
pipe.execute()
# 异步写入数据库做统计(不影响主流程)
# 这里可以用消息队列异步处理
await self._async_log_to_db(user_id, tokens_used)
def _today(self) -> str:
from datetime import date
return date.today().strftime('%Y%m%d')
async def _async_log_to_db(self, user_id: str, tokens: int):
"""异步记录到数据库(用于账单统计)"""
await self.db.execute(
"""INSERT INTO token_usage_log (user_id, tokens, created_at)
VALUES (%s, %s, NOW())""",
(user_id, tokens)
)维度二:配额 + 部门级别管控
对于 To B 产品,还需要部门/团队级别的配额管理:
class DepartmentQuotaManager:
"""部门级别的配额管理(适用于企业客户)"""
def __init__(self, redis_client, db_client):
self.redis = redis_client
self.db = db_client
async def check_department_quota(
self,
dept_id: str,
estimated_tokens: int
) -> tuple[bool, dict]:
"""检查部门配额"""
# 从数据库获取部门配置
dept_config = await self.db.query_one(
"SELECT monthly_token_quota, alert_threshold FROM departments WHERE dept_id = %s",
(dept_id,)
)
if not dept_config:
return True, {} # 找不到配置,放行
monthly_quota = dept_config['monthly_token_quota']
alert_threshold = dept_config['alert_threshold'] # 比如0.8 = 用了80%时告警
# 从Redis获取本月已用量
month_key = f"dept_quota:{dept_id}:{self._this_month()}"
used_this_month = int(self.redis.get(month_key) or 0)
# 告警检查
if alert_threshold and (used_this_month / monthly_quota) >= alert_threshold:
await self._send_quota_alert(dept_id, used_this_month, monthly_quota)
# 配额检查
if used_this_month + estimated_tokens > monthly_quota:
return False, {
'dept_id': dept_id,
'monthly_quota': monthly_quota,
'used': used_this_month,
'remaining': monthly_quota - used_this_month
}
return True, {
'used': used_this_month,
'remaining': monthly_quota - used_this_month
}
async def _send_quota_alert(self, dept_id: str, used: int, quota: int):
"""发送配额告警(避免重复发送)"""
alert_key = f"quota_alert_sent:{dept_id}:{self._this_month()}"
if not self.redis.exists(alert_key):
# 发送告警...(钉钉/企微/邮件)
self.redis.setex(alert_key, 3600 * 24, '1') # 24小时内不重复告警
def _this_month(self) -> str:
from datetime import date
return date.today().strftime('%Y%m')维度三:模型降级策略
当用量接近配额或成本压力上升时,自动切换到便宜的模型:
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class ModelTier(Enum):
PREMIUM = "premium" # 最好的模型,贵
STANDARD = "standard" # 中等模型,平衡
ECONOMY = "economy" # 最便宜的,适合简单任务
@dataclass
class ModelConfig:
model_id: str
input_cost_per_1k: float # 每1k输入token的成本(美元)
output_cost_per_1k: float
max_context: int
tier: ModelTier
# 模型配置表
MODEL_CONFIGS = {
ModelTier.PREMIUM: ModelConfig(
model_id="gpt-4o",
input_cost_per_1k=0.0025,
output_cost_per_1k=0.01,
max_context=128000,
tier=ModelTier.PREMIUM
),
ModelTier.STANDARD: ModelConfig(
model_id="gpt-4o-mini",
input_cost_per_1k=0.00015,
output_cost_per_1k=0.0006,
max_context=128000,
tier=ModelTier.STANDARD
),
ModelTier.ECONOMY: ModelConfig(
model_id="gpt-3.5-turbo",
input_cost_per_1k=0.0005,
output_cost_per_1k=0.0015,
max_context=16000,
tier=ModelTier.ECONOMY
)
}
class ModelSelector:
"""根据用户状态和任务类型选择合适的模型"""
def __init__(self, quota_manager: TokenQuotaManager, redis_client):
self.quota_manager = quota_manager
self.redis = redis_client
async def select_model(
self,
user_id: str,
user_tier: str,
task_type: str,
estimated_tokens: int,
force_premium: bool = False
) -> ModelConfig:
"""
选择模型的逻辑:
1. 强制使用高端模型(用户明确要求)
2. 根据任务类型决定基础模型
3. 根据配额使用情况降级
4. 根据全局成本压力降级
"""
if force_premium:
return MODEL_CONFIGS[ModelTier.PREMIUM]
# 根据任务类型确定基础等级
task_tier = self._get_task_base_tier(task_type)
# 检查用户配额使用情况
_, quota_info = await self.quota_manager.check_quota(
user_id, estimated_tokens, user_tier
)
usage_ratio = quota_info.get('used', 0) / max(quota_info.get('quota', 1), 1)
# 配额用了超过90%,自动降级
if usage_ratio > 0.9 and task_tier == ModelTier.PREMIUM:
task_tier = ModelTier.STANDARD
# 检查全局成本压力(今天整体花费是否超预算)
if await self._is_global_cost_pressure():
if task_tier == ModelTier.PREMIUM:
task_tier = ModelTier.STANDARD
# 免费用户只能用经济模型
if user_tier == 'free' and task_tier != ModelTier.ECONOMY:
task_tier = ModelTier.ECONOMY
return MODEL_CONFIGS[task_tier]
def _get_task_base_tier(self, task_type: str) -> ModelTier:
"""根据任务类型确定基础模型等级"""
premium_tasks = {
'code_review', 'complex_analysis', 'creative_writing',
'strategic_planning', 'legal_review'
}
economy_tasks = {
'simple_qa', 'classification', 'summarization',
'translation', 'format_conversion'
}
if task_type in premium_tasks:
return ModelTier.PREMIUM
elif task_type in economy_tasks:
return ModelTier.ECONOMY
else:
return ModelTier.STANDARD
async def _is_global_cost_pressure(self) -> bool:
"""检查今日全局成本是否超过预警线"""
today_cost_key = f"global_cost:{self._today()}"
today_cost = float(self.redis.get(today_cost_key) or 0)
# 从配置读取每日预算
daily_budget = 100.0 # $100/天,根据实际情况设置
warning_threshold = 0.8
return today_cost >= daily_budget * warning_threshold
def _today(self) -> str:
from datetime import date
return date.today().strftime('%Y%m%d')整合成一个中间件
把上面三个组件整合成一个请求处理中间件:
from fastapi import Request, HTTPException
from functools import wraps
class AIRequestGuard:
"""AI请求的完整防护层"""
def __init__(self, rate_limiter, quota_manager, model_selector):
self.rate_limiter = rate_limiter
self.quota_manager = quota_manager
self.model_selector = model_selector
async def check_and_prepare(
self,
user_id: str,
user_tier: str,
task_type: str,
estimated_tokens: int,
request_ip: str
) -> dict:
"""
检查所有限制条件,返回本次请求应使用的配置
"""
# 1. IP级别限流(防爬虫)
ip_key = f"rate:ip:{request_ip}"
ip_result, ip_info = self.rate_limiter.check_and_increment(
ip_key, limit=100, window_seconds=60 # 每分钟100次
)
if ip_result == LimitResult.RATE_LIMITED:
raise HTTPException(
status_code=429,
detail={
'error': 'rate_limited',
'message': '请求过于频繁,请稍后再试',
'retry_after': ip_info.get('retry_after', 60)
}
)
# 2. 用户级别限流
user_rate_key = f"rate:user:{user_id}"
# 不同等级的限流上限不同
rate_limits = {'free': 10, 'basic': 30, 'pro': 100, 'enterprise': 500}
user_limit = rate_limits.get(user_tier, 10)
user_result, user_rate_info = self.rate_limiter.check_and_increment(
user_rate_key, limit=user_limit, window_seconds=60
)
if user_result == LimitResult.RATE_LIMITED:
raise HTTPException(
status_code=429,
detail={
'error': 'user_rate_limited',
'message': f'您的请求频率已达上限({user_limit}次/分钟)',
'retry_after': user_rate_info.get('retry_after', 60)
}
)
# 3. Token配额检查
quota_ok, quota_info = await self.quota_manager.check_quota(
user_id, estimated_tokens, user_tier
)
if not quota_ok:
raise HTTPException(
status_code=402, # Payment Required 或 429
detail={
'error': 'quota_exceeded',
'message': f'今日用量已达上限({quota_info["quota"]:,} tokens)',
'quota': quota_info,
'reset_at': '明天 00:00'
}
)
# 4. 选择模型
model_config = await self.model_selector.select_model(
user_id, user_tier, task_type, estimated_tokens
)
return {
'model_config': model_config,
'quota_info': quota_info,
'rate_info': user_rate_info
}
async def record_completion(self, user_id: str, actual_tokens: int, cost_usd: float):
"""请求完成后记录实际用量"""
await self.quota_manager.record_usage(user_id, actual_tokens)
# 更新全局成本计数
today_cost_key = f"global_cost:{self._today()}"
self.quota_manager.redis.incrbyfloat(today_cost_key, cost_usd)
self.quota_manager.redis.expire(today_cost_key, 86400 * 2)
def _today(self) -> str:
from datetime import date
return date.today().strftime('%Y%m%d')成本告警体系
有了限流还不够,要有主动告警:
import asyncio
from datetime import datetime, date
async def daily_cost_monitor(redis_client, alert_webhook_url: str):
"""每小时检查成本状态,超过阈值发告警"""
daily_budget = 100.0 # 每日预算
today = date.today().strftime('%Y%m%d')
today_cost = float(redis_client.get(f"global_cost:{today}") or 0)
# 不同阈值触发不同级别的告警
alert_levels = [
(0.5, "注意", f"今日AI成本已达预算50%:${today_cost:.2f}"),
(0.8, "警告", f"今日AI成本已达预算80%:${today_cost:.2f},已触发降级策略"),
(1.0, "紧急", f"今日AI成本已超出预算:${today_cost:.2f},超预算${today_cost - daily_budget:.2f}"),
]
for threshold, level, message in alert_levels:
if today_cost >= daily_budget * threshold:
alert_key = f"cost_alert:{today}:{threshold}"
if not redis_client.exists(alert_key):
await send_alert(alert_webhook_url, level, message)
redis_client.setex(alert_key, 86400, '1')
break # 只发最高级别的告警
async def send_alert(webhook_url: str, level: str, message: str):
"""发送告警到企业微信/钉钉"""
import aiohttp
payload = {
"msgtype": "text",
"text": {"content": f"[{level}] AI成本告警\n{message}\n时间:{datetime.now().strftime('%H:%M')}"}
}
async with aiohttp.ClientSession() as session:
await session.post(webhook_url, json=payload)那次事故的复盘
回到文章开头说的那次成本事故。事后复盘,有几个系统性问题:
- 没有用户级别的 token 配额,某个功能允许上传任意大小的文件
- 没有文件大小限制,也没有 token 预估
- 没有成本告警,月底才发现超支
现在我们的标准是:每个 AI 功能上线前都要做成本估算,每个用户都有每日配额,超过 50% 预算就开始告警,超过 80% 触发降级策略。
从制度上解决,比一个个补丁靠谱得多。
