Python 项目结构最佳实践——Clean Architecture 在 Python 项目中的落地
Python 项目结构最佳实践——Clean Architecture 在 Python 项目中的落地
适读人群:Python 中高级工程师、想提升项目可维护性的团队 | 阅读时长:约17分钟 | 核心价值:掌握 Clean Architecture 的 Python 落地方案,让项目结构清晰、可测试、可扩展
那个谁都不敢动的"祖传"项目
我加入过一个团队,有个核心 Python 项目已经跑了三年,代码量有八九万行。每次有新需求,大家都会互相对视,然后默默叹气——因为这个项目的代码,已经没有人能完全搞清楚它的结构了。
utils.py有4000行,什么都往里堆- 数据库操作代码散落在 routes、services、models 各处
- 业务逻辑、框架代码、数据库代码搅在一起
- 想写单元测试?对不起,每个函数都依赖数据库连接,根本没法 mock
最后这个项目成了团队最大的技术债,大家估算过,要完全重构需要半年时间,但又不敢重构,因为怕改一处坏一片。
这件事让我下定决心,认真研究项目结构,专门学了 Clean Architecture,在后续几个新项目里落地实践。今天来分享这套方案。
一、Clean Architecture 核心思想
Clean Architecture 是 Robert C. Martin(Uncle Bob)提出的,核心思想用一句话概括:依赖只能从外层指向内层,内层对外层一无所知。
它把系统分为四层(从内到外):
内层(稳定)
Entities(实体/领域对象) ← 纯 Python 类,没有任何框架依赖
Use Cases(用例/业务逻辑) ← 业务规则,依赖 Entities
Interface Adapters(接口适配器)← 将数据格式在内外层之间转换
Frameworks & Drivers(外层) ← 数据库、Web框架、第三方服务
外层(易变)最关键的规则:内层不能导入外层的任何东西,依赖是单向的,从外向内。
二、Python 项目目录结构设计
myproject/
├── pyproject.toml
├── .env.example
├── Makefile
├── tests/
│ ├── unit/ # 纯单元测试,不需要任何基础设施
│ ├── integration/ # 集成测试(需要数据库/Redis)
│ └── e2e/ # 端到端测试
└── src/
└── myproject/
├── __init__.py
├── main.py # 应用入口,组装所有依赖
│
├── domain/ # 领域层(最内层,最稳定)
│ ├── entities/ # 领域实体
│ ├── value_objects/ # 值对象
│ ├── events/ # 领域事件
│ └── exceptions.py # 领域异常
│
├── application/ # 应用层(用例)
│ ├── use_cases/ # 每个业务场景一个类/函数
│ ├── ports/ # 抽象接口(依赖反转的关键)
│ └── dtos/ # 数据传输对象
│
├── infrastructure/ # 基础设施层
│ ├── database/ # 数据库实现(SQLAlchemy/MongoDB)
│ ├── cache/ # 缓存实现(Redis)
│ ├── external/ # 第三方 API 客户端
│ └── messaging/ # 消息队列
│
└── presentation/ # 表示层
├── api/ # FastAPI/Flask 路由
│ ├── v1/
│ └── dependencies.py # 依赖注入
└── cli/ # 命令行接口三、完整代码示例——AI 文档分析服务
让我们用一个真实的场景来演示:一个接收文档、调用 AI 分析、返回结果的服务。
领域层(Domain Layer)
# src/myproject/domain/entities/document.py
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import uuid
class DocumentStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Document:
"""
文档实体——纯 Python 类,零框架依赖
这里只有业务规则,没有数据库逻辑
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
title: str = ""
content: str = ""
owner_id: str = ""
status: DocumentStatus = DocumentStatus.PENDING
analysis_result: Optional[str] = None
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
def __post_init__(self):
if not self.title:
raise ValueError("文档标题不能为空")
if len(self.content) > 100_000:
raise ValueError("文档内容超过100,000字符限制")
def start_processing(self):
"""开始处理(封装状态转换逻辑)"""
if self.status != DocumentStatus.PENDING:
raise ValueError(f"只有 PENDING 状态的文档才能开始处理,当前: {self.status}")
self.status = DocumentStatus.PROCESSING
self.updated_at = datetime.utcnow()
def complete(self, result: str):
self.status = DocumentStatus.COMPLETED
self.analysis_result = result
self.updated_at = datetime.utcnow()
def fail(self, reason: str):
self.status = DocumentStatus.FAILED
self.analysis_result = f"失败原因: {reason}"
self.updated_at = datetime.utcnow()
@property
def is_finished(self) -> bool:
return self.status in (DocumentStatus.COMPLETED, DocumentStatus.FAILED)
# src/myproject/domain/exceptions.py
class DomainError(Exception):
"""领域异常基类"""
pass
class DocumentNotFoundError(DomainError):
def __init__(self, doc_id: str):
super().__init__(f"文档不存在: {doc_id}")
class PermissionDeniedError(DomainError):
pass应用层(Application Layer)——端口定义
# src/myproject/application/ports/document_repo.py
from abc import ABC, abstractmethod
from typing import Optional, List
from myproject.domain.entities.document import Document
class DocumentRepository(ABC):
"""
文档仓储接口(Port)
应用层定义接口,基础设施层实现
这是依赖反转的关键!
"""
@abstractmethod
async def save(self, document: Document) -> Document:
"""保存文档"""
raise NotImplementedError
@abstractmethod
async def find_by_id(self, doc_id: str) -> Optional[Document]:
"""按 ID 查找文档"""
raise NotImplementedError
@abstractmethod
async def find_by_owner(self, owner_id: str, limit: int = 20) -> List[Document]:
raise NotImplementedError
@abstractmethod
async def update(self, document: Document) -> Document:
raise NotImplementedError
# src/myproject/application/ports/ai_service.py
from abc import ABC, abstractmethod
class AIAnalysisPort(ABC):
"""AI 分析服务接口(Port)"""
@abstractmethod
async def analyze(self, content: str) -> str:
"""分析文档内容,返回分析结果"""
raise NotImplementedError应用层——用例实现
# src/myproject/application/use_cases/analyze_document.py
import logging
from dataclasses import dataclass
from myproject.domain.entities.document import Document, DocumentStatus
from myproject.domain.exceptions import DocumentNotFoundError, PermissionDeniedError
from myproject.application.ports.document_repo import DocumentRepository
from myproject.application.ports.ai_service import AIAnalysisPort
logger = logging.getLogger(__name__)
@dataclass
class AnalyzeDocumentInput:
document_id: str
requester_id: str
@dataclass
class AnalyzeDocumentOutput:
document_id: str
status: str
result: str = ""
class AnalyzeDocumentUseCase:
"""
分析文档用例
这个类只依赖抽象接口,不依赖任何具体实现
极易单元测试(mock 两个接口即可)
"""
def __init__(
self,
document_repo: DocumentRepository,
ai_service: AIAnalysisPort,
):
self._repo = document_repo
self._ai = ai_service
async def execute(self, input_data: AnalyzeDocumentInput) -> AnalyzeDocumentOutput:
# 1. 获取文档
doc = await self._repo.find_by_id(input_data.document_id)
if doc is None:
raise DocumentNotFoundError(input_data.document_id)
# 2. 权限检查
if doc.owner_id != input_data.requester_id:
raise PermissionDeniedError("没有权限分析此文档")
# 3. 开始处理
doc.start_processing()
await self._repo.update(doc)
# 4. 调用 AI 分析
try:
result = await self._ai.analyze(doc.content)
doc.complete(result)
logger.info(f"文档分析成功: {doc.id}")
except Exception as e:
doc.fail(str(e))
logger.error(f"文档分析失败: {doc.id}, {e}")
# 5. 保存结果
await self._repo.update(doc)
return AnalyzeDocumentOutput(
document_id=doc.id,
status=doc.status.value,
result=doc.analysis_result or "",
)基础设施层——接口实现
# src/myproject/infrastructure/database/document_repo_impl.py
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from myproject.application.ports.document_repo import DocumentRepository
from myproject.domain.entities.document import Document, DocumentStatus
class SQLDocumentRepository(DocumentRepository):
"""DocumentRepository 的 SQLAlchemy 实现"""
def __init__(self, session: AsyncSession):
self._session = session
async def save(self, document: Document) -> Document:
# 将领域对象转为 ORM 模型并保存
db_doc = DocumentModel.from_domain(document)
self._session.add(db_doc)
await self._session.flush()
return document
async def find_by_id(self, doc_id: str) -> Optional[Document]:
from sqlalchemy import select
result = await self._session.execute(
select(DocumentModel).where(DocumentModel.id == doc_id)
)
db_doc = result.scalar_one_or_none()
return db_doc.to_domain() if db_doc else None
async def find_by_owner(self, owner_id: str, limit: int = 20) -> List[Document]:
from sqlalchemy import select
result = await self._session.execute(
select(DocumentModel)
.where(DocumentModel.owner_id == owner_id)
.limit(limit)
)
return [row.to_domain() for row in result.scalars()]
async def update(self, document: Document) -> Document:
db_doc = await self._session.get(DocumentModel, document.id)
if db_doc:
db_doc.update_from_domain(document)
await self._session.flush()
return document
# src/myproject/infrastructure/external/openai_ai_service.py
from openai import AsyncOpenAI
from myproject.application.ports.ai_service import AIAnalysisPort
class OpenAIAnalysisService(AIAnalysisPort):
"""AIAnalysisPort 的 OpenAI 实现"""
def __init__(self, api_key: str, model: str = "gpt-4"):
self._client = AsyncOpenAI(api_key=api_key)
self._model = model
async def analyze(self, content: str) -> str:
response = await self._client.chat.completions.create(
model=self._model,
messages=[
{"role": "system", "content": "你是一个专业的文档分析助手。"},
{"role": "user", "content": f"请分析以下文档内容:\n\n{content[:8000]}"},
],
max_tokens=1000,
)
return response.choices[0].message.content依赖注入——组装所有层
# src/myproject/presentation/api/dependencies.py
from functools import lru_cache
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from myproject.infrastructure.database.document_repo_impl import SQLDocumentRepository
from myproject.infrastructure.external.openai_ai_service import OpenAIAnalysisService
from myproject.application.use_cases.analyze_document import AnalyzeDocumentUseCase
def get_document_repo(session: AsyncSession = Depends(get_db_session)):
return SQLDocumentRepository(session)
def get_ai_service():
from myproject.config import get_settings
settings = get_settings()
return OpenAIAnalysisService(api_key=settings.openai_api_key)
def get_analyze_use_case(
repo = Depends(get_document_repo),
ai = Depends(get_ai_service),
):
return AnalyzeDocumentUseCase(repo, ai)
# src/myproject/presentation/api/v1/documents.py
from fastapi import APIRouter, Depends, HTTPException, status
from myproject.application.use_cases.analyze_document import (
AnalyzeDocumentUseCase,
AnalyzeDocumentInput,
)
from myproject.domain.exceptions import DocumentNotFoundError, PermissionDeniedError
router = APIRouter(prefix="/documents", tags=["documents"])
@router.post("/{doc_id}/analyze")
async def analyze_document(
doc_id: str,
current_user_id: str, # 实际项目里从 JWT 解析
use_case: AnalyzeDocumentUseCase = Depends(get_analyze_use_case),
):
try:
result = await use_case.execute(
AnalyzeDocumentInput(
document_id=doc_id,
requester_id=current_user_id,
)
)
return {"status": result.status, "result": result.result}
except DocumentNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except PermissionDeniedError:
raise HTTPException(status_code=403, detail="权限不足")四、测试——Clean Architecture 的最大红利
# tests/unit/use_cases/test_analyze_document.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from myproject.application.use_cases.analyze_document import (
AnalyzeDocumentUseCase,
AnalyzeDocumentInput,
)
from myproject.domain.entities.document import Document, DocumentStatus
@pytest.fixture
def mock_repo():
repo = AsyncMock()
doc = Document(
id="doc-123",
title="测试文档",
content="测试内容",
owner_id="user-456",
)
repo.find_by_id.return_value = doc
repo.update.return_value = doc
return repo, doc
@pytest.fixture
def mock_ai():
ai = AsyncMock()
ai.analyze.return_value = "这是 AI 分析结果"
return ai
@pytest.mark.asyncio
async def test_analyze_document_success(mock_repo, mock_ai):
repo, doc = mock_repo
use_case = AnalyzeDocumentUseCase(repo, mock_ai)
result = await use_case.execute(
AnalyzeDocumentInput(document_id="doc-123", requester_id="user-456")
)
assert result.status == DocumentStatus.COMPLETED.value
assert result.result == "这是 AI 分析结果"
mock_ai.analyze.assert_awaited_once_with("测试内容")这就是 Clean Architecture 的最大红利:单元测试极其简单,只需要 mock 接口,不需要真实数据库,不需要真实 AI 服务,测试运行速度飞快。
五、踩坑实录
踩坑实录1:过度拆分,小项目变成了噩梦
现象:一个三人团队的小项目,引入了完整 Clean Architecture,每加一个功能要改7个文件。
原因:小项目不需要这种程度的分层,带来的复杂度超过了收益。
解法:根据项目规模选择分层深度。5人以下的小项目,用两层(业务逻辑 + 基础设施)足够了。
踩坑实录2:跨层直接访问,架构逐渐腐化
现象:刚开始遵守架构规范,慢慢地 routes 开始直接调用 repository,bypassing use cases。
原因:赶进度的时候贪图方便。
解法:CI 里加 import-linter 检查跨层依赖,自动拦截违规提交。
六、选型建议
| 项目规模 | 推荐结构 |
|---|---|
| 个人脚本/工具 | 单文件或简单平铺 |
| 小型 Web API(<10个接口) | MVC 两层(routes + services) |
| 中型项目 | 简化 Clean Architecture(3层) |
| 大型复杂业务 | 完整 Clean Architecture + DDD |
| 微服务 | 每个服务内部 Clean Architecture |
好的项目结构不是目的,能让团队高效协作、代码可测试可维护,才是目的。架构是工具,不是信仰。
