第2333篇:Java AI的并发控制策略——防止LLM调用风暴的工程设计
2026/4/30大约 6 分钟
第2333篇:Java AI的并发控制策略——防止LLM调用风暴的工程设计
适读人群:运维或开发Java AI服务,关注LLM API稳定性和成本控制的工程师 | 阅读时长:约18分钟 | 核心价值:掌握防止LLM调用风暴的多层防御架构,保护下游API同时保证服务可用性
上线AI服务之前,我从没认真想过"调用风暴"的问题。结果有一天,我们的一个批量处理任务出了bug——一个循环里忘记加break条件,对同一份文档反复调用LLM。
两分钟之内,OpenAI的API返回429(Too Many Requests),我们的账单涨了300美元,生产环境的其他AI功能全部503。
那次之后,我花了两周时间重新设计了整个并发控制体系。这篇文章就是那次教训的总结。
LLM调用的风险画像
LLM调用和普通HTTP调用的区别:
| 维度 | 普通HTTP调用 | LLM API调用 |
|---|---|---|
| 延迟 | 毫秒级 | 秒级(2-10s) |
| 费用 | 近乎为零 | 按Token计费 |
| 限流 | 少见 | 普遍(RPM/TPM限制) |
| 重试代价 | 低 | 高(费钱且慢) |
| 并发危害 | 一般 | 可能打穿账单 |
所以LLM调用需要更严格的并发控制。
第一层:信号量(Semaphore)限制全局并发
最基础的防护——限制同时进行的LLM调用数量:
@Component
@Slf4j
public class LlmConcurrencyGuard {
// 全局最多20个并发LLM调用
// 根据你的API tier调整这个数字
private final Semaphore globalSemaphore = new Semaphore(20, true);
// 按用户的并发限制(防止单个用户占满所有名额)
private final ConcurrentHashMap<String, Semaphore> userSemaphores = new ConcurrentHashMap<>();
/**
* 受保护的LLM调用
* @param userId 用户ID(用于用户级限流)
* @param task LLM调用逻辑
*/
public <T> T execute(String userId, Callable<T> task) throws Exception {
// 获取用户级信号量(每个用户最多3个并发)
Semaphore userSemaphore = userSemaphores.computeIfAbsent(
userId, id -> new Semaphore(3, true));
// 先尝试获取用户信号量
boolean userPermit = userSemaphore.tryAcquire(5, TimeUnit.SECONDS);
if (!userPermit) {
throw new TooManyRequestsException("您的请求太频繁,请稍后再试");
}
try {
// 再尝试获取全局信号量
boolean globalPermit = globalSemaphore.tryAcquire(10, TimeUnit.SECONDS);
if (!globalPermit) {
throw new ServiceUnavailableException("服务繁忙,请稍后重试");
}
try {
return task.call();
} finally {
globalSemaphore.release();
}
} finally {
userSemaphore.release();
}
}
// 获取当前并发状态(用于监控)
public ConcurrencyStats getStats() {
return new ConcurrencyStats(
20 - globalSemaphore.availablePermits(),
20,
userSemaphores.size()
);
}
public record ConcurrencyStats(int activeCalls, int maxConcurrency, int activeUsers) {}
}第二层:令牌桶限流(Token Bucket)
Semaphore控制并发数,但无法控制速率。令牌桶可以控制每分钟/每秒的调用频率:
@Component
public class LlmRateLimiter {
// 使用Guava的RateLimiter(令牌桶实现)
// 每秒最多10个请求
private final RateLimiter globalRateLimiter = RateLimiter.create(10.0);
// 更精细的Token级速率限制(对应LLM API的TPM限制)
// 假设API限制是100K TPM = 每秒约1667 tokens
private final TokenBucket tokenBucket;
public LlmRateLimiter() {
// 每秒补充1000个token的令牌桶(保守估计,留有余量)
this.tokenBucket = new TokenBucket(1000, 5000); // 每秒1000,桶容量5000
}
/**
* 等待获取请求许可
*/
public void acquireRequest() {
// 等待获取1个请求名额(最多等3秒)
boolean acquired = globalRateLimiter.tryAcquire(3, TimeUnit.SECONDS);
if (!acquired) {
throw new RateLimitExceededException("请求频率超限,请降低调用频率");
}
}
/**
* 等待获取token消耗许可
* @param estimatedTokens 预估的token消耗量
*/
public void acquireTokens(int estimatedTokens) {
if (!tokenBucket.tryConsume(estimatedTokens)) {
throw new RateLimitExceededException("Token消耗速率超限,请稍后重试");
}
}
// 简单的Token桶实现
static class TokenBucket {
private final int refillRate;
private final int capacity;
private long tokens;
private long lastRefillTime;
public TokenBucket(int refillRate, int capacity) {
this.refillRate = refillRate;
this.capacity = capacity;
this.tokens = capacity;
this.lastRefillTime = System.currentTimeMillis();
}
public synchronized boolean tryConsume(int amount) {
refill();
if (tokens >= amount) {
tokens -= amount;
return true;
}
return false;
}
private void refill() {
long now = System.currentTimeMillis();
long elapsed = now - lastRefillTime;
long tokensToAdd = elapsed * refillRate / 1000;
tokens = Math.min(capacity, tokens + tokensToAdd);
lastRefillTime = now;
}
}
}第三层:熔断器(Circuit Breaker)
当LLM API持续报错(429/500/503),需要快速失败而不是继续发请求:
@Component
@Slf4j
public class LlmCircuitBreaker {
// 用Resilience4j的熔断器
private final CircuitBreaker circuitBreaker;
public LlmCircuitBreaker() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
// 滑动窗口:最近10次调用
.slidingWindowSize(10)
// 失败率超过50%时熔断
.failureRateThreshold(50)
// 慢调用阈值:超过8秒算慢调用
.slowCallDurationThreshold(Duration.ofSeconds(8))
// 慢调用比例超过50%也触发熔断
.slowCallRateThreshold(50)
// 熔断后等待30秒再尝试
.waitDurationInOpenState(Duration.ofSeconds(30))
// 半开状态允许3次测试调用
.permittedNumberOfCallsInHalfOpenState(3)
// 定义哪些异常算作失败
.recordExceptions(
IOException.class,
TimeoutException.class,
LlmApiException.class
)
// 429限流异常不算熔断的"失败"(是正常的限流信号)
.ignoreExceptions(RateLimitExceededException.class)
.build();
this.circuitBreaker = CircuitBreaker.of("llm-api", config);
// 监听熔断状态变化
circuitBreaker.getEventPublisher()
.onStateTransition(event -> {
log.warn("熔断器状态变化:{} -> {}",
event.getStateTransition().getFromState(),
event.getStateTransition().getToState());
// 可以在这里发告警通知
sendAlert(event);
});
}
public <T> T execute(Supplier<T> llmCall) {
return circuitBreaker.executeSupplier(llmCall);
}
public CircuitBreaker.State getState() {
return circuitBreaker.getState();
}
private void sendAlert(CircuitBreakerOnStateTransitionEvent event) {
// 接入告警系统(钉钉、企微等)
// 这里只打日志示意
if (event.getStateTransition().getToState() == CircuitBreaker.State.OPEN) {
log.error("[告警] LLM熔断器已开启,LLM调用将快速失败直到恢复");
}
}
}整合:多层防御的门面类
把三层防护整合到一个统一入口,调用方只需要关心这一个类:
@Service
@RequiredArgsConstructor
@Slf4j
public class ProtectedLlmService {
private final ChatClient chatClient;
private final LlmConcurrencyGuard concurrencyGuard;
private final LlmRateLimiter rateLimiter;
private final LlmCircuitBreaker circuitBreaker;
/**
* 受多层保护的LLM调用
*/
public String chat(String userId, String message) {
// 第一层:速率限制检查
rateLimiter.acquireRequest();
// 粗略估算token数(字符数 / 2 ≈ token数,用于TPM控制)
int estimatedTokens = message.length() / 2 + 500; // 500是预估的输出token
rateLimiter.acquireTokens(estimatedTokens);
try {
// 第二层:并发控制
return concurrencyGuard.execute(userId, () -> {
// 第三层:熔断器保护
return circuitBreaker.execute(() -> {
long start = System.currentTimeMillis();
try {
String result = chatClient.prompt()
.user(message)
.call()
.content();
long duration = System.currentTimeMillis() - start;
log.debug("LLM调用成功:userId={}, 耗时={}ms", userId, duration);
return result;
} catch (Exception e) {
log.error("LLM调用失败:userId={}", userId, e);
throw new LlmCallException("AI服务调用失败", e);
}
});
});
} catch (TooManyRequestsException e) {
// 用户级限流:友好提示
return "您的请求太频繁,请等待片刻后再试";
} catch (ServiceUnavailableException e) {
// 系统级限流:排队等待或降级
return "服务繁忙,您的请求已加入等待队列";
} catch (CallNotPermittedException e) {
// 熔断器开启:快速失败
log.warn("熔断器开启,快速失败:userId={}", userId);
return "AI服务暂时不可用,请稍后重试(工程师正在处理中)";
} catch (Exception e) {
log.error("未预期的异常:userId={}", userId, e);
return "系统出现异常,请联系客服";
}
}
// 批量调用:带队列和速率控制
public List<String> batchChat(String userId, List<String> messages) {
List<String> results = new ArrayList<>();
for (String message : messages) {
try {
results.add(chat(userId, message));
// 批量调用间加间隔,避免触发限流
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("批量调用失败:message={}", message, e);
results.add("处理失败:" + e.getMessage());
}
}
return results;
}
}监控端点:实时查看并发状态
@RestController
@RequestMapping("/admin/llm")
@RequiredArgsConstructor
public class LlmMonitorController {
private final LlmConcurrencyGuard concurrencyGuard;
private final LlmCircuitBreaker circuitBreaker;
@GetMapping("/status")
public Map<String, Object> getStatus() {
LlmConcurrencyGuard.ConcurrencyStats stats = concurrencyGuard.getStats();
return Map.of(
"activeCalls", stats.activeCalls(),
"maxConcurrency", stats.maxConcurrency(),
"activeUsers", stats.activeUsers(),
"circuitBreakerState", circuitBreaker.getState().name(),
"utilization", String.format("%.1f%%",
(double) stats.activeCalls() / stats.maxConcurrency() * 100)
);
}
}这套多层防御体系,核心思想是:让问题在最外层被挡住,不要让它传播到更内层。速率限制挡调用风暴,并发控制挡资源耗尽,熔断器挡连锁故障。任何一层防护失效时,下一层还在。
