第1949篇:AI基础设施的标准化——MLOps平台的组件选型与集成
第1949篇:AI基础设施的标准化——MLOps平台的组件选型与集成
去年我接触过一家公司,他们AI团队的情况是这样的:训练数据分散在五个不同的地方,模型版本管理全靠文件夹命名约定,部署靠手动SSH上传,监控靠偶尔看看日志,回滚靠重新上传旧版本文件。
这不是个例,而是很多中小型公司AI工程现状的缩影。
MLOps平台要解决的就是这些问题:把AI应用的整个生命周期——从数据到训练到部署到监控——纳入工程化管理。
MLOps平台的核心组件
先理清楚一个完整的MLOps平台需要哪些能力:
每一层都有对应的工具和选型考量,下面逐层讲。
实验追踪:MLflow是怎么工作的
实验追踪解决的问题:记录每次实验的参数、指标、模型文件,方便对比不同配置的效果。
MLflow是目前最成熟的开源选择,部署简单,Java SDK也比较完善。
搭建MLflow服务(Docker方式):
# docker-compose.yml
version: '3'
services:
mlflow:
image: ghcr.io/mlflow/mlflow:v2.11.0
ports:
- "5000:5000"
volumes:
- ./mlflow-data:/mlflow
command: >
mlflow server
--host 0.0.0.0
--port 5000
--default-artifact-root /mlflow/artifacts
--backend-store-uri sqlite:////mlflow/mlflow.db
environment:
MLFLOW_TRACKING_URI: http://localhost:5000
# 生产环境建议用PostgreSQL替代SQLite
# db:
# image: postgres:15
# ...在Java/Python训练代码里记录实验:
import mlflow
import mlflow.transformers
# 设置实验追踪服务器
mlflow.set_tracking_uri("http://mlflow-server:5000")
with mlflow.start_run(run_name="qwen-lora-v2-customer-service"):
# 记录超参数
mlflow.log_params({
"model_name": "Qwen2.5-7B-Instruct",
"lora_r": 16,
"lora_alpha": 32,
"learning_rate": 2e-4,
"num_epochs": 3,
"batch_size": 16,
"training_data_size": 5000,
})
# 训练过程中记录指标
for epoch in range(3):
train_loss = run_epoch(epoch)
mlflow.log_metrics({
"train_loss": train_loss,
"eval_loss": eval_loss,
"eval_accuracy": eval_acc,
}, step=epoch)
# 记录最终指标
mlflow.log_metric("final_eval_accuracy", final_acc)
# 保存模型
mlflow.log_artifact("./model_output/", artifact_path="model")
# 设置标签
mlflow.set_tags({
"team": "ai-platform",
"task": "customer-service",
"base_model": "qwen2.5-7b",
"status": "candidate"
})Java端查询实验记录(用于部署决策):
@Service
public class MlflowIntegrationService {
private final MlflowClient mlflowClient;
public MlflowIntegrationService(
@Value("${mlflow.tracking-uri}") String trackingUri) {
this.mlflowClient = new MlflowClient(trackingUri);
}
/**
* 获取最佳模型版本
* 按eval_accuracy选择最优的实验运行
*/
public ModelVersion getBestModel(String experimentName, String metricName) {
Experiment experiment = mlflowClient.getExperimentByName(experimentName)
.orElseThrow(() -> new RuntimeException("实验不存在: " + experimentName));
List<RunInfo> runs = mlflowClient.listRunInfos(experiment.getExperimentId());
return runs.stream()
.map(run -> mlflowClient.getRun(run.getRunId()))
.filter(run -> run.getInfo().getStatus().equals("FINISHED"))
.max(Comparator.comparingDouble(
run -> Double.parseDouble(
run.getData().getMetrics().stream()
.filter(m -> m.getKey().equals(metricName))
.map(m -> String.valueOf(m.getValue()))
.findFirst()
.orElse("-1")
)
))
.map(run -> new ModelVersion(
run.getInfo().getRunId(),
run.getInfo().getArtifactUri() + "/model"
))
.orElseThrow(() -> new RuntimeException("没有找到合格的模型"));
}
/**
* 把优质模型注册到模型仓库
*/
public RegisteredModelVersion registerModel(
String runId,
String modelName,
String stage) {
// 从运行记录注册到模型仓库
String artifactPath = runId + "/artifacts/model";
RegisteredModelVersion version = mlflowClient.createModelVersion(
modelName,
artifactPath,
runId
);
// 转移到指定阶段(Staging/Production)
mlflowClient.transitionModelVersionStage(
modelName,
version.getVersion(),
stage
);
return version;
}
}模型注册与版本管理
模型注册中心解决的问题:统一管理模型版本,清楚知道线上在跑哪个版本,方便做发布和回滚。
MLflow内置了Model Registry功能,配合上面的实验追踪使用:
@Service
public class ModelRegistryService {
@Autowired
private MlflowIntegrationService mlflowService;
@Autowired
private VllmDeploymentService deploymentService;
/**
* 模型晋升流程:Staging → Production
*/
public void promoteToProduction(
String modelName,
String version,
String approver) {
// 检查Staging环境的验收指标
ModelVersionDetails details = mlflowService.getModelVersion(modelName, version);
EvaluationMetrics metrics = details.getMetrics();
if (metrics.getEvalAccuracy() < 0.85) {
throw new ModelPromotionException(
"模型精度不达标: " + metrics.getEvalAccuracy() + " < 0.85");
}
if (metrics.getLatencyP99Ms() > 2000) {
throw new ModelPromotionException(
"P99延迟过高: " + metrics.getLatencyP99Ms() + "ms > 2000ms");
}
// 记录审批信息
mlflowService.addTag(modelName, version, "approved_by", approver);
mlflowService.addTag(modelName, version, "promotion_time",
LocalDateTime.now().toString());
// 执行蓝绿部署
deploymentService.blueGreenDeploy(modelName, version);
// 更新模型仓库状态
mlflowService.transitionToProduction(modelName, version);
}
}模型服务化:从模型文件到推理API
vLLM作为推理服务器已经讲过了,这里重点讲Kubernetes部署配置,这是生产必备的:
# vllm-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: qwen-inference
namespace: ai-platform
spec:
replicas: 2
selector:
matchLabels:
app: qwen-inference
template:
metadata:
labels:
app: qwen-inference
spec:
containers:
- name: vllm
image: vllm/vllm-openai:v0.4.0
args:
- "--model"
- "/models/Qwen2.5-7B-Instruct-Merged"
- "--served-model-name"
- "qwen2.5-7b-v2"
- "--dtype"
- "float16"
- "--max-model-len"
- "32768"
- "--gpu-memory-utilization"
- "0.90"
ports:
- containerPort: 8000
resources:
limits:
nvidia.com/gpu: 1 # 每个Pod一张GPU
memory: "32Gi"
cpu: "8"
requests:
nvidia.com/gpu: 1
memory: "24Gi"
cpu: "4"
volumeMounts:
- name: model-storage
mountPath: /models
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 120 # 模型加载需要时间
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 180
periodSeconds: 30
volumes:
- name: model-storage
persistentVolumeClaim:
claimName: model-storage-pvc
nodeSelector:
node-type: gpu # 调度到GPU节点
---
apiVersion: v1
kind: Service
metadata:
name: qwen-inference-svc
namespace: ai-platform
spec:
selector:
app: qwen-inference
ports:
- port: 80
targetPort: 8000
type: ClusterIP监控体系:观测模型在生产中的行为
这是很多团队最薄弱的环节。模型部署上线了,但出了问题往往很难快速定位。监控要覆盖三个层面:
系统层面:GPU利用率、显存占用、请求延迟、错误率——这些是基础设施监控,Prometheus + Grafana可以直接收集。
模型层面:token吞吐量、请求token分布、缓存命中率——vLLM原生暴露Prometheus指标。
业务层面:用户反馈率、答案质量评分、拒绝率——这需要自定义埋点。
@Component
public class LlmMetricsCollector {
private final MeterRegistry meterRegistry;
// 请求延迟直方图
private final Timer requestTimer;
// 输出token数分布
private final DistributionSummary outputTokens;
// 用户满意度(0-5评分)
private final DistributionSummary userRating;
// 请求失败计数
private final Counter failureCounter;
public LlmMetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.requestTimer = Timer.builder("llm.request.duration")
.description("LLM请求延迟")
.percentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
this.outputTokens = DistributionSummary.builder("llm.output.tokens")
.description("输出token数量分布")
.register(meterRegistry);
this.userRating = DistributionSummary.builder("llm.user.rating")
.description("用户满意度评分")
.register(meterRegistry);
this.failureCounter = Counter.builder("llm.request.failures")
.description("请求失败总数")
.register(meterRegistry);
}
public void recordRequest(
String model,
long durationMs,
int outputTokenCount,
boolean success) {
Tags tags = Tags.of("model", model);
requestTimer.record(durationMs, TimeUnit.MILLISECONDS);
outputTokens.record(outputTokenCount);
if (!success) {
failureCounter.increment();
}
}
public void recordUserFeedback(String requestId, int rating) {
userRating.record(rating);
// 同时记录到业务数据库,方便后续分析
feedbackRepository.save(new UserFeedback(requestId, rating));
}
}Grafana Dashboard的核心面板配置(关键指标):
# Prometheus查询示例
# P99请求延迟(秒)
histogram_quantile(0.99, rate(llm_request_duration_seconds_bucket[5m]))
# 每秒请求数
rate(llm_requests_total[1m])
# 错误率
rate(llm_request_failures_total[5m]) / rate(llm_requests_total[5m])
# GPU利用率(来自vLLM)
vllm:gpu_cache_usage_perc
# 等待队列长度
vllm:num_requests_waitingA/B测试框架
模型上线后经常需要做A/B测试,对比新旧版本:
@Service
public class ModelAbTestService {
@Autowired
private Map<String, ChatClient> modelClients;
@Autowired
private AbTestConfig abTestConfig;
@Autowired
private LlmMetricsCollector metricsCollector;
public AbTestResult complete(String userId, String prompt) {
// 根据用户ID决定分组(稳定分组,同一用户始终在同一组)
String variant = assignVariant(userId);
String modelName = abTestConfig.getModelForVariant(variant);
ChatClient client = modelClients.get(modelName);
long startTime = System.currentTimeMillis();
String result;
boolean success = true;
try {
result = client.prompt().user(prompt).call().content();
} catch (Exception e) {
result = "请求失败";
success = false;
}
long duration = System.currentTimeMillis() - startTime;
metricsCollector.recordRequest(modelName, duration, estimateTokens(result), success);
return new AbTestResult(result, variant, modelName);
}
private String assignVariant(String userId) {
// 基于用户ID的稳定哈希分组
int hash = Math.abs(userId.hashCode() % 100);
if (hash < abTestConfig.getControlGroupPercent()) {
return "control";
}
return "treatment";
}
}自动化部署流水线
把上面这些组件串起来,是一个完整的CI/CD流水线:
@Service
public class ModelDeploymentPipeline {
@Autowired
private MlflowIntegrationService mlflowService;
@Autowired
private KubernetesDeploymentService k8sService;
@Autowired
private LlmMetricsCollector metricsCollector;
/**
* 蓝绿部署:零停机切换模型版本
*/
public void blueGreenDeploy(String modelName, String newVersion) {
String currentVersion = k8sService.getCurrentVersion(modelName);
// 1. 部署新版本(Green)
log.info("部署新版本 {} -> {}", currentVersion, newVersion);
k8sService.deployVersion(modelName, newVersion, "green");
// 2. 等待新版本就绪
k8sService.waitForReady(modelName, "green", Duration.ofMinutes(5));
// 3. 切换5%流量到新版本(金丝雀)
k8sService.setTrafficSplit(modelName, Map.of("blue", 95, "green", 5));
log.info("金丝雀发布:5%流量切到新版本");
// 4. 观察5分钟
sleep(Duration.ofMinutes(5));
// 5. 检查错误率
double errorRate = metricsCollector.getErrorRate(modelName + "-green", Duration.ofMinutes(5));
if (errorRate > 0.05) {
log.error("金丝雀阶段错误率过高: {}%,回滚", errorRate * 100);
k8sService.setTrafficSplit(modelName, Map.of("blue", 100, "green", 0));
k8sService.cleanup(modelName, "green");
throw new DeploymentException("部署失败,已回滚");
}
// 6. 全量切换
k8sService.setTrafficSplit(modelName, Map.of("blue", 0, "green", 100));
k8sService.cleanup(modelName, "blue");
k8sService.renameDeployment(modelName, "green", "blue");
log.info("部署完成,新版本 {} 已全量上线", newVersion);
}
}小团队的务实建议
很多人看到上面这些会觉得太复杂,自己团队人少,做不了这么全。
我的建议是分阶段建设,不要一口气全上:
第一阶段(2-3人团队):
- MLflow记录实验 + 模型文件用S3/OSS管理,就这两件事
- 比啥都没有强,成本几乎是零
第二阶段(5-10人团队):
- 加上Kubernetes部署 + Prometheus基础监控
- 这时候已经够支撑大多数业务了
第三阶段(成熟团队):
- 完整的CI/CD流水线 + A/B测试 + 自动回滚
- 这时候团队有专职的AI Platform工程师
不要为了追求"标准化"而标准化,先解决当前最痛的问题。对大多数中小团队来说,第一阶段就已经解决了90%的混乱问题。
