FastAPI 深度实战——依赖注入、中间件、生命周期事件完整指南
FastAPI 深度实战——依赖注入、中间件、生命周期事件完整指南
适读人群:有基础 FastAPI 了解、想深入掌握框架核心机制的 Python 后端工程师 | 阅读时长:约 18 分钟 | 核心价值:彻底搞懂 FastAPI 三大核心机制,构建可维护的生产级 API
一次代码审查引发的深思
小林是我带过的一个 Java 转 Python 的同事,写了半年 FastAPI 之后,技术自我感觉还不错。有一天他的代码在 review 被叫停了——代码能跑,但代码里数据库连接的创建分散在每个路由函数里,日志记录也是手动在每个函数开头加的,连接池根本没有统一管理。
"你这代码跑起来没问题,但上了生产要出事的。"架构师说。
小林很困惑:FastAPI 不是说很好用吗?我按文档写的呀。
我看了他的代码,问题很明显:他只学了 FastAPI 的路由和响应这两块,完全没碰依赖注入、中间件和生命周期。这三个东西,是 FastAPI 从"能用"到"好用"的分水岭。
这篇文章,就是为了帮你彻底搞清楚这三个机制。
一、依赖注入(Depends):FastAPI 最强武器
1.1 依赖注入是什么
如果你写过 Spring Boot,依赖注入(DI)就是 @Autowired。FastAPI 的 Depends 本质上做的是同一件事——把"公共逻辑"从路由函数里抽出来,统一管理,按需注入。
没用依赖注入前:
@app.get("/users/{user_id}")
async def get_user(user_id: int, token: str = Header(...)):
# 每个路由都要验证 token
if not verify_token(token):
raise HTTPException(status_code=401)
db = get_database_connection()
user = db.query(User).filter(User.id == user_id).first()
return user
@app.get("/orders")
async def get_orders(token: str = Header(...)):
# 又要重复验证 token
if not verify_token(token):
raise HTTPException(status_code=401)
...用了依赖注入后:
from fastapi import Depends, HTTPException, Header
async def verify_token(authorization: str = Header(...)) -> str:
"""依赖函数:验证 token,返回用户信息"""
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid token format")
token = authorization[7:]
# 实际项目中这里做 JWT 解析
if token != "valid_token":
raise HTTPException(status_code=401, detail="Invalid token")
return token
@app.get("/users/{user_id}")
async def get_user(user_id: int, token: str = Depends(verify_token)):
... # 路由函数不再关心认证逻辑
@app.get("/orders")
async def get_orders(token: str = Depends(verify_token)):
...1.2 数据库会话依赖:生产级写法
这是最常用的依赖注入场景——管理数据库连接的生命周期:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from fastapi import Depends
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, pool_size=10, max_overflow=20)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def get_db() -> AsyncSession:
"""数据库会话依赖,自动处理提交/回滚/关闭"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# finally 中 async with 会自动关闭 session
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: str = Depends(verify_token),
):
...注意这里用了 yield——这让依赖函数变成一个生成器,yield 前的代码是"前置处理",yield 后的代码是"后置清理"。等价于 Java 的 try-finally 或 Spring 的 @Around AOP。
1.3 依赖嵌套与缓存
FastAPI 支持依赖嵌套,而且默认同一请求内的相同依赖只执行一次:
async def get_settings() -> dict:
"""每个请求只执行一次,结果被缓存"""
return {"max_items": 100, "debug": False}
async def get_db(settings: dict = Depends(get_settings)) -> AsyncSession:
"""依赖于 settings 的数据库依赖"""
...
@app.get("/items")
async def get_items(
db: AsyncSession = Depends(get_db),
settings: dict = Depends(get_settings), # 这里不会重复执行 get_settings
):
...如果你明确需要每次都重新执行,用 Depends(get_settings, use_cache=False)。
1.4 类作为依赖
复杂依赖逻辑可以用类封装:
from fastapi import Query
class PaginationParams:
"""分页参数依赖,复用于所有列表接口"""
def __init__(
self,
page: int = Query(default=1, ge=1, description="页码"),
page_size: int = Query(default=20, ge=1, le=100, description="每页条数"),
):
self.page = page
self.page_size = page_size
self.offset = (page - 1) * page_size
@app.get("/users")
async def list_users(pagination: PaginationParams = Depends(PaginationParams)):
print(f"第{pagination.page}页,每页{pagination.page_size}条")
...二、中间件(Middleware):请求的守门人
2.1 中间件 vs 依赖注入:选哪个?
| 场景 | 推荐方式 |
|---|---|
| 认证/权限(部分路由需要) | 依赖注入 |
| 全局日志记录 | 中间件 |
| 全局请求耗时统计 | 中间件 |
| CORS | 内置 CORSMiddleware |
| 请求 ID 追踪 | 中间件 |
| 响应压缩 | 内置 GZipMiddleware |
2.2 自定义中间件
import time
import uuid
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
class RequestLoggingMiddleware(BaseHTTPMiddleware):
"""请求日志中间件:记录请求ID、方法、路径、耗时、状态码"""
async def dispatch(self, request: Request, call_next) -> Response:
# 请求前:生成 request_id,记录开始时间
request_id = str(uuid.uuid4())[:8]
start_time = time.time()
# 将 request_id 存到请求状态,路由函数可以读取
request.state.request_id = request_id
# 调用下一个处理器(路由函数)
try:
response = await call_next(request)
except Exception as e:
print(f"[{request_id}] 请求异常: {e}")
raise
# 请求后:计算耗时,添加响应头
duration = (time.time() - start_time) * 1000
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = f"{duration:.2f}ms"
print(
f"[{request_id}] {request.method} {request.url.path} "
f"→ {response.status_code} ({duration:.1f}ms)"
)
return response
app.add_middleware(RequestLoggingMiddleware)2.3 内置中间件配置
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
# CORS(前后端分离必配)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://your-frontend.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 响应体超过 1000 字节时自动 gzip 压缩
app.add_middleware(GZipMiddleware, minimum_size=1000)
# 只允许指定域名的请求(防 SSRF)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["your-api.com", "*.your-api.com"]
)踩坑实录 1:中间件顺序很重要
中间件是洋葱模型,最后 add_middleware 的最先执行。如果你把 CORS 中间件放在认证中间件后面,OPTIONS 预检请求会先被认证中间件拦截,导致 CORS 失败。
# 正确顺序:CORS 最后添加(最先执行)
app.add_middleware(RequestLoggingMiddleware)
app.add_middleware(CORSMiddleware, ...) # 最后添加 = 最先被请求触达三、生命周期事件(Lifespan):资源的正确打开方式
3.1 为什么需要生命周期
数据库连接池、Redis 客户端、ML 模型——这些资源:
- 初始化代价高,不应该每次请求都重建
- 应用退出时需要优雅关闭,否则会丢数据或报错
FastAPI 0.93+ 推荐用 lifespan 上下文管理器替代旧的 @app.on_event。
3.2 lifespan 标准写法
from contextlib import asynccontextmanager
from fastapi import FastAPI
import redis.asyncio as aioredis
# 全局资源(在 lifespan 中初始化)
db_engine = None
redis_client = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理器"""
# ===== 启动阶段 =====
print("应用启动,初始化资源...")
global db_engine, redis_client
# 初始化数据库连接池
db_engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # 自动检测断开的连接
)
# 初始化 Redis 客户端
redis_client = aioredis.from_url(
"redis://localhost:6379",
encoding="utf-8",
decode_responses=True,
)
# 预热:测试连接是否正常
async with db_engine.connect() as conn:
await conn.execute(text("SELECT 1"))
await redis_client.ping()
print("资源初始化完成,开始接受请求")
yield # 应用运行中...
# ===== 关闭阶段 =====
print("应用关闭,释放资源...")
await db_engine.dispose()
await redis_client.close()
print("资源释放完成")
app = FastAPI(lifespan=lifespan)3.3 把资源挂到 app.state
在 lifespan 里把资源挂到 app.state,可以在路由函数中通过 request.app.state 访问,比全局变量更干净:
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.db_engine = create_async_engine(DATABASE_URL)
app.state.redis = aioredis.from_url("redis://localhost:6379")
yield
await app.state.db_engine.dispose()
await app.state.redis.close()
# 依赖函数中使用
async def get_redis(request: Request) -> aioredis.Redis:
return request.app.state.redis
@app.get("/cache/{key}")
async def get_cache(key: str, redis: aioredis.Redis = Depends(get_redis)):
value = await redis.get(key)
return {"key": key, "value": value}四、完整可运行示例:生产级 FastAPI 应用骨架
#!/usr/bin/env python3
"""
FastAPI 生产级骨架
演示:lifespan、依赖注入、中间件的完整集成
"""
import time
import uuid
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request
from pydantic import BaseModel
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
# ========== 数据模型 ==========
class Item(BaseModel):
id: int
name: str
price: float
class CreateItemRequest(BaseModel):
name: str
price: float
# ========== 模拟数据库 ==========
FAKE_DB: dict[int, Item] = {
1: Item(id=1, name="键盘", price=299.0),
2: Item(id=2, name="鼠标", price=199.0),
}
# ========== 中间件 ==========
class RequestTracingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
request_id = str(uuid.uuid4())[:8]
request.state.request_id = request_id
start = time.perf_counter()
response = await call_next(request)
elapsed = (time.perf_counter() - start) * 1000
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = f"{elapsed:.1f}ms"
print(f"[{request_id}] {request.method} {request.url.path} {response.status_code} {elapsed:.1f}ms")
return response
# ========== 生命周期 ==========
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动:初始化资源
print("启动:加载配置、初始化连接池...")
app.state.config = {"env": "production", "version": "1.0.0"}
print("启动完成")
yield
# 关闭:释放资源
print("关闭:释放连接池...")
print("关闭完成")
# ========== 应用实例 ==========
app = FastAPI(
title="老张的 FastAPI 骨架",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(RequestTracingMiddleware)
# ========== 依赖函数 ==========
async def verify_api_key(x_api_key: Annotated[str | None, Header()] = None) -> str:
"""API Key 验证依赖"""
if x_api_key != "secret-key-123":
raise HTTPException(status_code=401, detail="Invalid API Key")
return x_api_key
class Pagination:
def __init__(
self,
page: int = Query(default=1, ge=1),
size: int = Query(default=10, ge=1, le=100),
):
self.page = page
self.size = size
self.offset = (page - 1) * size
# ========== 路由 ==========
@app.get("/health")
async def health_check(request: Request):
return {
"status": "ok",
"config": request.app.state.config,
"request_id": request.state.request_id,
}
@app.get("/items", response_model=list[Item])
async def list_items(
pagination: Pagination = Depends(Pagination),
_: str = Depends(verify_api_key),
):
items = list(FAKE_DB.values())
return items[pagination.offset : pagination.offset + pagination.size]
@app.get("/items/{item_id}", response_model=Item)
async def get_item(
item_id: int,
_: str = Depends(verify_api_key),
):
if item_id not in FAKE_DB:
raise HTTPException(status_code=404, detail="Item not found")
return FAKE_DB[item_id]
@app.post("/items", response_model=Item, status_code=201)
async def create_item(
body: CreateItemRequest,
_: str = Depends(verify_api_key),
):
new_id = max(FAKE_DB.keys()) + 1
item = Item(id=new_id, name=body.name, price=body.price)
FAKE_DB[new_id] = item
return item
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)五、踩坑实录 2:依赖函数中的异步陷阱
# 错误:同步依赖函数里调用异步代码
def get_user_sync(token: str = Depends(verify_token)):
user = await db.fetch_user(token) # SyntaxError: 同步函数不能 await
return user
# 正确:依赖函数必须和它调用的代码保持异步一致性
async def get_user_async(token: str = Depends(verify_token)):
user = await db.fetch_user(token)
return userFastAPI 支持同步和异步依赖函数,但同步依赖会在线程池中执行(有额外开销),如果你的依赖函数需要做 I/O,一定要用 async def。
六、踩坑实录 3:中间件里读 request body 后路由收不到
# 危险写法:中间件消费了请求体
class BadMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
body = await request.body() # 读了一次
print(f"请求体: {body}")
response = await call_next(request) # 路由函数里 body 已经被消费,读到空
return response现象:中间件打印了请求体,但路由函数里 await request.json() 返回空。原因:HTTP 请求体是流,读一次就没了。解法:
class SafeLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# 方案1:只记录 headers 和路径,不读 body
print(f"{request.method} {request.url.path}")
# 方案2:读完后重新封装流
body = await request.body()
# 重新设置 body 流,让后续处理器可以再次读取
async def receive():
return {"type": "http.request", "body": body}
request._receive = receive
return await call_next(request)总结
FastAPI 的三大核心机制:
- 依赖注入(Depends):抽取公共逻辑,支持嵌套、缓存、生命周期管理
- 中间件(Middleware):全局横切关注点,注意洋葱顺序和 body 消费问题
- 生命周期(Lifespan):统一管理应用级资源,替代
@app.on_event
这三个机制组合使用,才能写出结构清晰、可测试、可维护的生产级 FastAPI 应用。
