供应链/运营异常检测——AI 怎么做时序数据的智能监控
供应链/运营异常检测——AI 怎么做时序数据的智能监控
去年帮一个做供应链的客户做 AI 项目,他们遇到一个让人头疼的问题:某个仓库每天的出货量,最近两周突然比正常值低了 30%,但没有任何报警。
仓储系统有报警,但报警规则是手工配的:出货量低于 X 就报警。问题是 X 是按历史均值设的,但那个仓库到了旺季出货量会翻倍,均值根本不够参考,季节性波动把异常掩盖了。
排查出来的根因是:一台叉车故障,导致某个库区的货物取不出来,但因为出货总量只降了 30%(其他库区补了一部分),没有触发报警阈值。拖了两周才发现,损失不小。
这个案例让我认识到:传统的固定阈值报警,对于有周期性、季节性的业务时序数据,根本不够用。
时序数据的 AI 分析和文本 RAG 完全不同
做过 RAG 的同学转过来做时序异常检测,常犯一个错误:把时序数据当文本来处理,或者把"异常检测"和"RAG 查询"混为一谈。
两者的本质差异:
| 维度 | 文本 RAG | 时序异常检测 |
|---|---|---|
| 数据形态 | 离散的文本片段 | 连续的时间序列 |
| 查询方式 | 语义相似检索 | 统计偏差检测 |
| LLM 的角色 | 生成答案 | 解释异常原因 |
| 准确率要求 | 答案合理即可 | 漏报/误报率要量化 |
| 实时性要求 | 秒级响应 | 分钟级到秒级 |
时序异常检测的技术核心是统计方法,LLM 在这里的角色不是检测者,而是解释者:把统计算法检测到的异常点,用自然语言解释给业务人员。
这是一个分工模式:统计算法负责"发现了什么",LLM 负责"这意味着什么,应该怎么办"。
统计基线 + LLM 解释的技术架构
整体思路分三层:
第一层:基线构建 基于历史数据,构建每个指标的正常波动范围。要考虑:
- 趋势(整体上升/下降)
- 季节性(每周、每月、每年的周期性变化)
- 残差(去掉趋势和季节性后的随机波动)
第二层:异常检测 实时数据到来时,与基线比较,超出正常波动范围的打上"异常"标签。
第三层:LLM 解释 把异常数据点 + 相关的业务上下文 + 历史故障知识,交给 LLM 生成业务语言的异常解释和处理建议。
核心数据结构
@Data
public class TimeSeriesPoint {
private String metricName; // 指标名称(如"wh001_出货量")
private LocalDateTime timestamp;
private double value;
private Map<String, String> dimensions; // 维度标签(仓库ID、品类等)
}
@Data
public class AnomalyDetectionResult {
private String metricName;
private LocalDateTime detectedAt;
private TimeSeriesPoint anomalyPoint;
private double expectedValue; // 基线预期值
private double actualValue; // 实际值
private double deviationRatio; // 偏差比例
private AnomalyType anomalyType;
private AnomalySeverity severity;
// 统计检测的上下文(用于LLM解释)
private List<TimeSeriesPoint> recentHistory; // 最近N个数据点
private double baseline; // 当前时间点的基线
private double upperBound; // 正常范围上界
private double lowerBound; // 正常范围下界
public enum AnomalyType {
SPIKE, // 突刺(短时间急剧升高)
DROP, // 急跌
TREND_CHANGE, // 趋势变化(持续上升/下降)
LEVEL_SHIFT, // 水平位移(长期偏离基线)
MISSING_DATA // 数据缺失
}
public enum AnomalySeverity {
WARNING, // 偏差 10-30%
CRITICAL, // 偏差 30-50%
SEVERE // 偏差 >50%
}
}
@Data
public class AnomalyExplanation {
private String anomalyId;
private String businessSummary; // 业务语言的异常摘要(一句话)
private String detailedAnalysis; // 详细分析
private List<String> possibleCauses; // 可能的原因
private List<String> recommendedActions; // 建议操作
private String urgencyLevel; // 紧急程度
private List<String> historicalSimilarCases; // 历史相似案例
}统计异常检测器实现
基于 STL 分解的基线构建
STL(Seasonal and Trend decomposition using Loess)是时序数据分解的标准方法,把时序数据分解成趋势、季节性、残差三部分。
Java 生态里没有现成的 STL 库,我用了一个简化版:
@Component
public class StatisticalBaselineBuilder {
/**
* 构建指标的动态基线
* 使用移动平均 + 同期对比的简化方法
*/
public MetricBaseline buildBaseline(String metricName,
List<TimeSeriesPoint> historicalData,
int seasonalPeriod) {
MetricBaseline baseline = new MetricBaseline();
baseline.setMetricName(metricName);
if (historicalData.size() < seasonalPeriod * 2) {
// 历史数据不足,使用简单统计
return buildSimpleBaseline(metricName, historicalData);
}
// 1. 计算整体趋势(简单线性回归)
double trend = calculateLinearTrend(historicalData);
baseline.setTrendSlope(trend);
// 2. 计算季节性分量(同期平均)
Map<Integer, Double> seasonalFactors = calculateSeasonalFactors(
historicalData, seasonalPeriod
);
baseline.setSeasonalFactors(seasonalFactors);
// 3. 计算残差的标准差(用于确定异常阈值)
List<Double> residuals = calculateResiduals(historicalData, trend, seasonalFactors, seasonalPeriod);
double residualStd = calculateStd(residuals);
baseline.setResidualStd(residualStd);
// 4. 设置异常检测阈值(基于标准差的倍数)
baseline.setWarningThreshold(2.0 * residualStd); // 2σ
baseline.setCriticalThreshold(3.0 * residualStd); // 3σ
return baseline;
}
/**
* 预测某个时间点的期望值
*/
public BaselinePrediction predict(MetricBaseline baseline,
LocalDateTime targetTime,
int periodIndex) {
double trend = baseline.getTrendSlope() * periodIndex;
double seasonal = baseline.getSeasonalFactors().getOrDefault(
getPeriodPosition(targetTime, baseline.getSeasonalPeriod()), 0.0
);
double expectedValue = baseline.getMeanValue() + trend + seasonal;
double upperBound = expectedValue + baseline.getCriticalThreshold();
double lowerBound = expectedValue - baseline.getCriticalThreshold();
return new BaselinePrediction(expectedValue, upperBound, lowerBound);
}
private double calculateLinearTrend(List<TimeSeriesPoint> data) {
// 最小二乘法线性回归
int n = data.size();
double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0;
for (int i = 0; i < n; i++) {
sumX += i;
sumY += data.get(i).getValue();
sumXY += i * data.get(i).getValue();
sumX2 += i * i;
}
return (n * sumXY - sumX * sumY) / (n * sumX2 - sumX * sumX);
}
private Map<Integer, Double> calculateSeasonalFactors(
List<TimeSeriesPoint> data, int period) {
// 计算每个季节位置的平均偏差
Map<Integer, List<Double>> seasonalData = new HashMap<>();
for (int i = 0; i < data.size(); i++) {
int position = i % period;
seasonalData.computeIfAbsent(position, k -> new ArrayList<>())
.add(data.get(i).getValue());
}
Map<Integer, Double> factors = new HashMap<>();
double overallMean = data.stream().mapToDouble(TimeSeriesPoint::getValue).average().orElse(0);
for (Map.Entry<Integer, List<Double>> entry : seasonalData.entrySet()) {
double periodMean = entry.getValue().stream().mapToDouble(Double::doubleValue).average().orElse(0);
factors.put(entry.getKey(), periodMean - overallMean);
}
return factors;
}
private List<Double> calculateResiduals(List<TimeSeriesPoint> data, double trend,
Map<Integer, Double> seasonalFactors, int period) {
double mean = data.stream().mapToDouble(TimeSeriesPoint::getValue).average().orElse(0);
List<Double> residuals = new ArrayList<>();
for (int i = 0; i < data.size(); i++) {
double expected = mean + trend * i + seasonalFactors.getOrDefault(i % period, 0.0);
residuals.add(data.get(i).getValue() - expected);
}
return residuals;
}
private double calculateStd(List<Double> values) {
double mean = values.stream().mapToDouble(Double::doubleValue).average().orElse(0);
double variance = values.stream()
.mapToDouble(v -> (v - mean) * (v - mean))
.average()
.orElse(0);
return Math.sqrt(variance);
}
private int getPeriodPosition(LocalDateTime time, int period) {
// 简化:按小时取模
return (int)(time.getHour()) % period;
}
private MetricBaseline buildSimpleBaseline(String metricName, List<TimeSeriesPoint> data) {
MetricBaseline baseline = new MetricBaseline();
baseline.setMetricName(metricName);
double mean = data.stream().mapToDouble(TimeSeriesPoint::getValue).average().orElse(0);
double std = calculateStd(data.stream().map(TimeSeriesPoint::getValue).collect(Collectors.toList()));
baseline.setMeanValue(mean);
baseline.setResidualStd(std);
baseline.setWarningThreshold(2.0 * std);
baseline.setCriticalThreshold(3.0 * std);
baseline.setSeasonalFactors(new HashMap<>());
return baseline;
}
}实时异常检测器
@Service
@Slf4j
public class AnomalyDetector {
private final StatisticalBaselineBuilder baselineBuilder;
private final Map<String, MetricBaseline> baselineCache = new ConcurrentHashMap<>();
/**
* 检测单个数据点是否异常
*/
public Optional<AnomalyDetectionResult> detect(TimeSeriesPoint currentPoint,
List<TimeSeriesPoint> recentHistory) {
String metricName = currentPoint.getMetricName();
// 获取或构建基线
MetricBaseline baseline = baselineCache.computeIfAbsent(metricName,
k -> baselineBuilder.buildBaseline(k, recentHistory, 24) // 假设24小时为一个周期
);
// 预测当前时间点的期望值
BaselinePrediction prediction = baselineBuilder.predict(
baseline, currentPoint.getTimestamp(), recentHistory.size()
);
double actual = currentPoint.getValue();
double expected = prediction.getExpectedValue();
double deviation = actual - expected;
double deviationRatio = expected > 0 ? Math.abs(deviation / expected) : 0;
// 判断是否超出阈值
boolean isAnomaly = Math.abs(deviation) > baseline.getWarningThreshold();
if (!isAnomaly) {
return Optional.empty();
}
AnomalyDetectionResult result = new AnomalyDetectionResult();
result.setMetricName(metricName);
result.setDetectedAt(LocalDateTime.now());
result.setAnomalyPoint(currentPoint);
result.setExpectedValue(expected);
result.setActualValue(actual);
result.setDeviationRatio(deviationRatio);
result.setRecentHistory(recentHistory.subList(
Math.max(0, recentHistory.size() - 48), recentHistory.size() // 最近48个点
));
result.setBaseline(expected);
result.setUpperBound(prediction.getUpperBound());
result.setLowerBound(prediction.getLowerBound());
// 判断异常类型
result.setAnomalyType(classifyAnomalyType(recentHistory, actual, expected));
// 判断严重程度
result.setSeverity(classifySeverity(deviationRatio, baseline));
return Optional.of(result);
}
private AnomalyDetectionResult.AnomalyType classifyAnomalyType(
List<TimeSeriesPoint> history, double actual, double expected) {
// 检查是否是持续性偏差(最近10个点都偏高/偏低)
if (history.size() >= 10) {
List<TimeSeriesPoint> recent10 = history.subList(history.size() - 10, history.size());
double recentMean = recent10.stream().mapToDouble(TimeSeriesPoint::getValue).average().orElse(0);
if (Math.abs(recentMean - expected) > Math.abs(actual - expected) * 0.5) {
return AnomalyDetectionResult.AnomalyType.LEVEL_SHIFT;
}
}
if (actual > expected) {
return AnomalyDetectionResult.AnomalyType.SPIKE;
} else {
return AnomalyDetectionResult.AnomalyType.DROP;
}
}
private AnomalyDetectionResult.AnomalySeverity classifySeverity(
double deviationRatio, MetricBaseline baseline) {
if (Math.abs(deviationRatio) > 0.5) return AnomalyDetectionResult.AnomalySeverity.SEVERE;
if (Math.abs(deviationRatio) > 0.3) return AnomalyDetectionResult.AnomalySeverity.CRITICAL;
return AnomalyDetectionResult.AnomalySeverity.WARNING;
}
}LLM 解释异常原因
这是 LLM 真正发挥价值的地方。
@Service
@Slf4j
public class AnomalyExplainer {
private final ChatClient chatClient;
private final VectorStore knowledgeBase; // 历史故障知识库
private static final String EXPLANATION_PROMPT = """
你是一个供应链运营分析专家。以下是一个业务指标的异常检测结果,请分析并解释这个异常。
【异常基本信息】
指标名称:{metricName}
检测时间:{detectedAt}
实际值:{actualValue}
预期值(基线):{expectedValue}
偏差:{deviation}(偏差{deviationRatio}%)
异常类型:{anomalyType}
严重程度:{severity}
【近期数据趋势】
{recentTrend}
【相关维度信息】
{dimensionInfo}
【历史相似案例】
{historicalCases}
请提供:
1. 一句话的业务摘要(给管理层看的)
2. 详细分析(技术层面,给运营人员看的)
3. 3-5个可能的原因(从最可能到最不可能排序)
4. 建议的排查步骤和处理方式
5. 紧急程度(立即处理/今日内处理/例行关注)
以JSON格式返回:
{
"businessSummary": "一句话摘要",
"detailedAnalysis": "详细分析",
"possibleCauses": ["原因1", "原因2", ...],
"recommendedActions": ["步骤1", "步骤2", ...],
"urgencyLevel": "立即处理/今日内处理/例行关注"
}
""";
public AnomalyExplanation explain(AnomalyDetectionResult anomaly,
Map<String, String> businessContext) {
// 1. 从故障知识库检索历史相似案例
String searchQuery = buildSearchQuery(anomaly);
List<Document> historicalCases = knowledgeBase.similaritySearch(
SearchRequest.query(searchQuery).withTopK(3)
);
// 2. 构建 Prompt
String prompt = buildExplanationPrompt(anomaly, businessContext, historicalCases);
// 3. 调用 LLM
String response = chatClient.prompt()
.user(prompt)
.call()
.content();
// 4. 解析结果
return parseExplanationResult(anomaly, response, historicalCases);
}
private String buildExplanationPrompt(AnomalyDetectionResult anomaly,
Map<String, String> businessContext,
List<Document> historicalCases) {
// 格式化近期趋势
String recentTrend = formatRecentTrend(anomaly.getRecentHistory());
// 格式化历史案例
String historicalCasesText = historicalCases.stream()
.map(doc -> "- " + doc.getContent().substring(0, Math.min(200, doc.getContent().length())))
.collect(Collectors.joining("\n"));
// 格式化维度信息
String dimensionInfo = anomaly.getAnomalyPoint().getDimensions() != null
? anomaly.getAnomalyPoint().getDimensions().entrySet().stream()
.map(e -> e.getKey() + ":" + e.getValue())
.collect(Collectors.joining(","))
: "无";
double deviationRatio = (anomaly.getActualValue() - anomaly.getExpectedValue())
/ anomaly.getExpectedValue() * 100;
return EXPLANATION_PROMPT
.replace("{metricName}", anomaly.getMetricName())
.replace("{detectedAt}", anomaly.getDetectedAt().toString())
.replace("{actualValue}", String.format("%.2f", anomaly.getActualValue()))
.replace("{expectedValue}", String.format("%.2f", anomaly.getExpectedValue()))
.replace("{deviation}", String.format("%.2f", anomaly.getActualValue() - anomaly.getExpectedValue()))
.replace("{deviationRatio}", String.format("%.1f", Math.abs(deviationRatio)))
.replace("{anomalyType}", anomaly.getAnomalyType().name())
.replace("{severity}", anomaly.getSeverity().name())
.replace("{recentTrend}", recentTrend)
.replace("{dimensionInfo}", dimensionInfo)
.replace("{historicalCases}", historicalCasesText.isEmpty() ? "暂无历史相似案例" : historicalCasesText);
}
private String formatRecentTrend(List<TimeSeriesPoint> history) {
if (history == null || history.isEmpty()) return "无历史数据";
StringBuilder sb = new StringBuilder();
int showCount = Math.min(12, history.size());
List<TimeSeriesPoint> recent = history.subList(history.size() - showCount, history.size());
for (TimeSeriesPoint point : recent) {
sb.append(point.getTimestamp().format(DateTimeFormatter.ofPattern("MM-dd HH:mm")))
.append(": ").append(String.format("%.2f", point.getValue())).append("\n");
}
return sb.toString();
}
private String buildSearchQuery(AnomalyDetectionResult anomaly) {
return String.format("%s %s %s 异常",
anomaly.getMetricName(),
anomaly.getAnomalyType().name(),
anomaly.getAnomalyPoint().getDimensions() != null
? String.join(" ", anomaly.getAnomalyPoint().getDimensions().values())
: ""
);
}
}人工确认闭环
检测到异常、生成解释之后,还需要有人工确认的机制,形成闭环。
@Service
public class AnomalyFeedbackService {
private final AnomalyRepository anomalyRepository;
private final VectorStore knowledgeBase;
/**
* 人工确认异常真实性(是真实问题还是误报)
*/
public void confirmAnomaly(String anomalyId, AnomalyConfirmation confirmation) {
AnomalyRecord record = anomalyRepository.findById(anomalyId)
.orElseThrow(() -> new RuntimeException("异常记录不存在: " + anomalyId));
record.setConfirmedAt(LocalDateTime.now());
record.setConfirmedBy(confirmation.getConfirmedBy());
record.setIsRealAnomaly(confirmation.isRealAnomaly());
record.setActualRootCause(confirmation.getActualRootCause());
record.setResolutionActions(confirmation.getResolutionActions());
anomalyRepository.save(record);
// 如果是真实异常,加入故障知识库(用于未来类似异常的解释参考)
if (confirmation.isRealAnomaly() && confirmation.getActualRootCause() != null) {
addToKnowledgeBase(record, confirmation);
}
// 更新基线(如果异常是由正常的业务变化引起的,需要更新基线)
if (confirmation.isNeedBaselineUpdate()) {
triggerBaselineUpdate(record.getMetricName());
}
}
private void addToKnowledgeBase(AnomalyRecord record, AnomalyConfirmation confirmation) {
String content = String.format("""
故障案例:%s 指标异常
发生时间:%s
异常类型:%s
根本原因:%s
处理方式:%s
恢复时间:%s
""",
record.getMetricName(),
record.getDetectedAt(),
record.getAnomalyType(),
confirmation.getActualRootCause(),
String.join(";", confirmation.getResolutionActions()),
confirmation.getResolutionTime() != null ? confirmation.getResolutionTime().toString() : "未记录"
);
Map<String, Object> metadata = new HashMap<>();
metadata.put("metricName", record.getMetricName());
metadata.put("anomalyType", record.getAnomalyType());
metadata.put("rootCause", confirmation.getActualRootCause());
metadata.put("recordType", "HISTORICAL_ANOMALY");
knowledgeBase.add(List.of(new Document(content, metadata)));
log.info("已将异常案例添加到知识库: anomalyId={}", record.getAnomalyId());
}
}异常检测和处理流程图
实际效果
这套系统上线后,那个客户的异常检测效果:
- 误报率:从原来固定阈值方案的 42%(即将近一半的报警是误报)降到了 8%
- 漏报率:从无法量化(压根没检测季节性异常)到 < 5%
- 响应时间:从平均 2.3 天发现问题(靠人工巡检)到平均 23 分钟自动报警
- 运营效率:值班人员每天处理报警的时间从 4 小时降到 45 分钟(因为误报少了,每条报警的质量更高)
总结
时序数据的 AI 智能监控,关键是搞清楚 AI 在哪个环节发挥作用:
- 异常检测:这是统计方法的主场,AI 不是最优选择
- 异常解释:这是 LLM 的主场,把数字语言翻译成业务语言
- 知识积累:把每次处理过的异常案例加入知识库,让下次解释更准确
不要让 LLM 做它不擅长的事(统计计算),也不要把统计算法放在它不擅长的地方(自然语言理解)。合理分工,效果才好。
