Python AI 应用性能优化——并发推理、缓存策略、Token 成本控制
Python AI 应用性能优化——并发推理、缓存策略、Token 成本控制
适读人群:AI 应用已上生产、面临性能或成本压力的工程师 | 阅读时长:约18分钟 | 核心价值:系统掌握 AI 应用的性能优化方法,把延迟降低50%、成本降低40%
老谢在一家在线教育公司做技术,他们的 AI 辅导老师功能上线半年了,用户越来越多,但问题也开始出现:高峰期接口响应超过10秒,用户投诉体验差;每个月的 OpenAI 账单快到两万美元,老板开始质疑 ROI。
他来找我说:"老张,我们的 AI 功能没有什么特别复杂的逻辑,就是调 OpenAI 接口,为什么会这么慢这么贵?"
我问了他几个问题:你们有缓存吗?你们是串行调用还是并发?你们用的是什么模型?Prompt 有没有优化过?
他一一回答:没有缓存、全是串行、统一用 gpt-4、Prompt 一直没动过。
我说:你找到了4个优化点,每个都能降20-30%的成本或延迟。
今天我们把 AI 应用的性能优化系统地讲一遍,帮你把这些省出来的钱装到口袋里。
一、并发推理——把串行变并行
1.1 asyncio 并发调用
import asyncio
import time
from openai import AsyncOpenAI
from typing import List
aclient = AsyncOpenAI()
async def single_chat(prompt: str, model: str = "gpt-3.5-turbo") -> str:
"""单次异步调用"""
response = await aclient.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=500
)
return response.choices[0].message.content
async def concurrent_chat_requests(prompts: List[str], max_concurrent: int = 10) -> List[str]:
"""
并发执行多个 LLM 请求
max_concurrent:最大并发数(防止触发速率限制)
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def rate_limited_chat(prompt: str) -> str:
async with semaphore:
return await single_chat(prompt)
# 全部并发执行
tasks = [rate_limited_chat(p) for p in prompts]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理失败的请求
final_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"请求 {i} 失败:{result}")
final_results.append(None)
else:
final_results.append(result)
return final_results
# 性能对比
async def performance_benchmark():
prompts = [f"用一句话解释{topic}" for topic in [
"机器学习", "深度学习", "自然语言处理", "计算机视觉",
"强化学习", "迁移学习", "联邦学习", "对比学习"
]]
# 串行执行
start = time.time()
serial_results = []
for p in prompts:
r = await single_chat(p)
serial_results.append(r)
serial_time = time.time() - start
# 并发执行
start = time.time()
concurrent_results = await concurrent_chat_requests(prompts, max_concurrent=8)
concurrent_time = time.time() - start
print(f"串行执行:{serial_time:.1f}s")
print(f"并发执行:{concurrent_time:.1f}s")
print(f"加速比:{serial_time/concurrent_time:.1f}x")
# 实测(8个请求,gpt-3.5-turbo):串行 16s,并发 3.2s,加速 5x
asyncio.run(performance_benchmark())1.2 多阶段并行优化
async def process_document_pipeline(document: str) -> dict:
"""
文档处理管道:并行执行多个分析任务
原本 3 个串行任务 = 9s,改为并行 = 3s
"""
async def extract_keywords():
return await single_chat(f"提取以下文档的5个关键词:\n{document[:500]}")
async def generate_summary():
return await single_chat(f"用100字总结以下文档:\n{document[:1000]}")
async def assess_difficulty():
return await single_chat(f"评估以下内容的难度(初级/中级/高级),只输出难度等级:\n{document[:300]}")
# 并行执行所有分析
keywords, summary, difficulty = await asyncio.gather(
extract_keywords(),
generate_summary(),
assess_difficulty()
)
return {
"keywords": keywords,
"summary": summary,
"difficulty": difficulty.strip()
}二、缓存策略——相同问题不重复调用
2.1 语义缓存(最有价值)
import hashlib
import json
import time
from typing import Optional
import redis
from openai import OpenAI
import numpy as np
client = OpenAI()
cache_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
def get_text_embedding(text: str) -> List[float]:
"""获取文本向量"""
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def cosine_similarity(v1: List[float], v2: List[float]) -> float:
a, b = np.array(v1), np.array(v2)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
class SemanticCache:
"""
语义缓存:基于向量相似度的缓存
用于缓存"意思相同但表达不同"的问题的答案
如:
- "Python 怎么读文件" ≈ "Python 如何读取文件"
- "LangChain 是什么" ≈ "介绍一下 LangChain"
"""
def __init__(
self,
similarity_threshold: float = 0.92, # 相似度阈值(越高越严格)
ttl_seconds: int = 3600 # 缓存有效期 1 小时
):
self.threshold = similarity_threshold
self.ttl = ttl_seconds
self.cache_key = "semantic_cache"
def _get_all_entries(self) -> List[dict]:
"""获取所有缓存条目"""
data = cache_client.get(self.cache_key)
if data:
return json.loads(data)
return []
def _save_entries(self, entries: List[dict]):
"""保存缓存条目"""
cache_client.setex(
self.cache_key,
self.ttl,
json.dumps(entries, ensure_ascii=False)
)
def search(self, query: str) -> Optional[str]:
"""搜索语义相似的缓存"""
entries = self._get_all_entries()
if not entries:
return None
query_embedding = get_text_embedding(query)
best_score = 0
best_answer = None
for entry in entries:
score = cosine_similarity(query_embedding, entry["embedding"])
if score > best_score:
best_score = score
best_answer = entry["answer"]
if best_score >= self.threshold:
print(f"缓存命中!相似度:{best_score:.3f}")
return best_answer
return None
def store(self, query: str, answer: str):
"""存储查询结果"""
embedding = get_text_embedding(query)
entries = self._get_all_entries()
entries.append({
"query": query,
"answer": answer,
"embedding": embedding,
"timestamp": time.time()
})
# 只保留最近 1000 条(防止无限增长)
if len(entries) > 1000:
entries = entries[-1000:]
self._save_entries(entries)
semantic_cache = SemanticCache(similarity_threshold=0.92)
def cached_chat(user_message: str, system: str = "你是一个AI助手") -> dict:
"""带语义缓存的聊天函数"""
# 先查缓存
cached_answer = semantic_cache.search(user_message)
if cached_answer:
return {
"answer": cached_answer,
"source": "cache",
"cost": 0,
"latency_ms": 5 # 缓存命中极快
}
# 缓存未命中,调用 LLM
start = time.time()
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_message}
],
temperature=0.3 # 缓存场景用低 temperature,结果更稳定
)
answer = response.choices[0].message.content
latency_ms = int((time.time() - start) * 1000)
cost = (
response.usage.prompt_tokens * 0.0015 / 1000 +
response.usage.completion_tokens * 0.002 / 1000
)
# 存入缓存
semantic_cache.store(user_message, answer)
return {
"answer": answer,
"source": "llm",
"cost": cost,
"latency_ms": latency_ms,
"tokens": {
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens
}
}
# 测试缓存效果
print(cached_chat("Python 中的列表和元组有什么区别?"))
print(cached_chat("Python 里 list 和 tuple 的区别是什么?")) # 意思相同,应该命中缓存三、Token 成本控制
3.1 Prompt 压缩
def compress_prompt(system_prompt: str, target_reduction: float = 0.3) -> str:
"""
用 LLM 压缩过长的系统提示词
通常可减少 20-40% 的 Token 而不损失关键信息
"""
if len(system_prompt) < 500:
return system_prompt # 本来就不长,不压缩
compress_instruction = f"""请将以下系统提示词压缩,减少约{int(target_reduction*100)}%的字数。
要求:
- 保留所有关键规则和约束
- 去掉冗余、重复的表达
- 保持专业的语气
- 不改变原意
原始提示词:
{system_prompt}
压缩后的提示词(只输出压缩结果,不要其他文字):"""
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": compress_instruction}],
temperature=0
)
return response.choices[0].message.content
# 示例
original_prompt = """
你是一个专业的客服助手,你的职责是帮助用户解决他们在使用我们产品过程中遇到的各种问题和疑难解答。
你需要保持礼貌、专业、耐心的态度来对待每一位用户,无论用户提出什么样的问题,你都应该认真对待并给出有帮助的回答。
在回答问题的时候,你应该尽量做到简洁明了,避免使用过于复杂或者专业的术语,使普通用户也能够看懂你的回答。
如果用户提出的问题超出了你的能力范围或者你不确定答案,你应该诚实地告诉用户你不知道,而不是编造答案。
对于需要立即人工处理的紧急情况,比如账号被盗、大额资金异常等,你应该第一时间建议用户联系人工客服。
"""
compressed = compress_prompt(original_prompt)
print(f"原始长度:{len(original_prompt)} 字符")
print(f"压缩后:{len(compressed)} 字符")
print(f"压缩率:{(1 - len(compressed)/len(original_prompt))*100:.0f}%")
print(f"\n压缩结果:\n{compressed}")3.2 模型降级策略
from typing import Callable
class ModelRouter:
"""
智能模型路由:根据任务复杂度选择合适的模型
简单任务用便宜模型,复杂任务用强大模型
"""
def __init__(self):
self.models = {
"fast": "gpt-3.5-turbo", # $0.0015/1K input tokens
"balanced": "gpt-4o-mini", # $0.00015/1K input tokens
"powerful": "gpt-4o" # $0.005/1K input tokens
}
self.stats = {"fast": 0, "balanced": 0, "powerful": 0}
def classify_task(self, prompt: str, system: str = "") -> str:
"""
判断任务复杂度
返回 "fast", "balanced", 或 "powerful"
"""
# 简单启发式规则(生产中可以用 LLM 分类)
# 长文本分析、代码生成、推理 → 强模型
complex_signals = [
len(prompt) > 1500,
any(kw in prompt for kw in ["分析", "对比", "推理", "设计", "优化"]),
"```" in prompt, # 包含代码
any(kw in prompt for kw in ["为什么", "如何解决", "最佳方案"])
]
# 简单问答、格式转换、分类 → 快速模型
simple_signals = [
len(prompt) < 200,
any(kw in prompt for kw in ["是什么", "翻译", "分类", "判断"]),
"JSON" in system or "格式" in system # 结构化输出任务
]
complex_count = sum(complex_signals)
simple_count = sum(simple_signals)
if complex_count >= 2:
return "powerful"
elif simple_count >= 2:
return "fast"
else:
return "balanced"
def route(self, prompt: str, system: str = "") -> str:
"""路由到合适的模型"""
tier = self.classify_task(prompt, system)
model = self.models[tier]
self.stats[tier] += 1
return model
def get_cost_savings(self) -> dict:
"""计算相比全用 gpt-4o 的成本节省"""
cost_ratio = {"fast": 0.03, "balanced": 0.01, "powerful": 1.0} # 相对 gpt-4o
total = sum(self.stats.values())
if total == 0:
return {}
actual_cost_ratio = sum(
count * cost_ratio[tier]
for tier, count in self.stats.items()
) / total
return {
"requests": dict(self.stats),
"avg_cost_ratio": actual_cost_ratio,
"savings_vs_gpt4o": f"{(1 - actual_cost_ratio) * 100:.0f}%"
}
router = ModelRouter()
def smart_chat(user_message: str, system: str = "你是一个AI助手") -> str:
"""智能路由的聊天函数"""
model = router.route(user_message, system)
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_message}
]
)
return response.choices[0].message.content
# 使用
smart_chat("Python 是什么语言?") # → fast 模型
smart_chat("帮我设计一个高并发的订单处理系统架构") # → powerful 模型
print(router.get_cost_savings())四、踩坑实录一:缓存热点失效引发雪崩
现象:高峰期缓存大量失效(TTL 到期),大量请求同时打到 LLM,触发速率限制,系统雪崩。
原因:同一批数据的 TTL 是同时设置的,会同时过期。
解法:
import random
def store_with_jitter(key: str, value: str, base_ttl: int = 3600):
"""
带随机抖动的缓存过期时间
避免大量缓存同时过期
"""
jitter = random.randint(-300, 300) # ±5分钟随机抖动
ttl = base_ttl + jitter
cache_client.setex(key, ttl, value)五、踩坑实录二:异步并发超过 Rate Limit
现象:并发请求数过多时,OpenAI 返回 429 Rate limit exceeded,大量请求失败。
解法:
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import openai
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=60),
retry=retry_if_exception_type(openai.RateLimitError)
)
async def robust_chat(prompt: str) -> str:
"""带自动重试的 LLM 调用"""
response = await aclient.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content六、踩坑实录三:输出 Token 超出预期
现象:设置了 max_tokens=500,但每次调用账单上的输出 Token 大约只有 300,和预期不符,但费用比预期高。
原因:不是 max_tokens 的问题。是某些问题会触发模型生成非常长的回答(比如"帮我写一份完整的项目方案"),这些偶发的大输出拉高了平均成本。
解法:
# 对不同类型的请求设置不同的 max_tokens 上限
MAX_TOKENS_BY_TASK = {
"classification": 10, # 分类任务(只需要类别名)
"extraction": 200, # 信息提取
"summary": 400, # 摘要
"qa": 500, # 问答
"generation": 1500, # 内容生成
"code": 2000, # 代码生成
}
def cost_controlled_chat(prompt: str, task_type: str = "qa") -> str:
max_tokens = MAX_TOKENS_BY_TASK.get(task_type, 500)
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens
)
# 监控是否触发了截断
if response.choices[0].finish_reason == "length":
print(f"警告:输出被 max_tokens={max_tokens} 截断,考虑调大或换 task_type")
return response.choices[0].message.content七、综合优化效果
在老谢的项目里,我们做了以下优化:
| 优化措施 | 效果 |
|---|---|
| 串行→并发(10并发) | 延迟从平均 8s 降到 1.8s |
| 语义缓存(命中率 35%) | LLM 调用量减少 35% |
| 模型降级(70%请求用 mini) | 单次成本降低 42% |
| Prompt 压缩(30%) | Token 消耗减少 12% |
| 综合效果 | 延迟降低 77%,成本降低 55% |
老谢说,从两万美元降到了不到九千美元,老板终于不再问 ROI 了。
AI 应用的性能优化没有魔法,就是这几件事:缓存高频问题、并发低相关请求、用小模型处理简单任务、控制 Prompt 长度。每件事单独做效果有限,但组合起来效果显著。
