FastAPI 认证鉴权实战——JWT、OAuth2 Password Flow、权限依赖完整实现
2026/4/30大约 6 分钟
FastAPI 认证鉴权实战——JWT、OAuth2 Password Flow、权限依赖完整实现
适读人群:需要在 FastAPI 项目中实现完整认证鉴权体系的后端工程师 | 阅读时长:约 16 分钟 | 核心价值:从 JWT 签发到权限控制,一套可直接用于生产的完整方案
从一次安全漏洞说起
老刘是个做了八年 Java 的工程师,去年转型负责一个 SaaS 产品的 Python 后端。有天下午,公司安全部门突然找上门,说他们的 API 被扫出了越权漏洞——普通用户只要修改请求里的 user_id 参数,就能查到其他用户的订单数据。
老刘一看自己的代码,冷汗都下来了。他在每个接口里把 user_id 做成了查询参数,验证 token 有效就放行,根本没有校验"当前用户是否有权访问这条数据"。
这个问题,在 Java Spring Security 里有一整套框架来处理,大多数人不需要自己想这些。但 FastAPI 相对轻量,认证鉴权的逻辑需要工程师自己用依赖注入来组装。
这篇文章,我们从零到一,把 JWT 生成、OAuth2 Flow、权限控制全链路走一遍。
一、JWT 基础:生成与验证
1.1 安装依赖
pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt] python-multipart1.2 JWT 工具函数
from datetime import datetime, timedelta, timezone
from typing import Any
from jose import JWTError, jwt
from passlib.context import CryptContext
# 配置(生产环境从环境变量读取)
SECRET_KEY = "your-super-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
"""密码哈希"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str:
"""创建 Access Token"""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict[str, Any]) -> str:
"""创建 Refresh Token"""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> dict[str, Any]:
"""解码并验证 Token,失败抛出异常"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError as e:
raise ValueError(f"Token 无效: {e}")二、OAuth2 Password Flow:登录接口
FastAPI 内置了 OAuth2PasswordBearer 和 OAuth2PasswordRequestForm,专门处理 OAuth2 密码模式。
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
router = APIRouter(prefix="/auth", tags=["认证"])
# OAuth2 Bearer Token 提取器
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# 用户数据(实际项目从数据库查)
FAKE_USERS_DB = {
"laoz@example.com": {
"id": 1,
"email": "laoz@example.com",
"name": "老张",
"hashed_password": hash_password("password123"),
"roles": ["admin", "user"],
"is_active": True,
},
"xc@example.com": {
"id": 2,
"email": "xc@example.com",
"name": "小陈",
"hashed_password": hash_password("mypassword"),
"roles": ["user"],
"is_active": True,
},
}
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int = ACCESS_TOKEN_EXPIRE_MINUTES * 60
class UserInfo(BaseModel):
id: int
email: str
name: str
roles: list[str]
@router.post("/login", response_model=TokenResponse)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""
OAuth2 密码模式登录
form_data.username 是邮箱,form_data.password 是密码
"""
user = FAKE_USERS_DB.get(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="邮箱或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
if not user["is_active"]:
raise HTTPException(status_code=400, detail="账号已禁用")
token_data = {"sub": str(user["id"]), "email": user["email"], "roles": user["roles"]}
return TokenResponse(
access_token=create_access_token(token_data),
refresh_token=create_refresh_token({"sub": str(user["id"])}),
)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(refresh_token: str):
"""用 Refresh Token 换新的 Access Token"""
try:
payload = decode_token(refresh_token)
if payload.get("type") != "refresh":
raise ValueError("不是 Refresh Token")
user_id = payload.get("sub")
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
# 从数据库查用户(这里用假数据)
user = next((u for u in FAKE_USERS_DB.values() if str(u["id"]) == user_id), None)
if not user:
raise HTTPException(status_code=401, detail="用户不存在")
token_data = {"sub": str(user["id"]), "email": user["email"], "roles": user["roles"]}
return TokenResponse(
access_token=create_access_token(token_data),
refresh_token=create_refresh_token({"sub": str(user["id"])}),
)三、权限依赖:当前用户 + 角色控制
from fastapi import Security
from fastapi.security import SecurityScopes
async def get_current_user(token: str = Depends(oauth2_scheme)) -> UserInfo:
"""
核心依赖:从 Token 解析当前用户
所有需要认证的接口都依赖这个函数
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="认证失败,请重新登录",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(token)
if payload.get("type") != "access":
raise credentials_exception
user_id: str = payload.get("sub", "")
if not user_id:
raise credentials_exception
except ValueError:
raise credentials_exception
# 从数据库查用户(这里用假数据)
user = next(
(u for u in FAKE_USERS_DB.values() if str(u["id"]) == user_id), None
)
if user is None:
raise credentials_exception
return UserInfo(
id=user["id"],
email=user["email"],
name=user["name"],
roles=user["roles"],
)
def require_roles(*required_roles: str):
"""
角色控制工厂函数:生成角色验证依赖
用法:current_user: UserInfo = Depends(require_roles("admin"))
"""
async def role_checker(
current_user: UserInfo = Depends(get_current_user),
) -> UserInfo:
for role in required_roles:
if role in current_user.roles:
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"需要角色: {list(required_roles)},当前用户角色: {current_user.roles}",
)
return role_checker四、完整可运行示例
#!/usr/bin/env python3
"""
FastAPI JWT + OAuth2 认证鉴权完整示例
运行后访问 http://localhost:8000/docs 测试
"""
from datetime import datetime, timedelta, timezone
from typing import Any
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# ===== 配置 =====
SECRET_KEY = "dev-secret-key-do-not-use-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# ===== 模拟数据库 =====
USERS_DB = {
"admin@test.com": {
"id": 1, "email": "admin@test.com", "name": "管理员",
"hashed_password": pwd_context.hash("admin123"),
"roles": ["admin", "user"], "is_active": True,
},
"user@test.com": {
"id": 2, "email": "user@test.com", "name": "普通用户",
"hashed_password": pwd_context.hash("user123"),
"roles": ["user"], "is_active": True,
},
}
# ===== 工具函数 =====
def create_token(data: dict[str, Any]) -> str:
payload = {**data, "exp": datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# ===== Pydantic 模型 =====
class TokenOut(BaseModel):
access_token: str
token_type: str = "bearer"
class UserOut(BaseModel):
id: int
email: str
name: str
roles: list[str]
# ===== 依赖函数 =====
async def get_current_user(token: str = Depends(oauth2_scheme)) -> UserOut:
exc = HTTPException(status_code=401, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub", "")
except JWTError:
raise exc
user = next((u for u in USERS_DB.values() if str(u["id"]) == user_id), None)
if not user:
raise exc
return UserOut(**{k: v for k, v in user.items() if k != "hashed_password"})
def require_role(role: str):
async def checker(current_user: UserOut = Depends(get_current_user)) -> UserOut:
if role not in current_user.roles:
raise HTTPException(status_code=403, detail=f"需要 {role} 权限")
return current_user
return checker
# ===== 应用 =====
app = FastAPI(title="FastAPI 认证鉴权示例")
@app.post("/auth/login", response_model=TokenOut)
async def login(form: OAuth2PasswordRequestForm = Depends()):
user = USERS_DB.get(form.username)
if not user or not pwd_context.verify(form.password, user["hashed_password"]):
raise HTTPException(status_code=401, detail="邮箱或密码错误")
token = create_token({"sub": str(user["id"]), "roles": user["roles"]})
return TokenOut(access_token=token)
@app.get("/me", response_model=UserOut)
async def get_me(current_user: UserOut = Depends(get_current_user)):
"""所有已登录用户可访问"""
return current_user
@app.get("/admin/dashboard")
async def admin_dashboard(current_user: UserOut = Depends(require_role("admin"))):
"""仅管理员可访问"""
return {"message": f"欢迎,{current_user.name}!这是管理员面板。"}
@app.get("/users/{user_id}/orders")
async def get_user_orders(
user_id: int,
current_user: UserOut = Depends(get_current_user),
):
"""用户只能查自己的订单,管理员可查所有"""
if current_user.id != user_id and "admin" not in current_user.roles:
raise HTTPException(status_code=403, detail="无权访问其他用户的订单")
return {"user_id": user_id, "orders": ["order-001", "order-002"]}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)五、踩坑实录 1:Token 过期时间用错时区
# 错误:用不带时区的 datetime,和 JWT 库产生时区冲突
from datetime import datetime
expire = datetime.utcnow() + timedelta(minutes=30) # 已过时的写法
# 正确:明确使用 UTC 时区
from datetime import datetime, timezone
expire = datetime.now(timezone.utc) + timedelta(minutes=30)现象:Token 明明还没到期,但 jwt.decode 报 ExpiredSignatureError。原因:datetime.utcnow() 返回的是 naive datetime(无时区信息),jose 库在比较时会出现时区错乱。
六、踩坑实录 2:不要在 Token 里存敏感信息
# 错误:把密码哈希或手机号放进 Token
token_data = {
"sub": user_id,
"phone": "138****1234", # 不要放!Token 只是 Base64,任何人都能解码
"hashed_password": "...", # 绝对不要放!
}
# 正确:Token 只存 user_id(sub)和角色,其他信息每次从数据库查
token_data = {
"sub": str(user.id),
"roles": user.roles,
}七、踩坑实录 3:OAuth2PasswordRequestForm 的字段名陷阱
# 错误:以为 form_data.email 就能取到邮箱
@app.post("/auth/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
email = form_data.email # AttributeError!字段名是 username,不是 email
# 正确:OAuth2 规范要求字段名是 username(哪怕你用的是邮箱)
email = form_data.username # 正确
password = form_data.password总结
FastAPI 认证鉴权的核心架构:
create_access_token+create_refresh_token:JWT 双令牌体系OAuth2PasswordRequestForm:标准登录接口,与 Swagger UI 无缝集成get_current_user:核心依赖,所有认证接口的基础require_role():工厂模式生成角色控制依赖- 资源级权限:在路由函数里对比
current_user.id和资源 owner
