AI 应用的灾备和降级——你的 AI 系统如果 OpenAI 宕机了会怎样
AI 应用的灾备和降级——你的 AI 系统如果 OpenAI 宕机了会怎样
适读人群:正在生产环境运行 AI 应用的工程师 | 阅读时长:约 14 分钟 | 核心价值:一次真实 API 宕机事故的完整复盘,以及之后建立的高可用方案:主备切换 + 降级策略 + 熔断机制
2024 年某天下午两点,我的手机开始不停地振动。
告警信息:API 调用失败率突然飙到 80%。生产环境,实时在线用户。我打开监控面板,满屏红色,所有调用 OpenAI 的请求都在报 503。
我查了一下 OpenAI 的状态页:Investigating - We are currently investigating elevated error rates on the API.
就是这么平淡的一行字,但它意味着我的系统对用户来说基本上废了。
那次事故持续了 47 分钟。复盘之后,我花了两周建立了一套完整的高可用方案。这篇文章把整个方案写出来。
事故复盘:47 分钟里发生了什么
那次宕机让我意识到我的系统有多脆弱:
第一层问题:没有主备切换
我所有的 LLM 调用都打向 OpenAI。OpenAI 一挂,没有任何 fallback,直接全线失败。
第二层问题:没有降级策略
AI 功能挂了,整个功能直接不可用,用户看到的是报错页面。没有"AI 挂了,用规则兜底"的逻辑。
第三层问题:没有熔断
在 OpenAI 已经在报错的情况下,我的系统还在不停地重试,每次重试都等 timeout,导致请求堆积,连带影响了不依赖 AI 的功能。
三层问题叠加,47 分钟里用户体验几乎是 0。
完整方案设计
复盘结束,我设计了三层防御:
第一层:主备模型切换
- 主:OpenAI GPT-4o
- 备:Anthropic Claude 3.5 Sonnet
- 策略:主模型连续失败 N 次,自动切换
第二层:功能降级
- AI 功能不可用时,降级到规则引擎
- 部分功能接受质量下降,但不接受不可用
第三层:熔断保护
- 快速失败,不等 timeout
- 防止错误级联第一层:主备模型切换
# llm_failover.py
import time
import logging
from enum import Enum
from typing import Optional
from dataclasses import dataclass, field
from openai import OpenAI, APIError, RateLimitError, APITimeoutError
import anthropic
logger = logging.getLogger(__name__)
class ModelProvider(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
@dataclass
class ProviderHealth:
provider: ModelProvider
failure_count: int = 0
last_failure_time: float = 0
is_healthy: bool = True
recovery_time: float = 300 # 5分钟后尝试恢复
failure_threshold: int = 3 # 连续失败3次触发切换
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.is_healthy = False
logger.warning(f"Provider {self.provider.value} 标记为不健康")
def record_success(self):
self.failure_count = 0
self.is_healthy = True
def should_retry(self) -> bool:
"""不健康的 provider,过了恢复时间后可以再试"""
if self.is_healthy:
return True
return time.time() - self.last_failure_time > self.recovery_time
class FailoverLLMClient:
"""带主备切换的 LLM 客户端"""
def __init__(self):
self.openai_client = OpenAI()
self.anthropic_client = anthropic.Anthropic()
self.providers = {
ModelProvider.OPENAI: ProviderHealth(ModelProvider.OPENAI),
ModelProvider.ANTHROPIC: ProviderHealth(ModelProvider.ANTHROPIC),
}
# 主备顺序
self.priority = [ModelProvider.OPENAI, ModelProvider.ANTHROPIC]
def _get_available_provider(self) -> Optional[ModelProvider]:
"""获取当前可用的 provider"""
for provider in self.priority:
health = self.providers[provider]
if health.is_healthy or health.should_retry():
return provider
return None
def _call_openai(self, messages: list, model: str = "gpt-4o-mini", **kwargs) -> str:
response = self.openai_client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
return response.choices[0].message.content
def _call_anthropic(self, messages: list, model: str = "claude-3-5-haiku-20241022", **kwargs) -> str:
# 转换消息格式
system_message = None
anthropic_messages = []
for msg in messages:
if msg["role"] == "system":
system_message = msg["content"]
else:
anthropic_messages.append({
"role": msg["role"],
"content": msg["content"]
})
response = self.anthropic_client.messages.create(
model=model,
max_tokens=kwargs.get("max_tokens", 2048),
system=system_message or "You are a helpful assistant.",
messages=anthropic_messages
)
return response.content[0].text
def chat(self, messages: list, **kwargs) -> dict:
"""
带主备切换的 chat 调用
返回: {"content": str, "provider": str, "model": str}
"""
provider = self._get_available_provider()
if provider is None:
raise Exception("所有 LLM provider 均不可用")
try:
if provider == ModelProvider.OPENAI:
content = self._call_openai(messages, **kwargs)
model = kwargs.get("model", "gpt-4o-mini")
else:
content = self._call_anthropic(messages, **kwargs)
model = kwargs.get("model", "claude-3-5-haiku-20241022")
# 成功,记录
self.providers[provider].record_success()
return {
"content": content,
"provider": provider.value,
"model": model
}
except (APIError, RateLimitError, APITimeoutError, Exception) as e:
logger.error(f"Provider {provider.value} 调用失败: {e}")
self.providers[provider].record_failure()
# 尝试备用 provider
fallback_provider = next(
(p for p in self.priority if p != provider), None
)
if fallback_provider:
logger.info(f"切换到备用 provider: {fallback_provider.value}")
try:
if fallback_provider == ModelProvider.OPENAI:
content = self._call_openai(messages, **kwargs)
else:
content = self._call_anthropic(messages, **kwargs)
self.providers[fallback_provider].record_success()
return {
"content": content,
"provider": fallback_provider.value,
"model": "fallback"
}
except Exception as e2:
self.providers[fallback_provider].record_failure()
raise Exception(f"主备 provider 均失败。主: {e}; 备: {e2}")
raise
# 单例
llm_client = FailoverLLMClient()第二层:功能降级
当 AI 不可用时,不是直接返回报错,而是降级到规则兜底。以意图分类为例:
# degradation.py
from enum import Enum
from typing import Optional
import re
import logging
logger = logging.getLogger(__name__)
class Intent(Enum):
COMPLAINT = "complaint"
INQUIRY = "inquiry"
REFUND = "refund"
OTHER = "other"
class RuleBasedIntentClassifier:
"""规则引擎:AI 不可用时的兜底方案"""
COMPLAINT_KEYWORDS = ["投诉", "差评", "不满", "太差", "垃圾", "坑", "骗", "退款投诉"]
REFUND_KEYWORDS = ["退款", "退货", "退钱", "退回", "申请退"]
INQUIRY_KEYWORDS = ["如何", "怎么", "怎样", "是什么", "能不能", "可以吗", "请问", "查询", "了解"]
def classify(self, text: str) -> Intent:
text_lower = text.lower()
# 优先级:退款 > 投诉 > 咨询 > 其他
for keyword in self.REFUND_KEYWORDS:
if keyword in text_lower:
return Intent.REFUND
for keyword in self.COMPLAINT_KEYWORDS:
if keyword in text_lower:
return Intent.COMPLAINT
for keyword in self.INQUIRY_KEYWORDS:
if keyword in text_lower:
return Intent.INQUIRY
return Intent.OTHER
class IntentClassifierWithDegradation:
"""带降级的意图分类器"""
def __init__(self, llm_client: FailoverLLMClient):
self.llm_client = llm_client
self.rule_classifier = RuleBasedIntentClassifier()
self._use_ai = True
self._consecutive_ai_failures = 0
self._ai_failure_threshold = 5
def classify(self, text: str) -> dict:
"""
返回: {
"intent": Intent,
"method": "ai" | "rule",
"confidence": float
}
"""
if self._use_ai:
try:
result = self.llm_client.chat(
messages=[
{"role": "system", "content": "分类用户意图,只返回: complaint, inquiry, refund, 或 other"},
{"role": "user", "content": text}
],
temperature=0
)
raw = result["content"].strip().lower()
try:
intent = Intent(raw)
except ValueError:
intent = Intent.OTHER
self._consecutive_ai_failures = 0
return {"intent": intent, "method": "ai", "confidence": 0.9}
except Exception as e:
logger.error(f"AI 分类失败: {e}")
self._consecutive_ai_failures += 1
if self._consecutive_ai_failures >= self._ai_failure_threshold:
logger.warning("AI 连续失败过多,暂时切换到规则引擎")
self._use_ai = False
# 规则兜底
intent = self.rule_classifier.classify(text)
logger.info(f"使用规则引擎分类: {intent}")
return {"intent": intent, "method": "rule", "confidence": 0.7}
def restore_ai(self):
"""手动或定时恢复 AI 模式"""
self._use_ai = True
self._consecutive_ai_failures = 0
logger.info("已恢复 AI 分类模式")关键是:用规则兜底的结果质量会下降,但系统不会报错,用户体验的是"差一点的服务",而不是"完全坏掉的服务"。对用户来说,差一点但能用,远比完全不能用好。
第三层:熔断机制
熔断是为了防止级联故障。当 LLM 持续失败时,快速失败,不让请求堆积:
# circuit_breaker.py
import time
from enum import Enum
from typing import Callable, Any
import logging
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = "closed" # 正常工作
OPEN = "open" # 熔断中,直接拒绝
HALF_OPEN = "half_open" # 试探性恢复
class CircuitBreaker:
"""
熔断器实现:
- CLOSED:正常通过
- OPEN:快速失败,不发实际请求
- HALF_OPEN:允许少量请求通过,检测是否恢复
"""
def __init__(self,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 60.0,
name: str = "default"):
self.failure_threshold = failure_threshold # 触发熔断的失败次数
self.success_threshold = success_threshold # 从 HALF_OPEN 恢复需要的成功次数
self.timeout = timeout # OPEN 状态持续时间(秒)
self.name = name
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
def _should_attempt(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# 超过 timeout,转为 HALF_OPEN
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
logger.info(f"熔断器 [{self.name}] 进入 HALF_OPEN 状态")
return True
return False
if self.state == CircuitState.HALF_OPEN:
return True
return False
def _record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
logger.info(f"熔断器 [{self.name}] 恢复为 CLOSED 状态")
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
def _record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
logger.warning(f"熔断器 [{self.name}] 从 HALF_OPEN 重新触发 OPEN")
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.warning(f"熔断器 [{self.name}] 触发 OPEN,失败次数: {self.failure_count}")
def call(self, func: Callable, *args, **kwargs) -> Any:
if not self._should_attempt():
raise Exception(f"熔断器 [{self.name}] 处于 OPEN 状态,拒绝请求")
try:
result = func(*args, **kwargs)
self._record_success()
return result
except Exception as e:
self._record_failure()
raise
@property
def status(self) -> dict:
return {
"name": self.name,
"state": self.state.value,
"failure_count": self.failure_count,
"success_count": self.success_count,
}
# 全局熔断器实例
openai_breaker = CircuitBreaker(
failure_threshold=5,
success_threshold=2,
timeout=60,
name="openai_api"
)
# 在 FailoverLLMClient 里集成熔断器
def call_with_circuit_breaker(messages: list, **kwargs) -> dict:
try:
return openai_breaker.call(llm_client.chat, messages, **kwargs)
except Exception as e:
if "熔断器" in str(e):
# 熔断状态,直接返回错误,不走降级逻辑
raise
raise监控告警:必须有
这套防护机制要配合监控才有用,不然你不知道它有没有在工作:
# monitoring.py
import time
from collections import deque
from typing import Deque
class LLMHealthMonitor:
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.call_results: Deque[dict] = deque(maxlen=window_size)
def record_call(self, provider: str, success: bool,
latency_ms: float, used_fallback: bool = False):
self.call_results.append({
"timestamp": time.time(),
"provider": provider,
"success": success,
"latency_ms": latency_ms,
"used_fallback": used_fallback,
})
# 检查是否需要告警
self._check_alerts()
def _check_alerts(self):
if len(self.call_results) < 10:
return
recent = list(self.call_results)[-20:]
failure_rate = sum(1 for r in recent if not r["success"]) / len(recent)
fallback_rate = sum(1 for r in recent if r["used_fallback"]) / len(recent)
if failure_rate > 0.3:
self._send_alert(f"LLM 调用失败率过高: {failure_rate:.0%}")
if fallback_rate > 0.5:
self._send_alert(f"LLM 备用 provider 使用率过高: {fallback_rate:.0%},主 provider 可能有问题")
def _send_alert(self, message: str):
# 接入你的告警系统(钉钉、飞书、PagerDuty 等)
import logging
logging.getLogger("alert").critical(f"[LLM ALERT] {message}")
def get_stats(self) -> dict:
if not self.call_results:
return {}
recent = list(self.call_results)
success_rate = sum(1 for r in recent if r["success"]) / len(recent)
avg_latency = sum(r["latency_ms"] for r in recent) / len(recent)
fallback_count = sum(1 for r in recent if r["used_fallback"])
return {
"total_calls": len(recent),
"success_rate": f"{success_rate:.1%}",
"avg_latency_ms": f"{avg_latency:.0f}",
"fallback_count": fallback_count,
}事故之后的变化
那次 47 分钟的事故,让我意识到 AI 应用的高可用和传统应用的高可用本质上一样,只是细节不同。
传统应用要做数据库主备、服务多副本;AI 应用要做 LLM 主备、功能降级、熔断保护。
建完这套方案之后,我们又经历了两次 OpenAI 的故障(可以从 status 页看到),每次对我们的影响都很有限:流量自动切到 Anthropic,降级逻辑保证了基本可用性,熔断防止了请求堆积。
这些机制写完之后不占用太多维护成本,但在关键时刻能救命。如果你的 AI 应用已经在生产环境跑着,而且你没有 fallback,建议今天就把主备切换加上,这是最基础的防护。
