FastAPI 高性能实战——并发模型、后台任务、WebSocket 完整方案
FastAPI 高性能实战——并发模型、后台任务、WebSocket 完整方案
适读人群:想深入理解 FastAPI 性能机制、需要实现实时推送或后台任务的工程师 | 阅读时长:约 16 分钟 | 核心价值:搞懂异步并发原理,掌握后台任务和 WebSocket 的生产级写法
一次压测暴露的问题
老周在做一个 AI 写作平台,后端用 FastAPI,调用 OpenAI API 生成文章。上线之前做了压测,用 100 个并发用户同时请求,结果让他崩溃了——明明每次调用 OpenAI 只需要 3 秒,但 100 个并发下,后面的请求等了将近 5 分钟才拿到响应。
他跑来问我:FastAPI 不是异步的吗?为什么还这么慢?
我看了他的代码,找到了根本原因:他的路由函数用的是 def(同步),不是 async def(异步)。一个看似微小的区别,导致所有请求都在线程池里排队执行,完全没有发挥出异步的优势。
这篇文章,就从这个坑出发,把 FastAPI 的并发模型彻底讲清楚。
一、FastAPI 并发模型:async def vs def
1.1 核心原理
FastAPI 基于 Starlette,运行在 uvicorn 的事件循环上。理解并发模型,要明白两种路由函数的区别:
| 函数类型 | 执行方式 | 适用场景 |
|---|---|---|
async def | 在事件循环中直接运行 | I/O 密集:调用 API、数据库、Redis |
def | 在线程池中运行 | CPU 密集:图像处理、数学计算、同步库 |
# 正确:I/O 操作用 async def
@app.get("/users/{id}")
async def get_user(id: int):
user = await db.fetch_one(...) # 非阻塞 I/O
return user
# 正确:CPU 密集或同步库用 def(FastAPI 会放入线程池)
@app.get("/process-image")
def process_image(file_path: str):
result = heavy_image_processing(file_path) # 阻塞 CPU 操作
return result
# 错误:async def 里调用同步阻塞操作
@app.get("/bad-example")
async def bad_route():
import time
time.sleep(3) # 阻塞事件循环!所有其他请求被卡住
return {"done": True}踩坑实录 1:在 async def 里用同步库
# 错误:requests 是同步库,在 async def 里会阻塞事件循环
import requests
@app.get("/proxy")
async def proxy():
resp = requests.get("https://api.example.com/data") # 阻塞!
return resp.json()
# 正确:用异步 HTTP 客户端
import httpx
@app.get("/proxy")
async def proxy():
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.example.com/data")
return resp.json()现象:接口能正常返回,但高并发下所有请求都在等待。原因:同步 requests 阻塞了事件循环,导致整个进程"假死"。解法:换成 httpx(异步)。
1.2 并发 vs 并行:Python 的 GIL
从 Java 来的同学要注意:Python 有 GIL(全局解释器锁),多线程无法真正并行执行 CPU 密集任务。
| 场景 | Java | Python |
|---|---|---|
| I/O 密集(网络、数据库) | 多线程 | asyncio 协程(推荐) |
| CPU 密集(计算、加密) | 多线程 | multiprocessing 多进程 |
| 高并发 Web | Tomcat 多线程 | uvicorn 异步事件循环 |
FastAPI 的正确打开方式:I/O 全用 async,CPU 密集用进程池或独立微服务。
二、后台任务(BackgroundTasks)
2.1 基础用法
后台任务适合"不需要等响应"的操作:发送邮件、记录日志、异步推送通知。
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def send_welcome_email(email: str, name: str) -> None:
"""模拟发送邮件(同步函数)"""
import time
time.sleep(2) # 模拟邮件发送延迟
print(f"邮件已发送给 {name} <{email}>")
async def update_user_login_stats(user_id: int) -> None:
"""更新登录统计(异步函数)"""
await some_db.execute(
"UPDATE users SET last_login=NOW(), login_count=login_count+1 WHERE id=?",
user_id,
)
@app.post("/register")
async def register(
email: str,
name: str,
background_tasks: BackgroundTasks,
):
# 主流程:创建用户(同步返回)
user_id = await create_user_in_db(email, name)
# 后台任务:不阻塞响应
background_tasks.add_task(send_welcome_email, email, name)
background_tasks.add_task(update_user_login_stats, user_id)
return {"message": "注册成功", "user_id": user_id}2.2 后台任务 vs Celery:怎么选?
| 需求 | BackgroundTasks | Celery |
|---|---|---|
| 任务与请求在同一进程 | 是 | 否(独立 worker) |
| 任务失败重试 | 不支持 | 支持 |
| 任务队列/优先级 | 不支持 | 支持 |
| 定时任务 | 不支持 | 支持(beat) |
| 分布式 | 不支持 | 支持 |
| 适用场景 | 简单异步操作 | 生产级任务队列 |
老张建议:发邮件、记日志、更新缓存——用 BackgroundTasks;订单处理、报表生成、视频转码——用 Celery。
三、WebSocket:实时推送方案
3.1 基础 WebSocket 路由
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
try:
while True:
# 接收客户端消息
data = await websocket.receive_text()
# 回显
await websocket.send_text(f"[{client_id}] 收到: {data}")
except WebSocketDisconnect:
print(f"客户端 {client_id} 断开连接")3.2 连接管理器:广播消息
实际项目中需要管理多个连接,实现群发功能:
from typing import DefaultDict
from collections import defaultdict
import asyncio
class ConnectionManager:
"""WebSocket 连接管理器"""
def __init__(self):
# 用 room_id 分组:{room_id: [websocket1, websocket2, ...]}
self.rooms: DefaultDict[str, list[WebSocket]] = defaultdict(list)
self._lock = asyncio.Lock()
async def connect(self, websocket: WebSocket, room_id: str) -> None:
await websocket.accept()
async with self._lock:
self.rooms[room_id].append(websocket)
print(f"连接加入房间 {room_id},当前人数: {len(self.rooms[room_id])}")
async def disconnect(self, websocket: WebSocket, room_id: str) -> None:
async with self._lock:
if websocket in self.rooms[room_id]:
self.rooms[room_id].remove(websocket)
print(f"连接离开房间 {room_id},当前人数: {len(self.rooms[room_id])}")
async def broadcast(self, room_id: str, message: str) -> None:
"""向指定房间的所有连接广播消息"""
if room_id not in self.rooms:
return
dead_connections = []
for websocket in self.rooms[room_id]:
try:
await websocket.send_text(message)
except Exception:
dead_connections.append(websocket)
# 清除死连接
async with self._lock:
for ws in dead_connections:
if ws in self.rooms[room_id]:
self.rooms[room_id].remove(ws)
async def send_personal(self, websocket: WebSocket, message: str) -> None:
"""向单个连接发送消息"""
await websocket.send_text(message)
manager = ConnectionManager()四、完整可运行示例:实时聊天室
#!/usr/bin/env python3
"""
FastAPI WebSocket 聊天室 + 后台任务完整示例
运行后访问 http://localhost:8000/
"""
import asyncio
import json
from collections import defaultdict
from contextlib import asynccontextmanager
from typing import DefaultDict
import uvicorn
from fastapi import BackgroundTasks, FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
# ===== 连接管理器 =====
class ConnectionManager:
def __init__(self):
self.rooms: DefaultDict[str, list[WebSocket]] = defaultdict(list)
async def connect(self, websocket: WebSocket, room: str, user: str):
await websocket.accept()
self.rooms[room].append(websocket)
await self.broadcast(room, f"系统: {user} 加入了房间", exclude=websocket)
async def disconnect(self, websocket: WebSocket, room: str, user: str):
if websocket in self.rooms[room]:
self.rooms[room].remove(websocket)
await self.broadcast(room, f"系统: {user} 离开了房间")
async def broadcast(self, room: str, message: str, exclude: WebSocket | None = None):
for ws in list(self.rooms[room]):
if ws != exclude:
try:
await ws.send_text(message)
except Exception:
self.rooms[room].remove(ws)
manager = ConnectionManager()
# ===== 后台任务:记录消息日志 =====
def log_message(room: str, user: str, message: str):
"""后台任务:将消息写入日志(模拟)"""
print(f"[LOG] Room={room} User={user} Message={message}")
# ===== HTTP 接口 =====
class BroadcastRequest(BaseModel):
room: str
message: str
@asynccontextmanager
async def lifespan(app: FastAPI):
print("聊天室服务启动")
yield
print("聊天室服务关闭")
app = FastAPI(title="FastAPI 聊天室", lifespan=lifespan)
@app.post("/broadcast")
async def broadcast_message(
req: BroadcastRequest,
background_tasks: BackgroundTasks,
):
"""HTTP 接口广播消息到指定房间"""
await manager.broadcast(req.room, f"[系统广播] {req.message}")
background_tasks.add_task(log_message, req.room, "system", req.message)
return {"sent_to": req.room, "online": len(manager.rooms[req.room])}
@app.get("/rooms/{room}/count")
async def room_count(room: str):
return {"room": room, "online": len(manager.rooms[room])}
# ===== WebSocket 接口 =====
@app.websocket("/ws/{room}/{user}")
async def chat(websocket: WebSocket, room: str, user: str, background_tasks: BackgroundTasks):
await manager.connect(websocket, room, user)
try:
while True:
text = await websocket.receive_text()
msg = f"{user}: {text}"
await manager.broadcast(room, msg)
# 后台记录日志
log_message(room, user, text)
except WebSocketDisconnect:
await manager.disconnect(websocket, room, user)
# ===== 前端测试页面 =====
@app.get("/", response_class=HTMLResponse)
async def chat_page():
return """
<!DOCTYPE html>
<html>
<head><title>FastAPI 聊天室</title></head>
<body>
<h2>聊天室测试</h2>
<input id="room" placeholder="房间号" value="general"/>
<input id="user" placeholder="用户名" value="老张"/>
<button onclick="connect()">连接</button>
<div id="messages" style="border:1px solid #ccc;height:300px;overflow-y:auto;padding:10px;margin:10px 0"></div>
<input id="msg" placeholder="输入消息" style="width:80%"/>
<button onclick="send()">发送</button>
<script>
let ws;
function connect() {
const room = document.getElementById('room').value;
const user = document.getElementById('user').value;
ws = new WebSocket(`ws://localhost:8000/ws/${room}/${user}`);
ws.onmessage = e => {
const div = document.createElement('div');
div.textContent = e.data;
document.getElementById('messages').appendChild(div);
};
}
function send() {
const msg = document.getElementById('msg').value;
if (ws) ws.send(msg);
document.getElementById('msg').value = '';
}
</script>
</body>
</html>
"""
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)五、踩坑实录 2:WebSocket 心跳与断线重连
# 问题:网络波动导致 WebSocket 连接"假死"——连接没有断开,但消息发不出去
# 解决:服务端定期发送 ping,客户端没有响应则主动关闭
@app.websocket("/ws/{client_id}")
async def websocket_with_heartbeat(websocket: WebSocket, client_id: str):
await websocket.accept()
async def heartbeat():
"""每 30 秒发送一次 ping"""
while True:
try:
await asyncio.sleep(30)
await websocket.send_text("__ping__")
except Exception:
break
# 并发运行心跳和消息处理
heartbeat_task = asyncio.create_task(heartbeat())
try:
while True:
data = await websocket.receive_text()
if data == "__pong__":
continue # 忽略 pong 响应
await websocket.send_text(f"Echo: {data}")
except WebSocketDisconnect:
pass
finally:
heartbeat_task.cancel()六、踩坑实录 3:uvicorn worker 数量与 WebSocket 状态
# 多 worker 时 WebSocket 连接状态不能共享内存
uvicorn main:app --workers 4 # 4 个进程,各自维护自己的连接管理器
# 问题:worker1 里的 WebSocket 连接,worker2 广播时找不到
# 解决方案:用 Redis Pub/Sub 做跨进程广播
import redis.asyncio as aioredis
redis_client = aioredis.from_url("redis://localhost:6379")
async def broadcast_via_redis(channel: str, message: str):
await redis_client.publish(channel, message)
async def subscribe_and_forward(websocket: WebSocket, channel: str):
pubsub = redis_client.pubsub()
await pubsub.subscribe(channel)
async for msg in pubsub.listen():
if msg["type"] == "message":
await websocket.send_text(msg["data"])总结
FastAPI 高性能的关键:
- I/O 全用 async:数据库、HTTP 请求、Redis 都用异步库
- CPU 密集用 def 或进程池:不要用
async def包裹同步阻塞代码 - 后台任务:
BackgroundTasks适合简单异步操作,复杂的上 Celery - WebSocket:用
ConnectionManager管理连接,多 worker 用 Redis Pub/Sub
