高并发 AI 服务设计——万级 QPS 的架构挑战
高并发 AI 服务设计——万级 QPS 的架构挑战
适读人群:做 AI 服务后端的工程师 | 阅读时长:约16分钟 | 核心价值:从一次真实的流量峰值事故出发,完整的高并发 AI 架构设计方案
去年双十一前,我们的 AI 营销文案生成服务迎来了一次预期之外的流量高峰。
起因是一个大客户把我们的服务集成进了他们的活动页面,流量在两小时内从平时的 50 QPS 飙升到接近 800 QPS,然后系统开始掉请求、超时、返回 500。
我盯着监控大屏,看着错误率从 0 涨到 15%,再到 38%,冷汗直流。
那两个小时里,我做了几个紧急操作止住了血,但根本问题没有解决。事故结束之后,我花了一个月重新设计了整个高并发架构。这篇文章是那次设计的完整记录。
事故复盘:哪里垮了
先说当时垮在哪里,因为这直接决定了后续的设计方向。
原始架构(简化):
客户端 → [API 网关] → [AI 服务实例 x3] → [OpenAI API]
|
[PostgreSQL]
[Redis]在 800 QPS 的压力下,按顺序垮掉的:
AI 服务实例被打满:每个请求要等 OpenAI API 返回(平均 2-3 秒),3个实例撑不住 800 并发的积压。
连接池耗尽:每个实例的 HTTP 连接池设了最大 50 个连接,800 QPS 下连接池立刻打满,新请求拿不到连接直接失败。
数据库连接池也耗尽:每次 AI 调用前后要读写数据库,DB 连接池也打满了。
没有降级策略:打满之后系统没有任何限流或降级,直接硬扛,越扛越挂。
重新设计后的架构
[负载均衡]
|
[API 网关 + 限流]
/ \
[非流式接口] [流式接口]
| |
[请求队列 Redis] [长连接池]
/ | \
[Worker Worker Worker] (可水平扩展)
| |
[模型路由层] [缓存层]
/ \ |
[OpenAI] [备用模型] [语义缓存]
|
[数据库连接池]下面逐个讲关键组件的设计。
请求队列:把同步变异步
最关键的改造是引入请求队列。AI 调用天然是耗时操作(2-10秒),不应该让 HTTP 连接在那里阻塞等待。
import asyncio
import redis.asyncio as aioredis
import json
import uuid
from datetime import datetime
class AIRequestQueue:
"""基于 Redis 的 AI 请求队列"""
def __init__(self, redis_url: str, queue_name: str = "ai_requests"):
self.redis = aioredis.from_url(redis_url)
self.queue_name = queue_name
self.result_prefix = "ai_result:"
self.result_ttl = 300 # 结果保留5分钟
async def enqueue(self, prompt: str, params: dict,
user_id: str, priority: int = 0) -> str:
"""
把请求放入队列,立即返回 request_id
priority: 0=普通, 1=高优先级(付费用户等)
"""
request_id = str(uuid.uuid4())
payload = {
"request_id": request_id,
"prompt": prompt,
"params": params,
"user_id": user_id,
"enqueued_at": datetime.utcnow().isoformat(),
"priority": priority
}
queue_key = f"{self.queue_name}:p{priority}" # 按优先级分队列
await self.redis.lpush(queue_key, json.dumps(payload))
# 初始化结果占位
await self.redis.setex(
f"{self.result_prefix}{request_id}",
self.result_ttl,
json.dumps({"status": "queued", "request_id": request_id})
)
return request_id
async def get_result(self, request_id: str) -> dict:
"""轮询获取结果"""
result_raw = await self.redis.get(f"{self.result_prefix}{request_id}")
if not result_raw:
return {"status": "not_found"}
return json.loads(result_raw)
async def set_result(self, request_id: str, result: dict):
"""Worker 写入结果"""
result["completed_at"] = datetime.utcnow().isoformat()
await self.redis.setex(
f"{self.result_prefix}{request_id}",
self.result_ttl,
json.dumps(result)
)
# API 层:接收请求,立刻返回
async def submit_generation_request(
prompt: str,
params: dict,
user_id: str
) -> dict:
# 检查限流
allowed, wait_time = await rate_limiter.check(user_id)
if not allowed:
return {
"error": "rate_limited",
"retry_after": wait_time,
"message": "请求频率过高,请稍后重试"
}
# 语义缓存检查(下面会讲)
cached = await semantic_cache.get(prompt)
if cached:
return {"status": "done", "result": cached, "from_cache": True}
# 放入队列
priority = 1 if await is_premium_user(user_id) else 0
request_id = await request_queue.enqueue(prompt, params, user_id, priority)
return {
"status": "queued",
"request_id": request_id,
"poll_url": f"/api/v1/results/{request_id}"
}客户端用轮询或 WebSocket 来获取结果:
# 客户端轮询示例(前端 JavaScript)
async function waitForResult(requestId, maxWaitSeconds = 30) {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitSeconds * 1000) {
const response = await fetch(`/api/v1/results/${requestId}`);
const data = await response.json();
if (data.status === 'done') {
return data.result;
} else if (data.status === 'failed') {
throw new Error(data.error);
}
// 指数退避:先等0.5秒,然后1秒,然后2秒...
const elapsed = (Date.now() - startTime) / 1000;
const waitMs = Math.min(500 * Math.pow(2, Math.floor(elapsed / 2)), 3000);
await new Promise(resolve => setTimeout(resolve, waitMs));
}
throw new Error('Request timed out');
}Worker:可水平扩展的推理单元
class AIWorker:
"""AI 推理 Worker,多实例水平扩展"""
def __init__(
self,
worker_id: str,
queue: AIRequestQueue,
model_router: 'ModelRouter',
concurrency: int = 10 # 每个 Worker 的并发数
):
self.worker_id = worker_id
self.queue = queue
self.model_router = model_router
self.semaphore = asyncio.Semaphore(concurrency)
async def run(self):
"""Worker 主循环"""
print(f"Worker {self.worker_id} started")
while True:
# 按优先级取任务:先取高优先级队列
for priority in [1, 0]:
queue_key = f"{self.queue.queue_name}:p{priority}"
item = await self.queue.redis.brpop(queue_key, timeout=1)
if item:
_, payload_raw = item
payload = json.loads(payload_raw)
# 用 semaphore 控制并发
asyncio.create_task(
self._process_with_semaphore(payload)
)
break
async def _process_with_semaphore(self, payload: dict):
async with self.semaphore:
await self._process_request(payload)
async def _process_request(self, payload: dict):
request_id = payload['request_id']
try:
# 记录开始时间
start_time = datetime.utcnow()
# 通过模型路由层调用 AI
result = await self.model_router.generate(
prompt=payload['prompt'],
params=payload.get('params', {})
)
elapsed = (datetime.utcnow() - start_time).total_seconds()
# 写入结果
await self.queue.set_result(request_id, {
"status": "done",
"result": result,
"latency_seconds": elapsed,
"worker_id": self.worker_id
})
except Exception as e:
await self.queue.set_result(request_id, {
"status": "failed",
"error": str(e),
"worker_id": self.worker_id
})模型路由:故障转移 + Batch 推理
class ModelRouter:
"""智能模型路由:处理故障转移、负载均衡、批处理"""
def __init__(self, primary_client, backup_client, batch_size: int = 5):
self.primary = primary_client # 主模型(如 GPT-4o)
self.backup = backup_client # 备用模型(如 GPT-4o-mini 或自部署模型)
self.primary_failures = 0
self.failure_threshold = 5 # 连续失败5次切到备用
self.batch_queue = asyncio.Queue()
self.batch_size = batch_size
async def generate(self, prompt: str, params: dict) -> str:
"""单次生成,带自动故障转移"""
client = self.primary if self.primary_failures < self.failure_threshold \
else self.backup
try:
result = await asyncio.wait_for(
client.chat(prompt=prompt, **params),
timeout=30.0 # 30秒超时
)
# 成功了,重置失败计数
if client == self.primary:
self.primary_failures = 0
return result
except asyncio.TimeoutError:
self.primary_failures += 1
if client == self.primary:
# 主模型超时,尝试备用模型
return await self.backup.chat(prompt=prompt, **params)
raise
except Exception as e:
self.primary_failures += 1
raise
async def batch_generate(self, prompts: list[str], params: dict) -> list[str]:
"""
批量生成:把多个请求合并成一次 API 调用
适用于支持 batch API 的场景(如 OpenAI Batch API)
"""
# OpenAI Batch API 适合离线批处理,不适合实时
# 对于实时场景,用并发调用模拟"批处理"
tasks = [self.generate(p, params) for p in prompts]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
r if not isinstance(r, Exception) else f"ERROR: {r}"
for r in results
]语义缓存:AI 应用专属的缓存层
普通的 key-value 缓存没法用在 AI 应用里,因为同样意思的问题会有无数种表达方式。语义缓存解决这个问题:
class SemanticCache:
"""基于向量相似度的语义缓存"""
def __init__(self, qdrant_client, embedding_service,
similarity_threshold: float = 0.95):
self.qdrant = qdrant_client
self.embedding = embedding_service
self.threshold = similarity_threshold
self.collection = "semantic_cache"
async def get(self, prompt: str) -> str | None:
"""查缓存,返回语义相似的已缓存结果"""
embedding = await self.embedding.embed(prompt)
results = await self.qdrant.search(
collection_name=self.collection,
query_vector=embedding,
limit=1,
score_threshold=self.threshold
)
if results:
return results[0].payload.get('response')
return None
async def set(self, prompt: str, response: str, ttl_hours: int = 24):
"""写入缓存"""
embedding = await self.embedding.embed(prompt)
cache_id = str(uuid.uuid4())
await self.qdrant.upsert(
collection_name=self.collection,
points=[{
"id": cache_id,
"vector": embedding,
"payload": {
"prompt": prompt[:200], # 只存摘要
"response": response,
"cached_at": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() +
timedelta(hours=ttl_hours)).isoformat()
}
}]
)语义缓存对于一些固定场景效果非常好,比如"生成产品介绍"类的请求,不同用户可能问的是相同的产品,直接走缓存,成本下降明显。我们的命中率在 25% 左右,这意味着每4个请求有1个不需要调用模型 API。
限流设计:保护自己,也保护用户
import time
class RateLimiter:
"""令牌桶限流,支持用户级和全局级"""
def __init__(self, redis):
self.redis = redis
async def check(self, user_id: str) -> tuple[bool, float]:
"""
返回 (是否允许, 需要等待的秒数)
"""
now = time.time()
# 用户级限流:每分钟最多10次
user_key = f"rate_limit:user:{user_id}"
user_count = await self.redis.incr(user_key)
if user_count == 1:
await self.redis.expire(user_key, 60)
if user_count > 10:
ttl = await self.redis.ttl(user_key)
return False, ttl
# 全局限流:系统整体每秒最多500次(保护下游 AI API)
global_key = f"rate_limit:global:{int(now)}"
global_count = await self.redis.incr(global_key)
if global_count == 1:
await self.redis.expire(global_key, 2)
if global_count > 500:
return False, 1.0
return True, 0降级策略
流量高峰时,降级是保证系统可用的最后一道防线:
class DegradationManager:
"""分级降级策略"""
def __init__(self, redis):
self.redis = redis
async def get_current_strategy(self) -> str:
"""获取当前降级级别"""
strategy = await self.redis.get("degradation_strategy")
return strategy.decode() if strategy else "normal"
async def should_degrade(self, user_id: str, feature: str) -> bool:
"""判断当前请求是否需要降级"""
strategy = await self.get_current_strategy()
if strategy == "normal":
return False
elif strategy == "level1":
# 一级降级:把低优先级用户的请求延后处理
is_premium = await is_premium_user(user_id)
return not is_premium
elif strategy == "level2":
# 二级降级:非核心功能全部关闭
core_features = {"generate_main_copy", "answer_question"}
return feature not in core_features
elif strategy == "level3":
# 三级降级:只保留最核心的功能,其余返回静态兜底
critical_features = {"answer_question"}
return feature not in critical_features
return False
async def get_fallback_response(self, feature: str) -> str:
"""降级时的兜底响应"""
fallbacks = {
"generate_marketing_copy": "系统繁忙,您可以稍后重试,或使用我们的模板库",
"generate_product_desc": "系统繁忙,请稍后再试",
"default": "服务繁忙,请稍后重试"
}
return fallbacks.get(feature, fallbacks["default"])架构全貌(ASCII 图)
客户端请求
|
+--------+---------+
| API 网关 + 限流 |
| (Nginx + Lua) |
+--------+---------+
|
+--------+---------+
| API 服务层 |
| (FastAPI x3) |
+---+-------+------+
| |
+---------+ +---------+
| |
+-------+--------+ +----------+--------+
| 语义缓存检查 | | 请求队列入队 |
| (Qdrant) | | (Redis Lists) |
+-------+--------+ +----------+--------+
| 命中 |
+---> 直接返回 |
+---------+----------+
| Worker 池 |
| (可水平扩展) |
| Worker1 Worker2 |
| Worker3 Worker4 |
+---------+----------+
|
+---------+----------+
| 模型路由层 |
| |
+----+----+ +-----+----+
| 主模型 | | 备用模型 |
| GPT-4o | | GPT-4o-m |
+---------+ +----------+改造后的效果(同样 800 QPS 压力):
指标 改造前 改造后
--------------------------------------------
错误率 38% <0.5%
P99 延迟(队列等待+推理)N/A 8.2s(用户感知,异步)
P50 延迟(首次 API 响应)800ms 45ms(放入队列即返回)
系统吞吐上限 约50 QPS 500+ QPS(Worker可扩展)
CPU 使用率(峰值) 98% 65%高并发 AI 服务的核心是:接受 AI 推理慢这个现实,用异步队列解耦请求的接收和处理,用水平扩展的 Worker 提升吞吐,用缓存和降级保护系统稳定性。
这些都不是 AI 独有的技术,但把它们组合起来用在 AI 场景里,需要根据 AI 推理的特点做专门的调优。
