第2087篇:分布式LLM服务——多模型负载均衡与故障转移的工程实践
2026/4/30大约 8 分钟
第2087篇:分布式LLM服务——多模型负载均衡与故障转移的工程实践
适读人群:负责AI服务稳定性的工程师 | 阅读时长:约19分钟 | 核心价值:设计生产级别的LLM服务层,包括多提供商负载均衡、智能故障转移、限流保护和成本控制
线上有一个场景我印象很深:某个周末凌晨,OpenAI出现大面积故障,系统里所有依赖GPT-4的功能全部挂掉,客户投诉从0瞬间涨到几十条。那次之后,我们花了两周时间搭了一套多提供商的LLM服务层,再也没有出现单点故障导致全部中断的情况。
这篇文章把这套系统的核心设计讲清楚。
架构设计
提供商抽象层
/**
* LLM提供商统一抽象
* 屏蔽不同SDK的差异,对上层暴露统一接口
*/
public interface LlmProvider {
String getName();
/**
* 同步调用
*/
LlmResponse complete(LlmRequest request);
/**
* 流式调用(返回Flux<String>)
*/
Flux<String> stream(LlmRequest request);
/**
* 当前健康状态
*/
ProviderHealth getHealth();
/**
* 当前QPS(用于负载均衡决策)
*/
int getCurrentQps();
}
/**
* 统一请求格式
*/
@Data @Builder
public class LlmRequest {
private String systemPrompt;
private List<Message> messages;
private String model; // 可以指定,也可以让网关决定
private double temperature;
private int maxTokens;
private Map<String, Object> extra; // 各提供商特有参数
// 路由提示(帮助负载均衡器选择提供商)
private RoutingHint routingHint;
public enum RoutingHint {
PREFER_OPENAI,
PREFER_ANTHROPIC,
LOWEST_COST,
LOWEST_LATENCY,
HIGHEST_QUALITY
}
}OpenAI提供商实现
/**
* OpenAI提供商实现
* 包含重试、超时、熔断等保护机制
*/
@Component
@Slf4j
public class OpenAiProvider implements LlmProvider {
private final OpenAiChatModel chatModel;
private final AtomicInteger currentQps = new AtomicInteger(0);
private volatile ProviderHealth health = ProviderHealth.HEALTHY;
// 连续失败次数(用于健康判断)
private final AtomicInteger consecutiveFailures = new AtomicInteger(0);
private static final int FAILURE_THRESHOLD = 5;
public OpenAiProvider(OpenAiConfig config) {
this.chatModel = OpenAiChatModel.builder()
.apiKey(config.getApiKey())
.modelName(config.getDefaultModel())
.timeout(Duration.ofSeconds(30))
.maxRetries(2)
.build();
}
@Override
public String getName() { return "openai"; }
@Override
public LlmResponse complete(LlmRequest request) {
long startTime = System.currentTimeMillis();
currentQps.incrementAndGet();
try {
String result = chatModel.generate(
SystemMessage.from(request.getSystemPrompt()),
UserMessage.from(getLastUserMessage(request))
).content().text();
consecutiveFailures.set(0);
health = ProviderHealth.HEALTHY;
return LlmResponse.success(result, getName(),
System.currentTimeMillis() - startTime);
} catch (RateLimitException e) {
log.warn("OpenAI限流: {}", e.getMessage());
recordFailure();
throw new ProviderRateLimitException(getName(), e);
} catch (Exception e) {
log.error("OpenAI调用失败: {}", e.getMessage());
recordFailure();
throw new ProviderException(getName(), e);
} finally {
currentQps.decrementAndGet();
}
}
@Override
public Flux<String> stream(LlmRequest request) {
return Flux.create(sink -> {
try {
// OpenAI流式实现
chatModel.generate(
List.of(SystemMessage.from(request.getSystemPrompt())),
new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
sink.next(token);
}
@Override
public void onComplete(Response<AiMessage> response) {
sink.complete();
}
@Override
public void onError(Throwable error) {
sink.error(error);
}
}
);
} catch (Exception e) {
sink.error(e);
}
});
}
@Override
public ProviderHealth getHealth() { return health; }
@Override
public int getCurrentQps() { return currentQps.get(); }
private void recordFailure() {
int failures = consecutiveFailures.incrementAndGet();
if (failures >= FAILURE_THRESHOLD) {
health = ProviderHealth.DEGRADED;
log.warn("OpenAI连续失败{}次,标记为DEGRADED状态", failures);
}
}
private String getLastUserMessage(LlmRequest request) {
if (request.getMessages() == null || request.getMessages().isEmpty()) return "";
return request.getMessages().stream()
.filter(m -> "user".equals(m.getRole()))
.reduce((first, second) -> second)
.map(Message::getContent)
.orElse("");
}
public enum ProviderHealth { HEALTHY, DEGRADED, DOWN }
}负载均衡策略
/**
* 多策略负载均衡器
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class LlmLoadBalancer {
private final List<LlmProvider> providers;
/**
* 选择最佳提供商
*/
public LlmProvider select(LlmRequest request) {
// 过滤掉不健康的提供商
List<LlmProvider> healthyProviders = providers.stream()
.filter(p -> p.getHealth() != OpenAiProvider.ProviderHealth.DOWN)
.toList();
if (healthyProviders.isEmpty()) {
throw new NoAvailableProviderException("所有LLM提供商均不可用");
}
// 根据路由提示选择策略
LlmRequest.RoutingHint hint = request.getRoutingHint();
if (hint == null) hint = LlmRequest.RoutingHint.LOWEST_LATENCY;
return switch (hint) {
case LOWEST_COST -> selectLowestCost(healthyProviders);
case LOWEST_LATENCY -> selectLowestLatency(healthyProviders);
case HIGHEST_QUALITY -> selectHighestQuality(healthyProviders);
case PREFER_OPENAI -> preferProvider(healthyProviders, "openai");
case PREFER_ANTHROPIC -> preferProvider(healthyProviders, "anthropic");
};
}
/**
* 最低延迟:选择当前QPS最低的健康提供商
*/
private LlmProvider selectLowestLatency(List<LlmProvider> providers) {
return providers.stream()
.filter(p -> p.getHealth() == OpenAiProvider.ProviderHealth.HEALTHY)
.min(Comparator.comparingInt(LlmProvider::getCurrentQps))
.orElse(providers.get(0));
}
/**
* 最低成本:按预定义的成本权重轮询
* 优先使用便宜的提供商(如自部署模型),超载时切换到付费API
*/
private LlmProvider selectLowestCost(List<LlmProvider> providers) {
Map<String, Integer> costTier = Map.of(
"local", 1, // 自部署模型,成本最低
"openai", 3, // 中等成本
"anthropic", 3, // 中等成本
"azure", 2 // 有批量协议,稍便宜
);
return providers.stream()
.min(Comparator.comparingInt(p ->
costTier.getOrDefault(p.getName(), 5)))
.orElse(providers.get(0));
}
private LlmProvider selectHighestQuality(List<LlmProvider> providers) {
Map<String, Integer> qualityTier = Map.of(
"openai", 3, // GPT-4质量最高
"anthropic", 3, // Claude相当
"azure", 2,
"local", 1
);
return providers.stream()
.max(Comparator.comparingInt(p ->
qualityTier.getOrDefault(p.getName(), 1)))
.orElse(providers.get(0));
}
private LlmProvider preferProvider(List<LlmProvider> providers, String preferred) {
return providers.stream()
.filter(p -> p.getName().equals(preferred))
.findFirst()
.orElseGet(() -> selectLowestLatency(providers));
}
}故障转移链
/**
* 故障转移执行器
* 按优先级依次尝试不同提供商,直到成功
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class FailoverExecutor {
private final LlmLoadBalancer loadBalancer;
private final List<LlmProvider> providers;
// 故障转移优先级(主→备用1→备用2→本地降级)
private final List<String> failoverChain = List.of(
"openai", "anthropic", "azure", "local"
);
/**
* 带故障转移的请求执行
*/
public LlmResponse executeWithFailover(LlmRequest request) {
// 先用负载均衡器选主提供商
LlmProvider primary = loadBalancer.select(request);
try {
return primary.complete(request);
} catch (ProviderRateLimitException e) {
log.warn("主提供商{}限流,触发故障转移", primary.getName());
return failover(request, primary.getName());
} catch (ProviderException e) {
log.warn("主提供商{}失败,触发故障转移: {}", primary.getName(), e.getMessage());
return failover(request, primary.getName());
}
}
private LlmResponse failover(LlmRequest request, String excludedProvider) {
// 按故障转移链顺序尝试
List<LlmProvider> fallbackCandidates = failoverChain.stream()
.filter(name -> !name.equals(excludedProvider))
.map(name -> findProvider(name))
.filter(Objects::nonNull)
.filter(p -> p.getHealth() != OpenAiProvider.ProviderHealth.DOWN)
.toList();
for (LlmProvider provider : fallbackCandidates) {
try {
log.info("故障转移到: {}", provider.getName());
LlmResponse response = provider.complete(request);
response = response.withFallbackProvider(provider.getName());
return response;
} catch (Exception e) {
log.warn("故障转移到{}也失败: {}", provider.getName(), e.getMessage());
}
}
// 所有提供商都失败了
throw new AllProvidersFailedException("所有LLM提供商均不可用,请稍后重试");
}
private LlmProvider findProvider(String name) {
return providers.stream()
.filter(p -> p.getName().equals(name))
.findFirst()
.orElse(null);
}
}限流器:保护下游也保护成本
/**
* LLM请求限流器
* 双层限流:全局限流 + 用户级限流
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class LlmRateLimiter {
private final RedisTemplate<String, String> redisTemplate;
// 全局QPS限制(避免超出API配额)
private static final int GLOBAL_QPS_LIMIT = 100;
// 单用户每分钟请求限制
private static final int USER_RPM_LIMIT = 20;
// Token消耗限制(每小时)
private static final int USER_TPH_LIMIT = 100_000;
/**
* 检查是否可以发起请求
* 使用Redis的滑动窗口计数器
*/
public RateLimitResult checkLimit(String userId, int estimatedTokens) {
long now = System.currentTimeMillis();
// 1. 全局QPS检查
String globalKey = "llm:global:qps:" + (now / 1000);
long globalCount = incrementAndExpire(globalKey, 1);
if (globalCount > GLOBAL_QPS_LIMIT) {
log.warn("全局QPS限流: current={}", globalCount);
return RateLimitResult.rejected("服务繁忙,请稍后重试", RetryAfter.SECONDS_1);
}
// 2. 用户RPM检查
String userRpmKey = "llm:user:" + userId + ":rpm:" + (now / 60000);
long userRpm = incrementAndExpire(userRpmKey, 1);
if (userRpm > USER_RPM_LIMIT) {
log.debug("用户RPM限流: userId={}, rpm={}", userId, userRpm);
return RateLimitResult.rejected("请求频率过高,请1分钟后重试", RetryAfter.MINUTE_1);
}
// 3. 用户每小时Token限制
if (estimatedTokens > 0) {
String userTphKey = "llm:user:" + userId + ":tph:" + (now / 3600000);
long usedTokens = incrementAndExpire(userTphKey, estimatedTokens);
if (usedTokens > USER_TPH_LIMIT) {
log.debug("用户Token限流: userId={}, tokens={}", userId, usedTokens);
return RateLimitResult.rejected("Token使用量超出限制,请1小时后重试",
RetryAfter.HOUR_1);
}
}
return RateLimitResult.allowed();
}
/**
* 原子增加并设置过期时间
*/
private long incrementAndExpire(String key, long delta) {
// 使用Lua脚本保证原子性
String script = """
local current = redis.call('INCRBY', KEYS[1], ARGV[1])
if current == tonumber(ARGV[1]) then
redis.call('EXPIRE', KEYS[1], ARGV[2])
end
return current
""";
Long result = redisTemplate.execute(
RedisScript.of(script, Long.class),
List.of(key),
String.valueOf(delta),
"120" // 2分钟过期(留缓冲)
);
return result != null ? result : 0L;
}
public enum RetryAfter { SECONDS_1, MINUTE_1, HOUR_1 }
public record RateLimitResult(boolean allowed, String message, RetryAfter retryAfter) {
public static RateLimitResult allowed() {
return new RateLimitResult(true, "", null);
}
public static RateLimitResult rejected(String message, RetryAfter retryAfter) {
return new RateLimitResult(false, message, retryAfter);
}
}
}熔断器:防止雪崩
/**
* LLM提供商熔断器
* 基于Resilience4j的熔断实现
*/
@Service
@Slf4j
public class LlmCircuitBreakerManager {
private final CircuitBreakerRegistry circuitBreakerRegistry;
private final Map<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();
public LlmCircuitBreakerManager() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.slidingWindowSize(20) // 最近20次请求
.failureRateThreshold(50) // 失败率50%时熔断
.waitDurationInOpenState(Duration.ofSeconds(30)) // 熔断30秒
.permittedNumberOfCallsInHalfOpenState(5) // 半开状态允许5次探测
.slowCallDurationThreshold(Duration.ofSeconds(10)) // 10秒算慢调用
.slowCallRateThreshold(80) // 80%慢调用也触发熔断
.build();
this.circuitBreakerRegistry = CircuitBreakerRegistry.of(config);
}
/**
* 获取或创建提供商的熔断器
*/
public CircuitBreaker getBreaker(String providerName) {
return breakers.computeIfAbsent(providerName,
name -> {
CircuitBreaker breaker = circuitBreakerRegistry.circuitBreaker(name);
// 注册状态变化监听
breaker.getEventPublisher()
.onStateTransition(event ->
log.warn("LLM提供商{}熔断状态变化: {} → {}",
name,
event.getStateTransition().getFromState(),
event.getStateTransition().getToState()));
return breaker;
});
}
/**
* 通过熔断器执行LLM调用
*/
public LlmResponse executeWithBreaker(String providerName,
Supplier<LlmResponse> operation) {
CircuitBreaker breaker = getBreaker(providerName);
try {
return breaker.executeSupplier(operation);
} catch (CallNotPermittedException e) {
// 熔断器打开,直接拒绝
log.warn("LLM提供商{}熔断器打开,请求被拒绝", providerName);
throw new ProviderCircuitOpenException(providerName);
}
}
/**
* 获取所有提供商的熔断状态(监控用)
*/
public Map<String, CircuitBreaker.State> getAllBreakerStates() {
return breakers.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().getState()
));
}
}统一网关入口
/**
* LLM统一网关
* 集成限流、熔断、负载均衡、故障转移
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class LlmGateway {
private final LlmRateLimiter rateLimiter;
private final LlmCircuitBreakerManager circuitBreakerManager;
private final FailoverExecutor failoverExecutor;
private final LlmCostTracker costTracker; // 成本追踪
/**
* 统一入口:所有LLM调用必须经过这里
*/
public LlmResponse complete(String userId, LlmRequest request) {
// 1. 限流检查
int estimatedTokens = estimateTokens(request);
LlmRateLimiter.RateLimitResult rateLimit =
rateLimiter.checkLimit(userId, estimatedTokens);
if (!rateLimit.allowed()) {
throw new RateLimitException(rateLimit.message());
}
// 2. 执行(含负载均衡+故障转移+熔断)
long startTime = System.currentTimeMillis();
try {
LlmResponse response = failoverExecutor.executeWithFailover(request);
// 3. 记录成本
costTracker.record(userId, response.getProviderName(),
response.getUsage().getInputTokens(),
response.getUsage().getOutputTokens());
long latencyMs = System.currentTimeMillis() - startTime;
log.info("LLM调用成功: userId={}, provider={}, latency={}ms, tokens={}",
userId, response.getProviderName(), latencyMs,
response.getUsage().getTotalTokens());
return response;
} catch (AllProvidersFailedException e) {
log.error("所有提供商均失败: userId={}", userId);
// 降级响应
return LlmResponse.degraded("服务暂时不可用,请稍后重试。如紧急,请联系客服。");
}
}
private int estimateTokens(LlmRequest request) {
// 简单估算:字符数/3 ≈ token数(中文)
int chars = 0;
if (request.getSystemPrompt() != null) chars += request.getSystemPrompt().length();
if (request.getMessages() != null) {
chars += request.getMessages().stream()
.mapToInt(m -> m.getContent().length())
.sum();
}
return chars / 3;
}
}监控指标
/**
* LLM网关监控
* 暴露关键指标供Prometheus抓取
*/
@Component
@RequiredArgsConstructor
public class LlmGatewayMetrics {
private final MeterRegistry meterRegistry;
@PostConstruct
public void registerMetrics() {
// 各提供商的请求计数
Counter.builder("llm.requests.total")
.tag("provider", "openai")
.register(meterRegistry);
// 延迟分布(按提供商)
Timer.builder("llm.request.duration")
.publishPercentiles(0.5, 0.9, 0.99)
.register(meterRegistry);
// Token消耗
Counter.builder("llm.tokens.total")
.tag("type", "input")
.register(meterRegistry);
}
public void recordRequest(String provider, long latencyMs,
boolean success, int inputTokens, int outputTokens) {
Tags tags = Tags.of("provider", provider, "success", String.valueOf(success));
meterRegistry.counter("llm.requests.total", tags).increment();
meterRegistry.timer("llm.request.duration", tags)
.record(latencyMs, TimeUnit.MILLISECONDS);
if (success) {
meterRegistry.counter("llm.tokens.total",
Tags.of("provider", provider, "type", "input"))
.increment(inputTokens);
meterRegistry.counter("llm.tokens.total",
Tags.of("provider", provider, "type", "output"))
.increment(outputTokens);
}
}
}这套系统上线三个月后,我们统计了一下:故障转移触发了47次,其中43次来自OpenAI的限流(429),4次来自网络问题。用户侧感知到的中断次数:0。
多提供商不是奢侈,对于生产环境来说是标配。成本也不一定高——用自部署模型处理简单请求,付费API只处理复杂请求,合理的路由可以把成本控制在合理范围内。
