Python 缓存实战——functools.lru_cache、Redis 缓存装饰器、缓存策略
Python 缓存实战——functools.lru_cache、Redis 缓存装饰器、缓存策略
适读人群:需要优化接口性能的后端开发者、AI 工程师 | 阅读时长:约15分钟 | 核心价值:从进程内缓存到分布式缓存的完整工程方案
一个接口从 3 秒优化到 50 毫秒的故事
去年我在做一个 AI 推荐系统,有个接口是根据用户 ID 返回推荐商品列表,背后要做相似度计算,平均耗时约2.8秒。
上线第一周用户还可以接受,第二周运营搞了个活动,流量涨了5倍,API Gateway 开始报大量超时,P99 延迟飙到了20秒多。
紧急排查后发现,同样的用户,在同一天内会多次请求这个接口(每次打开 App 首页都会调用),而每次请求都重新算一遍推荐,毫无意义。推荐结果其实可以缓存15分钟。
加了 Redis 缓存之后,缓存命中率达到了85%,接口平均响应时间从2.8秒降到了180毫秒;对于缓存命中的请求,响应时间不到50毫秒。这是一次代价很小但收益极大的优化。
今天来系统讲讲 Python 缓存的完整工程实践。
一、进程内缓存——lru_cache 和 cache
from functools import lru_cache, cache
import time
# lru_cache:LRU 淘汰策略,限制缓存条目数
@lru_cache(maxsize=128)
def get_user_config(user_id: int) -> dict:
"""用户配置,变化不频繁,适合缓存"""
print(f"[DB查询] 获取用户配置: user_id={user_id}")
# 模拟数据库查询
time.sleep(0.1)
return {"user_id": user_id, "theme": "dark", "language": "zh-CN"}
# Python 3.9+ cache 等同于 lru_cache(maxsize=None)
@cache
def compute_fibonacci(n: int) -> int:
"""纯计算函数,结果永久缓存(适合数学计算)"""
if n < 2:
return n
return compute_fibonacci(n - 1) + compute_fibonacci(n - 2)
# 测试 lru_cache
start = time.time()
for _ in range(10):
config = get_user_config(123)
print(f"10次调用耗时: {time.time()-start:.3f}s(只有第1次真正查询)")
# 查看缓存统计
info = get_user_config.cache_info()
print(f"缓存: hits={info.hits}, misses={info.misses}, maxsize={info.maxsize}")
# 清除缓存(需要刷新时手动清除)
get_user_config.cache_clear()
# 带 TTL 的 lru_cache 封装
import threading
from datetime import datetime, timedelta
class TTLCache:
"""带过期时间的进程内缓存"""
def __init__(self, maxsize: int = 128, ttl: int = 300):
self.maxsize = maxsize
self.ttl = ttl
self._cache: dict = {}
self._timestamps: dict = {}
self._lock = threading.Lock()
def get(self, key):
with self._lock:
if key not in self._cache:
return None
if time.time() - self._timestamps[key] > self.ttl:
del self._cache[key]
del self._timestamps[key]
return None
return self._cache[key]
def set(self, key, value):
with self._lock:
if len(self._cache) >= self.maxsize:
# 淘汰最老的条目
oldest_key = min(self._timestamps, key=self._timestamps.get)
del self._cache[oldest_key]
del self._timestamps[oldest_key]
self._cache[key] = value
self._timestamps[key] = time.time()
def __call__(self, func):
"""作为装饰器使用"""
def wrapper(*args, **kwargs):
cache_key = str(args) + str(sorted(kwargs.items()))
cached = self.get(cache_key)
if cached is not None:
return cached
result = func(*args, **kwargs)
self.set(cache_key, result)
return result
wrapper.cache_clear = lambda: self._cache.clear()
return wrapper
# 使用 TTL 缓存
user_cache = TTLCache(maxsize=1000, ttl=60)
@user_cache
def get_user_info(user_id: int) -> dict:
print(f"[DB查询] user_id={user_id}")
return {"id": user_id, "name": "张三"}二、Redis 缓存装饰器——分布式缓存核心方案
import json
import hashlib
import functools
import logging
from typing import Any, Callable, Optional
from datetime import timedelta
import redis
logger = logging.getLogger(__name__)
class RedisCache:
"""Redis 缓存管理器"""
def __init__(
self,
redis_url: str = "redis://localhost:6379/0",
key_prefix: str = "cache",
default_ttl: int = 300,
):
self.redis = redis.from_url(
redis_url,
decode_responses=True,
socket_timeout=2, # 连接超时2秒
socket_connect_timeout=2,
retry_on_timeout=True,
)
self.key_prefix = key_prefix
self.default_ttl = default_ttl
def _make_key(self, namespace: str, *args, **kwargs) -> str:
"""生成缓存 Key"""
# 对参数做哈希,避免 Key 太长
params = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True, default=str)
param_hash = hashlib.md5(params.encode()).hexdigest()[:8]
return f"{self.key_prefix}:{namespace}:{param_hash}"
def get(self, key: str) -> Optional[Any]:
try:
value = self.redis.get(key)
if value is None:
return None
return json.loads(value)
except (redis.RedisError, json.JSONDecodeError) as e:
logger.warning(f"缓存读取失败: {key}, {e}")
return None
def set(self, key: str, value: Any, ttl: int = None) -> bool:
try:
serialized = json.dumps(value, ensure_ascii=False, default=str)
self.redis.setex(key, ttl or self.default_ttl, serialized)
return True
except (redis.RedisError, TypeError) as e:
logger.warning(f"缓存写入失败: {key}, {e}")
return False
def delete(self, key: str) -> bool:
try:
return bool(self.redis.delete(key))
except redis.RedisError:
return False
def delete_pattern(self, pattern: str) -> int:
"""批量删除匹配 pattern 的 key(小心用,生产慎用 KEYS 命令)"""
try:
keys = list(self.redis.scan_iter(f"{self.key_prefix}:{pattern}:*"))
if keys:
return self.redis.delete(*keys)
return 0
except redis.RedisError:
return 0
def cached(
self,
namespace: str,
ttl: int = None,
cache_none: bool = False,
fallback_on_error: bool = True,
):
"""
缓存装饰器
:param namespace: 命名空间,用于缓存 Key 分组
:param ttl: 缓存过期时间(秒)
:param cache_none: 是否缓存 None 值(防穿透)
:param fallback_on_error: Redis 异常时是否 fallback 到直接调用
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
cache_key = self._make_key(namespace, *args, **kwargs)
# 尝试读缓存
cached_value = self.get(cache_key)
if cached_value is not None:
logger.debug(f"缓存命中: {cache_key}")
return cached_value["data"] if cache_none else cached_value
# 执行函数
try:
result = func(*args, **kwargs)
except Exception:
raise
# 写缓存(None 值处理)
if result is None and not cache_none:
return result
store_value = {"data": result} if cache_none else result
self.set(cache_key, store_value, ttl or self.default_ttl)
return result
# 提供手动清除缓存的方法
def invalidate(*args, **kwargs):
cache_key = self._make_key(namespace, *args, **kwargs)
return self.delete(cache_key)
wrapper.invalidate = invalidate
wrapper.cache_namespace = namespace
return wrapper
return decorator
# 全局缓存实例
cache = RedisCache(
redis_url="redis://localhost:6379/0",
key_prefix="myapp",
default_ttl=300,
)
# 使用示例
@cache.cached("recommendation", ttl=900) # 推荐结果缓存15分钟
def get_recommendations(user_id: int, category: str = "all") -> list[dict]:
"""AI 推荐接口(计算密集)"""
print(f"[计算] 生成推荐: user_id={user_id}")
# 复杂的推荐计算
return [{"item_id": i, "score": 0.9 - i * 0.1} for i in range(10)]
@cache.cached("user", ttl=60) # 用户信息缓存1分钟
def get_user(user_id: int) -> dict:
print(f"[DB] 查询用户: user_id={user_id}")
return {"id": user_id, "name": "张三", "level": "VIP"}
# 使用示例
result = get_recommendations(123, "electronics")
result2 = get_recommendations(123, "electronics") # 命中缓存
print(f"两次调用结果一致: {result == result2}")
# 手动清除某个用户的推荐缓存
get_recommendations.invalidate(123, "electronics")三、缓存策略——这才是难点
踩坑实录1:缓存穿透——查询不存在的数据把 DB 打死
现象:某批恶意请求,查询不存在的 user_id,全部穿透到数据库,DB 扛不住。
原因:不存在的数据没有缓存,每次都要去 DB 查,结果还是没有,缓存没有价值。
解法:对 None 结果也缓存(设置较短的 TTL),用 cache_none=True。
踩坑实录2:缓存雪崩——大量缓存同时失效,DB 被瞬间打垮
现象:系统高峰期突然变慢,DB 连接池耗尽,原来是大批缓存同时过期。
原因:批量设置缓存时用了相同的 TTL,导致大量 key 在同一时刻过期。
解法:TTL 加随机抖动,避免同时过期。
import random
def set_with_jitter(key: str, value: Any, base_ttl: int = 300):
"""TTL 加随机抖动,范围 ±10%"""
jitter = random.randint(-base_ttl // 10, base_ttl // 10)
actual_ttl = base_ttl + jitter
cache.set(key, value, actual_ttl)踩坑实录3:缓存击穿——热点缓存失效瞬间大量并发打到 DB
现象:某个热门商品的缓存失效的瞬间,数百个并发请求同时打到 DB,DB 短暂压力剧增。
原因:热点数据缓存失效后,多个请求同时发现缓存不存在,都去 DB 查询,回写缓存。
解法:互斥锁,只让第一个请求去 DB 查,其他请求等待。
import redis
import time
def get_with_mutex_lock(
redis_client,
cache_key: str,
fetch_func,
ttl: int = 300,
lock_timeout: int = 5,
) -> Any:
"""防缓存击穿:互斥锁方案"""
# 先读缓存
value = redis_client.get(cache_key)
if value:
return json.loads(value)
# 尝试获取互斥锁
lock_key = f"lock:{cache_key}"
acquired = redis_client.set(lock_key, "1", nx=True, ex=lock_timeout)
if acquired:
# 获取到锁,去 DB 查询
try:
result = fetch_func()
redis_client.setex(cache_key, ttl, json.dumps(result))
return result
finally:
redis_client.delete(lock_key)
else:
# 未获取到锁,等待后重试(最多等5秒)
for _ in range(50):
time.sleep(0.1)
value = redis_client.get(cache_key)
if value:
return json.loads(value)
# 仍然没有,降级直接查 DB
return fetch_func()四、选型建议
| 场景 | 推荐方案 |
|---|---|
| 纯计算函数(无副作用) | functools.cache / lru_cache |
| 单进程,需要 TTL | TTLCache 封装 |
| 多进程/多机分布式 | Redis 缓存 |
| 查询结果缓存(API/DB) | Redis + 缓存装饰器 |
| 复杂缓存策略 | 专用缓存库(dogpile.cache) |
缓存是性能优化的最强武器,但也是一把双刃剑。用好了接口响应时间下降90%,用错了缓存数据过期不一致、数据不一致 bug 排查困难。先把业务逻辑做对,再来考虑缓存,不要为了性能提前引入不必要的复杂度。
