第2145篇:AI服务的生产部署——从单机Demo到高可用集群的工程实践
第2145篇:AI服务的生产部署——从单机Demo到高可用集群的工程实践
适读人群:负责AI服务生产部署的DevOps工程师和后端架构师 | 阅读时长:约18分钟 | 核心价值:掌握AI服务的生产部署要点,解决模型服务、向量数据库和LLM调用的高可用和扩容问题
"在我本地跑得好好的,一上生产就出问题。"
这是AI工程师比传统后端工程师更频繁遇到的情况。AI服务的生产部署有几个独特挑战:本地Embedding模型要加载几百MB的模型文件;向量数据库需要大内存;LLM调用有速率限制;批量任务和实时查询的资源竞争激烈。
这篇文章讲AI服务从单机到生产集群的关键工程决策,不是通用的部署教程,而是AI服务特有的那些坑。
AI服务的资源特征
/**
* AI服务和传统Web服务的资源差异
*
* ===== 内存压力 =====
*
* 传统Web服务:每个实例2-4GB内存通常够用
*
* AI服务的额外内存消耗:
* - 本地Embedding模型:768MB - 2GB(ONNX模型文件)
* - 向量缓存:每百万条768维向量 ≈ 3GB内存
* - LLM上下文:每个活跃对话0.5-2MB
* - JVM本身:1-2GB
*
* 典型配置:每个AI服务实例需要8-16GB内存
*
* ===== CPU vs GPU =====
*
* LLM推理(OpenAI API):无本地GPU需求,但有网络IO
*
* 本地Embedding模型推理:
* - CPU:单核约5-20 QPS(取决于模型大小)
* - GPU(T4):100-500 QPS
*
* 结论:
* - 低QPS(<50/s):CPU足够,成本低
* - 高QPS(>100/s):考虑GPU或水平扩容
*
* ===== 冷启动问题 =====
*
* AI服务启动慢(加载模型需要10-30秒)
* Kubernetes的liveness和readiness probe要配置合理,
* 避免模型还没加载好就开始接收流量
*
* ===== 批处理 vs 实时查询的竞争 =====
*
* 批量文档入库(高CPU/内存需求)
* 和实时用户查询(要求低延迟)
* 跑在同一个实例上会互相干扰
*
* 解决方案:分开部署或使用不同的资源池
*/Spring Boot AI服务的生产配置
/**
* AI服务的生产级配置
*
* 覆盖:
* 1. JVM内存配置
* 2. 线程池隔离(批处理 vs 实时查询)
* 3. 超时和熔断配置
* 4. 健康检查
*/
@Configuration
@Slf4j
public class AiServiceProductionConfig {
/**
* LLM调用线程池
*
* 实时查询线程池:核心20线程,最大50线程,响应优先
* 批处理线程池:核心5线程,队列深度1000,吞吐优先
*/
@Bean("llmRealTimeExecutor")
public ThreadPoolTaskExecutor llmRealTimeExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100); // 队列小,超过快速返回503
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix("llm-rt-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("llmBatchExecutor")
public ThreadPoolTaskExecutor llmBatchExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(1000); // 队列大,允许积压
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("llm-batch-");
executor.initialize();
return executor;
}
/**
* Embedding模型的延迟初始化和预热
*
* 在应用启动后异步预热,不阻塞主线程
*/
@Bean
public LocalEmbeddingWarmup embeddingWarmup(LocalEmbeddingService embeddingService) {
return new LocalEmbeddingWarmup(embeddingService);
}
/**
* 向量Store连接池配置
*/
@Bean
public VectorStoreConnectionPool vectorStoreConnectionPool(
@Value("${vectorstore.host}") String host,
@Value("${vectorstore.port:6333}") int port) {
return VectorStoreConnectionPool.builder()
.host(host).port(port)
.maxConnections(20)
.connectionTimeout(Duration.ofSeconds(5))
.idleTimeout(Duration.ofMinutes(5))
.build();
}
}
/**
* Embedding服务预热
*
* 服务启动后,发送几条测试请求"预热"模型,
* 使JIT编译和内存分配在第一个真实请求前完成
*/
@Component
@Slf4j
public class LocalEmbeddingWarmup implements ApplicationListener<ApplicationReadyEvent> {
private final LocalEmbeddingService embeddingService;
public LocalEmbeddingWarmup(LocalEmbeddingService embeddingService) {
this.embeddingService = embeddingService;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("开始Embedding服务预热...");
CompletableFuture.runAsync(() -> {
try {
// 发送几条预热请求
List<String> warmupTexts = List.of(
"这是预热文本",
"This is a warmup text",
"Warming up the embedding model"
);
for (String text : warmupTexts) {
embeddingService.embedDocument(text);
}
log.info("Embedding服务预热完成");
} catch (Exception e) {
log.error("Embedding服务预热失败(不影响启动): {}", e.getMessage());
}
});
}
}健康检查与就绪检测
/**
* AI服务健康检查
*
* Kubernetes需要两类探针:
* 1. Liveness Probe:服务是否还活着(是否需要重启)
* 2. Readiness Probe:服务是否准备好接收流量
*
* AI服务的特殊处理:
* - Embedding模型加载完毕才就绪
* - 向量数据库连接正常才就绪
* - LLM API可达才就绪
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class AiServiceHealthIndicator {
private final LocalEmbeddingService embeddingService;
private final VectorStore vectorStore;
private volatile boolean isReady = false;
private volatile String notReadyReason = "服务初始化中";
/**
* Readiness检查
*
* 所有关键依赖都就绪才返回UP
*/
@GetMapping("/health/ready")
public ResponseEntity<HealthStatus> readiness() {
List<ComponentHealth> components = new ArrayList<>();
// 检查Embedding模型
ComponentHealth embeddingHealth = checkEmbeddingService();
components.add(embeddingHealth);
// 检查向量数据库
ComponentHealth vectorStoreHealth = checkVectorStore();
components.add(vectorStoreHealth);
boolean allHealthy = components.stream().allMatch(ComponentHealth::healthy);
if (allHealthy) {
isReady = true;
return ResponseEntity.ok(new HealthStatus("UP", components));
} else {
isReady = false;
String reason = components.stream()
.filter(c -> !c.healthy())
.map(ComponentHealth::reason)
.collect(Collectors.joining(", "));
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(new HealthStatus("DOWN", components));
}
}
/**
* Liveness检查(轻量,只看进程是否存活)
*/
@GetMapping("/health/live")
public ResponseEntity<Map<String, String>> liveness() {
return ResponseEntity.ok(Map.of("status", "UP"));
}
private ComponentHealth checkEmbeddingService() {
try {
long startMs = System.currentTimeMillis();
float[] vector = embeddingService.embedDocument("health check");
long latencyMs = System.currentTimeMillis() - startMs;
if (vector.length == 0) {
return new ComponentHealth("embedding", false, "向量维度为0");
}
if (latencyMs > 5000) {
return new ComponentHealth("embedding", false,
"响应过慢: " + latencyMs + "ms");
}
return new ComponentHealth("embedding", true, null);
} catch (Exception e) {
return new ComponentHealth("embedding", false, e.getMessage());
}
}
private ComponentHealth checkVectorStore() {
try {
// 向量库连接检查:执行一个简单的查询
vectorStore.count();
return new ComponentHealth("vectorStore", true, null);
} catch (Exception e) {
return new ComponentHealth("vectorStore", false, e.getMessage());
}
}
public record ComponentHealth(String component, boolean healthy, String reason) {}
public record HealthStatus(String status, List<ComponentHealth> components) {}
}水平扩容与状态共享
/**
* 多实例部署时的状态共享
*
* AI服务的几个状态需要跨实例共享:
* 1. 速率限制计数器(各实例共享LLM配额)
* 2. 断路器状态(一个实例发现LLM故障,其他实例应知道)
* 3. 语义缓存(各实例共享缓存,提高命中率)
* 4. 会话状态(用户请求可能路由到不同实例)
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class DistributedAiStateManager {
private final RedisTemplate<String, String> redis;
private final ObjectMapper objectMapper;
/**
* 分布式速率限制
*
* 全局Token消耗跟踪,防止超过LLM配额
*/
public boolean checkAndConsumeTokenQuota(String modelId, int tokenCount) {
String key = "llm:quota:" + modelId + ":" + getCurrentMinuteKey();
// 原子性增加计数并检查
Long current = redis.opsForValue().increment(key, tokenCount);
if (current == tokenCount) {
// 第一次设置,添加过期时间
redis.expire(key, Duration.ofMinutes(2));
}
// 每分钟最大Token限制(从模型配置读取)
long maxTokensPerMinute = getMaxTokensPerMinute(modelId);
if (current > maxTokensPerMinute) {
redis.opsForValue().decrement(key, tokenCount); // 回滚
log.warn("速率限制触发: model={}, current={}, max={}",
modelId, current, maxTokensPerMinute);
return false;
}
return true;
}
/**
* 分布式断路器状态
*
* 某个实例记录的故障,对所有实例可见
*/
public void publishCircuitBreakerState(String modelId,
LlmCircuitBreaker.Status status) {
String key = "circuit-breaker:" + modelId;
String value = status.name();
redis.opsForValue().set(key, value, Duration.ofMinutes(5));
// 发布变更事件,让其他实例的断路器缓存失效
redis.convertAndSend("circuit-breaker-events", modelId + ":" + value);
log.info("断路器状态已发布: model={}, status={}", modelId, status);
}
public LlmCircuitBreaker.Status getCircuitBreakerState(String modelId) {
String value = redis.opsForValue().get("circuit-breaker:" + modelId);
if (value == null) return LlmCircuitBreaker.Status.CLOSED;
try {
return LlmCircuitBreaker.Status.valueOf(value);
} catch (IllegalArgumentException e) {
return LlmCircuitBreaker.Status.CLOSED;
}
}
/**
* 分布式会话状态
*
* 用户的对话历史存Redis,让任意实例都能处理同一用户的请求
*/
public void saveConversationSession(String sessionId, ConversationSession session) {
try {
String json = objectMapper.writeValueAsString(session);
redis.opsForValue().set(
"session:" + sessionId, json, Duration.ofHours(24));
} catch (Exception e) {
log.error("保存会话失败: sessionId={}", sessionId, e);
}
}
public Optional<ConversationSession> loadConversationSession(String sessionId) {
String json = redis.opsForValue().get("session:" + sessionId);
if (json == null) return Optional.empty();
try {
return Optional.of(objectMapper.readValue(json, ConversationSession.class));
} catch (Exception e) {
log.error("加载会话失败: sessionId={}", sessionId, e);
return Optional.empty();
}
}
private String getCurrentMinuteKey() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmm"));
}
private long getMaxTokensPerMinute(String modelId) {
// 从配置或数据库读取
return switch (modelId) {
case "gpt-4o" -> 300_000;
case "gpt-4o-mini" -> 2_000_000;
default -> 100_000;
};
}
public record ConversationSession(String sessionId, String userId,
List<Map<String, String>> messages,
LocalDateTime lastActiveAt) {}
}生产环境的关键指标监控
/**
* AI服务生产监控指标
*
* 要监控的关键指标及告警阈值
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class AiServiceProductionMetrics {
private final MeterRegistry meterRegistry;
/**
* 注册AI服务的核心监控指标
*/
@PostConstruct
public void registerMetrics() {
// 1. LLM调用的P50/P95/P99延迟
// Micrometer会自动统计百分位
Timer.builder("ai.llm.call.duration")
.description("LLM调用耗时")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
// 2. Embedding推理延迟
Timer.builder("ai.embedding.duration")
.description("Embedding推理耗时")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
// 3. 语义缓存命中率(Counter方式)
Counter.builder("ai.cache.hits")
.description("语义缓存命中次数")
.register(meterRegistry);
Counter.builder("ai.cache.misses")
.description("语义缓存未命中次数")
.register(meterRegistry);
// 4. Token消耗量(成本监控)
DistributionSummary.builder("ai.tokens.consumed")
.description("LLM Token消耗量")
.publishPercentiles(0.5, 0.95)
.register(meterRegistry);
// 5. 速率限制触发次数
Counter.builder("ai.rate.limit.triggered")
.description("速率限制触发次数")
.register(meterRegistry);
log.info("AI服务监控指标注册完成");
}
/**
* 告警规则建议(在Prometheus/Grafana中配置)
*
* 1. LLM调用P99延迟 > 10秒:触发告警
* 2. 错误率 > 5%(5分钟内):触发告警
* 3. 速率限制触发次数 > 100次/分钟:触发告警
* 4. 内存使用 > 80%:提前预警
* 5. 断路器状态为OPEN:立即告警
*/
public void recordLlmCall(String modelId, long durationMs, boolean success,
int promptTokens, int completionTokens) {
Tags tags = Tags.of("model", modelId, "success", String.valueOf(success));
meterRegistry.timer("ai.llm.call.duration", tags)
.record(durationMs, TimeUnit.MILLISECONDS);
meterRegistry.summary("ai.tokens.consumed", tags)
.record(promptTokens + completionTokens);
if (!success) {
meterRegistry.counter("ai.llm.errors", tags).increment();
}
}
public void recordEmbedding(int batchSize, long durationMs) {
meterRegistry.timer("ai.embedding.duration",
Tags.of("batchSize", String.valueOf(batchSize)))
.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordCacheHit(double similarity) {
meterRegistry.counter("ai.cache.hits").increment();
meterRegistry.summary("ai.cache.similarity").record(similarity);
}
public void recordCacheMiss() {
meterRegistry.counter("ai.cache.misses").increment();
}
}实践建议
AI服务的JVM配置要和模型内存一起规划
常见的错误:把AI服务直接部署到2GB内存的容器里,然后OOM。正确的容量规划:先测量模型加载后的内存基线(jmap -heap),加上JVM堆内存(通常1-4GB),加上Vector Cache的预期大小,再留20%的余量。一个运行768维Embedding模型的服务,最少需要4GB内存,建议配置8GB。Java参数参考:-Xmx4g -Xms2g -XX:MaxMetaspaceSize=512m,配合-XX:+UseG1GC减少GC暂停。
冷启动慢是Kubernetes弹性扩容的最大障碍
AI服务启动慢(加载模型30秒),Kubernetes的默认探针配置(10秒一次,3次失败重启)会让服务在加载完之前就被反复重启。生产配置:initialDelaySeconds: 60(留足加载时间),readinessProbe返回DOWN时不重启、只是摘除流量,livenessProbe用更宽松的阈值。同时,提前启动额外实例(Scale ahead),不要等流量高峰来了再扩。
批处理和实时查询要隔离资源
大批量文档入库时,Embedding推理会占满CPU,导致实时用户查询延迟飙升。隔离方案:把批处理任务放到专用的Job Pod里(独立的K8s Job,不在主服务Pod里),主服务Pod只处理实时查询。如果只有一套服务,至少要在代码层面隔离线程池:批处理用独立的低优先级线程池,实时查询用高优先级线程池。我们用这个方案把实时查询的P99延迟从7秒降到了1.2秒。
