Python 微服务实战——Flask/FastAPI 微服务架构的完整落地方案
Python 微服务实战——Flask/FastAPI 微服务架构的完整落地方案
适读人群:从单体应用向微服务迁移的 Python 工程师、主导过或即将主导微服务拆分的技术负责人 | 阅读时长:约16分钟 | 核心价值:不讲理论,讲我们真实落地过的完整方案,包括踩过的坑
2022年底,我们把一个跑了3年的 Python 单体应用拆成了微服务。整个过程历时大概5个月,中间出了不少问题,也学到了不少东西。
拆之前,我在网上看了很多微服务最佳实践的文章,感觉都说得很好,但真正做起来,发现那些文章没写的细节才是最难的部分。
今天按照我们真实的实施路径把这个过程写出来。
为什么要拆,什么时候拆
先说一个我的明确立场:不要为了微服务而微服务。
我们当时拆分的原因是:
- 单体代码库 42 万行,3个团队共同维护,代码冲突频繁,发布互相阻塞
- 不同模块的性能要求差异极大(实时接口 vs 批处理),混在一起无法独立扩展
- 技术债严重,需要借拆分的机会重写几个核心模块
如果你只有一个团队,代码量在10万行以内,没有明显的独立扩展需求——先别拆,微服务的运维成本比你想象的大得多。
拆分原则:按业务能力,不按技术层
常见的错误拆法是按技术层拆:
- 数据访问层服务
- 业务逻辑层服务
- API 网关服务
这样拆出来,每个功能改动都要跨越多个服务,服务间调用极其频繁,网络开销比单体还大,而且根本谈不上独立部署。
正确的拆法是按业务能力(Business Capability)拆:
- 用户服务(User Service)
- 订单服务(Order Service)
- 商品服务(Product Service)
- 通知服务(Notification Service)
每个服务对应一个明确的业务领域,自己管自己的数据库,独立部署。
技术选型:Flask vs FastAPI
我的立场是:新项目用 FastAPI,旧项目迁移视情况而定(这里破例用"视情况"是因为迁移成本确实差异很大)。
具体比较:
| 维度 | Flask | FastAPI |
|---|---|---|
| 性能 | 同步,较低 | 异步,高 |
| 类型安全 | 弱(需要手动) | 强(Pydantic 内置) |
| 文档生成 | 需要插件 | 自动生成 OpenAPI |
| 学习曲线 | 低 | 中 |
| 生态成熟度 | 非常成熟 | 较新但快速增长 |
| 适合场景 | CRUD、传统 web | API服务、AI应用 |
我们的选择:现有的 Flask 服务迁移时保持 Flask,新写的服务全部用 FastAPI。
服务基础模板
每个微服务都应该有一致的结构,不然多服务维护起来是灾难:
my-service/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app 入口
│ ├── api/
│ │ ├── v1/
│ │ │ ├── routes.py
│ │ │ └── schemas.py
│ ├── core/
│ │ ├── config.py # 配置管理
│ │ ├── database.py # 数据库连接
│ │ └── dependencies.py # FastAPI 依赖注入
│ ├── models/ # ORM 模型
│ ├── services/ # 业务逻辑
│ └── repositories/ # 数据访问层
├── tests/
├── Dockerfile
├── docker-compose.yml
├── gunicorn.conf.py
├── pyproject.toml
└── .env.example核心文件示例:
# app/core/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# 服务基本信息
SERVICE_NAME: str = "user-service"
SERVICE_VERSION: str = "1.0.0"
DEBUG: bool = False
# 数据库
DATABASE_URL: str
DB_POOL_SIZE: int = 20
DB_MAX_OVERFLOW: int = 10
# Redis
REDIS_URL: str
# 服务发现(其他服务的地址)
ORDER_SERVICE_URL: str
PRODUCT_SERVICE_URL: str
# 认证
JWT_SECRET_KEY: str
JWT_ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
class Config:
env_file = ".env"
@lru_cache()
def get_settings() -> Settings:
return Settings()# app/main.py
from fastapi import FastAPI
from contextlib import asynccontextmanager
from app.core.config import get_settings
from app.api.v1 import routes
from app.core.database import engine, Base
settings = get_settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print(f"{settings.SERVICE_NAME} started")
yield
# 关闭时
await engine.dispose()
print(f"{settings.SERVICE_NAME} stopped")
app = FastAPI(
title=settings.SERVICE_NAME,
version=settings.SERVICE_VERSION,
lifespan=lifespan,
docs_url="/docs" if settings.DEBUG else None, # 生产环境关掉文档
)
app.include_router(routes.router, prefix="/api/v1")
@app.get("/health")
async def health():
return {"status": "ok", "service": settings.SERVICE_NAME}服务间通信方案
这是微服务最复杂的部分,我的方案是同步调用 + 异步事件组合使用:
同步调用(HTTP/gRPC): 用于需要立即得到结果的场景
# app/core/http_client.py
import httpx
from typing import Optional, Any
import logging
logger = logging.getLogger(__name__)
class ServiceClient:
"""微服务间 HTTP 调用客户端,带重试和熔断"""
def __init__(self, base_url: str, service_name: str, timeout: float = 10.0):
self.base_url = base_url.rstrip('/')
self.service_name = service_name
self.client = httpx.AsyncClient(
base_url=self.base_url,
timeout=httpx.Timeout(connect=3.0, read=timeout, write=5.0, pool=3.0),
headers={"X-Source-Service": "user-service"},
)
async def get(self, path: str, **kwargs) -> Optional[dict]:
try:
response = await self.client.get(path, **kwargs)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
logger.error(f"Timeout calling {self.service_name}{path}")
return None
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error from {self.service_name}{path}: {e.response.status_code}")
return None
except Exception as e:
logger.error(f"Unexpected error calling {self.service_name}: {e}")
return None
async def close(self):
await self.client.aclose()
# 使用示例
order_client = ServiceClient(
base_url=settings.ORDER_SERVICE_URL,
service_name="order-service",
)
@app.get("/api/v1/users/{user_id}/orders")
async def get_user_orders(user_id: int):
orders = await order_client.get(f"/api/v1/orders?user_id={user_id}")
if orders is None:
# 服务降级:返回空列表而不是报错
return {"orders": [], "degraded": True}
return orders异步事件(消息队列): 用于不需要即时结果、可以接受延迟的场景
# app/events/publisher.py
import json
from datetime import datetime
import redis.asyncio as redis
class EventPublisher:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def publish(self, event_type: str, payload: dict):
event = {
"event_type": event_type,
"payload": payload,
"timestamp": datetime.utcnow().isoformat(),
"source_service": "user-service",
}
# 发布到 Redis Stream
await self.redis.xadd(
f"events:{event_type}",
{"data": json.dumps(event)},
maxlen=10000, # 保留最近10000条事件
)
# 用户注册后发布事件,通知邮件服务、分析服务等
publisher = EventPublisher(settings.REDIS_URL)
@app.post("/api/v1/users")
async def create_user(user_data: UserCreate):
user = await user_service.create(user_data)
# 异步发布事件,不阻塞当前请求
await publisher.publish("user.registered", {
"user_id": user.id,
"email": user.email,
})
return user踩坑实录一:分布式事务噩梦
现象: 用户下单流程涉及三个服务(订单服务创建订单、库存服务减库存、积分服务加积分),订单服务成功了,但库存服务因为网络超时失败了,导致数据不一致。
原因: 微服务之间没有跨服务的事务保证。传统单体里一个 transaction.commit() 能保证的事,在微服务里根本做不到。
解法: 使用 Saga 模式 + 补偿事务。
简化实现:
# 创建订单 Saga
async def create_order_saga(order_data: dict) -> dict:
compensations = [] # 需要回滚的操作列表
try:
# Step 1: 创建订单
order = await order_service.create(order_data)
compensations.append(lambda: order_service.cancel(order['id']))
# Step 2: 减库存
await inventory_service.reduce(order_data['product_id'], order_data['quantity'])
compensations.append(
lambda: inventory_service.restore(order_data['product_id'], order_data['quantity'])
)
# Step 3: 扣积分
await points_service.deduct(order_data['user_id'], order_data['points_used'])
# 所有步骤成功
return order
except Exception as e:
logger.error(f"Order saga failed: {e}, running compensations")
# 逆序执行补偿操作
for compensation in reversed(compensations):
try:
await compensation()
except Exception as comp_error:
logger.error(f"Compensation failed: {comp_error}")
# 补偿也失败的话,写入死信队列,人工处理
raise这不是完美方案,但比什么都不做强得多。完整的 Saga 实现建议看 Temporal 或者 Conductor。
踩坑实录二:服务间调用链路追踪
现象: 一个请求进来,经过了 API Gateway → User Service → Order Service → Inventory Service,最终返回慢,但不知道慢在哪个服务里。
解法: 加 TraceID,每个服务的日志和响应里都带上。
# middleware/tracing.py
import uuid
from fastapi import Request
async def tracing_middleware(request: Request, call_next):
# 从上游传过来的 trace_id,或者生成新的
trace_id = request.headers.get("X-Trace-ID") or str(uuid.uuid4())
# 注入到请求上下文
request.state.trace_id = trace_id
response = await call_next(request)
# 在响应头里带回去,方便前端追踪
response.headers["X-Trace-ID"] = trace_id
return response
# 调用下游服务时透传 trace_id
headers = {"X-Trace-ID": request.state.trace_id}
result = await order_client.get("/api/v1/orders", headers=headers)配合 Jaeger 或者 Zipkin 可以把整个调用链路可视化,一眼看出慢在哪里。
踩坑实录三:配置管理混乱
现象: 7个微服务,每个都有自己的 .env 文件,有些配置是重复的(如 Redis 地址),改一处要改7个地方。有次运维只改了5个,忘了另外2个,导致这2个服务连的是旧的 Redis。
解法: 引入配置中心。我们用的是 Consul,也可以用 Vault、Apollo 等。
# app/core/config_center.py
import consul
import json
from functools import lru_cache
class ConfigCenter:
def __init__(self, host: str, port: int = 8500):
self.client = consul.Consul(host=host, port=port)
def get(self, key: str, default=None):
try:
index, data = self.client.kv.get(f"myapp/{key}")
if data:
return json.loads(data['Value'].decode())
return default
except Exception:
return default
# 优先从配置中心读,不存在则降级到环境变量
config_center = ConfigCenter(host="consul")
class Settings(BaseSettings):
@property
def redis_url(self) -> str:
return config_center.get("shared/redis_url") or os.getenv("REDIS_URL")最终架构图(文字描述)
Client → Nginx → API Gateway (FastAPI)
↓
┌─────────┬─────────┬──────────┐
↓ ↓ ↓ ↓
User Svc Order Svc Product Svc Notification Svc
↓ ↓ ↓
User DB Order DB Product DB
↓
Redis (事件总线)
↓
┌─────────┴──────────┐
Analytics Svc Email Svc每个服务独立数据库,通过 Redis Stream 发事件,同步调用走 HTTP。
微服务不是银弹,但用对了确实能解决团队协作和独立扩展的问题。关键是要想清楚你的痛点是什么,拆分方案要服务于这个痛点,而不是为了架构好看。
