第2299篇:Multi-Cloud AI部署——在多个云厂商之间分发AI工作负载
第2299篇:Multi-Cloud AI部署——在多个云厂商之间分发AI工作负载
适读人群:需要考虑AI服务高可用和多云战略的架构师 | 阅读时长:约14分钟 | 核心价值:理解多云AI架构的真实驱动因素,掌握工作负载分发的工程实现
我接触过一家中型企业,他们的AI系统同时用了两个AI API提供商:主用Claude,备用GPT-4o。不是因为追求技术先进,是因为他们吃过一次苦头。
2024年某天早上,Anthropic的API出现了大规模故障,持续了两个多小时。他们的AI客服系统完全停摆,损失相当可观。那次事故之后,他们花了两周时间把系统改成了双提供商模式。
这就是多云AI的最朴素的驱动力:单一提供商是单点,任何提供商都会有故障,分散就是保险。
多云AI的几种模式
主备模式(Active-Passive):正常用主提供商,主提供商故障时切到备用。实现简单,但备用平时不干活,资源有些浪费。
主从模式(Active-Active):两个提供商同时处理流量,按比例分配(比如7:3)。可用性更高,也能比较两个提供商的质量。
成本驱动路由:根据不同提供商的价格,把不同类型的请求路由到最便宜的提供商:简单查询用国产低成本模型,复杂分析用Claude。
能力驱动路由:根据任务类型选最适合的模型:代码用Claude,数学推理用o3,中文内容用Qwen。
统一的多云AI客户端
核心是设计一个屏蔽底层差异的统一接口:
// 统一接口
public interface UnifiedAiClient {
String complete(String prompt, CompletionConfig config);
Flux<String> streamComplete(String prompt, CompletionConfig config);
float[] embed(String text);
}
// Claude实现
@Service("claudeClient")
public class ClaudeUnifiedClient implements UnifiedAiClient {
private final AnthropicClient anthropicClient;
@Override
public String complete(String prompt, CompletionConfig config) {
MessageResponse response = anthropicClient.messages().create(
MessageCreateParams.builder()
.model(config.getModelHint() != null ? config.getModelHint() : "claude-sonnet-4-5")
.maxTokens(config.getMaxTokens())
.messages(List.of(MessageParam.user(prompt)))
.build()
);
return extractText(response);
}
@Override
public Flux<String> streamComplete(String prompt, CompletionConfig config) {
return Flux.create(sink -> {
anthropicClient.messages().stream(/* ... */)
.on(ContentBlockDeltaEvent.class, e -> {
if (e.getDelta() instanceof TextDelta td) {
sink.next(td.getText());
}
})
.on(MessageStopEvent.class, e -> sink.complete())
.execute();
});
}
@Override
public float[] embed(String text) {
throw new UnsupportedOperationException("Claude不支持embedding,使用其他提供商");
}
}
// OpenAI实现
@Service("openaiClient")
public class OpenAIUnifiedClient implements UnifiedAiClient {
private final OpenAIClient openAIClient;
@Override
public String complete(String prompt, CompletionConfig config) {
ChatCompletion completion = openAIClient.chat().completions().create(
ChatCompletionCreateParams.builder()
.model(config.getModelHint() != null ? config.getModelHint() : "gpt-4o")
.maxTokens(config.getMaxTokens())
.addUserMessage(prompt)
.build()
);
return completion.choices().get(0).message().content().orElse("");
}
@Override
public float[] embed(String text) {
// OpenAI embedding
CreateEmbeddingResponse response = openAIClient.embeddings().create(
EmbeddingCreateParams.builder()
.model("text-embedding-3-small")
.input(text)
.build()
);
return toFloatArray(response.data().get(0).embedding());
}
}多云路由器:智能分发请求
@Service
public class MultiCloudAiRouter {
private final Map<String, UnifiedAiClient> clients;
private final ProviderHealthChecker healthChecker;
private final RoutingConfig routingConfig;
/**
* 根据路由策略选择合适的AI提供商
*/
public String route(AiRequest request) {
// 1. 基于任务类型的能力路由
String preferredProvider = routingConfig.getPreferredProvider(request.getTaskType());
// 2. 检查首选提供商是否健康
if (healthChecker.isHealthy(preferredProvider)) {
try {
return clients.get(preferredProvider).complete(
request.getPrompt(), request.getConfig()
);
} catch (ProviderException e) {
log.warn("首选提供商{}调用失败,尝试备用", preferredProvider, e);
// 更新健康状态
healthChecker.recordFailure(preferredProvider);
}
}
// 3. 故障转移:找一个健康的备用提供商
List<String> healthyProviders = clients.keySet().stream()
.filter(p -> !p.equals(preferredProvider))
.filter(healthChecker::isHealthy)
.collect(Collectors.toList());
if (healthyProviders.isEmpty()) {
throw new AllProvidersUnavailableException("所有AI提供商均不可用");
}
String fallbackProvider = healthyProviders.get(0);
log.warn("使用备用提供商: {}", fallbackProvider);
return clients.get(fallbackProvider).complete(
adaptPromptForProvider(request.getPrompt(), fallbackProvider),
request.getConfig()
);
}
/**
* 部分场景下,不同模型对prompt格式有偏好,做适当调整
*/
private String adaptPromptForProvider(String prompt, String provider) {
// 大多数情况下不需要特殊处理
// 某些模型对特定格式响应更好,可以在这里做轻微调整
return prompt;
}
}
@Component
public class ProviderHealthChecker {
private final Map<String, AtomicInteger> failureCounters = new ConcurrentHashMap<>();
private final Map<String, Long> circuitOpenTime = new ConcurrentHashMap<>();
// 熔断参数
private static final int FAILURE_THRESHOLD = 5; // 连续5次失败触发熔断
private static final long CIRCUIT_OPEN_DURATION = 60000; // 熔断持续60秒
public boolean isHealthy(String provider) {
Long openTime = circuitOpenTime.get(provider);
if (openTime == null) return true;
// 检查熔断是否过期
if (System.currentTimeMillis() - openTime > CIRCUIT_OPEN_DURATION) {
// 半开状态:尝试恢复
circuitOpenTime.remove(provider);
failureCounters.remove(provider);
return true;
}
return false; // 熔断中
}
public void recordFailure(String provider) {
int failures = failureCounters
.computeIfAbsent(provider, k -> new AtomicInteger(0))
.incrementAndGet();
if (failures >= FAILURE_THRESHOLD) {
circuitOpenTime.put(provider, System.currentTimeMillis());
log.error("提供商{}熔断触发,60秒内停止路由到该提供商", provider);
}
}
public void recordSuccess(String provider) {
failureCounters.getOrDefault(provider, new AtomicInteger(0)).set(0);
}
// 定期探活
@Scheduled(fixedRate = 30000)
public void periodicHealthCheck() {
for (String provider : allProviders) {
if (!isHealthy(provider)) continue;
try {
clients.get(provider).complete("hello", quickConfig);
recordSuccess(provider);
} catch (Exception e) {
log.warn("提供商{}探活失败", provider);
recordFailure(provider);
}
}
}
}成本路由:基于价格分发请求
不同提供商的价格差异很大,合理利用可以显著降低成本:
@Service
public class CostOptimizedRouter {
// 价格表(每百万token,美元)
private static final Map<String, ProviderPricing> PRICING = Map.of(
"claude-haiku-3", new ProviderPricing(0.25, 1.25), // input, output
"claude-sonnet-3-5", new ProviderPricing(3.0, 15.0),
"gpt-4o-mini", new ProviderPricing(0.15, 0.6),
"gpt-4o", new ProviderPricing(5.0, 15.0),
"qwen-turbo", new ProviderPricing(0.14, 0.28) // 国产模型
);
public String routeByCost(AiRequest request, double maxCostUsd) {
int estimatedPromptTokens = tokenCounter.estimate(request.getPrompt());
int estimatedOutputTokens = request.getConfig().getMaxTokens();
// 找到价格在预算内、且有足够能力处理此任务的最便宜模型
return PRICING.entrySet().stream()
.filter(e -> {
double cost = (e.getValue().inputPrice * estimatedPromptTokens +
e.getValue().outputPrice * estimatedOutputTokens) / 1_000_000;
return cost <= maxCostUsd;
})
.filter(e -> isCapable(e.getKey(), request.getTaskType()))
.min(Comparator.comparingDouble(e ->
e.getValue().inputPrice * estimatedPromptTokens +
e.getValue().outputPrice * estimatedOutputTokens
))
.map(e -> clients.get(e.getKey()).complete(request.getPrompt(), request.getConfig()))
.orElseThrow(() -> new NoBudgetProviderException("没有符合预算的提供商"));
}
private boolean isCapable(String model, TaskType taskType) {
// 定义每个模型能处理的任务类型
Map<TaskType, Set<String>> capabilityMap = Map.of(
TaskType.SIMPLE_QA, Set.of("claude-haiku-3", "gpt-4o-mini", "qwen-turbo"),
TaskType.CODE_REVIEW, Set.of("claude-sonnet-3-5", "gpt-4o"),
TaskType.CREATIVE_WRITING, Set.of("claude-sonnet-3-5", "claude-opus-4-5")
);
return capabilityMap.getOrDefault(taskType, Set.of()).contains(model);
}
}多云架构的运营考量
多云架构增加了复杂度,有几个运营问题要提前考虑:
统一计费:多个提供商的账单要汇总,便于成本分析。建议在AI网关层统一记录每次调用的成本,不管用的是哪个提供商。
数据留存合规:不同国家的法规对数据出境有要求。路由策略要考虑数据主权:中国用户的数据可能不能发送到AWS us-east-1的Claude API。
Prompt一致性:不同模型对同一问题的回答风格不同。如果用户一会儿被路由到Claude,一会儿被路由到GPT-4o,体验会不一致。建议在高一致性要求的场景里固定提供商,只在故障时才切换。
多云AI不是每个项目都需要的。评估标准:如果你的业务对AI服务的可用性要求很高(比如SLA 99.9%),且单一提供商无法保证这个SLA,多云才值得引入。
