第2159篇:LLM的持续评估管道——自动检测质量下滑的工程机制
2026/4/30大约 6 分钟
第2159篇:LLM的持续评估管道——自动检测质量下滑的工程机制
适读人群:负责LLM系统长期运维的工程师 | 阅读时长:约17分钟 | 核心价值:建立全自动的LLM质量持续评估管道,在质量问题影响用户前自动发现并告警
系统上线后,很多团队的评估工作就停了。
"已经测过了,上线就好了。"
但LLM系统的质量不是静态的。模型提供商会静默地更新基础模型(你用的gpt-4o今天和三个月前不完全一样);你的知识库在更新;用户的输入分布在变化;业务规则在演进。所有这些变化都可能悄悄影响输出质量。
持续评估管道就是用来解决这个问题的:像CI/CD一样,让评估变成持续运行的自动化流程。
持续评估管道的设计
评估管道的调度与执行
/**
* 持续评估管道
*
* 定时从生产流量中采样,执行评估,监控质量趋势
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ContinuousEvaluationPipeline {
private final ProductionSampler productionSampler;
private final LlmEvaluationService evaluationService;
private final QualityBaselineService baselineService;
private final EvaluationResultRepository resultRepository;
private final AlertService alertService;
private final MeterRegistry meterRegistry;
// 每小时执行一次轻量评估
@Scheduled(cron = "0 0 * * * *")
public void runHourlyEvaluation() {
log.info("开始小时级持续评估");
try {
// 从过去1小时的生产流量中采样50条
List<ProductionInteraction> samples = productionSampler.sample(
Duration.ofHours(1), 50, SamplingStrategy.RANDOM
);
if (samples.isEmpty()) {
log.warn("过去1小时没有生产流量可评估");
return;
}
EvaluationBatchResult result = executeBatchEvaluation(samples, "hourly");
// 记录实时指标
recordMetrics(result);
// 检查是否超过告警阈值
checkAlerts(result, AlertLevel.HOURLY);
} catch (Exception e) {
log.error("小时级评估失败", e);
meterRegistry.counter("eval.pipeline.error", "schedule", "hourly").increment();
}
}
// 每天执行一次深度评估
@Scheduled(cron = "0 30 6 * * *") // 每天早上6:30
public void runDailyEvaluation() {
log.info("开始日级深度评估");
try {
// 从过去24小时中,按意图分层采样200条
List<ProductionInteraction> samples = productionSampler.sample(
Duration.ofHours(24), 200, SamplingStrategy.STRATIFIED_BY_INTENT
);
EvaluationBatchResult result = executeBatchEvaluation(samples, "daily");
// 与基线深度对比
QualityDriftReport driftReport = baselineService.compareWithBaseline(result);
// 生成日报告
generateDailyReport(result, driftReport);
// 发送日报
sendDailyReport(result, driftReport);
} catch (Exception e) {
log.error("日级评估失败", e);
alertService.sendCriticalAlert("持续评估管道失败", "日级评估执行异常: " + e.getMessage());
}
}
/**
* 事件触发的评估(用于部署后验证)
*/
@EventListener
public void onDeploymentCompleted(DeploymentCompletedEvent event) {
log.info("检测到新版本部署,触发部署后评估: {}", event.getConfigName());
// 等待新版本预热(5分钟)
try { Thread.sleep(5 * 60 * 1000); } catch (InterruptedException ignored) {}
// 采样部署后的前100个请求
List<ProductionInteraction> postDeploymentSamples = productionSampler.sampleSince(
event.getDeployedAt(), 100
);
if (postDeploymentSamples.size() < 20) {
log.warn("部署后样本不足20个,跳过评估");
return;
}
EvaluationBatchResult result = executeBatchEvaluation(postDeploymentSamples, "post-deployment");
// 部署后评估有更严格的告警阈值
checkAlerts(result, AlertLevel.POST_DEPLOYMENT);
}
private EvaluationBatchResult executeBatchEvaluation(
List<ProductionInteraction> samples, String evalType) {
long startTime = System.currentTimeMillis();
List<EvaluationReport> reports = new ArrayList<>();
int failedCount = 0;
// 并发评估,但控制并发度避免触发限速
Semaphore semaphore = new Semaphore(5); // 最多5个并发评估
List<CompletableFuture<Void>> futures = samples.stream()
.map(sample -> CompletableFuture.runAsync(() -> {
try {
semaphore.acquire();
EvaluationReport report = evaluationService.evaluate(
EvaluationRequest.builder()
.requestId(sample.getId())
.userInput(sample.getUserInput())
.llmOutput(sample.getLlmOutput())
.context(sample.getContext())
.build()
);
synchronized (reports) {
reports.add(report);
}
} catch (Exception e) {
log.warn("评估样本失败: {}", sample.getId(), e);
synchronized (reports) {
failedCount++;
}
} finally {
semaphore.release();
}
}))
.collect(Collectors.toList());
// 等待所有评估完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.orTimeout(10, TimeUnit.MINUTES)
.join();
long duration = System.currentTimeMillis() - startTime;
log.info("批量评估完成: evalType={}, 成功={}, 失败={}, 耗时={}ms",
evalType, reports.size(), failedCount, duration);
return computeBatchResult(reports, evalType, failedCount, samples.size());
}
private EvaluationBatchResult computeBatchResult(List<EvaluationReport> reports,
String evalType,
int failedCount,
int totalSamples) {
if (reports.isEmpty()) {
return EvaluationBatchResult.empty(evalType);
}
double avgScore = reports.stream().mapToDouble(EvaluationReport::getOverallScore).average().orElse(0);
double passRate = reports.stream().mapToDouble(r -> r.isPassed() ? 1.0 : 0.0).average().orElse(0);
// 各维度均值
Map<String, Double> dimensionAvgs = new HashMap<>();
Set<String> dimensions = reports.get(0).getDimensionScores() != null
? reports.get(0).getDimensionScores().keySet() : new HashSet<>();
for (String dim : dimensions) {
double dimAvg = reports.stream()
.map(r -> r.getDimensionScores() != null ? r.getDimensionScores().get(dim) : null)
.filter(Objects::nonNull)
.mapToDouble(DimensionScore::getScore)
.average().orElse(0);
dimensionAvgs.put(dim, dimAvg);
}
return EvaluationBatchResult.builder()
.evalType(evalType)
.timestamp(Instant.now())
.totalSamples(totalSamples)
.evaluatedCount(reports.size())
.failedCount(failedCount)
.avgScore(avgScore)
.passRate(passRate)
.dimensionAvgs(dimensionAvgs)
.build();
}
private void checkAlerts(EvaluationBatchResult result, AlertLevel level) {
double passRateThreshold = level == AlertLevel.POST_DEPLOYMENT ? 0.70 : 0.65;
double scoreThreshold = level == AlertLevel.POST_DEPLOYMENT ? 0.72 : 0.68;
if (result.getPassRate() < passRateThreshold) {
String severity = level == AlertLevel.POST_DEPLOYMENT ? "CRITICAL" : "WARNING";
alertService.sendAlert(
severity,
"LLM质量低于阈值 [" + result.getEvalType() + "]",
String.format("通过率=%.1f%%(阈值=%.1f%%),综合分=%.3f,样本数=%d",
result.getPassRate() * 100, passRateThreshold * 100,
result.getAvgScore(), result.getEvaluatedCount())
);
}
}
private void recordMetrics(EvaluationBatchResult result) {
meterRegistry.gauge("llm.eval.avg_score", result.getAvgScore());
meterRegistry.gauge("llm.eval.pass_rate", result.getPassRate());
result.getDimensionAvgs().forEach((dim, score) ->
meterRegistry.gauge("llm.eval.dimension_score",
Tags.of("dimension", dim), score)
);
}
private void sendDailyReport(EvaluationBatchResult result, QualityDriftReport drift) {
String report = buildDailyReportText(result, drift);
alertService.sendDailyReport("LLM质量日报", report);
}
private String buildDailyReportText(EvaluationBatchResult result, QualityDriftReport drift) {
StringBuilder sb = new StringBuilder();
sb.append("=== LLM质量日报 ===\n\n");
sb.append(String.format("评估时间: %s\n", LocalDate.now()));
sb.append(String.format("评估样本: %d条\n\n", result.getEvaluatedCount()));
sb.append(String.format("综合得分: %.3f\n", result.getAvgScore()));
sb.append(String.format("通过率: %.1f%%\n\n", result.getPassRate() * 100));
sb.append("各维度得分:\n");
result.getDimensionAvgs().forEach((dim, score) ->
sb.append(String.format(" %s: %.3f\n", dim, score)));
if (drift != null) {
sb.append("\n与基线对比:\n");
sb.append(String.format(" 通过率变化: %+.1f%%\n", drift.getPassRateDelta() * 100));
sb.append(String.format(" 综合分变化: %+.3f\n", drift.getScoreDelta()));
}
return sb.toString();
}
private void generateDailyReport(EvaluationBatchResult result, QualityDriftReport drift) {
// 持久化日报到数据库
resultRepository.saveDailyReport(result, drift);
}
}生产采样器
/**
* 生产流量采样器
*
* 从生产日志中采样,支持多种采样策略
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductionSampler {
private final InteractionLogRepository logRepository;
private final IntentClassifier intentClassifier;
/**
* 采样
* @param window 时间窗口
* @param count 目标采样数
* @param strategy 采样策略
*/
public List<ProductionInteraction> sample(Duration window, int count, SamplingStrategy strategy) {
Instant since = Instant.now().minus(window);
return switch (strategy) {
case RANDOM -> randomSample(since, count);
case STRATIFIED_BY_INTENT -> stratifiedSampleByIntent(since, count);
case PRIORITIZE_ERRORS -> prioritizeErrorSample(since, count);
};
}
private List<ProductionInteraction> randomSample(Instant since, int count) {
return logRepository.findRandomSince(since, count);
}
private List<ProductionInteraction> stratifiedSampleByIntent(Instant since, int count) {
// 按意图分类,保证各类问题都有代表
Map<String, List<ProductionInteraction>> byIntent = logRepository.findSince(since)
.stream()
.collect(Collectors.groupingBy(i ->
intentClassifier.classify(i.getUserInput())
));
List<ProductionInteraction> result = new ArrayList<>();
int perIntentQuota = Math.max(5, count / byIntent.size());
byIntent.values().forEach(interactions -> {
List<ProductionInteraction> shuffled = new ArrayList<>(interactions);
Collections.shuffle(shuffled);
result.addAll(shuffled.subList(0, Math.min(perIntentQuota, shuffled.size())));
});
Collections.shuffle(result);
return result.subList(0, Math.min(count, result.size()));
}
private List<ProductionInteraction> prioritizeErrorSample(Instant since, int count) {
// 优先采样用户有负面反馈的交互
List<ProductionInteraction> errors = logRepository.findWithNegativeFeedbackSince(since);
List<ProductionInteraction> normal = logRepository.findRandomSince(since, count);
List<ProductionInteraction> combined = new ArrayList<>();
combined.addAll(errors.subList(0, Math.min(count / 2, errors.size())));
combined.addAll(normal.subList(0, count - combined.size()));
Collections.shuffle(combined);
return combined;
}
}管道健康监控
持续评估管道本身也需要被监控:
@Component
@RequiredArgsConstructor
@Slf4j
public class PipelineHealthMonitor {
private final EvaluationResultRepository resultRepository;
private final AlertService alertService;
@Scheduled(fixedDelay = 1800000) // 每30分钟检查一次管道健康
public void checkPipelineHealth() {
// 检查最近2小时是否有评估记录(管道是否在跑)
Instant twoHoursAgo = Instant.now().minus(Duration.ofHours(2));
long recentEvalCount = resultRepository.countSince(twoHoursAgo);
if (recentEvalCount == 0) {
alertService.sendCriticalAlert(
"持续评估管道疑似停止",
"过去2小时没有评估记录,请检查管道是否正常运行"
);
}
}
}持续评估管道的核心价值是:让质量变成可观测的指标,而不是靠用户投诉来发现问题。这是生产LLM系统从"凭感觉运维"到"数据驱动运维"的关键基础设施。
