Redis 在 AI 应用中的 5 个关键用法
Redis 在 AI 应用中的 5 个关键用法
适读人群:有 Redis 基础的后端/AI 应用工程师 | 阅读时长:约 13 分钟 | 核心价值:用 Redis 把 AI 应用的 API 成本削减 40%+,同时提升响应速度
我有一段时间对 OpenAI 账单上的数字很敏感。
某个月底一看,账单 $847,比上个月涨了 60%。我去查了一下,发现有一个功能每天被调用 3000 次,而这 3000 次里,至少有 1200 次的 Prompt 内容完全重复——同样的问题,同样的上下文,每次都老老实实调用 API,每次都花钱。
加了 Prompt 缓存之后,这个功能的 API 成本直接降了 40%。
这篇文章把我在 AI 应用里用 Redis 的 5 个真实场景都写出来,每个都有代码,有些场景的成本节省数据我也一起列出来。
用法一:Prompt 缓存——最暴力的省钱方式
场景: 相同的 Prompt + 输入,重复调用 LLM。
典型情况: 用户问"Python 怎么写 for 循环",这个问题每天被问 100 次,每次都调用 GPT-4。
做法: 对 Prompt 内容做哈希,缓存结果。命中缓存直接返回,不碰 LLM。
# prompt_cache.py
import redis
import hashlib
import json
from typing import Optional
from openai import OpenAI
class CachedLLMClient:
def __init__(self, openai_client: OpenAI, redis_client: redis.Redis,
default_ttl: int = 3600):
self.client = openai_client
self.redis = redis_client
self.default_ttl = default_ttl
self._hit_count = 0
self._miss_count = 0
def _cache_key(self, model: str, messages: list, **kwargs) -> str:
"""生成缓存键,基于模型+消息内容"""
content = {
"model": model,
"messages": messages,
# temperature 为 0 时缓存有意义,其他值不缓存
}
raw = json.dumps(content, sort_keys=True, ensure_ascii=False)
return f"prompt_cache:{hashlib.sha256(raw.encode()).hexdigest()}"
def chat_complete(self, model: str, messages: list,
temperature: float = 0,
use_cache: bool = True,
cache_ttl: Optional[int] = None, **kwargs) -> dict:
"""
带缓存的 chat completion
注意:temperature > 0 时不缓存,因为结果本来就有随机性
"""
# 随机性调用不缓存
if not use_cache or temperature > 0:
return self._call_api(model, messages, temperature=temperature, **kwargs)
cache_key = self._cache_key(model, messages)
# 检查缓存
cached = self.redis.get(cache_key)
if cached:
self._hit_count += 1
result = json.loads(cached)
result["_cache_hit"] = True
return result
# 缓存未命中,调用 API
self._miss_count += 1
result = self._call_api(model, messages, temperature=temperature, **kwargs)
# 存入缓存
ttl = cache_ttl or self.default_ttl
cache_data = {k: v for k, v in result.items() if k != "_cache_hit"}
self.redis.setex(cache_key, ttl, json.dumps(cache_data, ensure_ascii=False))
result["_cache_hit"] = False
return result
def _call_api(self, model: str, messages: list, **kwargs) -> dict:
response = self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
return {
"content": response.choices[0].message.content,
"model": response.model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
}
}
@property
def cache_hit_rate(self) -> float:
total = self._hit_count + self._miss_count
return self._hit_count / total if total > 0 else 0
# 使用示例
client = CachedLLMClient(
openai_client=OpenAI(),
redis_client=redis.Redis(host="localhost", decode_responses=True),
default_ttl=7200 # 2 小时过期
)
result = client.chat_complete(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "你是一个 Python 编程助手"},
{"role": "user", "content": "Python 怎么写 for 循环?"}
],
temperature=0
)
print(result["content"])
print(f"缓存命中: {result['_cache_hit']}")
print(f"命中率: {client.cache_hit_rate:.1%}")实际效果: 我的一个 FAQ 助手,高频问题缓存命中率 65%,这块的 API 成本直接腰斩。
注意事项:
temperature=0时结果确定,适合缓存temperature>0的随机调用不应该缓存- 缓存 TTL 要根据内容时效性设置,编程知识可以缓存 24 小时,时事新闻 1 小时内可能就过期了
用法二:语义缓存——比精确匹配更聪明
Prompt 缓存有个问题:用户问"Python 写 for 循环"和"Python 怎么写 for loop"是两个不同的字符串,Prompt 缓存会认为这是两个不同的 Prompt,各自调用一次 API。
语义缓存解决这个问题:用向量相似度找语义上相近的缓存内容。
# semantic_cache.py
import redis
import numpy as np
from openai import OpenAI
from typing import Optional, Tuple
import json
import hashlib
class SemanticCache:
"""
语义缓存:
- 用 text-embedding 模型把问题转成向量
- 在 Redis 里存储向量和对应答案
- 查询时找相似度最高的缓存,超过阈值就直接返回
注意:这里用的是简单的线性搜索,数据量大时应该用 Redis Vector Search 或 Qdrant 等
"""
def __init__(self, openai_client: OpenAI, redis_client: redis.Redis,
similarity_threshold: float = 0.92, ttl: int = 86400):
self.client = openai_client
self.redis = redis_client
self.threshold = similarity_threshold
self.ttl = ttl
self.index_key = "semantic_cache:index" # 存所有缓存 key 的集合
def _embed(self, text: str) -> np.ndarray:
"""获取文本的 embedding 向量"""
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return np.array(response.data[0].embedding)
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def _cache_key(self, query_hash: str) -> str:
return f"semantic_cache:entry:{query_hash}"
def get(self, query: str) -> Tuple[Optional[str], float]:
"""
查找语义相似的缓存
返回: (cached_answer, similarity_score)
如果未命中,返回 (None, 0.0)
"""
query_vector = self._embed(query)
# 获取所有缓存 key
all_keys = self.redis.smembers(self.index_key)
best_similarity = 0.0
best_answer = None
for key in all_keys:
entry_raw = self.redis.get(key)
if not entry_raw:
continue
entry = json.loads(entry_raw)
cached_vector = np.array(entry["vector"])
similarity = self._cosine_similarity(query_vector, cached_vector)
if similarity > best_similarity:
best_similarity = similarity
best_answer = entry["answer"]
if best_similarity >= self.threshold:
return best_answer, best_similarity
return None, best_similarity
def set(self, query: str, answer: str) -> None:
"""存入缓存"""
vector = self._embed(query)
query_hash = hashlib.md5(query.encode()).hexdigest()
key = self._cache_key(query_hash)
entry = {
"query": query,
"answer": answer,
"vector": vector.tolist(),
}
self.redis.setex(key, self.ttl, json.dumps(entry, ensure_ascii=False))
self.redis.sadd(self.index_key, key)
# 集成到 LLM 调用中
class SemanticCachedClient:
def __init__(self, openai_client: OpenAI, semantic_cache: SemanticCache):
self.client = openai_client
self.cache = semantic_cache
def ask(self, question: str, system_prompt: str = "") -> dict:
# 先查语义缓存
cached_answer, similarity = self.cache.get(question)
if cached_answer:
return {
"answer": cached_answer,
"cache_hit": True,
"similarity": similarity
}
# 缓存未命中,调用 LLM
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": question})
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0
)
answer = response.choices[0].message.content
# 存入语义缓存
self.cache.set(question, answer)
return {
"answer": answer,
"cache_hit": False,
"similarity": 0.0
}语义缓存的 embedding 本身也要钱(不过比 GPT-4 调用便宜得多,text-embedding-3-small 极其廉价)。适合高频的问答场景,比如 FAQ 机器人、知识库问答。
用法三:会话上下文管理
多轮对话要维护历史消息,这些数据放哪里?内存里丢失,数据库里太重。Redis 是最合适的中间层:
# conversation_manager.py
import redis
import json
from typing import List
from dataclasses import dataclass
@dataclass
class Message:
role: str # "user" | "assistant" | "system"
content: str
class ConversationManager:
def __init__(self, redis_client: redis.Redis,
max_messages: int = 20,
ttl_seconds: int = 1800): # 30 分钟无活动自动过期
self.redis = redis_client
self.max_messages = max_messages
self.ttl = ttl_seconds
def _key(self, session_id: str) -> str:
return f"conversation:{session_id}"
def get_history(self, session_id: str) -> List[Message]:
"""获取会话历史"""
key = self._key(session_id)
raw_messages = self.redis.lrange(key, 0, -1)
messages = []
for raw in raw_messages:
data = json.loads(raw)
messages.append(Message(role=data["role"], content=data["content"]))
return messages
def add_message(self, session_id: str, role: str, content: str) -> None:
"""添加消息,自动截断超长历史"""
key = self._key(session_id)
message = json.dumps({"role": role, "content": content}, ensure_ascii=False)
pipe = self.redis.pipeline()
pipe.rpush(key, message)
# 只保留最近 max_messages 条
pipe.ltrim(key, -self.max_messages, -1)
# 刷新过期时间
pipe.expire(key, self.ttl)
pipe.execute()
def build_messages_for_api(self, session_id: str,
system_prompt: str = "") -> list:
"""构建 OpenAI API 的 messages 格式"""
history = self.get_history(session_id)
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
for msg in history:
messages.append({"role": msg.role, "content": msg.content})
return messages
def clear(self, session_id: str) -> None:
self.redis.delete(self._key(session_id))
def get_token_estimate(self, session_id: str) -> int:
"""粗略估算当前历史的 token 数(用字符数 / 4 估算)"""
history = self.get_history(session_id)
total_chars = sum(len(m.content) for m in history)
return total_chars // 4
# 使用
r = redis.Redis(host="localhost", decode_responses=True)
conv_manager = ConversationManager(r)
openai_client = OpenAI()
def chat(session_id: str, user_message: str) -> str:
# 记录用户消息
conv_manager.add_message(session_id, "user", user_message)
# 构建完整消息列表
messages = conv_manager.build_messages_for_api(
session_id,
system_prompt="你是一个专业的技术助手,回答简洁准确。"
)
# 调用 LLM
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
assistant_reply = response.choices[0].message.content
# 记录 AI 回复
conv_manager.add_message(session_id, "assistant", assistant_reply)
return assistant_reply关键点是 ltrim——只保留最近 N 条消息,防止上下文无限增长导致 token 超限和成本失控。我的经验是保留最近 20 条消息对大多数对话场景够用。
用法四:Token 配额和速率限制
如果你的 AI 应用有多个用户,你需要控制每个用户每天/每月的 token 消耗,防止单个用户把你的 API quota 用光:
# quota_manager.py
import redis
from datetime import datetime
from typing import Tuple
class TokenQuotaManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def _daily_key(self, user_id: str) -> str:
today = datetime.now().strftime("%Y%m%d")
return f"quota:daily:{user_id}:{today}"
def _monthly_key(self, user_id: str) -> str:
month = datetime.now().strftime("%Y%m")
return f"quota:monthly:{user_id}:{month}"
def check_and_consume(self, user_id: str, estimated_tokens: int,
daily_limit: int = 100000,
monthly_limit: int = 2000000) -> Tuple[bool, dict]:
"""
检查配额并消耗
返回: (is_allowed, quota_info)
"""
daily_key = self._daily_key(user_id)
monthly_key = self._monthly_key(user_id)
pipe = self.redis.pipeline(transaction=True)
pipe.get(daily_key)
pipe.get(monthly_key)
results = pipe.execute()
daily_used = int(results[0] or 0)
monthly_used = int(results[1] or 0)
# 检查是否超限
if daily_used + estimated_tokens > daily_limit:
return False, {
"reason": "daily_limit_exceeded",
"daily_used": daily_used,
"daily_limit": daily_limit,
"monthly_used": monthly_used,
"monthly_limit": monthly_limit,
}
if monthly_used + estimated_tokens > monthly_limit:
return False, {
"reason": "monthly_limit_exceeded",
"daily_used": daily_used,
"daily_limit": daily_limit,
"monthly_used": monthly_used,
"monthly_limit": monthly_limit,
}
# 消耗配额(用 INCRBY,原子操作)
pipe = self.redis.pipeline()
pipe.incrby(daily_key, estimated_tokens)
pipe.expire(daily_key, 86400 * 2) # 2 天过期(跨天保险)
pipe.incrby(monthly_key, estimated_tokens)
pipe.expire(monthly_key, 86400 * 35) # 35 天过期
pipe.execute()
return True, {
"daily_used": daily_used + estimated_tokens,
"daily_limit": daily_limit,
"monthly_used": monthly_used + estimated_tokens,
"monthly_limit": monthly_limit,
}
def get_usage(self, user_id: str) -> dict:
daily_key = self._daily_key(user_id)
monthly_key = self._monthly_key(user_id)
pipe = self.redis.pipeline()
pipe.get(daily_key)
pipe.get(monthly_key)
results = pipe.execute()
return {
"daily_used": int(results[0] or 0),
"monthly_used": int(results[1] or 0),
}
# 使用示例
quota_manager = TokenQuotaManager(redis.Redis(host="localhost", decode_responses=True))
def protected_llm_call(user_id: str, prompt: str) -> str:
# 粗略估算 token 数
estimated_tokens = len(prompt) // 4 + 500 # 加上预期输出
allowed, quota_info = quota_manager.check_and_consume(
user_id=user_id,
estimated_tokens=estimated_tokens,
daily_limit=50000, # 免费用户每天 5 万 token
monthly_limit=500000
)
if not allowed:
reason = quota_info["reason"]
if reason == "daily_limit_exceeded":
raise Exception(f"今日 token 配额已用完,明天再来")
else:
raise Exception(f"本月 token 配额已用完")
# 调用 LLM
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content用法五:任务状态存储
这个在上一篇异步工作流里已经写了,但这里补充一个细节——用 Redis Hash 存储任务状态比用 String 更灵活,可以原子地更新单个字段:
# task_status_with_hash.py
import redis
import json
import time
class TaskStatusStore:
def __init__(self, redis_client: redis.Redis, ttl: int = 86400):
self.redis = redis_client
self.ttl = ttl
def _key(self, task_id: str) -> str:
return f"task:{task_id}"
def create(self, task_id: str, task_type: str, payload: dict) -> None:
key = self._key(task_id)
pipe = self.redis.pipeline()
pipe.hset(key, mapping={
"task_id": task_id,
"task_type": task_type,
"payload": json.dumps(payload),
"status": "pending",
"progress": "0",
"step": "",
"created_at": str(time.time()),
"updated_at": str(time.time()),
})
pipe.expire(key, self.ttl)
pipe.execute()
def update_progress(self, task_id: str, progress: float, step: str) -> None:
"""原子更新进度,不影响其他字段"""
key = self._key(task_id)
pipe = self.redis.pipeline()
pipe.hset(key, mapping={
"status": "processing",
"progress": str(progress),
"step": step,
"updated_at": str(time.time()),
})
pipe.expire(key, self.ttl)
pipe.execute()
def complete(self, task_id: str, result: dict) -> None:
key = self._key(task_id)
pipe = self.redis.pipeline()
pipe.hset(key, mapping={
"status": "completed",
"progress": "100",
"result": json.dumps(result, ensure_ascii=False),
"updated_at": str(time.time()),
})
pipe.expire(key, self.ttl)
pipe.execute()
def fail(self, task_id: str, error: str) -> None:
key = self._key(task_id)
pipe = self.redis.pipeline()
pipe.hset(key, mapping={
"status": "failed",
"error": error,
"updated_at": str(time.time()),
})
pipe.expire(key, self.ttl)
pipe.execute()
def get(self, task_id: str) -> dict:
data = self.redis.hgetall(self._key(task_id))
if not data:
return {}
result = dict(data)
if "payload" in result:
result["payload"] = json.loads(result["payload"])
if "result" in result:
result["result"] = json.loads(result["result"])
if "progress" in result:
result["progress"] = float(result["progress"])
return result用 Hash 比 String + JSON 的好处是:更新单个字段不需要先读再写,Pipeline + HSET 是原子操作,在高并发场景下更安全。
成本数据汇总
我在几个不同项目上用这些方案的实际效果:
FAQ 机器人(高频重复问题):
- Prompt 缓存命中率:65%
- API 成本降低:约 62%
- 响应时间从平均 1.2 秒降到 0.05 秒(命中缓存时)
客服系统(语义缓存):
- 语义相似命中率:38%(比精确匹配高约 15 个百分点)
- 配合精确缓存,综合命中率 58%
多用户 SaaS(配额管理):
- 消除了"一个高使用量用户把所有 API quota 用完"的情况
- 月度账单波动从 ±40% 降到 ±12%
不是每个场景都适合所有这 5 个用法,按需用,别为了用而用。但如果你的 AI 应用还没有 Prompt 缓存,几乎可以确定你在白花钱。
