Python 实现 LLM 缓存层——语义缓存,相似问题不重复调用 API
Python 实现 LLM 缓存层——语义缓存,相似问题不重复调用 API
适读人群:在用 LLM API 的 Python 工程师、想降低 AI 调用成本的开发者 | 阅读时长:约13分钟 | 核心价值:从精确缓存到语义缓存,完整实现方案,API 调用量降低 40%+
上个季度,我们一个内部 AI 工具的 OpenAI 账单让我吓了一跳:3.7 万元人民币。
仔细分析调用日志,发现有大量相似问题被重复调用了 API:
- "帮我写一个 Python 读取 CSV 的函数"
- "用 Python 怎么读取 CSV 文件"
- "Python CSV 文件读取代码示例"
这三个问题的意思几乎一样,但都独立调用了 API,各花了一次钱。
语义缓存就是用来解决这个问题的: 当新问题和已有缓存的问题语义相似度超过阈值,直接返回缓存结果,不重新调用 API。
精确缓存 vs 语义缓存
先理解两种缓存的区别:
精确缓存(exact match):
- 只有完全相同的输入才命中缓存
- 实现简单(MD5 哈希作 key)
- 命中率低,因为用户的措辞几乎不会完全相同
语义缓存(semantic cache):
- 语义相似的输入也能命中缓存
- 实现复杂(需要向量相似度搜索)
- 命中率高得多
语义缓存的实现原理
新问题 → 向量化(embedding) → 在向量数据库里搜索相似问题
↓
相似度 > 阈值? → Yes → 返回缓存答案
↓ No
调用 LLM API → 存入缓存(问题向量 + 答案)完整实现代码
# semantic_cache.py
import hashlib
import json
import time
from typing import Optional
import numpy as np
import redis
from openai import AsyncOpenAI
import qdrant_client
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, SearchRequest
client = AsyncOpenAI()
class SemanticCache:
def __init__(
self,
redis_url: str,
qdrant_url: str,
collection_name: str = "llm_cache",
similarity_threshold: float = 0.92, # 相似度阈值,0-1之间
ttl_seconds: int = 86400 * 7, # 缓存7天
embedding_model: str = "text-embedding-3-small",
):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.qdrant = qdrant_client.QdrantClient(url=qdrant_url)
self.collection_name = collection_name
self.threshold = similarity_threshold
self.ttl = ttl_seconds
self.embedding_model = embedding_model
# 确保向量集合存在
self._ensure_collection()
def _ensure_collection(self):
"""创建向量集合(如果不存在)"""
collections = [c.name for c in self.qdrant.get_collections().collections]
if self.collection_name not in collections:
self.qdrant.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=1536, # text-embedding-3-small 的维度
distance=Distance.COSINE,
),
)
async def _get_embedding(self, text: str) -> list[float]:
"""获取文本的向量表示"""
response = await client.embeddings.create(
input=text,
model=self.embedding_model,
)
return response.data[0].embedding
def _make_cache_key(self, question: str) -> str:
"""生成精确匹配的缓存 key(用于存储完整响应)"""
return f"llm_cache:{hashlib.md5(question.encode()).hexdigest()}"
async def get(
self,
question: str,
model: str = "gpt-4o-mini",
) -> Optional[dict]:
"""
查询缓存
返回: {"answer": str, "cache_hit": True, "similarity": float} 或 None
"""
# 第一步:精确匹配(最快)
exact_key = self._make_cache_key(question + model)
cached = self.redis.get(exact_key)
if cached:
return {**json.loads(cached), "cache_hit": True, "similarity": 1.0, "match_type": "exact"}
# 第二步:语义匹配(稍慢,需要向量计算)
question_vector = await self._get_embedding(question)
search_results = self.qdrant.search(
collection_name=self.collection_name,
query_vector=question_vector,
limit=1,
score_threshold=self.threshold,
query_filter=Filter(
must=[
{"key": "model", "match": {"value": model}}
]
) if model else None,
)
if search_results:
best_match = search_results[0]
# 从 Redis 里拿完整的响应内容
cache_key = best_match.payload["cache_key"]
cached_data = self.redis.get(cache_key)
if cached_data:
return {
**json.loads(cached_data),
"cache_hit": True,
"similarity": best_match.score,
"match_type": "semantic",
"matched_question": best_match.payload["question"],
}
return None
async def set(
self,
question: str,
answer: str,
model: str = "gpt-4o-mini",
metadata: dict = None,
):
"""存入缓存"""
cache_key = self._make_cache_key(question + model)
# 存完整响应到 Redis
cache_data = {
"answer": answer,
"model": model,
"question": question,
"cached_at": time.time(),
**(metadata or {}),
}
self.redis.setex(cache_key, self.ttl, json.dumps(cache_data, ensure_ascii=False))
# 存向量到 Qdrant(用于语义搜索)
question_vector = await self._get_embedding(question)
point_id = abs(hash(question + model)) % (2**63) # 生成唯一 ID
self.qdrant.upsert(
collection_name=self.collection_name,
points=[
PointStruct(
id=point_id,
vector=question_vector,
payload={
"question": question,
"model": model,
"cache_key": cache_key,
"cached_at": time.time(),
},
)
],
)
async def get_stats(self) -> dict:
"""获取缓存统计"""
total_keys = self.redis.dbsize()
qdrant_count = self.qdrant.count(collection_name=self.collection_name).count
return {
"redis_keys": total_keys,
"qdrant_vectors": qdrant_count,
}
# 使用示例
cache = SemanticCache(
redis_url="redis://localhost:6379",
qdrant_url="http://localhost:6333",
similarity_threshold=0.92,
)
async def cached_llm_call(
question: str,
model: str = "gpt-4o-mini",
system_prompt: str = None,
) -> dict:
"""带语义缓存的 LLM 调用"""
# 1. 查缓存
cached = await cache.get(question, model=model)
if cached:
print(f"Cache hit! type={cached['match_type']}, similarity={cached.get('similarity', 1.0):.3f}")
return cached
# 2. 调用 LLM
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": question})
response = await client.chat.completions.create(
model=model,
messages=messages,
)
answer = response.choices[0].message.content
usage = response.usage
# 3. 存入缓存
await cache.set(
question=question,
answer=answer,
model=model,
metadata={
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
},
)
return {
"answer": answer,
"cache_hit": False,
"usage": {
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
},
}踩坑实录一:相似度阈值怎么定
现象: 阈值设为 0.85,结果有时候完全不同的问题被判断为相似,返回了错误答案。比如"Python 怎么读取文件"和"Python 怎么写入文件"相似度是 0.88,被当作同一个问题处理了。
原因: 0.85 太低了,"读取"和"写入"在语义空间里确实比较近,但它们的答案完全不同。
解法: 提高阈值,同时做 A/B 测试找到合适的值:
# 测试不同阈值的准确率
test_cases = [
# (question1, question2, should_match)
("Python 读取 CSV", "Python 怎么读取 CSV 文件", True),
("Python 读取文件", "Python 写入文件", False),
("FastAPI 如何添加认证", "FastAPI 如何添加鉴权", True),
("如何安装 pandas", "如何卸载 pandas", False),
]
async def evaluate_threshold(threshold: float) -> float:
"""评估某个阈值的准确率"""
correct = 0
for q1, q2, expected_match in test_cases:
v1 = await get_embedding(q1)
v2 = await get_embedding(q2)
similarity = np.dot(v1, v2) # cosine similarity
actual_match = similarity >= threshold
if actual_match == expected_match:
correct += 1
return correct / len(test_cases)
# 测试结果(不同项目会不同,这只是我们的测试结果):
# threshold=0.85: accuracy=72%
# threshold=0.90: accuracy=88%
# threshold=0.92: accuracy=94% ← 我们选的值
# threshold=0.95: accuracy=96%,但命中率太低,缓存没什么效果建议:先用一批你知道正确答案的问题对测试,找到在你的场景里最合适的阈值,而不是直接用我的值。
踩坑实录二:缓存了错误的答案
现象: 模型升级了(从 gpt-3.5-turbo 换到 gpt-4o),但缓存里还是旧模型的答案,新模型的更好答案没有被使用。
原因: 缓存 key 没有包含模型名称,不同模型的答案混在一起了。
解法: 缓存 key 必须包含模型名称,这在上面的代码里已经体现了:self._make_cache_key(question + model)。
另外,当你想清空某个模型的缓存时:
async def invalidate_model_cache(self, model: str):
"""清空某个模型的所有缓存"""
# 从 Qdrant 里删除对应模型的向量
self.qdrant.delete(
collection_name=self.collection_name,
points_selector=Filter(
must=[{"key": "model", "match": {"value": model}}]
),
)
# Redis 里的 key 会自然过期,或者你可以用 scan 找到并删除
print(f"Cleared cache for model: {model}")踩坑实录三:向量存储费用
现象: Qdrant 存了几十万个向量,磁盘占用很大,而且很多是很久没被访问的"冷缓存"。
解法: 定期清理长期未命中的缓存:
async def cleanup_stale_cache(self, max_age_days: int = 30):
"""清理超过 N 天未被访问的缓存"""
cutoff_time = time.time() - max_age_days * 86400
# 找出旧向量
# 注意:Qdrant 不支持直接按时间删除,需要先搜索再删除
old_points = self.qdrant.scroll(
collection_name=self.collection_name,
scroll_filter=Filter(
must=[
{"key": "cached_at", "range": {"lt": cutoff_time}}
]
),
limit=1000,
)
if old_points[0]:
ids_to_delete = [p.id for p in old_points[0]]
self.qdrant.delete(
collection_name=self.collection_name,
points_selector=ids_to_delete,
)
print(f"Deleted {len(ids_to_delete)} stale cache entries")效果数据
我们上线语义缓存一个月后的数据:
| 指标 | 上线前 | 上线后 |
|---|---|---|
| 月 API 调用量 | 18.7 万次 | 11.2 万次 |
| 月 API 费用 | 3.7 万元 | 2.1 万元 |
| 缓存命中率 | - | 40.1% |
| 平均响应时间 | 1.8s | 0.4s(命中时) |
缓存命中后响应时间从1.8秒降到了0.4秒,体验也大幅提升。
语义缓存不是万能的,有些场景不适合(比如查询实时数据的问题,答案每天都不同)。但对于技术问答、代码生成这类"答案相对稳定"的场景,语义缓存的收益非常明显。
