第2295篇:AI系统的水平扩展——从单实例到多实例的弹性架构设计
第2295篇:AI系统的水平扩展——从单实例到多实例的弹性架构设计
适读人群:正在或即将面临AI系统扩展压力的架构师和工程师 | 阅读时长:约16分钟 | 核心价值:掌握AI系统水平扩展的核心难点和工程解法,避免常见的有状态陷阱
我们的AI客服系统第一个版本只有单实例,跑得挺稳。直到某个大促活动开始,QPS突然从平时的50飙到500,系统直接挂了。
事后复盘发现问题:我们的系统里有几个全局状态:一个用于速率限制的计数器(存在内存里)、一个缓存LLM客户端连接的对象池(也在内存里)、还有一个正在进行的对话的上下文(同样在内存里)。水平扩展加了第二个实例后,这些全局状态就乱了——用户的请求可能被路由到没有其对话上下文的实例上。
这篇文章系统地讲AI系统水平扩展的几个关键问题。
AI系统水平扩展的特殊挑战
普通HTTP服务的水平扩展相对简单,只要服务是无状态的,加机器就好。AI系统有几个独特的有状态点:
对话上下文状态:多轮对话需要维护历史消息,如果历史存在内存里,请求不能随机路由到不同实例。
向量索引状态:大型向量索引(数百万条嵌入)加载到内存需要几分钟,如果每个实例都要加载,扩容很慢;如果共享一份,变成了单点。
速率限制计数器:AI API有速率限制,多实例各自维护计数器会导致超限。
模型缓存:Prompt缓存(Claude的cache_control功能)是会话级别的,切换实例缓存失效。
无状态化改造
解决水平扩展的根本方法是将状态外化,让每个应用实例变成真正的无状态节点:
// 改造前:对话上下文存在内存里
@Service
public class ConversationServiceBad {
// 这个Map存在单个JVM实例的内存里,多实例时不一致
private final Map<String, List<ChatMessage>> inMemoryContexts = new ConcurrentHashMap<>();
public String chat(String conversationId, String userMessage) {
List<ChatMessage> history = inMemoryContexts.getOrDefault(conversationId, new ArrayList<>());
// ... 调用AI
inMemoryContexts.put(conversationId, updatedHistory);
return response;
}
}
// 改造后:对话上下文存在Redis里
@Service
public class ConversationServiceGood {
private final RedisTemplate<String, List<ChatMessage>> redis;
private static final String CONTEXT_KEY = "conv:ctx:";
private static final Duration CONTEXT_TTL = Duration.ofHours(4);
public String chat(String conversationId, String userMessage) {
// 从Redis加载历史(任何实例都能访问)
String key = CONTEXT_KEY + conversationId;
List<ChatMessage> history = redis.opsForValue().get(key);
if (history == null) history = new ArrayList<>();
// 调用AI(带上历史)
String response = llmClient.chat(history, userMessage);
// 更新历史写回Redis
history.add(new ChatMessage("user", userMessage));
history.add(new ChatMessage("assistant", response));
// 只保留最近N条(防止上下文无限增长)
if (history.size() > 40) {
history = history.subList(history.size() - 40, history.size());
}
redis.opsForValue().set(key, history, CONTEXT_TTL);
return response;
}
}分布式速率限制
多实例部署时,需要用Redis实现分布式速率限制,确保整体不超过AI API的速率限制:
@Service
public class DistributedRateLimiter {
private final RedisTemplate<String, Long> redis;
// 使用Redis的Lua脚本实现原子性的令牌桶
private static final String RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return 0
else
return 1
end
""";
private final RedisScript<Long> rateLimitScript =
RedisScript.of(RATE_LIMIT_SCRIPT, Long.class);
/**
* 全局速率限制:跨所有实例,每分钟最多N次AI API调用
*/
public boolean tryAcquire(String apiKey, int limitPerMinute) {
String key = "ratelimit:" + apiKey + ":" + currentMinuteTimestamp();
Long result = redis.execute(rateLimitScript,
List.of(key),
String.valueOf(limitPerMinute),
"60" // 60秒窗口
);
return Long.valueOf(1).equals(result);
}
private long currentMinuteTimestamp() {
return System.currentTimeMillis() / 60000;
}
}向量索引的共享与分片
大型向量索引的水平扩展是个挑战。几种方案:
方案1:独立向量数据库(推荐)
把向量索引放在专门的向量数据库(Qdrant、Milvus、pgvector),AI应用实例通过HTTP/gRPC访问,实现真正的无状态:
@Configuration
public class VectorStoreConfig {
@Bean
public QdrantClient qdrantClient(
@Value("${qdrant.host}") String host,
@Value("${qdrant.port}") int port) {
// Qdrant本身支持集群模式,可以水平扩展
return QdrantClient.newBuilder()
.address(host + ":" + port)
.build();
}
}
@Service
public class VectorSearchService {
private final QdrantClient qdrantClient;
// 每次调用走网络,比内存慢,但支持水平扩展
public List<ScoredPoint> search(float[] queryVector, int topK) {
return qdrantClient.searchAsync(
SearchPoints.newBuilder()
.setCollectionName("knowledge-base")
.addAllVector(toFloatList(queryVector))
.setLimit(topK)
.setWithPayload(WithPayloadSelector.newBuilder().setEnable(true))
.build()
).join().getResultList();
}
}方案2:分片索引
当单个向量数据库成为瓶颈时,对向量空间分片,不同的查询路由到不同的分片:
@Service
public class ShardedVectorSearch {
private final List<VectorSearchService> shards; // N个向量数据库实例
/**
* 广播查询到所有分片,合并结果
* 适合数据均匀分布的场景
*/
public List<SearchResult> searchAllShards(float[] queryVector, int topK) {
List<CompletableFuture<List<SearchResult>>> futures = shards.stream()
.map(shard -> CompletableFuture.supplyAsync(() ->
shard.search(queryVector, topK)
))
.collect(Collectors.toList());
// 收集所有分片的结果
List<SearchResult> allResults = futures.stream()
.flatMap(f -> f.join().stream())
.collect(Collectors.toList());
// 重新排序,取Top-K
return allResults.stream()
.sorted(Comparator.comparingDouble(SearchResult::getScore).reversed())
.limit(topK)
.collect(Collectors.toList());
}
}弹性伸缩配置
在Kubernetes上实现AI服务的自动弹性伸缩:
# Kubernetes HPA配置
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ai-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ai-service
minReplicas: 2 # 最少2个副本(高可用)
maxReplicas: 20 # 最多20个副本
metrics:
# 基于CPU扩容(简单可靠)
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
# 基于自定义指标扩容(AI请求队列深度)
- type: External
external:
metric:
name: kafka_consumer_lag
selector:
matchLabels:
topic: ai-tasks
target:
type: AverageValue
averageValue: "100" # 每个Pod处理100条消息的积压时开始扩容
behavior:
scaleUp:
stabilizationWindowSeconds: 30 # 扩容快一点(应对突发)
policies:
- type: Percent
value: 100 # 每30秒最多翻倍
periodSeconds: 30
scaleDown:
stabilizationWindowSeconds: 300 # 缩容慢一点(避免频繁抖动)
policies:
- type: Percent
value: 25 # 每5分钟最多缩减25%
periodSeconds: 300优雅启停:处理进行中的AI请求
AI请求延迟长(10-60秒),扩容和缩容时需要优雅处理进行中的请求:
@Component
public class GracefulShutdownHandler {
private final AtomicInteger activeRequests = new AtomicInteger(0);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
@PreDestroy
public void onShutdown() throws InterruptedException {
log.info("收到关闭信号,等待进行中的AI请求完成...");
// 等待所有进行中的请求完成,最多等待90秒
int waitSeconds = 0;
while (activeRequests.get() > 0 && waitSeconds < 90) {
log.info("还有{}个AI请求进行中,继续等待...", activeRequests.get());
Thread.sleep(5000);
waitSeconds += 5;
}
if (activeRequests.get() > 0) {
log.warn("超时,强制关闭,{}个请求未完成", activeRequests.get());
} else {
log.info("所有请求已完成,安全关闭");
}
}
}
// Kubernetes terminationGracePeriodSeconds要与应用的等待时间匹配
// 如果AI请求最长60秒,terminationGracePeriodSeconds至少要设120秒水平扩展AI系统的核心原则:对话上下文、速率计数器、缓存等所有状态都要外化到Redis/数据库,应用实例要真正无状态。这不只是AI系统的要求,是所有需要水平扩展的分布式系统的基础要求,只是AI系统的有状态点更隐蔽、更容易被忽视。
