第1812篇:复杂事件处理(CEP)在AI异常检测中的应用
第1812篇:复杂事件处理(CEP)在AI异常检测中的应用
我一直觉得CEP是被严重低估的技术。
在各大技术会议上,你总能听到Transformer、RAG、向量数据库,但很少有人聊CEP。可实际上,在金融风控、工业监控、网络安全这些领域,CEP+AI才是真正在生产上跑着赚钱的方案。
这篇文章聊的不是抽象概念。我在一个工厂设备监控项目里把Flink CEP和LLM做了深度整合,系统上线后把误报率从37%降到了8%以下。我把这段经历完整写出来,有代码有教训。
CEP到底解决什么问题
先澄清一个误解:很多人以为CEP就是"复杂的条件判断",其实差远了。
CEP的核心是在时间维度上识别事件的模式。举个工厂设备的例子:
- 单纯的阈值告警:温度 > 90°C 就报警。问题是温度偶发性超过90°C可能是正常工况波动,这样会产生大量误报。
- CEP模式检测:在5分钟内,温度先升高超过85°C,然后振动频率同时异常,紧接着电流波动超过20%——这三个事件按这个顺序发生,才报警。
后者描述的是设备真实的故障前兆,精准得多。LLM在这里的作用是什么?它负责解读CEP检测到的异常模式,给运维人员一个人话说的诊断结论,并推荐处理建议。
这就是CEP+AI的组合价值:CEP负责发现,LLM负责解读。
系统整体架构
这里有个关键设计:CEP检测到的复杂异常,先查询历史诊断库(RAG),看有没有相似的历史故障案例,然后结合当前的传感器数据一起扔给LLM,生成诊断报告。这样LLM的输出质量会高出很多。
数据模型
// 传感器原始数据
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SensorEvent {
private String deviceId;
private String sensorType; // TEMPERATURE / VIBRATION / CURRENT / PRESSURE
private double value;
private String unit;
private long timestamp;
private Map<String, Object> metadata;
// 判断是否超阈值
public boolean exceedsThreshold(double threshold) {
return this.value > threshold;
}
}
// CEP检测到的复杂事件
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ComplexAnomalyEvent {
private String anomalyId;
private String deviceId;
private AnomalyPattern pattern; // 触发的模式类型
private List<SensorEvent> triggerEvents; // 触发这次告警的所有事件
private long startTime;
private long endTime;
private AnomalySeverity severity;
private Map<String, Double> aggregatedMetrics; // 聚合统计指标
public enum AnomalyPattern {
THERMAL_OVERLOAD, // 热过载
BEARING_DEGRADATION, // 轴承退化
ELECTRICAL_FAULT, // 电气故障
MECHANICAL_RESONANCE, // 机械共振
MULTI_SENSOR_CORRELATION // 多传感器关联异常
}
public enum AnomalySeverity {
LOW, MEDIUM, HIGH, CRITICAL
}
}
// LLM诊断报告
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DiagnosisReport {
private String anomalyId;
private String deviceId;
private String rootCauseAnalysis; // 根因分析
private List<String> recommendations; // 处理建议
private double confidenceScore;
private List<String> similarCases; // 相似历史案例
private String urgencyLevel; // 紧急程度说明
private long generatedAt;
}Flink CEP模式定义
这是整个方案的核心。Flink CEP的模式定义语言非常强大,但文档比较晦涩,我来把实际用到的几种模式拆开讲:
@Component
@Slf4j
public class AnomalyPatternFactory {
/**
* 热过载模式:
* 温度先升到85°C以上,10分钟内振动异常,再5分钟内电流波动
* 这是我们定义的轴承过热前兆
*/
public Pattern<SensorEvent, ?> thermalOverloadPattern() {
return Pattern.<SensorEvent>begin("temp-rise")
.where(new SimpleCondition<SensorEvent>() {
@Override
public boolean filter(SensorEvent event) {
return "TEMPERATURE".equals(event.getSensorType())
&& event.getValue() > 85.0;
}
})
.next("vibration-anomaly") // 紧接着发生(中间不允许有其他事件)
.where(new SimpleCondition<SensorEvent>() {
@Override
public boolean filter(SensorEvent event) {
return "VIBRATION".equals(event.getSensorType())
&& event.getValue() > 15.0; // 振动超过15mm/s
}
})
.within(Duration.ofMinutes(10)) // 整个模式在10分钟内完成
.followedBy("current-spike") // 接着发生(中间允许有其他事件)
.where(new SimpleCondition<SensorEvent>() {
@Override
public boolean filter(SensorEvent event) {
return "CURRENT".equals(event.getSensorType())
&& event.getValue() > 120.0; // 电流超120%额定值
}
})
.within(Duration.ofMinutes(15));
}
/**
* 轴承退化模式:
* 振动幅值持续在高位(连续出现3次以上超过阈值)
* 这是一个"量变引起质变"的模式
*/
public Pattern<SensorEvent, ?> bearingDegradationPattern() {
return Pattern.<SensorEvent>begin("vibration-high", AfterMatchSkipStrategy.skipPastLastEvent())
.where(new SimpleCondition<SensorEvent>() {
@Override
public boolean filter(SensorEvent event) {
return "VIBRATION".equals(event.getSensorType())
&& event.getValue() > 12.0;
}
})
.timesOrMore(3) // 至少出现3次
.greedy() // 尽可能多匹配
.within(Duration.ofMinutes(30));
}
/**
* 多传感器关联异常:
* 任意两种传感器在短时间内都出现异常
* 使用OR条件和时间窗口实现
*/
public Pattern<SensorEvent, ?> multiSensorCorrelationPattern() {
return Pattern.<SensorEvent>begin("first-sensor-anomaly")
.where(new IterativeCondition<SensorEvent>() {
@Override
public boolean filter(SensorEvent event, Context<SensorEvent> ctx) {
return isAnomalous(event);
}
})
.followedByAny("second-sensor-anomaly") // followedByAny允许交叉事件
.where(new IterativeCondition<SensorEvent>() {
@Override
public boolean filter(SensorEvent event, Context<SensorEvent> ctx) {
// 第二个异常必须来自不同的传感器类型
SensorEvent first = ctx.getEventsForPattern("first-sensor-anomaly")
.iterator().next();
return isAnomalous(event)
&& !event.getSensorType().equals(first.getSensorType());
}
})
.within(Duration.ofMinutes(5));
}
private boolean isAnomalous(SensorEvent event) {
Map<String, Double> thresholds = Map.of(
"TEMPERATURE", 85.0,
"VIBRATION", 12.0,
"CURRENT", 115.0,
"PRESSURE", 8.0
);
Double threshold = thresholds.get(event.getSensorType());
return threshold != null && event.getValue() > threshold;
}
}Flink Job主体
@Component
@Slf4j
public class AnomalyDetectionJob {
private final AnomalyPatternFactory patternFactory;
private final LLMDiagnosisService diagnosisService;
public void buildAndRunJob(StreamExecutionEnvironment env) throws Exception {
// 数据源:从Kafka读取传感器数据
KafkaSource<SensorEvent> source = KafkaSource.<SensorEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("raw-sensor-data")
.setGroupId("cep-anomaly-detector")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SensorEventDeserializer())
.build();
DataStream<SensorEvent> sensorStream = env
.fromSource(source, WatermarkStrategy
.<SensorEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getTimestamp()),
"Sensor Stream")
.keyBy(SensorEvent::getDeviceId); // 按设备ID分组
// 应用热过载模式
PatternStream<SensorEvent> thermalPatternStream = CEP.pattern(
sensorStream,
patternFactory.thermalOverloadPattern()
);
// 应用轴承退化模式
PatternStream<SensorEvent> bearingPatternStream = CEP.pattern(
sensorStream,
patternFactory.bearingDegradationPattern()
);
// 提取匹配的复杂事件
DataStream<ComplexAnomalyEvent> thermalAnomalies = thermalPatternStream
.select(new ThermalOverloadPatternSelectFunction());
DataStream<ComplexAnomalyEvent> bearingAnomalies = bearingPatternStream
.select(new BearingDegradationSelectFunction());
// 合并所有异常流
DataStream<ComplexAnomalyEvent> allAnomalies = thermalAnomalies
.union(bearingAnomalies);
// 异步调用LLM进行诊断(异步算子是关键!)
DataStream<DiagnosisReport> diagnosisStream = AsyncDataStream
.unorderedWait(
allAnomalies,
new LLMDiagnosisAsyncFunction(diagnosisService),
30, // 超时30秒
TimeUnit.SECONDS,
100 // 最多100个并发异步请求
);
// 输出到Kafka
diagnosisStream.addSink(buildKafkaSink());
env.execute("Anomaly Detection with CEP");
}
// CEP模式匹配函数
static class ThermalOverloadPatternSelectFunction
implements PatternSelectFunction<SensorEvent, ComplexAnomalyEvent> {
@Override
public ComplexAnomalyEvent select(Map<String, List<SensorEvent>> pattern) {
List<SensorEvent> tempEvents = pattern.get("temp-rise");
List<SensorEvent> vibEvents = pattern.get("vibration-anomaly");
List<SensorEvent> currentEvents = pattern.get("current-spike");
List<SensorEvent> allEvents = new ArrayList<>();
allEvents.addAll(tempEvents);
allEvents.addAll(vibEvents);
allEvents.addAll(currentEvents);
// 计算聚合指标
Map<String, Double> metrics = new HashMap<>();
tempEvents.forEach(e -> metrics.merge("max_temperature", e.getValue(), Math::max));
vibEvents.forEach(e -> metrics.merge("max_vibration", e.getValue(), Math::max));
currentEvents.forEach(e -> metrics.merge("max_current", e.getValue(), Math::max));
String deviceId = allEvents.get(0).getDeviceId();
long startTime = allEvents.stream().mapToLong(SensorEvent::getTimestamp).min().orElse(0);
long endTime = allEvents.stream().mapToLong(SensorEvent::getTimestamp).max().orElse(0);
return ComplexAnomalyEvent.builder()
.anomalyId(UUID.randomUUID().toString())
.deviceId(deviceId)
.pattern(ComplexAnomalyEvent.AnomalyPattern.THERMAL_OVERLOAD)
.triggerEvents(allEvents)
.startTime(startTime)
.endTime(endTime)
.severity(ComplexAnomalyEvent.AnomalySeverity.HIGH)
.aggregatedMetrics(metrics)
.build();
}
}
}LLM异步诊断算子
Flink的AsyncFunction是处理LLM这类高延迟调用的正确方式:
@Slf4j
public class LLMDiagnosisAsyncFunction
extends RichAsyncFunction<ComplexAnomalyEvent, DiagnosisReport> {
private final LLMDiagnosisService diagnosisService;
private transient ExecutorService executorService;
public LLMDiagnosisAsyncFunction(LLMDiagnosisService diagnosisService) {
this.diagnosisService = diagnosisService;
}
@Override
public void open(Configuration parameters) {
// 用虚拟线程池处理异步LLM调用
executorService = Executors.newVirtualThreadPerTaskExecutor();
}
@Override
public void asyncInvoke(ComplexAnomalyEvent anomaly,
ResultFuture<DiagnosisReport> resultFuture) {
CompletableFuture.supplyAsync(() -> {
try {
return diagnosisService.diagnose(anomaly);
} catch (Exception e) {
log.error("LLM diagnosis failed for anomaly: {}", anomaly.getAnomalyId(), e);
// 降级:返回基于规则的基础诊断
return createFallbackReport(anomaly);
}
}, executorService).thenAccept(report ->
resultFuture.complete(Collections.singletonList(report))
);
}
@Override
public void timeout(ComplexAnomalyEvent input, ResultFuture<DiagnosisReport> resultFuture) {
log.warn("LLM diagnosis timeout for anomaly: {}", input.getAnomalyId());
resultFuture.complete(Collections.singletonList(createFallbackReport(input)));
}
private DiagnosisReport createFallbackReport(ComplexAnomalyEvent anomaly) {
// 基于规则的降级报告
String analysis = switch (anomaly.getPattern()) {
case THERMAL_OVERLOAD -> "检测到热过载模式,建议立即降低设备负载并检查冷却系统";
case BEARING_DEGRADATION -> "检测到轴承退化迹象,建议安排预防性维护";
case ELECTRICAL_FAULT -> "检测到电气异常,建议检查电源和接线";
default -> "检测到多传感器关联异常,建议人工排查";
};
return DiagnosisReport.builder()
.anomalyId(anomaly.getAnomalyId())
.deviceId(anomaly.getDeviceId())
.rootCauseAnalysis(analysis)
.recommendations(List.of("立即检查设备状态", "联系维修人员"))
.confidenceScore(0.6)
.urgencyLevel("需要人工确认")
.generatedAt(System.currentTimeMillis())
.build();
}
@Override
public void close() {
if (executorService != null) {
executorService.shutdown();
}
}
}LLM诊断服务核心逻辑
@Service
@Slf4j
public class LLMDiagnosisService {
private final ChatLanguageModel chatModel;
private final VectorStore vectorStore; // 历史案例向量数据库
private final EmbeddingModel embeddingModel;
public DiagnosisReport diagnose(ComplexAnomalyEvent anomaly) {
// Step 1: 检索相似历史案例(RAG)
String queryText = buildQueryText(anomaly);
List<TextSegment> similarCases = vectorStore.search(
EmbeddingSearchRequest.builder()
.queryEmbedding(embeddingModel.embed(queryText).content())
.maxResults(3)
.minScore(0.75)
.build()
).matches().stream()
.map(EmbeddingMatch::embedded)
.collect(Collectors.toList());
// Step 2: 构造诊断Prompt
String prompt = buildDiagnosisPrompt(anomaly, similarCases);
// Step 3: 调用LLM
String response = chatModel.generate(prompt);
// Step 4: 解析结果
return parseDiagnosisResponse(anomaly, response, similarCases);
}
private String buildDiagnosisPrompt(ComplexAnomalyEvent anomaly,
List<TextSegment> similarCases) {
StringBuilder sb = new StringBuilder();
sb.append("你是一位资深的工业设备故障诊断专家。\n\n");
sb.append("## 当前异常事件\n");
sb.append("设备ID: ").append(anomaly.getDeviceId()).append("\n");
sb.append("异常模式: ").append(anomaly.getPattern().name()).append("\n");
sb.append("严重程度: ").append(anomaly.getSeverity().name()).append("\n");
sb.append("时间范围: ").append(formatTimeRange(anomaly.getStartTime(), anomaly.getEndTime())).append("\n\n");
sb.append("## 传感器数据\n");
anomaly.getAggregatedMetrics().forEach((key, value) ->
sb.append(key).append(": ").append(String.format("%.2f", value)).append("\n"));
sb.append("\n## 触发事件序列\n");
anomaly.getTriggerEvents().forEach(event ->
sb.append(String.format("[%s] %s: %.2f %s\n",
formatTimestamp(event.getTimestamp()),
event.getSensorType(),
event.getValue(),
event.getUnit())));
if (!similarCases.isEmpty()) {
sb.append("\n## 历史相似案例\n");
similarCases.forEach(case_ ->
sb.append("- ").append(case_.text()).append("\n"));
}
sb.append("\n请提供:\n");
sb.append("1. 根因分析(100字以内)\n");
sb.append("2. 处理建议(3-5条具体操作)\n");
sb.append("3. 紧急程度说明\n");
sb.append("4. 置信度(0.0-1.0)\n\n");
sb.append("以JSON格式返回,字段:rootCause, recommendations(数组), urgency, confidence");
return sb.toString();
}
private String buildQueryText(ComplexAnomalyEvent anomaly) {
return String.format("设备故障 %s 模式 温度%s 振动%s",
anomaly.getPattern().name(),
anomaly.getAggregatedMetrics().getOrDefault("max_temperature", 0.0),
anomaly.getAggregatedMetrics().getOrDefault("max_vibration", 0.0));
}
private DiagnosisReport parseDiagnosisResponse(ComplexAnomalyEvent anomaly,
String response,
List<TextSegment> similarCases) {
try {
ObjectMapper mapper = new ObjectMapper();
String jsonStr = extractJson(response);
JsonNode node = mapper.readTree(jsonStr);
List<String> recommendations = new ArrayList<>();
node.get("recommendations").forEach(r -> recommendations.add(r.asText()));
List<String> caseTexts = similarCases.stream()
.map(TextSegment::text)
.collect(Collectors.toList());
return DiagnosisReport.builder()
.anomalyId(anomaly.getAnomalyId())
.deviceId(anomaly.getDeviceId())
.rootCauseAnalysis(node.get("rootCause").asText())
.recommendations(recommendations)
.confidenceScore(node.get("confidence").asDouble(0.7))
.urgencyLevel(node.get("urgency").asText())
.similarCases(caseTexts)
.generatedAt(System.currentTimeMillis())
.build();
} catch (Exception e) {
log.warn("Failed to parse LLM diagnosis response", e);
// fallback处理
return DiagnosisReport.builder()
.anomalyId(anomaly.getAnomalyId())
.deviceId(anomaly.getDeviceId())
.rootCauseAnalysis("解析失败,需要人工检查原始LLM输出")
.recommendations(List.of("人工审核"))
.confidenceScore(0.0)
.urgencyLevel("人工确认")
.generatedAt(System.currentTimeMillis())
.build();
}
}
private String extractJson(String text) {
int start = text.indexOf('{');
int end = text.lastIndexOf('}');
if (start >= 0 && end > start) {
return text.substring(start, end + 1);
}
throw new IllegalArgumentException("No valid JSON in LLM response");
}
private String formatTimeRange(long start, long end) {
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("HH:mm:ss");
return LocalDateTime.ofEpochSecond(start/1000, 0, ZoneOffset.UTC).format(fmt)
+ " - "
+ LocalDateTime.ofEpochSecond(end/1000, 0, ZoneOffset.UTC).format(fmt);
}
private String formatTimestamp(long ts) {
return LocalDateTime.ofEpochSecond(ts/1000, 0, ZoneOffset.UTC)
.format(DateTimeFormatter.ofPattern("HH:mm:ss"));
}
}CEP参数调优经验
这几个参数我在生产上反复调过,记录下来供参考:
时间窗口(within):这是最关键的参数,直接影响误报率和漏报率。窗口太小——正常的工况变化也会被识别成"模式";窗口太大——真正的故障早期就该处理了,等窗口满了才触发太晚。我的做法是先从历史故障数据中统计真实故障发展的时间分布,把P50作为初始窗口值,再逐步调整。
afterMatchSkipStrategy:控制一个匹配之后,下次匹配从哪里开始。skipPastLastEvent会跳过已匹配的事件,防止一段数据触发大量重叠告警(告警风暴)。我们之前没配这个,一个设备故障能触发几十条告警,运维人员直接麻了。
超时处理(TimedOutPartialMatchHandler):模式没有在时间窗口内完全匹配,这些"半匹配"事件怎么处理?默认是丢弃,但在某些场景下,半匹配本身也是一个值得关注的信号。我们对"热过载"模式,如果前两个事件匹配了但第三个没来,也会生成一条低级别告警。
状态清理:CEP内部维护了大量状态(等待中的部分匹配)。如果传感器发送频率很高,而且定义了很多模式,状态会膨胀很快。要定期监控Flink的状态大小,并合理配置TTL。
踩坑:时间对齐问题
CEP有一个让我折腾了两天的问题:乱序事件。
IoT设备的网络传输不稳定,传感器数据经常乱序到达。比如温度上升事件时间戳是10:00:01,但由于网络延迟,它在10:00:03才到Flink。如果振动异常事件时间戳是10:00:02,并且它先到了,CEP会先处理振动事件,然后温度事件到达时,时序就反了,模式匹配失败。
解决方案是配合Flink的Watermark机制:允许10秒钟的乱序容忍,用forBoundedOutOfOrderness(Duration.ofSeconds(10))。但这会带来额外的10秒延迟。在我们的场景里,10秒是可以接受的,毕竟设备故障不需要毫秒级响应。
如果你的场景对延迟要求极高,可以考虑在IoT网关层做时间对齐,减少乱序到达Flink的概率。
这套系统上线3个月后,我们复盘了一次数据。CEP检测到的复杂异常中,LLM给出的诊断置信度在0.8以上的占71%,有9%是明确的误报(LLM自己在报告里说"数据异常,建议复查传感器"),剩下20%需要人工进一步确认。
比起之前纯规则告警37%误报率的状态,这个结果让运维团队很满意。
