Python 构建 AI 工具平台——给团队用的内部 AI 工具的完整架构
Python 构建 AI 工具平台——给团队用的内部 AI 工具的完整架构
适读人群:想给团队搭建内部 AI 工具的工程师、做 AI 基础设施的开发者 | 阅读时长:约15分钟 | 核心价值:一个真实的内部 AI 工具平台从 0 到 1 的完整架构设计与实现
去年我们团队有大概47个人,每个人都在用 ChatGPT 处理各种工作任务:写代码、写文档、数据分析、邮件润色。
但问题是:每个人各用各的账号,没有统一管理,成本不透明;每次都要把公司内部数据粘贴到 ChatGPT,有数据安全隐患;各种重复的提示词没有沉淀,每次都从头写。
我花了大概三个月,从零搭了一个内部 AI 工具平台,解决了这三个问题。
今天把整个架构写出来。
平台目标
这个平台要解决的核心问题:
- 统一 API 代理:所有 AI 调用走同一个网关,统一鉴权、计费、日志
- 提示词库:团队积累的好提示词可以保存、共享、版本管理
- 工具集:几个高频使用场景封装成工具,不用每次手写 prompt
- 内部知识库问答:基于公司内部文档的 RAG(检索增强生成)
- 使用统计:谁用了多少 token,成本多少,一目了然
整体架构
用户(浏览器/API client)
↓
FastAPI 网关层
├── 认证(JWT/SSO)
├── 限流(按用户/部门)
├── 计费记录
└── 请求日志
↓
AI Provider 适配层
├── OpenAI
├── Claude (Anthropic)
└── 内部部署模型
↓
功能模块层
├── 提示词库
├── 工具集
└── 知识库 RAG
↓
存储层
├── PostgreSQL(用户、提示词、日志)
├── Redis(缓存、会话、限流)
└── Qdrant(向量数据库)核心代码:API 网关层
# app/gateway/router.py
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, AsyncIterator
import httpx
import json
import time
from app.core.auth import get_current_user
from app.core.billing import record_usage
from app.core.rate_limit import check_rate_limit
app = FastAPI(title="AI Tools Platform")
class ChatRequest(BaseModel):
model: str = "gpt-4o-mini"
messages: list[dict]
stream: bool = False
temperature: float = 0.7
max_tokens: Optional[int] = None
# 平台自定义字段
tool_id: Optional[str] = None # 使用哪个预设工具
prompt_id: Optional[str] = None # 使用哪个提示词模板
async def stream_openai_response(
client: httpx.AsyncClient,
request_data: dict,
) -> AsyncIterator[str]:
"""流式转发 OpenAI 响应"""
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
json=request_data,
headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
yield line + "\n\n"
if line == "data: [DONE]":
break
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatRequest,
raw_request: Request,
current_user=Depends(get_current_user),
):
# 1. 限流检查
await check_rate_limit(current_user.id, request.model)
# 2. 如果指定了工具或提示词模板,加载并合并
if request.tool_id:
tool = await load_tool(request.tool_id)
request.messages = tool.apply(request.messages)
elif request.prompt_id:
prompt = await load_prompt(request.prompt_id)
request.messages = prompt.apply(request.messages)
# 3. 记录请求开始
request_id = await log_request_start(current_user, request)
start_time = time.time()
# 4. 转发请求
openai_payload = request.model_dump(exclude={'tool_id', 'prompt_id'})
if request.stream:
async def generate():
total_tokens = 0
async with httpx.AsyncClient(timeout=120) as client:
async for chunk in stream_openai_response(client, openai_payload):
# 统计 token(从流式响应里解析)
if '"usage"' in chunk:
try:
data = json.loads(chunk.replace("data: ", ""))
total_tokens = data.get("usage", {}).get("total_tokens", 0)
except Exception:
pass
yield chunk
# 流式完成后记录用量
await record_usage(
user_id=current_user.id,
model=request.model,
tokens=total_tokens,
duration=time.time() - start_time,
request_id=request_id,
)
return StreamingResponse(generate(), media_type="text/event-stream")
else:
async with httpx.AsyncClient(timeout=120) as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
json=openai_payload,
headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
)
response_data = response.json()
# 记录用量
usage = response_data.get("usage", {})
await record_usage(
user_id=current_user.id,
model=request.model,
tokens=usage.get("total_tokens", 0),
duration=time.time() - start_time,
request_id=request_id,
)
return response_data提示词库
# app/prompts/models.py
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, JSON
from sqlalchemy.orm import relationship
from app.core.database import Base
from datetime import datetime
class PromptTemplate(Base):
__tablename__ = "prompt_templates"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
description = Column(Text)
category = Column(String(50)) # coding / writing / analysis / etc.
system_prompt = Column(Text) # 系统 prompt
user_prompt_template = Column(Text) # 用户消息模板,支持变量 {{variable}}
variables = Column(JSON) # 变量定义,如 [{"name": "language", "default": "Python"}]
created_by = Column(Integer, ForeignKey("users.id"))
is_public = Column(Integer, default=0) # 是否对团队公开
usage_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
creator = relationship("User", back_populates="prompts")
versions = relationship("PromptVersion", back_populates="template")
def apply(self, user_message: str, variables: dict = None) -> list[dict]:
"""把模板应用到用户消息,返回 messages 列表"""
messages = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
if self.user_prompt_template:
# 替换变量
content = self.user_prompt_template
if variables:
for k, v in variables.items():
content = content.replace(f"{{{{{k}}}}}", str(v))
content = content.replace("{{user_input}}", user_message)
messages.append({"role": "user", "content": content})
else:
messages.append({"role": "user", "content": user_message})
return messages踩坑实录一:token 统计不准确
现象: 计费账单里的 token 数和 OpenAI 账单不匹配,差了大概12%。
原因: 流式响应里,usage 信息不是每个 chunk 都有,要在最后一个 chunk 里才能拿到。我之前的代码在流式响应中间就在尝试解析 usage,很多时候没解析到,就按0记录了。
解法:
async def stream_with_usage_tracking(
client,
payload,
on_usage: callable
) -> AsyncIterator[str]:
completion_tokens = 0
prompt_tokens = 0
async with client.stream("POST", url, json=payload) as resp:
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
data_str = line[6:] # 去掉 "data: "
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
# usage 在最后一个非 [DONE] 的 chunk 里
if usage := data.get("usage"):
prompt_tokens = usage.get("prompt_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
except json.JSONDecodeError:
pass
yield line + "\n\n"
# 流结束后回调
await on_usage(prompt_tokens, completion_tokens)另外需要在 OpenAI 请求里加 "stream_options": {"include_usage": true},这样流式响应里才会包含 usage 信息。
踩坑实录二:不同用户的 API Key 隔离
现象: 早期我们只有一个 OpenAI API Key,一个部门用量暴增导致触发了速率限制,影响了所有人。
解法: 按部门/团队配置不同的 API Key,按路由分发:
class APIKeyPool:
def __init__(self, keys: list[dict]):
# keys = [{"key": "sk-...", "department": "engineering", "monthly_limit": 100}]
self.keys = {k["department"]: k for k in keys}
self.usage = {} # 本月用量统计
def get_key(self, department: str) -> str:
if department in self.keys:
return self.keys[department]["key"]
return self.keys["default"]["key"]
async def check_limit(self, department: str, tokens: int) -> bool:
current = self.usage.get(department, 0)
limit = self.keys.get(department, self.keys.get("default", {})).get("monthly_limit", float("inf"))
return current + tokens <= limit踩坑实录三:内部 RAG 的分块策略
现象: 知识库里有很多 PDF 文档,RAG 效果很差,经常检索到不相关的片段。
原因: 按固定字符数分块(每块512字符),会把一个完整的概念切断,导致每个块的语义不完整,向量搜索找到的片段上下文不够。
解法: 改用语义分块:
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 不好的分块:固定字符数
# splitter = CharacterTextSplitter(chunk_size=512, chunk_overlap=50)
# 好的分块:按语义边界(段落、句子)
splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=100,
separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?", " "],
# 优先在段落边界切,其次句子边界,最后才是字符
)
# 另外,给每个 chunk 加元数据,检索时一起返回
def prepare_chunks(document_text: str, doc_id: str, title: str) -> list[dict]:
chunks = splitter.split_text(document_text)
return [
{
"text": chunk,
"metadata": {
"doc_id": doc_id,
"title": title,
"chunk_index": i,
"chunk_total": len(chunks),
}
}
for i, chunk in enumerate(chunks)
]部署方案
# docker-compose.yml(简化版)
version: '3.8'
services:
api:
build: .
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://...
- REDIS_URL=redis://redis:6379
- QDRANT_URL=http://qdrant:6333
ports:
- "8000:8000"
postgres:
image: postgres:16
volumes:
- pg-data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
qdrant:
image: qdrant/qdrant:latest
volumes:
- qdrant-data:/qdrant/storage
ports:
- "6333:6333"
volumes:
pg-data:
qdrant-data:这个平台上线之后,团队反馈最好的两个功能是:提示词库(减少了大量重复工作)和使用统计(让管理层第一次清楚地看到 AI 工具的实际ROI)。
