FastAPI + SQLAlchemy 2.0 实战——异步 ORM、关联查询、事务管理
FastAPI + SQLAlchemy 2.0 实战——异步 ORM、关联查询、事务管理
适读人群:会基本 FastAPI、想用 SQLAlchemy 2.0 做持久层的 Python 后端工程师 | 阅读时长:约 17 分钟 | 核心价值:掌握异步 ORM 的完整使用方式,避开 N+1 查询和事务泄漏
那次慢查询告警
小梅在公司维护一个内部数据平台,用 FastAPI + SQLAlchemy 1.4 做的,跑了大半年没什么问题。有一天监控突然报警,某个接口响应时间从 200ms 飙到 8 秒,数据库连接数耗尽。
我帮她看了代码,问题很快找到了——她的关联查询触发了经典的 N+1 问题:查出 100 个用户,然后循环里每个用户再去查一次部门信息,总共发了 101 条 SQL。加上连接没有异步,阻塞了事件循环。
那次我帮她升级到 SQLAlchemy 2.0,改成异步模式,加上 selectinload 预加载,接口响应时间回到了 180ms。
这篇文章,就把那次经历整理成一套完整的异步 ORM 实战方案。
一、SQLAlchemy 2.0 vs 1.x:核心变化
SQLAlchemy 1.4 开始引入异步支持,2.0 正式稳定。对 Java 工程师来说,SQLAlchemy 2.0 有点像 JPA/Hibernate 的 Python 版本,但语法更简洁。
主要变化:
| 维度 | SQLAlchemy 1.x | SQLAlchemy 2.0 |
|---|---|---|
| 查询语法 | session.query(User).filter(...) | select(User).where(...) |
| 异步支持 | 1.4 引入,部分限制 | 完全稳定 |
| ORM 映射 | declarative_base() | DeclarativeBase 类 |
| 类型注解 | 不支持 | Mapped[T] 完整支持 |
| 2.0 风格执行 | session.execute(stmt) | 统一入口 |
二、模型定义:SQLAlchemy 2.0 风格
from datetime import datetime
from typing import Optional
from sqlalchemy import String, Integer, Float, ForeignKey, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
"""所有模型的基类"""
pass
class Department(Base):
__tablename__ = "departments"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(100), nullable=False, unique=True)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
# 反向关联:一个部门有多个用户
users: Mapped[list["User"]] = relationship("User", back_populates="department")
def __repr__(self) -> str:
return f"Department(id={self.id}, name={self.name!r})"
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(100), nullable=False)
email: Mapped[str] = mapped_column(String(200), nullable=False, unique=True)
score: Mapped[float] = mapped_column(Float, default=0.0)
department_id: Mapped[Optional[int]] = mapped_column(
Integer, ForeignKey("departments.id"), nullable=True
)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
# 正向关联:多个用户属于一个部门
department: Mapped[Optional[Department]] = relationship(
"Department", back_populates="users"
)
def __repr__(self) -> str:
return f"User(id={self.id}, name={self.name!r})"注意 Mapped[T] 注解——SQLAlchemy 2.0 的 ORM 映射和 Python 类型系统完全融合,IDE 可以正确推断类型,不再是运行时魔法。
三、异步引擎与会话配置
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
# 创建异步引擎(需要安装 asyncpg 驱动)
engine: AsyncEngine = create_async_engine(
"postgresql+asyncpg://user:password@localhost:5432/mydb",
echo=False, # 生产环境关掉 SQL 日志
pool_size=10, # 连接池大小
max_overflow=20, # 超出 pool_size 时允许的额外连接
pool_pre_ping=True, # 使用前检查连接是否存活
pool_recycle=3600, # 连接最长复用 1 小时,防止数据库断连
)
# 会话工厂
AsyncSessionLocal = async_sessionmaker(
engine,
expire_on_commit=False, # 提交后不自动过期,避免访问属性时重新查询
class_=AsyncSession,
)
# FastAPI 依赖函数
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise四、CRUD 操作完整示例
4.1 创建
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async def create_user(db: AsyncSession, name: str, email: str, dept_id: int | None = None) -> User:
user = User(name=name, email=email, department_id=dept_id)
db.add(user)
await db.flush() # 写入但不提交,让 user.id 有值
await db.refresh(user) # 刷新对象,获取数据库生成的字段
return user4.2 查询
async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def list_users(db: AsyncSession, page: int = 1, size: int = 20) -> list[User]:
stmt = (
select(User)
.order_by(User.created_at.desc())
.offset((page - 1) * size)
.limit(size)
)
result = await db.execute(stmt)
return list(result.scalars().all())4.3 更新
async def update_user_score(db: AsyncSession, user_id: int, score: float) -> User | None:
user = await get_user_by_id(db, user_id)
if user is None:
return None
user.score = score
await db.flush()
return user
# 批量更新(不加载对象,直接 UPDATE)
from sqlalchemy import update
async def bulk_reset_scores(db: AsyncSession) -> int:
stmt = update(User).values(score=0.0)
result = await db.execute(stmt)
return result.rowcount4.4 删除
from sqlalchemy import delete
async def delete_user(db: AsyncSession, user_id: int) -> bool:
stmt = delete(User).where(User.id == user_id)
result = await db.execute(stmt)
return result.rowcount > 0五、关联查询:消灭 N+1 问题
踩坑实录 1:懒加载在异步环境下直接报错
# 错误:异步环境下不能用懒加载
async def get_users_wrong(db: AsyncSession):
result = await db.execute(select(User))
users = result.scalars().all()
for user in users:
print(user.department.name) # MissingGreenlet 错误!
# 异步 SQLAlchemy 不支持在协程外触发懒加载现象:MissingGreenlet: greenlet_spawn has not been called。原因:异步 ORM 默认不允许懒加载,必须显式预加载。解法:用 selectinload 或 joinedload。
from sqlalchemy.orm import selectinload, joinedload
# selectinload:发两条 SQL,适合一对多(推荐)
async def get_users_with_dept(db: AsyncSession) -> list[User]:
stmt = (
select(User)
.options(selectinload(User.department))
.order_by(User.id)
)
result = await db.execute(stmt)
return list(result.scalars().all())
# joinedload:JOIN 查询,适合多对一(小结果集)
async def get_user_with_dept(db: AsyncSession, user_id: int) -> User | None:
stmt = (
select(User)
.options(joinedload(User.department))
.where(User.id == user_id)
)
result = await db.execute(stmt)
return result.unique().scalar_one_or_none()selectinload vs joinedload 怎么选?
| 场景 | 推荐 |
|---|---|
| 查询多个用户,每个用户有一个部门(多对一) | joinedload |
| 查询多个用户,每个用户有多个订单(一对多) | selectinload |
| 结果集大,JOIN 会导致行数膨胀 | selectinload |
六、事务管理
6.1 基础事务
# 在 get_db 依赖里,yield 后自动 commit,异常自动 rollback
# 这是最常见的场景,路由函数不需要手动管理事务
@app.post("/transfer")
async def transfer_score(
from_id: int,
to_id: int,
amount: float,
db: AsyncSession = Depends(get_db),
):
from_user = await get_user_by_id(db, from_id)
to_user = await get_user_by_id(db, to_id)
if from_user.score < amount:
raise HTTPException(status_code=400, detail="余额不足")
from_user.score -= amount
to_user.score += amount
# 依赖函数会在路由返回后自动 commit
return {"message": "转账成功"}6.2 嵌套事务(Savepoint)
async def create_order_with_items(
db: AsyncSession,
order_data: dict,
items_data: list[dict],
) -> Order:
async with db.begin_nested(): # 创建 SAVEPOINT
order = Order(**order_data)
db.add(order)
await db.flush()
for item_data in items_data:
item = OrderItem(order_id=order.id, **item_data)
db.add(item)
await db.flush()
return order踩坑实录 2:expire_on_commit 的坑
# 默认 expire_on_commit=True 的坑
async with AsyncSessionLocal() as session:
user = User(name="老张")
session.add(user)
await session.commit()
# 此时 user 对象被"过期",访问任何属性都会触发懒加载
print(user.name) # 在异步环境下报 MissingGreenlet!
# 解法1:创建会话时设置 expire_on_commit=False
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
# 解法2:commit 后 refresh
await session.commit()
await session.refresh(user) # 重新从数据库加载
print(user.name) # 正常七、完整可运行示例
#!/usr/bin/env python3
"""
FastAPI + SQLAlchemy 2.0 异步 ORM 完整示例
使用 SQLite(演示用,无需额外安装)
"""
import asyncio
from contextlib import asynccontextmanager
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException
from pydantic import BaseModel
from sqlalchemy import ForeignKey, String, Float, Integer, select
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, selectinload
# ===== 模型 =====
class Base(DeclarativeBase):
pass
class Department(Base):
__tablename__ = "departments"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True)
users: Mapped[list["User"]] = relationship("User", back_populates="department")
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(200), unique=True)
score: Mapped[float] = mapped_column(Float, default=0.0)
department_id: Mapped[Optional[int]] = mapped_column(
Integer, ForeignKey("departments.id"), nullable=True
)
department: Mapped[Optional[Department]] = relationship(
"Department", back_populates="users"
)
# ===== 数据库配置 =====
engine = create_async_engine("sqlite+aiosqlite:///./test.db", echo=True)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# ===== Pydantic 模型 =====
class UserCreate(BaseModel):
name: str
email: str
department_id: Optional[int] = None
class UserResponse(BaseModel):
id: int
name: str
email: str
score: float
department_name: Optional[str] = None
model_config = {"from_attributes": True}
# ===== 生命周期 =====
@asynccontextmanager
async def lifespan(app: FastAPI):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 插入测试数据
async with AsyncSessionLocal() as session:
dept = Department(name="研发部")
session.add(dept)
await session.commit()
await session.refresh(dept)
users = [
User(name="老张", email="laoz@example.com", score=95.0, department_id=dept.id),
User(name="小陈", email="xc@example.com", score=88.0, department_id=dept.id),
]
session.add_all(users)
await session.commit()
yield
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
app = FastAPI(lifespan=lifespan)
# ===== 路由 =====
@app.get("/users", response_model=list[UserResponse])
async def list_users(db: AsyncSession = Depends(get_db)):
stmt = select(User).options(selectinload(User.department))
result = await db.execute(stmt)
users = result.scalars().all()
return [
UserResponse(
id=u.id, name=u.name, email=u.email, score=u.score,
department_name=u.department.name if u.department else None,
)
for u in users
]
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(body: UserCreate, db: AsyncSession = Depends(get_db)):
user = User(**body.model_dump())
db.add(user)
await db.flush()
await db.refresh(user)
return UserResponse(id=user.id, name=user.name, email=user.email, score=user.score)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)八、踩坑实录 3:异步环境下 alembic 迁移
很多人在 FastAPI 项目里用 alembic 做数据库迁移,但异步引擎需要额外配置,直接用同步配置会报错。
# alembic/env.py 异步配置
import asyncio
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
def run_migrations_online():
connectable = create_async_engine(
"postgresql+asyncpg://user:password@localhost/mydb"
)
async def run():
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
asyncio.run(run())
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()总结
FastAPI + SQLAlchemy 2.0 的核心要点:
- 用
Mapped[T]+mapped_column定义模型,类型安全 - 用
async_sessionmaker+expire_on_commit=False配置会话 - 关联查询必须显式用
selectinload/joinedload,禁止懒加载 - 事务管理交给
get_db依赖,路由函数专注业务逻辑 - alembic 迁移需要用异步连接方式
