第2449篇:AI系统的供应商管理——多个LLM服务商的统一管理策略
2026/4/30大约 7 分钟
第2449篇:AI系统的供应商管理——多个LLM服务商的统一管理策略
适读人群:AI平台工程师、技术负责人、采购负责人 | 阅读时长:约13分钟 | 核心价值:建立多LLM服务商的统一管理体系,降低依赖风险并优化成本
我们在一个季度里同时接入了四个LLM服务商:主力用OpenAI,图文理解用Claude,中文场景用通义千问,代码生成用DeepSeek。
每个服务商都很好,但随之而来的管理混乱让团队头疼:API Key散落在各处,有人直接硬编码在代码里;账单分散在四个控制台,没有统一视图;不同服务商的限速策略不同,各自的重试逻辑都不一样;某个服务商出了问题,没有统一的监控能第一时间发现。
这不是供应商选型的问题,而是多供应商管理的问题。
一、多LLM供应商的典型挑战
二、统一API抽象层
解决API异构性的核心是建立统一的抽象层:
from abc import ABC, abstractmethod
from typing import List, Optional, AsyncGenerator
from dataclasses import dataclass
import time
@dataclass
class Message:
role: str # "system", "user", "assistant"
content: str
@dataclass
class CompletionRequest:
messages: List[Message]
model: Optional[str] = None # 如果不指定,由路由决定
temperature: float = 0.7
max_tokens: Optional[int] = None
stream: bool = False
metadata: dict = None # 用于路由决策的元数据
@dataclass
class CompletionResponse:
content: str
provider: str # 实际使用的供应商
model: str # 实际使用的模型
input_tokens: int
output_tokens: int
latency_ms: float
cost_usd: float
request_id: str # 用于追踪
class BaseLLMProvider(ABC):
"""LLM提供商基础抽象"""
@property
@abstractmethod
def provider_name(self) -> str:
pass
@abstractmethod
async def complete(self, request: CompletionRequest) -> CompletionResponse:
"""执行文本生成"""
pass
@abstractmethod
async def health_check(self) -> bool:
"""检查服务商健康状态"""
pass
@abstractmethod
def estimate_cost(self, input_tokens: int, output_tokens: int,
model: str) -> float:
"""估算成本(USD)"""
pass
class OpenAIProvider(BaseLLMProvider):
"""OpenAI服务商实现"""
PRICING = {
"gpt-4o": {"input": 0.0025 / 1000, "output": 0.010 / 1000},
"gpt-4o-mini": {"input": 0.00015 / 1000, "output": 0.0006 / 1000},
"gpt-3.5-turbo": {"input": 0.0005 / 1000, "output": 0.0015 / 1000}
}
def __init__(self, api_key: str, default_model: str = "gpt-4o-mini"):
import openai
self.client = openai.AsyncOpenAI(api_key=api_key)
self.default_model = default_model
@property
def provider_name(self) -> str:
return "openai"
async def complete(self, request: CompletionRequest) -> CompletionResponse:
model = request.model or self.default_model
start_time = time.time()
messages = [{"role": m.role, "content": m.content}
for m in request.messages]
params = {
"model": model,
"messages": messages,
"temperature": request.temperature
}
if request.max_tokens:
params["max_tokens"] = request.max_tokens
response = await self.client.chat.completions.create(**params)
latency_ms = (time.time() - start_time) * 1000
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
return CompletionResponse(
content=response.choices[0].message.content,
provider=self.provider_name,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
cost_usd=self.estimate_cost(input_tokens, output_tokens, model),
request_id=response.id
)
async def health_check(self) -> bool:
try:
await self.client.models.list()
return True
except Exception:
return False
def estimate_cost(self, input_tokens: int, output_tokens: int,
model: str) -> float:
pricing = self.PRICING.get(model, {"input": 0, "output": 0})
return (input_tokens * pricing["input"] +
output_tokens * pricing["output"])
class AnthropicProvider(BaseLLMProvider):
"""Anthropic Claude服务商实现"""
PRICING = {
"claude-3-5-sonnet-20241022": {"input": 0.003 / 1000, "output": 0.015 / 1000},
"claude-3-haiku-20240307": {"input": 0.00025 / 1000, "output": 0.00125 / 1000}
}
def __init__(self, api_key: str, default_model: str = "claude-3-haiku-20240307"):
import anthropic
self.client = anthropic.AsyncAnthropic(api_key=api_key)
self.default_model = default_model
@property
def provider_name(self) -> str:
return "anthropic"
async def complete(self, request: CompletionRequest) -> CompletionResponse:
model = request.model or self.default_model
start_time = time.time()
# Anthropic的system消息处理方式不同
system_content = ""
user_messages = []
for msg in request.messages:
if msg.role == "system":
system_content = msg.content
else:
user_messages.append({"role": msg.role, "content": msg.content})
params = {
"model": model,
"messages": user_messages,
"max_tokens": request.max_tokens or 1024,
"temperature": request.temperature
}
if system_content:
params["system"] = system_content
response = await self.client.messages.create(**params)
latency_ms = (time.time() - start_time) * 1000
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
return CompletionResponse(
content=response.content[0].text,
provider=self.provider_name,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
cost_usd=self.estimate_cost(input_tokens, output_tokens, model),
request_id=response.id
)
async def health_check(self) -> bool:
try:
# 发送最小化请求测试
await self.client.messages.create(
model=self.default_model,
messages=[{"role": "user", "content": "hi"}],
max_tokens=5
)
return True
except Exception:
return False
def estimate_cost(self, input_tokens: int, output_tokens: int,
model: str) -> float:
pricing = self.PRICING.get(model, {"input": 0, "output": 0})
return (input_tokens * pricing["input"] +
output_tokens * pricing["output"])三、统一路由器
class LLMRouter:
"""LLM统一路由器"""
def __init__(self):
self.providers: dict[str, BaseLLMProvider] = {}
self.routing_rules: list[dict] = []
self.circuit_breakers: dict[str, dict] = {}
def register_provider(self, provider: BaseLLMProvider):
"""注册服务商"""
self.providers[provider.provider_name] = provider
self.circuit_breakers[provider.provider_name] = {
"failure_count": 0,
"last_failure": None,
"is_open": False, # open = 熔断触发
"reset_timeout": 60 # 60秒后尝试恢复
}
def add_routing_rule(self, rule: dict):
"""
添加路由规则(按优先级顺序)
rule示例:
{
"condition": lambda req: req.metadata.get("task") == "code_generation",
"provider": "deepseek",
"fallback": "openai",
"reason": "代码生成任务使用DeepSeek"
}
"""
self.routing_rules.append(rule)
async def route(self, request: CompletionRequest) -> CompletionResponse:
"""根据规则路由请求"""
# 确定目标服务商
target_provider, fallback_provider = self._select_provider(request)
# 尝试主服务商
try:
if not self._is_circuit_open(target_provider):
response = await self.providers[target_provider].complete(request)
self._record_success(target_provider)
return response
except Exception as e:
self._record_failure(target_provider)
print(f"Primary provider {target_provider} failed: {e}")
# 尝试备用服务商
if fallback_provider and fallback_provider != target_provider:
try:
if not self._is_circuit_open(fallback_provider):
response = await self.providers[fallback_provider].complete(request)
self._record_success(fallback_provider)
return response
except Exception as e:
self._record_failure(fallback_provider)
print(f"Fallback provider {fallback_provider} failed: {e}")
raise RuntimeError("All providers failed")
def _select_provider(self, request: CompletionRequest) -> tuple[str, Optional[str]]:
"""根据路由规则选择服务商"""
for rule in self.routing_rules:
if rule.get("condition") and rule["condition"](request):
return rule["provider"], rule.get("fallback")
# 默认路由:使用第一个健康的服务商
for provider_name in self.providers:
if not self._is_circuit_open(provider_name):
return provider_name, None
raise RuntimeError("No healthy providers available")
def _is_circuit_open(self, provider_name: str) -> bool:
"""检查熔断器状态"""
cb = self.circuit_breakers.get(provider_name, {})
if not cb.get("is_open"):
return False
# 检查是否可以尝试恢复
if cb.get("last_failure"):
elapsed = time.time() - cb["last_failure"]
if elapsed > cb["reset_timeout"]:
cb["is_open"] = False
cb["failure_count"] = 0
return False
return True
def _record_success(self, provider_name: str):
cb = self.circuit_breakers[provider_name]
cb["failure_count"] = 0
cb["is_open"] = False
def _record_failure(self, provider_name: str):
cb = self.circuit_breakers[provider_name]
cb["failure_count"] += 1
cb["last_failure"] = time.time()
if cb["failure_count"] >= 5: # 连续5次失败触发熔断
cb["is_open"] = True四、统一API Key管理
class LLMKeyVault:
"""LLM API Key统一管理"""
def __init__(self, secret_backend):
"""
secret_backend: 密钥存储后端
可以是 HashiCorp Vault、AWS Secrets Manager、Azure Key Vault 等
"""
self.backend = secret_backend
self._key_cache = {}
self._cache_ttl = 300 # 5分钟缓存
def get_key(self, provider: str, key_name: str = "default") -> str:
"""获取API Key(带缓存)"""
cache_key = f"{provider}/{key_name}"
if cache_key in self._key_cache:
cached = self._key_cache[cache_key]
if time.time() - cached["cached_at"] < self._cache_ttl:
return cached["value"]
# 从Secret Manager获取
secret_path = f"ai-keys/{provider}/{key_name}"
key_value = self.backend.get_secret(secret_path)
self._key_cache[cache_key] = {
"value": key_value,
"cached_at": time.time()
}
return key_value
def rotate_key(self, provider: str, new_key: str, key_name: str = "default"):
"""轮换API Key"""
secret_path = f"ai-keys/{provider}/{key_name}"
self.backend.update_secret(secret_path, new_key)
# 清除缓存
cache_key = f"{provider}/{key_name}"
if cache_key in self._key_cache:
del self._key_cache[cache_key]
# 记录轮换日志
print(f"Key rotated for {provider}/{key_name} at {time.time()}")五、统一成本视图
class MultiProviderCostDashboard:
"""多供应商成本统一视图"""
def get_cost_summary(self, start_time: int, end_time: int,
group_by: str = "provider") -> dict:
"""
获取成本汇总
group_by: "provider" | "team" | "model" | "date"
"""
# 从使用记录数据库聚合
raw_data = self._query_usage_records(start_time, end_time)
summary = {}
for record in raw_data:
key = record.get(group_by, "unknown")
if key not in summary:
summary[key] = {
"total_requests": 0,
"total_input_tokens": 0,
"total_output_tokens": 0,
"total_cost_usd": 0.0
}
summary[key]["total_requests"] += 1
summary[key]["total_input_tokens"] += record.get("input_tokens", 0)
summary[key]["total_output_tokens"] += record.get("output_tokens", 0)
summary[key]["total_cost_usd"] += record.get("cost_usd", 0)
# 转换为人民币(汇率示意)
usd_to_cny = 7.2
for key in summary:
summary[key]["total_cost_cny"] = round(
summary[key]["total_cost_usd"] * usd_to_cny, 2
)
return summary
def detect_cost_anomalies(self, lookback_days: int = 7) -> list:
"""检测成本异常(突然增加)"""
anomalies = []
end_time = int(time.time())
start_time = end_time - (lookback_days * 86400)
# 按天分组对比
daily_costs = self._get_daily_costs(start_time, end_time)
if len(daily_costs) < 3:
return anomalies
# 计算最近7天的平均值和标准差
costs = [d["cost"] for d in daily_costs[:-1]] # 排除今天
if not costs:
return anomalies
avg = sum(costs) / len(costs)
std = (sum((c - avg) ** 2 for c in costs) / len(costs)) ** 0.5
# 今天的成本是否超出均值+2倍标准差
today_cost = daily_costs[-1]["cost"] if daily_costs else 0
if avg > 0 and today_cost > avg + 2 * std:
anomalies.append({
"type": "cost_spike",
"today_cost": today_cost,
"avg_cost": avg,
"deviation_factor": today_cost / avg if avg > 0 else 0,
"recommendation": "检查是否有异常大量调用或某个业务的成本暴增"
})
return anomalies
def _query_usage_records(self, start_time: int, end_time: int) -> list:
return []
def _get_daily_costs(self, start_time: int, end_time: int) -> list:
return []六、供应商健康监控
class ProviderHealthMonitor:
"""供应商健康监控"""
async def check_all_providers(self, providers: dict) -> dict:
"""并行检查所有供应商健康状态"""
import asyncio
results = {}
tasks = {
name: provider.health_check()
for name, provider in providers.items()
}
for name, task in tasks.items():
try:
is_healthy = await asyncio.wait_for(task, timeout=10)
results[name] = {
"status": "healthy" if is_healthy else "unhealthy",
"checked_at": int(time.time())
}
except asyncio.TimeoutError:
results[name] = {
"status": "timeout",
"checked_at": int(time.time())
}
except Exception as e:
results[name] = {
"status": "error",
"error": str(e),
"checked_at": int(time.time())
}
return results
def get_external_status_pages(self) -> dict:
"""主要LLM服务商的官方状态页"""
return {
"openai": "https://status.openai.com",
"anthropic": "https://status.anthropic.com",
"azure": "https://azure.status.microsoft.com",
"google_ai": "https://status.cloud.google.com"
}多LLM供应商管理的核心思想是:不要让业务代码感知到有多少个供应商。通过统一的抽象层、路由器、密钥管理和成本视图,把供应商的多样性封装在基础设施层,让业务团队只需要提出需求,不需要了解背后用的是哪家的API。
