第2166篇:生产LLM的漂移检测——模型行为悄悄变化时如何第一时间发现
2026/4/30大约 6 分钟
第2166篇:生产LLM的漂移检测——模型行为悄悄变化时如何第一时间发现
适读人群:负责生产LLM系统稳定性的工程师 | 阅读时长:约18分钟 | 核心价值:建立系统性的LLM漂移检测机制,第一时间发现输入分布变化和模型行为变化
不久前,我们的客服AI出现了一个奇怪的问题:用户的问题没有变,Prompt没有变,模型没有换,但某类问题的回答质量悄悄变差了。
排查了很久,最后发现是知识库里一篇关键文档被更新了,新版本的措辞更简洁,但信息量少了,导致RAG检索到的内容质量下降,进而影响了生成质量。
这就是漂移(Drift)——系统的某个环节发生了变化,导致整体行为偏离预期。
LLM系统的漂移有多种来源:
1. 输入漂移(Input Drift)
- 用户问题的分布变化(新产品发布,出现新的问题类型)
- 用户语言风格变化(社会热词、新表达方式)
2. 数据漂移(Data Drift)
- 知识库文档被更新/删除
- RAG检索结果质量变化
3. 模型漂移(Model Drift)
- 模型提供商静默更新了基础模型
- API行为变化(温度、token限制等)
4. 概念漂移(Concept Drift)
- 业务规则改变,原来"正确"的答案不再正确
- 法规政策更新输入漂移检测
/**
* 输入漂移检测服务
*
* 监控用户输入的分布变化
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class InputDriftDetector {
private final EmbeddingModel embeddingModel;
private final InputDistributionRepository distributionRepository;
private final AlertService alertService;
/**
* 更新输入分布的基线(在稳定期运行)
*/
public void updateBaseline(List<String> referenceInputs, String baselineName) {
log.info("更新输入基线: {}, 样本数={}", baselineName, referenceInputs.size());
// 计算输入embedding的统计特征
List<float[]> embeddings = referenceInputs.stream()
.map(embeddingModel::embed)
.collect(Collectors.toList());
// 计算均值向量(中心点)
float[] centroid = computeCentroid(embeddings);
// 计算与中心点的平均距离(用于检测分布偏移)
double avgDistFromCentroid = embeddings.stream()
.mapToDouble(e -> euclideanDist(e, centroid))
.average().orElse(0);
double stdDistFromCentroid = computeStd(
embeddings.stream().mapToDouble(e -> euclideanDist(e, centroid)).toArray(),
avgDistFromCentroid
);
InputDistributionBaseline baseline = InputDistributionBaseline.builder()
.baselineName(baselineName)
.centroid(centroid)
.avgDistFromCentroid(avgDistFromCentroid)
.stdDistFromCentroid(stdDistFromCentroid)
.sampleCount(referenceInputs.size())
.createdAt(Instant.now())
.build();
distributionRepository.saveBaseline(baseline);
}
/**
* 检测最新输入与基线的漂移
*/
@Scheduled(cron = "0 0 * * * *") // 每小时检查
public void detectDrift() {
List<InputDistributionBaseline> baselines = distributionRepository.findAllBaselines();
for (InputDistributionBaseline baseline : baselines) {
Instant oneHourAgo = Instant.now().minus(Duration.ofHours(1));
List<String> recentInputs = distributionRepository.findRecentInputs(oneHourAgo, 200);
if (recentInputs.size() < 30) continue; // 样本太少,跳过
DriftScore score = computeDriftScore(baseline, recentInputs);
distributionRepository.saveDriftRecord(DriftRecord.builder()
.baselineName(baseline.getBaselineName())
.timestamp(Instant.now())
.driftScore(score.getScore())
.interpretation(score.getInterpretation())
.sampleCount(recentInputs.size())
.build());
if (score.getScore() > 0.3) { // 漂移分数超过0.3触发告警
alertService.sendWarningAlert(
"输入分布漂移检测",
String.format("检测到输入分布漂移,漂移分数=%.3f(阈值0.3)," +
"过去1小时样本数=%d。%s",
score.getScore(), recentInputs.size(), score.getInterpretation())
);
}
}
}
private DriftScore computeDriftScore(InputDistributionBaseline baseline, List<String> recentInputs) {
List<float[]> recentEmbeddings = recentInputs.stream()
.map(embeddingModel::embed)
.collect(Collectors.toList());
// 计算最近输入与基线中心点的平均距离
double recentAvgDist = recentEmbeddings.stream()
.mapToDouble(e -> euclideanDist(e, baseline.getCentroid()))
.average().orElse(0);
// 归一化漂移分数:当前距离超过基线均值+2σ时开始告警
double threshold = baseline.getAvgDistFromCentroid() + 2 * baseline.getStdDistFromCentroid();
double normalizedDrift = Math.max(0, (recentAvgDist - baseline.getAvgDistFromCentroid()) /
(threshold - baseline.getAvgDistFromCentroid()));
String interpretation = normalizedDrift < 0.3
? "输入分布正常"
: normalizedDrift < 0.6
? "检测到轻微输入漂移,可能有新的问题类型出现"
: "检测到显著输入漂移,建议检查是否有重大业务事件或异常流量";
return new DriftScore(Math.min(1.0, normalizedDrift), interpretation);
}
private float[] computeCentroid(List<float[]> embeddings) {
int dim = embeddings.get(0).length;
float[] centroid = new float[dim];
for (float[] emb : embeddings) {
for (int i = 0; i < dim; i++) centroid[i] += emb[i];
}
for (int i = 0; i < dim; i++) centroid[i] /= embeddings.size();
return centroid;
}
private double euclideanDist(float[] a, float[] b) {
double sum = 0;
for (int i = 0; i < Math.min(a.length, b.length); i++) {
double diff = a[i] - b[i];
sum += diff * diff;
}
return Math.sqrt(sum);
}
private double computeStd(double[] values, double mean) {
double variance = Arrays.stream(values).map(v -> Math.pow(v - mean, 2)).average().orElse(0);
return Math.sqrt(variance);
}
}模型行为漂移检测
/**
* 模型行为漂移检测
*
* 检测模型在相同输入下的输出是否发生了变化
* (用于检测模型提供商的静默更新)
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ModelBehaviorDriftDetector {
private final ChatClient llmClient;
private final ModelBehaviorBaselineRepository baselineRepository;
private final AlertService alertService;
/**
* 定期运行行为基准测试,检测模型是否发生变化
*/
@Scheduled(cron = "0 0 4 * * *") // 每天凌晨4点
public void runBehaviorProbe() {
// 行为探针:一组固定的测试输入,预期输出
List<BehaviorProbe> probes = baselineRepository.loadProbes();
int consistentCount = 0;
List<BehaviorChange> changes = new ArrayList<>();
for (BehaviorProbe probe : probes) {
String currentOutput = llmClient.prompt()
.user(probe.getInput())
.call()
.content();
BehaviorConsistency consistency = checkConsistency(probe, currentOutput);
if (consistency.isConsistent()) {
consistentCount++;
} else {
changes.add(BehaviorChange.builder()
.probeId(probe.getId())
.probeInput(probe.getInput())
.expectedOutput(probe.getBaselineOutput())
.actualOutput(currentOutput)
.changeType(consistency.getChangeType())
.severity(consistency.getSeverity())
.build());
}
}
double consistencyRate = (double) consistentCount / probes.size();
// 保存检测结果
baselineRepository.saveDriftReport(ModelDriftReport.builder()
.timestamp(Instant.now())
.probeCount(probes.size())
.consistentCount(consistentCount)
.consistencyRate(consistencyRate)
.changes(changes)
.build());
log.info("模型行为探针完成: 一致率={:.1f}%({}/{})",
consistencyRate * 100, consistentCount, probes.size());
if (consistencyRate < 0.85) {
alertService.sendWarningAlert(
"模型行为漂移检测",
String.format("模型行为一致率仅%.1f%%(阈值85%%),%d个探针发现行为变化。" +
"可能是模型提供商进行了更新。",
consistencyRate * 100, changes.size())
);
}
}
private BehaviorConsistency checkConsistency(BehaviorProbe probe, String currentOutput) {
// 检查输出长度变化
int lengthChange = Math.abs(currentOutput.length() - probe.getBaselineOutput().length());
double lengthChangePct = (double) lengthChange / probe.getBaselineOutput().length();
if (lengthChangePct > 0.5) {
return BehaviorConsistency.inconsistent("输出长度显著变化", BehaviorChangeSeverity.MEDIUM);
}
// 检查关键词是否保留
boolean keywordsPresent = probe.getRequiredKeywords().stream()
.allMatch(kw -> currentOutput.toLowerCase().contains(kw.toLowerCase()));
if (!keywordsPresent) {
return BehaviorConsistency.inconsistent("输出缺失必要关键词", BehaviorChangeSeverity.HIGH);
}
// 检查是否包含了不应有的内容
boolean noForbiddenContent = probe.getForbiddenPatterns().stream()
.noneMatch(fp -> currentOutput.toLowerCase().contains(fp.toLowerCase()));
if (!noForbiddenContent) {
return BehaviorConsistency.inconsistent("输出包含不应有的内容", BehaviorChangeSeverity.CRITICAL);
}
return BehaviorConsistency.consistent();
}
/**
* 建立模型行为基线(系统稳定时调用)
*/
public void establishBehaviorBaseline(List<String> probeInputs) {
List<BehaviorProbe> probes = new ArrayList<>();
for (String input : probeInputs) {
// 多次采样,取一致的输出作为基线
List<String> samples = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String output = llmClient.prompt().user(input).call().content();
samples.add(output);
}
// 找共同关键词
Set<String> commonKeywords = extractCommonKeywords(samples);
probes.add(BehaviorProbe.builder()
.id(UUID.randomUUID().toString())
.input(input)
.baselineOutput(samples.get(0))
.requiredKeywords(new ArrayList<>(commonKeywords))
.forbiddenPatterns(List.of()) // 根据业务规则添加
.createdAt(Instant.now())
.build());
}
baselineRepository.saveProbes(probes);
log.info("建立模型行为基线,探针数={}", probes.size());
}
private Set<String> extractCommonKeywords(List<String> outputs) {
if (outputs.isEmpty()) return new HashSet<>();
// 简化实现:找在所有输出中都出现的词(长度>3)
String[] words = outputs.get(0).split("[\\s,.!?。!?,、]+");
Set<String> common = new HashSet<>();
for (String word : words) {
if (word.length() > 3) {
String finalWord = word;
boolean inAll = outputs.stream().allMatch(o -> o.contains(finalWord));
if (inAll) common.add(word);
}
}
return common;
}
}综合漂移监控看板
/**
* 漂移监控聚合服务
*
* 整合所有类型的漂移信号,给出综合的系统健康状态
*/
@Service
@RequiredArgsConstructor
public class DriftMonitoringAggregator {
private final InputDriftDetector inputDriftDetector;
private final ModelBehaviorDriftDetector behaviorDriftDetector;
private final QualityBaselineService qualityBaseline;
@GetMapping("/drift-status")
public SystemDriftStatus getSystemDriftStatus() {
// 汇总各维度的漂移信号
List<DriftSignal> signals = new ArrayList<>();
// 输入漂移信号
double inputDriftScore = getLatestInputDriftScore();
signals.add(new DriftSignal("input_distribution", inputDriftScore,
getDriftLevel(inputDriftScore)));
// 质量漂移信号
double qualityDriftScore = getLatestQualityDriftScore();
signals.add(new DriftSignal("output_quality", qualityDriftScore,
getDriftLevel(qualityDriftScore)));
// 模型行为一致性信号
double behaviorDriftScore = getLatestBehaviorConsistencyScore();
signals.add(new DriftSignal("model_behavior", 1 - behaviorDriftScore,
getDriftLevel(1 - behaviorDriftScore)));
// 综合健康分数
double healthScore = signals.stream()
.mapToDouble(s -> 1 - s.getDriftScore())
.average().orElse(1.0);
return SystemDriftStatus.builder()
.healthScore(healthScore)
.driftSignals(signals)
.overallStatus(classifyHealth(healthScore))
.lastUpdated(Instant.now())
.build();
}
private DriftLevel getDriftLevel(double score) {
if (score < 0.2) return DriftLevel.NORMAL;
if (score < 0.4) return DriftLevel.WARNING;
return DriftLevel.CRITICAL;
}
private HealthStatus classifyHealth(double score) {
if (score >= 0.85) return HealthStatus.HEALTHY;
if (score >= 0.70) return HealthStatus.DEGRADED;
return HealthStatus.CRITICAL;
}
private double getLatestInputDriftScore() { return 0.1; } // placeholder
private double getLatestQualityDriftScore() { return 0.1; } // placeholder
private double getLatestBehaviorConsistencyScore() { return 0.95; } // placeholder
}漂移检测是LLM系统"保持健康"的长期机制。很多团队把精力放在系统上线上,但上线只是开始。生产环境是动态的,需要持续监控各类漂移信号,才能保证系统长期稳定。
