第2016篇:微调模型的版本管理——MLflow在LLM实验追踪中的应用
2026/4/30大约 4 分钟
第2016篇:微调模型的版本管理——MLflow在LLM实验追踪中的应用
适读人群:正在进行多轮LLM微调迭代的工程师 | 阅读时长:约17分钟 | 核心价值:用MLflow建立微调实验的完整追踪体系,告别"这个版本是用什么数据训练的?"的困惑
微调项目做到第三个版本的时候,我发现自己已经不记得每个版本用了哪些训练参数了。
v1是用500条数据训的,还是1000条?v2改过学习率吗?v3为什么比v2差,是因为数据不同还是超参不同?
这些问题在没有实验追踪工具的情况下几乎无法回答。我开始认真把MLflow用起来了。
MLflow在LLM微调中的核心价值
MLflow本来是为传统ML实验设计的,但用来追踪LLM微调实验同样合适:
- 实验记录:每次训练的超参数、数据集、环境都有记录
- 指标追踪:训练loss、评估分数随时间的变化曲线
- 模型注册:不同版本的模型文件统一管理
- 比较分析:一键对比多个实验的结果差异
集成MLflow的微调脚本
import mlflow
import mlflow.transformers
from transformers import TrainerCallback
class MLflowCallback(TrainerCallback):
"""将Hugging Face训练过程的指标记录到MLflow"""
def __init__(self):
self.run_id = None
def on_train_begin(self, args, state, control, **kwargs):
mlflow.start_run()
self.run_id = mlflow.active_run().info.run_id
# 记录超参数
mlflow.log_params({
"learning_rate": args.learning_rate,
"num_train_epochs": args.num_train_epochs,
"per_device_train_batch_size": args.per_device_train_batch_size,
"gradient_accumulation_steps": args.gradient_accumulation_steps,
"warmup_ratio": args.warmup_ratio,
})
def on_log(self, args, state, control, logs=None, **kwargs):
if logs is None:
return
# 记录每步的训练指标
step = state.global_step
for key, value in logs.items():
if isinstance(value, (int, float)):
mlflow.log_metric(key, value, step=step)
def on_evaluate(self, args, state, control, metrics=None, **kwargs):
if metrics is None:
return
# 记录评估指标
for key, value in metrics.items():
mlflow.log_metric(key, value, step=state.global_step)
def on_train_end(self, args, state, control, **kwargs):
# 记录最终的训练指标
mlflow.log_metric("final_train_loss", state.log_history[-1].get("loss", 0))
mlflow.end_run()
# 在训练脚本中使用
def run_finetuning(config: dict):
# 记录数据集信息
with mlflow.start_run(run_name=config.get("run_name", "finetuning")):
# 记录数据集元信息
mlflow.log_params({
"dataset_name": config["dataset_name"],
"train_samples": config["train_samples"],
"test_samples": config["test_samples"],
"base_model": config["base_model"],
"lora_rank": config.get("lora_rank", 16),
"lora_alpha": config.get("lora_alpha", 32),
})
# 记录数据集版本(SHA或版本号)
mlflow.log_param("dataset_version", compute_dataset_hash(config["dataset_path"]))
# 记录训练环境
mlflow.log_param("gpu_type", get_gpu_info())
mlflow.log_param("transformers_version", transformers.__version__)
# 执行训练
trainer = build_trainer(config)
trainer.add_callback(MLflowCallback())
trainer.train()
# 评估并记录结果
eval_results = evaluate_model(trainer.model, config["eval_dataset"])
for metric, value in eval_results.items():
mlflow.log_metric("eval_" + metric, value)
# 保存模型到MLflow
mlflow.transformers.log_model(
transformers_model={"model": trainer.model, "tokenizer": tokenizer},
artifact_path="model",
registered_model_name=config.get("model_name", "custom_llm")
)
return mlflow.active_run().info.run_idJava端调用MLflow Model Registry
模型注册后,Java服务可以通过MLflow的REST API获取最新版本:
@Service
@RequiredArgsConstructor
public class ModelVersionManager {
private final RestTemplate restTemplate;
@Value("${mlflow.tracking-uri}")
private String mlflowUri;
/**
* 获取指定模型的生产版本信息
*/
public ModelVersionInfo getProductionVersion(String modelName) {
String url = mlflowUri + "/api/2.0/mlflow/registered-models/get-latest-versions";
Map<String, Object> request = Map.of(
"name", modelName,
"stages", List.of("Production")
);
ResponseEntity<Map> response = restTemplate.postForEntity(url, request, Map.class);
// 解析返回的版本信息
List<Map> modelVersions = (List<Map>) response.getBody().get("model_versions");
if (modelVersions == null || modelVersions.isEmpty()) {
throw new IllegalStateException("没有找到模型 " + modelName + " 的生产版本");
}
Map versionData = modelVersions.get(0);
return ModelVersionInfo.builder()
.version((String) versionData.get("version"))
.runId((String) versionData.get("run_id"))
.artifactPath(extractArtifactPath(versionData))
.createdAt((Long) versionData.get("creation_timestamp"))
.build();
}
/**
* 获取某个run的关键指标,用于比较不同版本的效果
*/
public Map<String, Double> getRunMetrics(String runId) {
String url = mlflowUri + "/api/2.0/mlflow/runs/get?run_id=" + runId;
ResponseEntity<Map> response = restTemplate.getForEntity(url, Map.class);
Map runData = (Map) response.getBody().get("run");
Map data = (Map) runData.get("data");
Map<String, Double> metrics = new HashMap<>();
List<Map> metricList = (List<Map>) data.get("metrics");
if (metricList != null) {
for (Map metric : metricList) {
metrics.put((String) metric.get("key"),
((Number) metric.get("value")).doubleValue());
}
}
return metrics;
}
}版本晋升流程
建立"实验版本→候选版本→生产版本"的晋升机制:
@Service
@RequiredArgsConstructor
public class ModelPromotionService {
private final ModelVersionManager versionManager;
private final ModelEvaluationPipeline evaluationPipeline;
private final AlertService alertService;
/**
* 将候选版本晋升到生产
* 只有通过评估的版本才能晋升
*/
public void promoteToProduction(String modelName, String runId) {
// 1. 获取该版本的评估指标
Map<String, Double> metrics = versionManager.getRunMetrics(runId);
// 2. 检查是否达到上线标准
double coreScore = metrics.getOrDefault("eval_core_score", 0.0);
double generalScore = metrics.getOrDefault("eval_general_score", 0.0);
if (coreScore < 0.85) {
throw new PromotionException("核心任务分数不足(" + coreScore + " < 0.85),拒绝晋升");
}
if (generalScore < 0.70) {
throw new PromotionException("通用能力分数不足(" + generalScore + " < 0.70),可能存在灾难性遗忘");
}
// 3. 调用MLflow API晋升版本
transitionModelStage(modelName, runId, "Production");
// 4. 发送通知
alertService.sendNotification(
"模型 " + modelName + " 版本 " + runId.substring(0, 8) +
" 已晋升到生产环境\n核心分数: " + coreScore + "\n通用分数: " + generalScore
);
log.info("模型版本晋升成功: model={}, runId={}", modelName, runId);
}
}MLflow的价值不在于单次实验,而在于积累了多次实验后,你能清楚地看到:哪次训练效果最好、为什么好、用了什么配置。这让微调从"碰运气"变成了"有章可循"。
