第1694篇:高并发AI服务的连接池优化——HTTP客户端与模型API的连接管理
第1694篇:高并发AI服务的连接池优化——HTTP客户端与模型API的连接管理
一个月前我们的AI服务在流量高峰时出现了一个奇怪的现象:监控显示CPU和内存都很正常,但请求延迟突然飙升,有时候甚至出现大量超时。
查了很久,最终找到原因:HTTP连接池满了。
大量请求在等待获取连接,等待时间加上实际的大模型响应时间,总延迟就超过了我们设置的超时阈值,触发了超时告警。但监控面板上CPU和内存显示正常,所以第一时间没往连接池这个方向想。
这个经历让我意识到,AI服务的连接池优化是个容易被忽视但非常关键的问题。今天把这块讲透。
为什么AI服务的连接池比普通服务更复杂
普通的REST API调用,一个请求可能就几十毫秒,连接池里的连接很快就能被释放归还,池子里的连接周转很快。
大模型API完全不同:
- 一次非流式请求:2秒到30秒不等
- 一次流式请求:可能持续10到60秒(长对话)
- 连接在这段时间内被占用,无法被其他请求复用
这意味着同样的连接池大小,在AI场景下能支撑的并发请求数远比普通场景少。
数学计算一下:假设你的连接池大小是50,传统API平均响应100ms,理论吞吐量是50/0.1 = 500 QPS。但如果是大模型API,平均响应5秒,理论吞吐量是50/5 = 10 QPS。差了50倍。
所以AI服务的连接池要根据这个特点来设计,不能照搬普通服务的经验。
Spring Boot里的HTTP客户端选型
在Java生态里,用于调用大模型API的HTTP客户端主要有几个选择:
- OkHttp:流行、支持SSE、连接池设计优秀
- Apache HttpClient 5:老牌稳健、配置灵活
- Java原生HttpClient(Java 11+):无依赖、支持HTTP/2
- WebClient(Spring WebFlux):响应式、背压支持
Spring AI默认用的是哪个?取决于你引入的starter。Spring AI的RestClient实现默认用Spring的RestTemplate底层,但可以切换。
我在生产上测试下来,OkHttp 在AI流式场景下表现最好,主要原因是它的连接池实现比较智能,对长连接的维护也做得好。
OkHttp连接池的深度配置
OkHttp的连接池是线程安全的,核心参数:
maxIdleConnections:最大空闲连接数keepAliveDuration:空闲连接保持时间
但对于AI服务,这两个参数远不够,需要更细粒度的控制。
@Configuration
public class AIHttpClientConfig {
@Bean
public OkHttpClient aiHttpClient(
@Value("${ai.http.maxIdleConnections:20}") int maxIdleConnections,
@Value("${ai.http.keepAliveSeconds:120}") int keepAliveSeconds,
@Value("${ai.http.connectTimeoutMs:5000}") int connectTimeout,
@Value("${ai.http.readTimeoutMs:120000}") int readTimeout,
@Value("${ai.http.writeTimeoutMs:30000}") int writeTimeout) {
// 连接池:空闲连接数根据并发请求数估算
// 如果最大并发100个AI请求,设置100个空闲连接
ConnectionPool connectionPool = new ConnectionPool(
maxIdleConnections, // 最大空闲连接数
keepAliveSeconds, // 空闲连接保持存活时间(秒)
TimeUnit.SECONDS
);
// 自定义Dispatcher控制并发
Dispatcher dispatcher = new Dispatcher();
// 最大并发请求数(所有host)
dispatcher.setMaxRequests(200);
// 单个host的最大并发(对同一个大模型API端点)
dispatcher.setMaxRequestsPerHost(100);
return new OkHttpClient.Builder()
.connectionPool(connectionPool)
.dispatcher(dispatcher)
.connectTimeout(connectTimeout, TimeUnit.MILLISECONDS)
// AI流式输出场景:readTimeout要足够长
.readTimeout(readTimeout, TimeUnit.MILLISECONDS)
.writeTimeout(writeTimeout, TimeUnit.MILLISECONDS)
// 添加连接失败重试(透明重试,只对幂等请求)
.retryOnConnectionFailure(true)
// 添加监控拦截器
.addInterceptor(new ConnectionMetricsInterceptor())
// HTTP/2支持(如果大模型API支持)
// .protocols(List.of(Protocol.HTTP_2, Protocol.HTTP_1_1))
.build();
}
}连接池监控拦截器
连接池的状态是非常有价值的监控数据,但很多团队没有把它暴露出来。
public class ConnectionMetricsInterceptor implements Interceptor {
private final Counter requestCounter;
private final Timer requestTimer;
private final Gauge idleConnectionsGauge;
public ConnectionMetricsInterceptor(MeterRegistry registry, ConnectionPool pool) {
this.requestCounter = Counter.builder("ai.http.requests")
.register(registry);
this.requestTimer = Timer.builder("ai.http.request.duration")
.publishPercentiles(0.5, 0.95, 0.99)
.register(registry);
// 注册连接池指标
Gauge.builder("ai.http.connections.idle", pool, ConnectionPool::idleConnectionCount)
.description("空闲连接数")
.register(registry);
Gauge.builder("ai.http.connections.total", pool, ConnectionPool::connectionCount)
.description("总连接数")
.register(registry);
}
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
long startTime = System.currentTimeMillis();
requestCounter.increment();
try {
Response response = chain.proceed(request);
long duration = System.currentTimeMillis() - startTime;
requestTimer.record(duration, TimeUnit.MILLISECONDS);
// 记录连接重用情况
if (response.cacheResponse() != null) {
log.debug("使用缓存响应");
}
return response;
} catch (IOException e) {
log.error("HTTP请求失败: url={}, duration={}ms",
request.url(), System.currentTimeMillis() - startTime, e);
throw e;
}
}
}Apache HttpClient 5 的连接池配置
如果你的项目用的是Apache HttpClient(很多老项目),连接池配置更复杂但也更灵活。
@Bean
public CloseableHttpClient aiApacheHttpClient() {
// 连接池管理器
PoolingHttpClientConnectionManager connectionManager =
PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnTotal(200) // 总最大连接数
.setMaxConnPerRoute(100) // 每个路由(host)的最大连接数
.setConnectionTimeToLive(TimeValue.ofMinutes(5)) // 连接最大存活时间
.setDefaultSocketConfig(SocketConfig.custom()
.setSoTimeout(Timeout.ofSeconds(120)) // Socket读超时
.setSoKeepAlive(true) // TCP Keep-Alive
.build())
.build();
// 连接有效性检查策略
// LAZY_VALIDATION:使用前验证连接(有额外开销但更安全)
connectionManager.setDefaultConnectionConfig(ConnectionConfig.custom()
.setValidateAfterInactivity(TimeValue.ofSeconds(30))
.build());
// 请求重试处理
HttpRequestRetryStrategy retryStrategy = new DefaultHttpRequestRetryStrategy(
3, // 最大重试次数
TimeValue.ofMilliseconds(500) // 重试间隔
);
// 自动清理过期连接
IdleConnectionEvictor evictor = new IdleConnectionEvictor(
connectionManager,
TimeValue.ofSeconds(30) // 每30秒清理一次
);
evictor.start();
return HttpClients.custom()
.setConnectionManager(connectionManager)
.setRetryStrategy(retryStrategy)
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectionRequestTimeout(Timeout.ofSeconds(10)) // 从池获取连接的超时
.setConnectTimeout(Timeout.ofSeconds(5))
.setResponseTimeout(Timeout.ofSeconds(120))
.build())
.build();
}一个重要配置:ConnectionRequestTimeout
这是从连接池获取连接的等待超时时间,很多人不知道这个配置项。如果连接池满了,请求需要等待其他请求释放连接,这个超时控制等待的最长时间。
在AI服务里,这个值要合理设置:太短会导致连接池满时大量请求快速失败;太长会导致请求队列积压,延迟不可控。通常设置10-30秒比较合适。
流式请求的特殊处理
流式请求(SSE)和普通请求有本质区别:连接建立后,服务端会持续推送数据,连接要维持整个推送过程。
这对连接池的压力很大——每个流式请求会长时间占用一个连接。
策略一:流式请求使用独立的连接池
@Configuration
public class AIHttpClientConfig {
// 普通请求的HTTP客户端(短连接为主)
@Bean("normalHttpClient")
public OkHttpClient normalHttpClient() {
return new OkHttpClient.Builder()
.connectionPool(new ConnectionPool(50, 60, TimeUnit.SECONDS))
.readTimeout(30, TimeUnit.SECONDS)
.build();
}
// 流式请求的HTTP客户端(长连接为主)
@Bean("streamHttpClient")
public OkHttpClient streamHttpClient() {
Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(500);
dispatcher.setMaxRequestsPerHost(500);
return new OkHttpClient.Builder()
// 流式场景:更多空闲连接,更长的keepalive
.connectionPool(new ConnectionPool(200, 300, TimeUnit.SECONDS))
// 流式读超时要很长(整个推送过程的时间)
.readTimeout(600, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.dispatcher(dispatcher)
.build();
}
}策略二:流式请求完成后主动归还连接
OkHttp在流式场景下,Response body需要被完全消费或者显式关闭,连接才会被归还到池里。这一点很容易忽视!
public void streamCompletion(String prompt, Consumer<String> tokenConsumer) {
Request request = new Request.Builder()
.url(apiUrl)
.post(buildRequestBody(prompt))
.addHeader("Accept", "text/event-stream")
.build();
// 使用try-with-resources确保连接归还
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new ApiException("API返回错误: " + response.code());
}
ResponseBody body = response.body();
if (body == null) return;
// 使用BufferedReader逐行读取SSE数据
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(body.byteStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("data: ")) {
String data = line.substring(6);
if ("[DONE]".equals(data)) break;
// 处理token
tokenConsumer.accept(parseToken(data));
}
}
}
// body和response在try块结束时自动关闭,连接归还到池
} catch (IOException e) {
throw new RuntimeException("流式请求失败", e);
}
// 如果没有用try-with-resources,而是忘记关闭response,连接就泄漏了!
}这是一个非常常见的连接泄漏场景。我见过的代码里有相当大比例在这里有问题——忘记关闭Response Body,导致连接越来越少,最终连接池耗尽。
连接池大小的计算方法
这是大家最关心的问题:连接池到底配多少?
理论计算公式:
连接池大小 = 目标并发请求数 × (平均响应时间 / 1000)示例:
- 目标并发:100个同时进行的AI对话
- 平均响应时间:10秒(非流式)
所需连接数 = 100 × (10000ms / 1000) = 1000
等等,这不对——1000个连接太多了。
实际上,我们不需要为每个"并发对话"都保持一个连接。AI推理系统里有几层缓冲:
- 等待进入处理的请求(在队列里等)
- 正在处理的请求(占用连接)
- 已经在流式输出的连接(占用连接)
如果你的系统设计允许100ms内的排队等待,那么连接池只需要支撑"同时在等模型响应"的那些请求。
实际建议:从小配置开始,监控连接池利用率,逐步增大。
// 暴露连接池监控端点
@RestController
@RequestMapping("/internal/metrics")
public class ConnectionPoolMetricsController {
private final OkHttpClient httpClient;
@GetMapping("/connection-pool")
public Map<String, Object> getConnectionPoolMetrics() {
ConnectionPool pool = httpClient.connectionPool();
return Map.of(
"idleConnectionCount", pool.idleConnectionCount(),
"connectionCount", pool.connectionCount(),
"utilization", String.format("%.1f%%",
(1.0 - (double)pool.idleConnectionCount() / pool.connectionCount()) * 100)
);
}
}当利用率持续超过80%时,说明连接池需要扩容。
多个大模型API的连接隔离
很多AI服务同时对接多个大模型(主用GPT-4,降级用国产模型),这时候要做连接隔离,防止一个API的问题拖垮另一个API的连接。
@Configuration
public class MultiModelHttpConfig {
@Bean
public Map<String, OkHttpClient> modelHttpClients() {
Map<String, OkHttpClient> clients = new HashMap<>();
// OpenAI专用连接池
clients.put("openai", new OkHttpClient.Builder()
.connectionPool(new ConnectionPool(50, 120, TimeUnit.SECONDS))
.readTimeout(120, TimeUnit.SECONDS)
.build());
// Claude专用连接池
clients.put("claude", new OkHttpClient.Builder()
.connectionPool(new ConnectionPool(30, 120, TimeUnit.SECONDS))
.readTimeout(180, TimeUnit.SECONDS)
.build());
// 阿里通义专用连接池(延迟更低,响应更快)
clients.put("qwen", new OkHttpClient.Builder()
.connectionPool(new ConnectionPool(80, 60, TimeUnit.SECONDS))
.readTimeout(60, TimeUnit.SECONDS)
.build());
return clients;
}
@Bean
public LLMClientRouter llmClientRouter(Map<String, OkHttpClient> modelHttpClients) {
return new LLMClientRouter(modelHttpClients);
}
}
@Component
public class LLMClientRouter {
private final Map<String, OkHttpClient> clients;
public OkHttpClient getClient(String model) {
// 根据模型名称选择对应的HTTP客户端
if (model.startsWith("gpt")) return clients.get("openai");
if (model.startsWith("claude")) return clients.get("claude");
if (model.startsWith("qwen")) return clients.get("qwen");
// 默认用一个通用客户端
return clients.get("openai");
}
}连接池健康检查与自动恢复
大模型API有时候会进入一种状态:连接可以建立,但请求挂起不返回。这种"僵尸连接"会占用连接池资源,让其他请求无法获取连接。
需要有一个后台任务定期检查和清理:
@Component
public class ConnectionPoolHealthChecker {
private final OkHttpClient httpClient;
private final LLMApiConfig apiConfig;
@Scheduled(fixedDelay = 60_000) // 每分钟执行一次
public void checkAndEvictStaleConnections() {
ConnectionPool pool = httpClient.connectionPool();
int before = pool.connectionCount();
pool.evictAll(); // 清理所有空闲连接,让它们重新建立
int after = pool.connectionCount();
if (before != after) {
log.info("连接池清理: before={}, after={}", before, after);
}
}
@Scheduled(fixedDelay = 30_000) // 每30秒发一个探测请求
public void probeConnection() {
// 发一个轻量级请求,测试连接是否正常
Request probe = new Request.Builder()
.url(apiConfig.getHealthCheckUrl())
.get()
.build();
try (Response response = httpClient.newCall(probe).execute()) {
if (!response.isSuccessful()) {
log.warn("大模型API健康检查失败: statusCode={}", response.code());
// 触发告警或者熔断
}
} catch (IOException e) {
log.error("大模型API健康检查异常", e);
}
}
}一个完整的连接池监控看板
最后分享一下我们的连接池监控配置,用Micrometer + Prometheus:
@Configuration
public class ConnectionPoolMetricsConfig {
@Bean
public MeterBinder connectionPoolMetrics(OkHttpClient httpClient) {
return registry -> {
ConnectionPool pool = httpClient.connectionPool();
Gauge.builder("okhttp.connection.pool.idle", pool, ConnectionPool::idleConnectionCount)
.description("空闲连接数")
.register(registry);
Gauge.builder("okhttp.connection.pool.total", pool, ConnectionPool::connectionCount)
.description("总连接数")
.register(registry);
// 连接池利用率
Gauge.builder("okhttp.connection.pool.utilization", pool, p -> {
int total = p.connectionCount();
if (total == 0) return 0.0;
return (double)(total - p.idleConnectionCount()) / total;
})
.description("连接池利用率")
.register(registry);
};
}
}配合Grafana告警规则:
# Grafana告警规则
- name: connection-pool-alert
rules:
- alert: ConnectionPoolUtilizationHigh
expr: okhttp_connection_pool_utilization > 0.85
for: 2m
annotations:
summary: "连接池利用率超过85%"
description: "AI服务连接池利用率持续超过85%,可能导致请求排队"总结
AI服务的连接池优化,核心要抓住几个关键点:
- 流式请求和非流式请求要分离,因为它们对连接的占用时间完全不同
- 连接泄漏是主要故障原因,要确保Response Body被正确关闭
- 连接池大小要根据实际负载计算,不能照搬普通服务的经验值
- 多模型API要做连接隔离,防止单个API故障影响全局
- 监控连接池利用率,而不是只看CPU和内存
下次你的AI服务出现不明原因的延迟升高,记得看一眼连接池状态,很可能就是这里。
