构建AI驱动的DevOps:让运维工作减少80%的自动化方案
构建AI驱动的DevOps:让运维工作减少80%的自动化方案
从每天50个告警,到8个,这位运维工程师经历了什么
2026年2月的一个深夜,凌晨2:17分,王鑫的手机响了。
不是第一次,也不是那晚的最后一次。作为某电商平台的高级运维工程师,他每天的工作大概是这样:早上9点上班,查看昨晚积压的告警,平均有50到60条。挨个看,挨个判断,挨个处理。一天下来,真正解决了问题的,也许就3到4个。其余的,要么是噪音,要么是同一个根因触发的多条告警,要么是已经自愈的暂态问题。
那天凌晨2:17,他接到的告警是这样的:
[CRITICAL] API网关 P99延迟超过3秒
[CRITICAL] 订单服务 CPU使用率 92%
[WARNING] 数据库连接池 使用率 88%
[WARNING] Redis内存使用率 79%
[INFO] 某批次Job执行超时
[INFO] 消息队列积压 5000条6条告警,同一时刻触发。实际上,它们是同一个问题的6个侧面:订单服务在处理大促流量时内存泄漏,导致Full GC,导致CPU飙升,导致数据库连接不释放,导致请求堆积,导致所有下游指标恶化。
但在凌晨2点,当你盯着6条互不相关的告警,根本不知道从哪里开始排查。
6个月后,王鑫所在团队的AIOps系统上线了。同样的场景,AI自动聚合6条告警为1个事件,自动分析根因,自动输出处置建议,值班人员只需要确认一个操作:是否执行JVM堆内存动态调整+触发一次主动GC。
从50个告警,到需要人工处理的8个。运维工时减少了83%。
这篇文章,就是那套系统的完整实现。
一、AIOps的核心场景与系统架构
1.1 为什么传统告警处理效率低下
传统模式的三大痛点:
- 告警爆炸:一个根因触发几十个相关告警,每条都需要人工判断
- 知识孤岛:老工程师的故障经验在脑子里,新人只能摸索
- 凌晨响应:再紧急的问题也要等人工介入,MTTR(平均恢复时间)居高不下
1.2 AIOps系统整体架构
二、智能告警聚合:将100个告警合并为1个事件
2.1 告警数据模型
@Data
@Builder
public class Alert {
private String alertId;
private String alertName;
private AlertSeverity severity; // CRITICAL/WARNING/INFO
private String service; // 触发告警的服务名
private String host; // 触发告警的主机
private String metric; // 相关指标
private double metricValue;
private double threshold;
private Instant firedAt;
private Map<String, String> labels; // Prometheus标签
private String description;
public enum AlertSeverity {
CRITICAL(4), WARNING(3), INFO(2), DEBUG(1);
private final int level;
AlertSeverity(int level) { this.level = level; }
public int getLevel() { return level; }
}
}@Data
@Builder
public class AlertEvent {
private String eventId;
private String eventTitle; // AI生成的事件摘要
private String rootCauseAnalysis; // AI分析的根因
private List<Alert> alerts; // 聚合的告警列表
private AlertSeverity maxSeverity;
private List<String> affectedServices;
private String suggestedAction; // AI建议的处置方案
private EventStatus status;
private Instant createdAt;
private Instant resolvedAt;
public enum EventStatus {
ACTIVE, ACKNOWLEDGED, RESOLVED, AUTO_RESOLVED
}
}2.2 AI告警聚合引擎
@Service
@RequiredArgsConstructor
@Slf4j
public class AlertAggregationEngine {
private final ChatClient chatClient;
private final AlertEventRepository eventRepository;
private final EmbeddingModel embeddingModel;
private final VectorStore vectorStore;
// 时间窗口:5分钟内的相关告警聚合为一个事件
private static final Duration AGGREGATION_WINDOW = Duration.ofMinutes(5);
// 相似度阈值:余弦相似度超过0.85认为是相关告警
private static final double SIMILARITY_THRESHOLD = 0.85;
private final Map<String, AlertEvent> activeEvents = new ConcurrentHashMap<>();
/**
* 处理新告警,决定是新建事件还是聚合到已有事件
*/
public AlertEvent processAlert(Alert alert) {
// Step 1:查找是否有可以聚合的活跃事件
Optional<AlertEvent> existingEvent = findRelatedActiveEvent(alert);
if (existingEvent.isPresent()) {
// 聚合到已有事件
AlertEvent event = existingEvent.get();
event.getAlerts().add(alert);
// 更新最高严重级别
if (alert.getSeverity().getLevel() > event.getMaxSeverity().getLevel()) {
event.setMaxSeverity(alert.getSeverity());
}
eventRepository.save(event);
log.info("Alert {} aggregated into event {}", alert.getAlertId(), event.getEventId());
return event;
} else {
// 创建新事件
AlertEvent newEvent = createNewEvent(alert);
activeEvents.put(newEvent.getEventId(), newEvent);
eventRepository.save(newEvent);
// 异步触发根因分析
triggerRootCauseAnalysis(newEvent);
log.info("New event {} created for alert {}", newEvent.getEventId(), alert.getAlertId());
return newEvent;
}
}
/**
* 使用向量相似度找到相关的活跃事件
*/
private Optional<AlertEvent> findRelatedActiveEvent(Alert alert) {
// 构建告警的语义表示
String alertDescription = String.format(
"Service: %s, Alert: %s, Metric: %s=%.2f, Severity: %s",
alert.getService(), alert.getAlertName(),
alert.getMetric(), alert.getMetricValue(), alert.getSeverity()
);
Instant windowStart = Instant.now().minus(AGGREGATION_WINDOW);
// 查找时间窗口内的活跃事件
List<AlertEvent> recentEvents = activeEvents.values().stream()
.filter(e -> e.getStatus() == AlertEvent.EventStatus.ACTIVE)
.filter(e -> e.getCreatedAt().isAfter(windowStart))
.collect(Collectors.toList());
if (recentEvents.isEmpty()) return Optional.empty();
// 使用LLM判断是否相关(也可以用向量相似度,取决于性能要求)
return findRelatedEventByLLM(alert, recentEvents);
}
private Optional<AlertEvent> findRelatedEventByLLM(Alert alert, List<AlertEvent> candidates) {
if (candidates.isEmpty()) return Optional.empty();
String prompt = buildAggregationPrompt(alert, candidates);
String response = chatClient.prompt()
.system("""
你是一个AIOps专家,负责判断告警之间的关联性。
给定一个新告警和一组候选事件,判断新告警是否与某个候选事件相关(共享同一个根因)。
只需返回JSON,格式:{"related": true/false, "eventId": "xxx", "reason": "理由"}
如果不相关,eventId返回null。
""")
.user(prompt)
.call()
.content();
try {
JsonNode result = objectMapper.readTree(response);
if (result.path("related").asBoolean() && !result.path("eventId").isNull()) {
String eventId = result.path("eventId").asText();
return candidates.stream()
.filter(e -> e.getEventId().equals(eventId))
.findFirst();
}
} catch (Exception e) {
log.warn("Failed to parse LLM aggregation response: {}", response);
}
return Optional.empty();
}
private String buildAggregationPrompt(Alert alert, List<AlertEvent> candidates) {
StringBuilder sb = new StringBuilder();
sb.append("新告警:\n");
sb.append(String.format("- 服务: %s\n", alert.getService()));
sb.append(String.format("- 告警名称: %s\n", alert.getAlertName()));
sb.append(String.format("- 指标: %s = %.2f\n", alert.getMetric(), alert.getMetricValue()));
sb.append(String.format("- 严重级别: %s\n\n", alert.getSeverity()));
sb.append("候选事件:\n");
for (AlertEvent event : candidates) {
sb.append(String.format("EventID: %s\n", event.getEventId()));
sb.append(String.format(" 标题: %s\n", event.getEventTitle()));
sb.append(String.format(" 涉及服务: %s\n", event.getAffectedServices()));
sb.append(String.format(" 已包含告警数: %d\n", event.getAlerts().size()));
sb.append("\n");
}
return sb.toString();
}
private AlertEvent createNewEvent(Alert alert) {
return AlertEvent.builder()
.eventId(UUID.randomUUID().toString())
.eventTitle(generateEventTitle(alert))
.alerts(new ArrayList<>(List.of(alert)))
.maxSeverity(alert.getSeverity())
.affectedServices(new ArrayList<>(List.of(alert.getService())))
.status(AlertEvent.EventStatus.ACTIVE)
.createdAt(Instant.now())
.build();
}
private String generateEventTitle(Alert alert) {
// 快速生成标题,不走LLM避免延迟
return String.format("[%s] %s - %s",
alert.getSeverity(), alert.getService(), alert.getAlertName());
}
private void triggerRootCauseAnalysis(AlertEvent event) {
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(30000); // 等待30秒,收集更多相关告警
rootCauseAnalysisAgent.analyze(event);
} catch (Exception e) {
log.error("RCA failed for event {}", event.getEventId(), e);
}
});
}
}三、根因分析Agent:自动定位问题
3.1 Spring AI Agent实现根因分析
@Component
@RequiredArgsConstructor
@Slf4j
public class RootCauseAnalysisAgent {
private final ChatClient chatClient;
private final PrometheusQueryService prometheusService;
private final ElasticsearchLogService logService;
private final ChangeRecordService changeService;
private final OpsKnowledgeBase knowledgeBase;
// Agent工具集
private final List<ToolCallback> rcaTools;
@PostConstruct
public void initTools() {
rcaTools = List.of(
ToolCallbacks.from(prometheusService),
ToolCallbacks.from(logService),
ToolCallbacks.from(changeService),
ToolCallbacks.from(knowledgeBase)
);
}
/**
* 对告警事件进行根因分析
*/
public RcaResult analyze(AlertEvent event) {
log.info("Starting RCA for event: {}", event.getEventId());
String initialContext = buildRcaContext(event);
ChatResponse response = chatClient.prompt()
.system(buildRcaSystemPrompt())
.user(initialContext)
.tools(rcaTools)
.options(OpenAiChatOptions.builder()
.model("gpt-4o") // RCA用强模型
.temperature(0.1) // 低温度,确保推理一致性
.maxTokens(4096)
.build())
.call()
.chatResponse();
String rcaAnalysis = response.getResult().getOutput().getText();
// 解析分析结果
RcaResult result = parseRcaResult(rcaAnalysis, event);
// 更新事件
event.setRootCauseAnalysis(result.getRootCause());
event.setSuggestedAction(result.getSuggestedAction());
return result;
}
private String buildRcaSystemPrompt() {
return """
你是一个经验丰富的SRE工程师,负责对生产环境的告警事件进行根因分析。
你有以下工具可以使用:
- queryPrometheusMetrics:查询指标数据,支持PromQL
- searchLogs:搜索ELK日志,支持关键词和时间范围
- getRecentChanges:查询最近24小时的变更记录(部署/配置变更)
- searchKnowledgeBase:搜索历史故障知识库
分析步骤:
1. 理解当前告警的上下文(服务、指标、时间)
2. 查询相关指标的时序数据,找到异常开始时间点
3. 在异常时间点前后搜索相关日志,找到错误信息
4. 检查是否有近期变更与异常时间点重合
5. 在知识库中搜索类似历史故障
6. 综合分析,给出根因判断和置信度
输出格式(JSON):
{
"rootCause": "根因描述",
"confidence": 0.85,
"evidence": ["证据1", "证据2"],
"relatedChange": "变更ID或null",
"suggestedAction": "建议处置方案",
"autoFixable": true/false,
"autoFixCommand": "自动修复命令或null"
}
""";
}
private String buildRcaContext(AlertEvent event) {
return String.format("""
告警事件分析请求:
事件ID:%s
最高级别:%s
触发时间:%s
涉及服务:%s
包含的告警(共%d条):
%s
请开始根因分析。
""",
event.getEventId(),
event.getMaxSeverity(),
event.getCreatedAt(),
String.join(", ", event.getAffectedServices()),
event.getAlerts().size(),
formatAlertsList(event.getAlerts())
);
}
private String formatAlertsList(List<Alert> alerts) {
return alerts.stream()
.map(a -> String.format(" - [%s] %s: %s=%.2f (阈值:%.2f)",
a.getSeverity(), a.getAlertName(), a.getMetric(),
a.getMetricValue(), a.getThreshold()))
.collect(Collectors.joining("\n"));
}
}3.2 Agent工具:Prometheus查询
@Service
@Slf4j
public class PrometheusQueryService {
private final RestTemplate restTemplate;
@Value("${prometheus.url}")
private String prometheusUrl;
@Tool(description = """
查询Prometheus指标数据。
参数:
- query: PromQL查询语句
- startTime: 开始时间(ISO格式)
- endTime: 结束时间(ISO格式)
- step: 采样间隔(如 30s, 1m, 5m)
""")
public String queryPrometheusMetrics(
@ToolParam("query") String query,
@ToolParam("startTime") String startTime,
@ToolParam("endTime") String endTime,
@ToolParam("step") String step) {
try {
String url = String.format(
"%s/api/v1/query_range?query=%s&start=%s&end=%s&step=%s",
prometheusUrl, URLEncoder.encode(query, StandardCharsets.UTF_8),
startTime, endTime, step
);
ResponseEntity<Map> response = restTemplate.getForEntity(url, Map.class);
if (response.getStatusCode().is2xxSuccessful() && response.getBody() != null) {
// 简化结果,返回关键数据点
return formatPrometheusResult(response.getBody(), query);
}
return "查询失败:HTTP " + response.getStatusCode();
} catch (Exception e) {
log.error("Prometheus query failed: {}", query, e);
return "查询异常:" + e.getMessage();
}
}
private String formatPrometheusResult(Map<String, Object> result, String query) {
// 提取时序数据,计算关键统计
try {
Map<String, Object> data = (Map<String, Object>) result.get("data");
List<Map<String, Object>> results = (List<Map<String, Object>>) data.get("result");
if (results == null || results.isEmpty()) {
return "查询无数据:" + query;
}
StringBuilder sb = new StringBuilder();
sb.append(String.format("查询:%s\n数据点数:%d\n", query, results.size()));
for (Map<String, Object> series : results.subList(0, Math.min(3, results.size()))) {
Map<String, String> metric = (Map<String, String>) series.get("metric");
List<List<Object>> values = (List<List<Object>>) series.get("values");
if (values != null && !values.isEmpty()) {
DoubleSummaryStatistics stats = values.stream()
.mapToDouble(v -> Double.parseDouble(v.get(1).toString()))
.summaryStatistics();
sb.append(String.format(" 序列标签:%s\n", metric));
sb.append(String.format(" 最大值:%.2f, 最小值:%.2f, 平均值:%.2f\n",
stats.getMax(), stats.getMin(), stats.getAverage()));
// 输出最后5个数据点
sb.append(" 最新数据:");
values.stream().skip(Math.max(0, values.size() - 5)).forEach(v ->
sb.append(String.format("[%s:%.2f] ", v.get(0),
Double.parseDouble(v.get(1).toString()))));
sb.append("\n");
}
}
return sb.toString();
} catch (Exception e) {
return "结果解析失败:" + e.getMessage();
}
}
}3.3 Agent工具:日志搜索
@Service
@Slf4j
public class ElasticsearchLogService {
private final ElasticsearchClient elasticsearchClient;
@Tool(description = """
搜索ELK日志。
参数:
- keywords: 搜索关键词(支持AND/OR)
- services: 服务名列表,逗号分隔
- startTime: 开始时间(ISO格式)
- endTime: 结束时间(ISO格式)
- level: 日志级别(ERROR/WARN/INFO,可为空)
- limit: 返回条数(默认20)
""")
public String searchLogs(
@ToolParam("keywords") String keywords,
@ToolParam("services") String services,
@ToolParam("startTime") String startTime,
@ToolParam("endTime") String endTime,
@ToolParam(value = "level", required = false) String level,
@ToolParam(value = "limit", required = false) Integer limit) {
try {
int maxResults = limit != null ? limit : 20;
SearchRequest.Builder builder = new SearchRequest.Builder()
.index("logs-*")
.size(maxResults);
// 构建查询条件
BoolQuery.Builder boolQuery = new BoolQuery.Builder();
// 关键词搜索
if (StringUtils.hasText(keywords)) {
boolQuery.must(q -> q.queryString(qs -> qs
.query(keywords)
.fields("message", "exception", "stack_trace")
));
}
// 服务过滤
if (StringUtils.hasText(services)) {
List<String> serviceList = Arrays.asList(services.split(","));
boolQuery.filter(q -> q.terms(t -> t
.field("service.keyword")
.terms(tv -> tv.value(serviceList.stream()
.map(FieldValue::of).collect(Collectors.toList())))
));
}
// 时间范围
boolQuery.filter(q -> q.range(r -> r
.field("@timestamp")
.from(startTime)
.to(endTime)
));
// 日志级别
if (StringUtils.hasText(level)) {
boolQuery.filter(q -> q.term(t -> t
.field("level.keyword")
.value(level)
));
}
SearchResponse<Map> response = elasticsearchClient.search(
builder.query(q -> q.bool(boolQuery.build())).build(),
Map.class
);
return formatLogResults(response);
} catch (Exception e) {
log.error("Log search failed", e);
return "日志搜索失败:" + e.getMessage();
}
}
private String formatLogResults(SearchResponse<Map> response) {
StringBuilder sb = new StringBuilder();
List<Hit<Map>> hits = response.hits().hits();
sb.append(String.format("找到 %d 条日志:\n", hits.size()));
for (Hit<Map> hit : hits) {
Map<String, Object> source = hit.source();
if (source != null) {
sb.append(String.format("[%s] [%s] [%s] %s\n",
source.getOrDefault("@timestamp", ""),
source.getOrDefault("level", ""),
source.getOrDefault("service", ""),
source.getOrDefault("message", "")
));
Object exception = source.get("exception");
if (exception != null) {
sb.append(String.format(" 异常: %s\n", exception));
}
}
}
return sb.toString();
}
}四、自动修复执行器
4.1 自动修复策略注册表
@Service
@RequiredArgsConstructor
@Slf4j
public class AutoRemediationService {
private final KubernetesClient k8sClient;
private final ShellExecutor shellExecutor;
private final AlertEventRepository eventRepository;
private final AuditLogService auditLog;
// 修复策略:根因类型 -> 修复Action
private final Map<String, RemediationAction> strategyRegistry = new HashMap<>();
@PostConstruct
public void registerStrategies() {
// 内存溢出:重启Pod
strategyRegistry.put("OOM_KILLED", this::restartPod);
// 磁盘满:清理日志
strategyRegistry.put("DISK_FULL", this::cleanDiskSpace);
// CPU过高(GC问题):主动触发GC
strategyRegistry.put("HIGH_CPU_GC", this::triggerGcAndAdjustHeap);
// 连接池耗尽:动态扩容连接池
strategyRegistry.put("CONNECTION_POOL_EXHAUSTED", this::expandConnectionPool);
// 副本不足:扩容Deployment
strategyRegistry.put("INSUFFICIENT_REPLICAS", this::scaleDeployment);
}
/**
* 执行自动修复
* 包含安全检查:只有置信度高且风险低的情况才自动执行
*/
public RemediationResult executeRemediation(AlertEvent event, RcaResult rcaResult) {
if (!rcaResult.isAutoFixable()) {
return RemediationResult.skipped("RCA判断不可自动修复");
}
if (rcaResult.getConfidence() < 0.85) {
return RemediationResult.skipped(
String.format("置信度不足(%.0f%%),需要人工确认", rcaResult.getConfidence() * 100));
}
// 高风险操作必须人工确认
String rootCauseType = rcaResult.getRootCauseType();
if (isHighRiskOperation(rootCauseType)) {
notifyForManualApproval(event, rcaResult);
return RemediationResult.pendingApproval("高风险操作,等待人工审批");
}
RemediationAction action = strategyRegistry.get(rootCauseType);
if (action == null) {
return RemediationResult.skipped("未找到对应的修复策略:" + rootCauseType);
}
// 执行修复
try {
log.info("Executing auto-remediation for event {}, rootCause: {}",
event.getEventId(), rootCauseType);
// 记录审计日志
auditLog.record(AuditEvent.builder()
.action("AUTO_REMEDIATION")
.eventId(event.getEventId())
.rootCause(rootCauseType)
.operator("AIOps-System")
.build());
RemediationResult result = action.execute(event, rcaResult);
// 更新事件状态
if (result.isSuccess()) {
event.setStatus(AlertEvent.EventStatus.ACKNOWLEDGED);
eventRepository.save(event);
}
return result;
} catch (Exception e) {
log.error("Auto-remediation failed for event {}", event.getEventId(), e);
return RemediationResult.failed("执行失败:" + e.getMessage());
}
}
private RemediationResult restartPod(AlertEvent event, RcaResult rcaResult) {
String service = event.getAffectedServices().get(0);
String namespace = getNamespace(service);
log.info("Restarting pods for service: {} in namespace: {}", service, namespace);
// 滚动重启(不中断服务)
k8sClient.apps().deployments()
.inNamespace(namespace)
.withName(service)
.rolling()
.restart();
return RemediationResult.success(
String.format("已触发 %s 滚动重启,预计2分钟内完成", service));
}
private RemediationResult cleanDiskSpace(AlertEvent event, RcaResult rcaResult) {
String host = event.getAlerts().get(0).getHost();
// 只清理日志文件,不删除应用数据
String cleanCommand = String.format(
"find /var/log -name '*.log' -mtime +7 -exec rm -f {} \\; && " +
"find /var/log -name '*.log.*' -mtime +3 -exec rm -f {} \\;", host
);
ShellResult result = shellExecutor.execute(host, cleanCommand, Duration.ofMinutes(5));
if (result.isSuccess()) {
return RemediationResult.success(
String.format("已清理 %s 上的过期日志,释放空间:%s", host, result.getOutput()));
} else {
return RemediationResult.failed("清理失败:" + result.getError());
}
}
private RemediationResult triggerGcAndAdjustHeap(AlertEvent event, RcaResult rcaResult) {
// 通过JMX触发GC
String service = event.getAffectedServices().get(0);
// 这里简化处理,实际需要通过Service Mesh或Agent发送JMX指令
log.info("Triggering GC for service: {}", service);
return RemediationResult.success(
String.format("已向 %s 发送GC请求,并临时扩大堆内存上限20%%", service));
}
private RemediationResult scaleDeployment(AlertEvent event, RcaResult rcaResult) {
String service = event.getAffectedServices().get(0);
String namespace = getNamespace(service);
// 获取当前副本数
Deployment deployment = k8sClient.apps().deployments()
.inNamespace(namespace)
.withName(service)
.get();
if (deployment == null) {
return RemediationResult.failed("未找到Deployment:" + service);
}
int currentReplicas = deployment.getSpec().getReplicas();
int targetReplicas = Math.min(currentReplicas * 2, 20); // 最多扩容到20
k8sClient.apps().deployments()
.inNamespace(namespace)
.withName(service)
.scale(targetReplicas);
return RemediationResult.success(
String.format("已将 %s 从 %d 副本扩容到 %d 副本",
service, currentReplicas, targetReplicas));
}
private boolean isHighRiskOperation(String rootCauseType) {
return Set.of("DATA_CORRUPTION", "NETWORK_PARTITION", "DATABASE_FAILOVER")
.contains(rootCauseType);
}
@FunctionalInterface
interface RemediationAction {
RemediationResult execute(AlertEvent event, RcaResult rcaResult);
}
@Data
@Builder
public static class RemediationResult {
private boolean success;
private String message;
private RemediationStatus status;
public enum RemediationStatus {
SUCCESS, FAILED, SKIPPED, PENDING_APPROVAL
}
public static RemediationResult success(String msg) {
return RemediationResult.builder()
.success(true).message(msg)
.status(RemediationStatus.SUCCESS).build();
}
public static RemediationResult failed(String msg) {
return RemediationResult.builder()
.success(false).message(msg)
.status(RemediationStatus.FAILED).build();
}
public static RemediationResult skipped(String msg) {
return RemediationResult.builder()
.success(false).message(msg)
.status(RemediationStatus.SKIPPED).build();
}
public static RemediationResult pendingApproval(String msg) {
return RemediationResult.builder()
.success(false).message(msg)
.status(RemediationStatus.PENDING_APPROVAL).build();
}
}
}五、运维知识库:RAG实现历史经验复用
5.1 故障知识库构建
@Service
@RequiredArgsConstructor
@Slf4j
public class OpsKnowledgeBase {
private final VectorStore vectorStore;
private final EmbeddingModel embeddingModel;
private final IncidentRepository incidentRepository;
/**
* 将历史故障记录入库(每次故障复盘后执行)
*/
public void indexIncident(IncidentRecord incident) {
// 构建知识条目
String content = String.format("""
故障时间:%s
故障服务:%s
故障现象:%s
根本原因:%s
处置步骤:%s
预防措施:%s
影响时长:%d分钟
Tags:%s
""",
incident.getOccurredAt(),
incident.getAffectedServices(),
incident.getSymptoms(),
incident.getRootCause(),
incident.getRemediationSteps(),
incident.getPreventionMeasures(),
incident.getDurationMinutes(),
String.join(",", incident.getTags())
);
Document doc = new Document(content, Map.of(
"incident_id", incident.getId(),
"services", incident.getAffectedServices(),
"root_cause_type", incident.getRootCauseType(),
"occurred_at", incident.getOccurredAt().toString()
));
vectorStore.add(List.of(doc));
log.info("Indexed incident {} into knowledge base", incident.getId());
}
@Tool(description = """
在历史故障知识库中搜索相似案例。
参数:
- symptoms: 故障现象描述
- services: 相关服务名,逗号分隔
- limit: 返回案例数(默认5)
""")
public String searchKnowledgeBase(
@ToolParam("symptoms") String symptoms,
@ToolParam(value = "services", required = false) String services,
@ToolParam(value = "limit", required = false) Integer limit) {
int maxResults = limit != null ? limit : 5;
SearchRequest searchRequest = SearchRequest.query(symptoms)
.withTopK(maxResults)
.withSimilarityThreshold(0.7);
// 如果有服务过滤,添加metadata过滤
if (StringUtils.hasText(services)) {
// 注意:这里的过滤语法取决于具体的VectorStore实现
searchRequest = searchRequest.withFilterExpression(
"services IN ['" + services.replace(",", "','") + "']"
);
}
List<Document> results = vectorStore.similaritySearch(searchRequest);
if (results.isEmpty()) {
return "未找到相似历史故障案例";
}
StringBuilder sb = new StringBuilder();
sb.append(String.format("找到 %d 个相似历史故障:\n\n", results.size()));
for (int i = 0; i < results.size(); i++) {
sb.append(String.format("案例 %d:\n%s\n---\n", i + 1, results.get(i).getText()));
}
return sb.toString();
}
/**
* 批量导入历史故障(初始化知识库)
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点更新
public void syncIncidentsToKnowledgeBase() {
Instant lastWeek = Instant.now().minus(Duration.ofDays(7));
List<IncidentRecord> recentIncidents = incidentRepository
.findByResolvedAtAfterAndIndexedFalse(lastWeek);
log.info("Syncing {} incidents to knowledge base", recentIncidents.size());
for (IncidentRecord incident : recentIncidents) {
try {
indexIncident(incident);
incident.setIndexed(true);
incidentRepository.save(incident);
} catch (Exception e) {
log.error("Failed to index incident {}", incident.getId(), e);
}
}
}
}六、值班机器人:AI接管一线告警
6.1 钉钉/飞书Webhook集成
@RestController
@RequestMapping("/webhook/aiops")
@RequiredArgsConstructor
@Slf4j
public class AiOpsWebhookController {
private final AlertAggregationEngine aggregationEngine;
private final RootCauseAnalysisAgent rcaAgent;
private final AutoRemediationService remediationService;
private final DingTalkNotificationService dingTalkService;
/**
* 接收Alertmanager的告警推送
*/
@PostMapping("/alerts")
public ResponseEntity<Void> receiveAlerts(
@RequestBody AlertmanagerPayload payload) {
for (AlertmanagerAlert amAlert : payload.getAlerts()) {
Alert alert = convertToAlert(amAlert);
// 聚合告警
AlertEvent event = aggregationEngine.processAlert(alert);
// 如果是新事件,立即通知
if (event.getAlerts().size() == 1) {
sendInitialNotification(event);
}
}
return ResponseEntity.ok().build();
}
private void sendInitialNotification(AlertEvent event) {
String message = String.format("""
🚨 **新告警事件**
**事件ID:** %s
**级别:** %s
**影响服务:** %s
**触发时间:** %s
AI正在分析根因,请稍候...
""",
event.getEventId(),
event.getMaxSeverity(),
String.join(", ", event.getAffectedServices()),
event.getCreatedAt().atZone(ZoneId.of("Asia/Shanghai"))
.format(DateTimeFormatter.ofPattern("HH:mm:ss"))
);
dingTalkService.sendMarkdown("告警通知", message, getOnCallGroup());
}
/**
* RCA完成后发送分析结果
*/
public void sendRcaResult(AlertEvent event, RcaResult rcaResult,
RemediationResult remediationResult) {
String statusEmoji = remediationResult.getStatus() ==
AutoRemediationService.RemediationResult.RemediationStatus.SUCCESS ? "✅" : "⚠️";
String message = String.format("""
%s **根因分析完成**
**事件ID:** %s
**根因:** %s
**置信度:** %.0f%%
**证据:**
%s
**AI处置结果:** %s
**建议下一步:** %s
""",
statusEmoji,
event.getEventId(),
rcaResult.getRootCause(),
rcaResult.getConfidence() * 100,
String.join("\n", rcaResult.getEvidence().stream()
.map(e -> "• " + e).collect(Collectors.toList())),
remediationResult.getMessage(),
rcaResult.getSuggestedAction()
);
dingTalkService.sendMarkdown("根因分析报告", message, getOnCallGroup());
}
}七、容量预测:基于历史数据预测资源需求
7.1 时序预测服务
@Service
@RequiredArgsConstructor
@Slf4j
public class CapacityPredictionService {
private final PrometheusQueryService prometheusService;
private final ChatClient chatClient;
/**
* 预测未来N天的资源需求
* 使用AI辅助分析+简单线性外推的组合方案
*/
public CapacityForecast predictCapacity(String service, String metric, int forecastDays) {
// 获取过去90天的历史数据
String query = String.format(
"avg_over_time(%s{service=\"%s\"}[1h])", metric, service);
Instant now = Instant.now();
String historicalData = prometheusService.queryPrometheusMetrics(
query,
now.minus(Duration.ofDays(90)).toString(),
now.toString(),
"1h"
);
// 用AI分析趋势并生成预测
String forecastResult = chatClient.prompt()
.system("""
你是一个容量规划专家。基于提供的历史指标数据,
分析趋势(增长/稳定/季节性波动),并预测未来的资源需求。
输出JSON格式:
{
"trend": "GROWING/STABLE/DECLINING/SEASONAL",
"growthRatePerDay": 0.02,
"currentBaseline": 100.0,
"predictions": [
{"day": 7, "predicted": 114.0, "confidence": "HIGH"},
{"day": 14, "predicted": 128.0, "confidence": "MEDIUM"},
{"day": 30, "predicted": 160.0, "confidence": "LOW"}
],
"alertThreshold": 150.0,
"recommendation": "建议在7天内扩容,预计第12天达到告警阈值"
}
""")
.user(String.format("""
服务:%s
指标:%s
预测天数:%d
历史数据:
%s
""", service, metric, forecastDays, historicalData))
.call()
.content();
return parseCapacityForecast(forecastResult, service, metric);
}
}八、变更影响分析
8.1 部署前AI风险评估
@Service
@RequiredArgsConstructor
@Slf4j
public class ChangeImpactAnalysisService {
private final ChatClient chatClient;
private final GitService gitService;
private final IncidentHistoryService incidentHistory;
private final DependencyGraphService dependencyGraph;
/**
* 分析代码变更的潜在影响
*/
public ChangeImpactReport analyzeChange(ChangeRequest changeRequest) {
// 1. 获取代码差异
String codeDiff = gitService.getDiff(
changeRequest.getRepo(),
changeRequest.getBaseCommit(),
changeRequest.getHeadCommit()
);
// 2. 获取依赖图
List<String> dependentServices = dependencyGraph
.getDependents(changeRequest.getService());
// 3. 查询历史:该服务过去的变更是否引发过故障
List<IncidentRecord> historicalIncidents = incidentHistory
.findByServiceAndCause(changeRequest.getService(), "DEPLOYMENT");
// 4. AI分析
String analysisPrompt = buildImpactAnalysisPrompt(
changeRequest, codeDiff, dependentServices, historicalIncidents);
String analysis = chatClient.prompt()
.system("""
你是一个代码审查和风险评估专家。
分析给定的代码变更,评估:
1. 变更涉及的核心逻辑(接口变更/数据库Schema变更/配置变更等)
2. 潜在的风险点(性能影响/兼容性/数据完整性)
3. 影响范围(直接影响服务+间接影响服务)
4. 建议的发布策略(金丝雀/蓝绿/直接发布)
5. 需要特别监控的指标
输出JSON格式。风险级别:LOW/MEDIUM/HIGH/CRITICAL
""")
.user(analysisPrompt)
.call()
.content();
return parseImpactReport(analysis, changeRequest);
}
}九、效果度量:AIOps实施前后对比
量化数据汇总:
| 指标 | 实施前 | 实施后 | 改善幅度 |
|---|---|---|---|
| 每日人工处理告警数 | 50条 | 8条 | -84% |
| 平均恢复时间(MTTR) | 45分钟 | 12分钟 | -73% |
| 凌晨告警(23:00-8:00) | 4次/周 | 0.5次/周 | -88% |
| 告警误报率 | 65% | 8% | -88% |
| 运维工程师有效工时占比 | 20% | 75% | +275% |
| 月均故障次数 | 12次 | 7次 | -42% |
王鑫的评价:"现在我终于有时间做架构优化了,而不是每天救火。那些原本花在告警上的时间,我用来重构了3个核心服务,性能提升了40%。"
FAQ
Q1:AIOps会不会误操作?比如删了不该删的服务?
A:安全机制是最关键的。系统严格限制了自动修复的操作范围:只允许重启、扩容、清理日志这类可逆操作;所有操作都有审计日志;置信度低于85%的分析结果必须人工确认;数据库操作和配置变更永远需要人工批准。
Q2:根因分析的准确率有多高?
A:基于历史6个月的数据,在已见过的故障类型上准确率约78%;在新型故障上约45%。随着知识库的丰富,准确率持续提升。更重要的是,即使分析不完全准确,AI也能把50条告警聚合为1-3个事件,大幅降低噪音。
Q3:如何避免AI把不相关的告警错误聚合?
A:聚合策略使用了双重验证:时间窗口(5分钟内)+LLM语义判断。LLM的判断准确率约92%,剩余8%的误聚合在故障复盘时会被发现,可以手动拆分,并作为负样本优化后续的聚合逻辑。
Q4:知识库的内容如何保持更新?
A:每次故障复盘后,SRE工程师填写结构化的故障报告,由系统自动向量化入库。另外,每次AI自动修复成功的案例也会自动加入知识库,形成正向循环。
Q5:这套系统对LLM的调用量大不大?成本如何?
A:每次告警聚合平均0.5次LLM调用,每次RCA约3-8次(含工具调用)。按50条/天告警量计算,每天约150次LLM调用。使用GPT-4o-mini做聚合判断,GPT-4o做RCA,月均成本约$200-300。相比人工运维的工时节省,ROI远超10倍。
