Spring AI高并发实战:10倍吞吐量的异步架构设计
2026/4/24大约 16 分钟Spring AI高并发异步Virtual ThreadWebFluxJava
Spring AI高并发实战:10倍吞吐量的异步架构设计
双11当天,AI客服系统挂了
2024年11月11日凌晨0点03分,某头部家居品牌的技术总监张伟给我发了一条消息:
"老张,系统挂了,AI客服完全不响应,现在电话打爆了。"
我问他情况,他发来了监控截图:
活跃线程数:800/800(线程池满)
等待队列:1247个请求
JVM内存:14.2GB/16GB
GC停顿:平均1.8秒
平均响应时间:从正常的2.1秒 → 超过60秒
错误率:89%他们的AI客服在双11前一个月上线,平时峰值200并发,一直很稳定。
但双11当天0点整,活动开始,并发量瞬间飙到1000+。
问题出在哪里?他们用的是Spring Boot默认的Tomcat线程池,800个线程。每个AI请求平均需要2.1秒才能返回(调用GPT-4o的网络延迟)。
800个线程 × 2.1秒/请求 = 最大TPS仅380。
而1000并发同时涌入,请求积压,线程耗尽,系统雪崩。
事后他们用这篇文章的方案重新设计了架构,并在次年618大促中承载了3000并发,系统稳定运行。吞吐量提升了10倍以上。
AI调用的并发特性分析
在开始设计方案之前,必须理解AI调用和普通HTTP调用的核心区别。
AI请求的特点:
| 特点 | 描述 | 影响 |
|---|---|---|
| 高延迟 | P50约1-2秒,P99可达10秒+ | 线程占用时间长 |
| I/O密集型 | 99%时间在等待网络 | 线程在等待期间什么都没做 |
| 不可预测 | 响应时间波动大 | 难以设置合理超时 |
| 大流量突刺 | 营销活动带来瞬时峰值 | 需要弹性扩容能力 |
核心结论:传统同步线程池对AI场景严重不适配。
一个线程2秒内只能处理1个AI请求,但它只有不到1ms真正在工作,其余时间都在等待网络IO。
三种方案对比总览
完整pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.5</version>
</parent>
<groupId>com.laozhang</groupId>
<artifactId>spring-ai-concurrency</artifactId>
<version>1.0.0</version>
<properties>
<java.version>21</java.version>
<spring-ai.version>1.0.0</spring-ai.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-bom</artifactId>
<version>${spring-ai.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Web:支持虚拟线程和响应式两种方式 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- WebFlux:响应式方案 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Spring AI OpenAI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<!-- 熔断限流:Resilience4j -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot3</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
<version>2.2.0</version>
</dependency>
<!-- RateLimiter(请求队列) -->
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-core</artifactId>
<version>8.10.1</version>
</dependency>
<!-- Redis(分布式限流) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 监控 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- 压测 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>application.yml:生产配置
spring:
application:
name: spring-ai-concurrency
ai:
openai:
api-key: ${OPENAI_API_KEY}
chat:
options:
model: gpt-4o
temperature: 0.7
max-tokens: 2048
# 关键配置:HTTP连接池
# 默认值通常不够用,必须根据并发量调整
# 以下配置支持500并发
connection-timeout: 10000 # 连接超时10秒
read-timeout: 60000 # 读超时60秒(流式时更长)
# 方案1:虚拟线程(Spring Boot 3.2+,Java 21+)
threads:
virtual:
enabled: true # 开启后Tomcat自动使用虚拟线程
# Redis(分布式限流使用)
data:
redis:
host: ${REDIS_HOST:localhost}
port: 6379
timeout: 2000ms
# Tomcat配置(虚拟线程时,这些配置意义改变)
server:
port: 8080
tomcat:
threads:
max: 200 # 虚拟线程模式下,这个值影响调度线程数
min-spare: 10
max-connections: 8192 # 最大并发连接数(关键!)
accept-count: 1000 # 连接等待队列
connection-timeout: 20000
# Resilience4j配置
resilience4j:
circuitbreaker:
instances:
openaiClient:
sliding-window-size: 20
failure-rate-threshold: 50
slow-call-duration-threshold: 30s
slow-call-rate-threshold: 80
wait-duration-in-open-state: 60s
permitted-number-of-calls-in-half-open-state: 5
bulkhead:
instances:
openaiClient:
max-concurrent-calls: 100 # 最大并发AI调用数(防打爆API)
max-wait-duration: 5s # 等待超时
ratelimiter:
instances:
openaiClient:
limit-for-period: 100 # 每个周期最多100个请求
limit-refresh-period: 1s # 周期1秒
timeout-duration: 10s # 等待令牌超时
# 自定义线程池配置
ai:
concurrency:
# AI专用线程池(CompletableFuture方案)
ai-pool:
core-size: 50
max-size: 200
queue-capacity: 1000
keep-alive-seconds: 60
# 普通业务线程池
business-pool:
core-size: 20
max-size: 50
queue-capacity: 500
management:
endpoints:
web:
exposure:
include: health, metrics, prometheus, threaddump
metrics:
export:
prometheus:
enabled: true方案1:Java 21虚拟线程
虚拟线程是最简单的方案——改动最小,收益最大。
原理
代码实现
package com.laozhang.concurrency;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.embedded.tomcat.TomcatProtocolHandlerCustomizer;
import org.springframework.context.annotation.Bean;
import java.util.concurrent.Executors;
@SpringBootApplication
public class ConcurrencyApplication {
public static void main(String[] args) {
SpringApplication.run(ConcurrencyApplication.class, args);
}
/**
* 方法1:通过配置开启虚拟线程(推荐,Spring Boot 3.2+)
* 只需在application.yml中设置:
* spring.threads.virtual.enabled=true
*
* 这一行配置就让所有Tomcat请求线程变为虚拟线程
*/
/**
* 方法2:手动配置(更精细控制)
*/
@Bean
public TomcatProtocolHandlerCustomizer<?> virtualThreadExecutor() {
return handler -> handler.setExecutor(
Executors.newVirtualThreadPerTaskExecutor()
);
}
}虚拟线程版ChatService
package com.laozhang.concurrency.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Service;
/**
* 虚拟线程版AI服务
*
* 开启虚拟线程后,这个类的代码不需要任何改动!
* Tomcat自动为每个HTTP请求分配一个虚拟线程。
* 当代码执行到LLM API调用(I/O等待)时,
* 虚拟线程自动挂起,平台线程被释放去处理其他请求。
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class VirtualThreadChatService {
private final ChatClient chatClient;
/**
* 看起来是普通同步代码,实际上在虚拟线程下是非阻塞的
*/
public String chat(String sessionId, String message) {
log.info("[VT] threadId={} sessionId={} 开始处理",
Thread.currentThread().threadId(), sessionId);
String response = chatClient.prompt()
.user(message)
.call()
.content();
log.info("[VT] threadId={} sessionId={} 处理完成",
Thread.currentThread().threadId(), sessionId);
return response;
}
/**
* 批量处理(虚拟线程+并行流)
*/
public java.util.List<String> batchChat(java.util.List<String> messages) {
return messages.parallelStream() // 并行流
.map(msg -> {
try {
return chatClient.prompt()
.user(msg)
.call()
.content();
} catch (Exception e) {
return "处理失败:" + e.getMessage();
}
})
.toList();
}
}方案2:WebFlux响应式
WebFlux适合需要精细背压控制、下游支持响应式API的场景。
package com.laozhang.concurrency.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.List;
/**
* WebFlux响应式AI服务
* 适用场景:
* 1. SSE流式输出(用户体验最好)
* 2. 大批量并行处理(背压防止OOM)
* 3. 链式调用(多个AI步骤串联)
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ReactiveAiService {
private final ChatClient chatClient;
/**
* SSE流式输出(最佳用户体验)
* 用户看到token逐个出现,而不是等待全部生成
*/
public Flux<String> streamChat(String message) {
return chatClient.prompt()
.user(message)
.stream()
.content() // 返回Flux<String>,每个元素是一个token
.doOnSubscribe(s -> log.info("[Reactive] 开始流式输出"))
.doOnComplete(() -> log.info("[Reactive] 流式输出完成"))
.doOnError(e -> log.error("[Reactive] 流式输出错误: {}", e.getMessage()))
// 重试:网络错误时自动重试(不重试业务错误)
.retryWhen(Retry.backoff(2, Duration.ofSeconds(1))
.filter(e -> e instanceof java.net.SocketException));
}
/**
* 带背压的批量处理
* 防止100000个消息同时发送给LLM API
*/
public Flux<String> batchChatWithBackpressure(List<String> messages) {
return Flux.fromIterable(messages)
// 背压:最多同时处理10个请求
.flatMap(
message -> callLlmAsync(message),
10 // 并发数限制(concurrency参数)
)
.doOnNext(result -> log.debug("[Reactive] 完成一条: {}",
result.substring(0, Math.min(50, result.length()))));
}
/**
* 单个异步调用(Mono)
*/
public Mono<String> chatAsync(String message) {
return Mono.fromCallable(() ->
chatClient.prompt()
.user(message)
.call()
.content()
)
// 在弹性调度器上执行(适合阻塞I/O)
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(30));
}
private Mono<String> callLlmAsync(String message) {
return Mono.fromCallable(() ->
chatClient.prompt()
.user(message)
.call()
.content()
).subscribeOn(Schedulers.boundedElastic());
}
}SSE控制器
package com.laozhang.concurrency.controller;
import com.laozhang.concurrency.service.ReactiveAiService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import java.time.Duration;
/**
* SSE流式输出控制器
*/
@RestController
@RequestMapping("/api/stream")
@RequiredArgsConstructor
public class StreamController {
private final ReactiveAiService reactiveAiService;
/**
* SSE端点:text/event-stream
* 前端用EventSource接收
*/
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(@RequestParam String message) {
return reactiveAiService.streamChat(message)
.map(token -> ServerSentEvent.<String>builder()
.data(token)
.build()
)
// 发送结束事件
.concatWith(Flux.just(
ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()
))
// 心跳:防止连接超时
.mergeWith(
Flux.interval(Duration.ofSeconds(15))
.map(i -> ServerSentEvent.<String>builder()
.comment("heartbeat")
.build()
)
)
.take(Duration.ofMinutes(5)); // 最长5分钟
}
/**
* 纯文本流(更简单)
*/
@GetMapping(value = "/chat/plain", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatPlain(@RequestParam String message) {
return reactiveAiService.streamChat(message);
}
}方案3:CompletableFuture + 隔离线程池
适合需要精细控制不同类型请求资源隔离的场景。
package com.laozhang.concurrency.service;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* CompletableFuture + 隔离线程池方案
*
* 资源隔离设计:
* - AI高优先级线程池:用于VIP用户/实时对话
* - AI低优先级线程池:用于批量处理/后台任务
* - 防止低优先级任务占用高优先级资源
*/
@Slf4j
@Service
public class IsolatedPoolChatService {
private final ChatClient chatClient;
private final MeterRegistry meterRegistry;
// 高优先级线程池:VIP用户/实时对话
private final ExecutorService highPriorityPool;
// 低优先级线程池:批量任务
private final ExecutorService lowPriorityPool;
// 信号量:控制对OpenAI API的最大并发数(防触发rate limit)
private final Semaphore apiSemaphore = new Semaphore(50);
public IsolatedPoolChatService(
ChatClient chatClient,
MeterRegistry meterRegistry) {
this.chatClient = chatClient;
this.meterRegistry = meterRegistry;
// 高优先级:虚拟线程
this.highPriorityPool = Executors.newVirtualThreadPerTaskExecutor();
// 低优先级:固定大小线程池
this.lowPriorityPool = new ThreadPoolExecutor(
10, 30, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
r -> {
Thread t = new Thread(r);
t.setName("ai-batch-" + t.getId());
t.setPriority(Thread.MIN_PRIORITY);
return t;
},
// 拒绝策略:直接返回错误,不抛异常
(r, executor) -> {
log.warn("[IsolatedPool] 批量任务队列已满,丢弃任务");
meterRegistry.counter("ai.pool.rejected", "pool", "low").increment();
}
);
}
/**
* 高优先级AI调用(VIP用户、实时对话)
*/
public CompletableFuture<String> chatHighPriority(String message) {
return CompletableFuture.supplyAsync(() -> {
try {
// 获取API信号量
if (!apiSemaphore.tryAcquire(10, TimeUnit.SECONDS)) {
throw new RuntimeException("服务繁忙,请稍后重试");
}
try {
return chatClient.prompt()
.user(message)
.call()
.content();
} finally {
apiSemaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("请求被中断", e);
}
}, highPriorityPool)
.orTimeout(30, TimeUnit.SECONDS)
.exceptionally(e -> {
log.error("[IsolatedPool] 高优先级调用失败: {}", e.getMessage());
return "抱歉,服务暂时不可用,请稍后重试。";
});
}
/**
* 低优先级批量处理
*/
public List<CompletableFuture<String>> batchChatLowPriority(List<String> messages) {
return messages.stream()
.map(msg -> CompletableFuture.supplyAsync(() -> {
try {
if (!apiSemaphore.tryAcquire(30, TimeUnit.SECONDS)) {
return "处理超时,跳过";
}
try {
return chatClient.prompt()
.user(msg)
.call()
.content();
} finally {
apiSemaphore.release();
}
} catch (Exception e) {
log.warn("[IsolatedPool] 批量任务失败: {}", e.getMessage());
return "处理失败:" + e.getMessage();
}
}, lowPriorityPool))
.collect(Collectors.toList());
}
/**
* 等待批量任务全部完成
*/
public List<String> waitForBatch(List<CompletableFuture<String>> futures) {
CompletableFuture<Void> allDone = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// 整体超时:所有任务5分钟内必须完成
try {
allDone.get(5, TimeUnit.MINUTES);
} catch (TimeoutException e) {
log.warn("[IsolatedPool] 批量任务整体超时,取消剩余任务");
futures.forEach(f -> f.cancel(true));
} catch (Exception e) {
log.error("[IsolatedPool] 批量任务等待失败: {}", e.getMessage());
}
return futures.stream()
.map(f -> {
try {
return f.isDone() && !f.isCompletedExceptionally()
? f.get() : "任务未完成";
} catch (Exception e) {
return "获取结果失败";
}
})
.toList();
}
}三种方案压测对比
测试环境:
- 服务器:4核8G(AWS c5.xlarge)
- JDK:Java 21
- Spring Boot:3.3.5
- Spring AI:1.0.0
- 模型:GPT-4o(P50响应:2.1秒)
- 测试工具:JMeter 5.6
- 测试场景:持续30分钟压测,逐步增加并发
测试结果
| 并发数 | 传统Tomcat(800线程) | 虚拟线程 | WebFlux | CF+隔离池 |
|---|---|---|---|---|
| 100 | TPS:47, P99:3.1s | TPS:47, P99:3.0s | TPS:48, P99:2.9s | TPS:45, P99:3.2s |
| 500 | TPS:180, P99:8.2s | TPS:238, P99:3.4s | TPS:245, P99:3.2s | TPS:220, P99:3.6s |
| 1000 | TPS:203, P99:崩溃 | TPS:476, P99:3.8s | TPS:490, P99:3.6s | TPS:440, P99:4.1s |
| 2000 | OOM崩溃 | TPS:485, P99:5.2s | TPS:502, P99:4.8s | TPS:458, P99:5.8s |
| 3000 | N/A | TPS:487, P99:7.1s | TPS:510, P99:6.4s | TPS:462, P99:8.3s |
内存使用(1000并发):
| 方案 | JVM堆内存 | 线程内存 | 总内存 |
|---|---|---|---|
| 传统线程池(800) | 3.2GB | 800MB(1MB×800) | 4.0GB |
| 虚拟线程(1000) | 2.8GB | 1MB(1KB×1000) | 2.8GB |
| WebFlux(1000) | 2.4GB | 4MB(少量平台线程) | 2.4GB |
结论:
- 低并发(<200):三种方案差异不大
- 中等并发(200-1000):虚拟线程TPS提升2.3倍
- 高并发(>1000):WebFlux略优,但差距在10%以内
- 内存:虚拟线程/WebFlux比传统线程节省约30-40%
SSE流式背压控制
高并发下,SSE连接积压会导致内存溢出,必须做背压控制。
package com.laozhang.concurrency.backpressure;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.util.concurrent.Queues;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
/**
* SSE高并发背压控制
*
* 问题:高并发时SSE连接可能积压数百MB的未发送数据
* 解决:
* 1. 全局SSE连接数限制
* 2. 每个连接的缓冲区大小限制
* 3. 慢消费者检测和断开
*/
@Slf4j
@Service
public class BackpressureStreamService {
private final ChatClient chatClient;
// 全局SSE连接数限制
private final Semaphore sseConnectionSemaphore = new Semaphore(500);
// 当前活跃连接数
private final AtomicInteger activeConnections = new AtomicInteger(0);
public BackpressureStreamService(ChatClient chatClient) {
this.chatClient = chatClient;
}
/**
* 带背压控制的SSE流
*/
public Flux<String> streamWithBackpressure(String sessionId, String message) {
// 检查并发连接数
if (!sseConnectionSemaphore.tryAcquire()) {
return Flux.error(new RuntimeException(
"服务繁忙,当前连接数已达上限(500),请稍后再试"));
}
int connections = activeConnections.incrementAndGet();
log.info("[SSE-BP] 新连接: sessionId={}, 当前连接数={}", sessionId, connections);
return chatClient.prompt()
.user(message)
.stream()
.content()
// 背压策略:缓冲区满时丢弃最旧的数据(防止内存溢出)
.onBackpressureBuffer(
256, // 每个连接最大缓冲256个token
dropped -> log.warn("[SSE-BP] 背压丢弃token: sessionId={}", sessionId),
FluxSink.OverflowStrategy.DROP_OLDEST
)
// 慢消费者检测:超过10秒没消费,关闭连接
.timeout(java.time.Duration.ofSeconds(60))
.doFinally(signalType -> {
sseConnectionSemaphore.release();
int remaining = activeConnections.decrementAndGet();
log.info("[SSE-BP] 连接关闭: sessionId={}, 信号={}, 剩余连接={}",
sessionId, signalType, remaining);
});
}
}请求队列:防止打爆LLM Provider
package com.laozhang.concurrency.queue;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* AI请求队列服务
*
* 功能:
* 1. 令牌桶限流(符合OpenAI RPM限制)
* 2. 优先级队列(VIP请求优先)
* 3. 请求超时自动丢弃
*/
@Slf4j
@Service
public class AiRequestQueueService {
private final ChatClient chatClient;
// OpenAI GPT-4o限制:500 RPM = 约8.3 RPS
// 留一些余量,设置为5 RPS
private final Bucket rateLimitBucket = Bucket.builder()
.addLimit(Bandwidth.simple(5, Duration.ofSeconds(1))) // 每秒5个
.addLimit(Bandwidth.simple(200, Duration.ofMinutes(1))) // 每分钟200个
.build();
// 优先级队列:数字越小优先级越高
private final PriorityBlockingQueue<AiRequest> requestQueue =
new PriorityBlockingQueue<>(1000,
java.util.Comparator.comparingInt(AiRequest::priority));
// 统计
private final AtomicLong totalProcessed = new AtomicLong(0);
private final AtomicLong totalRejected = new AtomicLong(0);
public AiRequestQueueService(ChatClient chatClient) {
this.chatClient = chatClient;
// 启动队列处理线程
startQueueProcessor();
}
/**
* 提交AI请求到队列
*/
public CompletableFuture<String> submitRequest(
String message, int priority, long timeoutMs) {
CompletableFuture<String> future = new CompletableFuture<>();
AiRequest request = new AiRequest(
message, priority, future,
System.currentTimeMillis() + timeoutMs
);
// 队列已满,拒绝
if (!requestQueue.offer(request)) {
totalRejected.incrementAndGet();
future.completeExceptionally(
new RuntimeException("请求队列已满(1000),请稍后重试"));
return future;
}
return future;
}
/**
* 队列处理器(后台线程)
*/
private void startQueueProcessor() {
Thread.ofVirtual().name("ai-queue-processor").start(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
AiRequest request = requestQueue.poll(1, TimeUnit.SECONDS);
if (request == null) continue;
// 检查请求是否已超时
if (System.currentTimeMillis() > request.expireTime()) {
request.future().completeExceptionally(
new TimeoutException("请求在队列中等待超时"));
continue;
}
// 获取令牌(阻塞等待)
rateLimitBucket.asBlocking().consume(1);
// 异步执行AI调用
Thread.ofVirtual().start(() -> {
try {
String result = chatClient.prompt()
.user(request.message())
.call()
.content();
request.future().complete(result);
totalProcessed.incrementAndGet();
} catch (Exception e) {
request.future().completeExceptionally(e);
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
public record AiRequest(
String message,
int priority, // 1=最高优先级
CompletableFuture<String> future,
long expireTime
) implements Comparable<AiRequest> {
@Override
public int compareTo(AiRequest other) {
return Integer.compare(this.priority, other.priority);
}
}
public QueueStats getStats() {
return new QueueStats(
requestQueue.size(),
totalProcessed.get(),
totalRejected.get()
);
}
public record QueueStats(int queueSize, long processed, long rejected) {}
}熔断限流配置
package com.laozhang.concurrency.resilience;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Service;
/**
* 带熔断限流保护的AI服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ResilientAiService {
private final ChatClient chatClient;
/**
* 三重保护:
* 1. @RateLimiter:每秒最多100个请求
* 2. @Bulkhead:最多100个并发调用
* 3. @CircuitBreaker:失败率>50%时熔断60秒
*/
@RateLimiter(name = "openaiClient",
fallbackMethod = "chatRateLimitFallback")
@Bulkhead(name = "openaiClient", type = Bulkhead.Type.SEMAPHORE,
fallbackMethod = "chatBulkheadFallback")
@CircuitBreaker(name = "openaiClient",
fallbackMethod = "chatCircuitBreakerFallback")
public String chat(String message) {
return chatClient.prompt()
.user(message)
.call()
.content();
}
// 降级方法:限流时触发
public String chatRateLimitFallback(String message,
io.github.resilience4j.ratelimiter.RequestNotPermitted e) {
log.warn("[Resilience] 限流触发: message={}", message.substring(0, 20));
return "当前请求量过大,您的请求已加入排队,预计等待时间30秒。";
}
// 降级方法:舱壁(并发限制)触发
public String chatBulkheadFallback(String message,
io.github.resilience4j.bulkhead.BulkheadFullException e) {
log.warn("[Resilience] 舱壁满触发: message={}", message.substring(0, 20));
return "服务繁忙,请稍后重试。";
}
// 降级方法:熔断器开路时触发
public String chatCircuitBreakerFallback(String message, Exception e) {
log.error("[Resilience] 熔断器触发: error={}", e.getMessage());
return "AI服务暂时不可用,请稍后重试。如紧急情况请拨打客服热线。";
}
}生产配置推荐
基于压测结果,以下是不同并发规模的推荐配置:
| 并发规模 | 方案 | 虚拟线程 | 连接池 | 信号量 | 预期TPS |
|---|---|---|---|---|---|
| <200并发 | 虚拟线程 | 开启 | maxConn=1000 | 20 | 90 |
| 200-500 | 虚拟线程 | 开启 | maxConn=4096 | 50 | 230 |
| 500-1000 | 虚拟线程+限流 | 开启 | maxConn=8192 | 80 | 470 |
| >1000 | WebFlux+背压 | N/A | maxConn=16384 | 100 | 500+ |
OpenAI HTTP连接池配置(这是被很多人忽略的关键参数):
spring:
ai:
openai:
# 以下配置在Spring AI 1.0中通过自定义WebClient实现
# 默认连接池只有200,高并发必须调大
http:
max-connections: 500 # 最大连接数
max-connections-per-route: 500
connect-timeout: 10s
read-timeout: 60s
write-timeout: 10s
# 连接保持活跃时间
connection-ttl: 30sFAQ
Q1:虚拟线程会有什么坑?
主要坑:
1. synchronized锁会固定(pin)虚拟线程到平台线程,导致退化
解决:用ReentrantLock替代synchronized
2. ThreadLocal在虚拟线程中内存占用更大(大量虚拟线程都有副本)
解决:减少ThreadLocal使用,改用ScopedValue(Java 21新特性)
3. 部分JDBC驱动还不支持虚拟线程友好
解决:确认驱动版本,MySQL Connector 8.1+已支持Q2:WebFlux和虚拟线程能共存吗?
可以,但不建议混用。
如果用WebFlux,就用Reactor的调度器。
如果用虚拟线程,就用同步代码。
混用会导致调试困难,性能收益也不明显。Q3:如何监控线程使用情况?
# 查看虚拟线程数
curl http://localhost:8080/actuator/metrics/jvm.threads.live
# 线程Dump(观察虚拟线程状态)
curl http://localhost:8080/actuator/threaddump
# Prometheus指标(Grafana看板)
ai_tool_calls_duration_seconds
ai_pool_rejected_totalQ4:SSE连接断开后,LLM还在生成怎么办?
// 使用Flux的doFinally取消上游
return chatClient.prompt().user(message)
.stream().content()
.doFinally(signal -> {
if (signal == SignalType.CANCEL) {
// 前端断开连接,取消后续生成
log.info("SSE连接已取消");
}
});
// Spring WebFlux会自动传播取消信号Q5:如何计算合适的信号量大小?
公式:信号量 = min(API_RPM / 60, 目标并发数 × 0.8)
示例:OpenAI GPT-4o限制500RPM
信号量 = min(500/60, 500×0.8) = min(8.3, 400) = 8(太小了)
实际上应该考虑平均响应时间:
信号量 ≈ API_RPM × 平均响应时间(秒) / 60
信号量 ≈ 500 × 2 / 60 ≈ 16
建议从16开始,逐步上调,观察是否触发429错误。