Agent行为监控:构建AI行为审计与异常检测系统
Agent行为监控:构建AI行为审计与异常检测系统
开篇故事:每隔几小时烧掉300美元
老陈是某金融科技公司的资深Java工程师,工作5年,负责公司的智能客服系统。
他们的客服Agent集成了十几个工具:查账单、查余额、查还款记录、查征信……每次客户咨询,Agent会调用几个工具拼出完整答复。
系统上线后运行得不错,客户满意度从 72% 涨到 89%。
然后,有一天,财务找来了。
"这个月AI费用 $4,200,为什么比上个月多了 $2,800?"
老陈打开账单,发现:某一天下午3点到5点,Token消耗是平常的17倍。
他去查日志,找了半个多小时才定位到问题:
有一个客户的咨询触发了Agent的一个bug——Agent调用"查还款记录"工具返回了空列表,然后它重新调用"查还款记录",还是空列表,然后再调用,再调用……在2小时内,同一个工具被调用了 847 次,用完了 3.2 万个Token。
更让老陈崩溃的是:他翻了那个客户的记录,发现早在循环开始后的第37次调用,Agent其实已经给用户回复了"暂时查不到记录,请稍后重试",但由于bug,后台仍在不停调用。
这是一个典型的"Agent已完成任务但仍在执行"的死循环。
老陈花了半天写了个临时脚本,分析历史日志,找出所有可能的异常pattern。然后又花了1天给每个工具加了计数器。
但他心里清楚:这都是补丁,真正需要的是一套完整的Agent行为监控系统。
1. 需要监控的Agent行为指标
要监控Agent行为,首先要定义"需要监控什么":
2. 项目结构与依赖
2.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.0</version>
</parent>
<groupId>com.laozhang</groupId>
<artifactId>agent-monitoring</artifactId>
<version>1.0.0</version>
<properties>
<java.version>21</java.version>
<spring-ai.version>1.0.0</spring-ai.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Spring AI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>${spring-ai.version}</version>
</dependency>
<!-- 监控:Micrometer + Prometheus -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- 告警:钉钉/企微通知 -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- 工具 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
</project>2.2 application.yml
spring:
application:
name: agent-monitoring
datasource:
url: jdbc:mysql://localhost:3306/agent_audit_db?useUnicode=true&characterEncoding=utf8
username: root
password: your_password
hikari:
maximum-pool-size: 20
data:
redis:
host: localhost
port: 6379
timeout: 3000ms
ai:
openai:
api-key: ${OPENAI_API_KEY}
chat:
options:
model: gpt-4o
temperature: 0.1
# Agent监控配置
agent:
monitoring:
# 工具重复调用阈值(同一工具+参数,N次以内算正常)
tool-duplicate-threshold: 3
# 单任务最大工具调用次数
max-tool-calls-per-task: 50
# 单任务最大LLM调用次数(防止推理循环)
max-llm-calls-per-task: 20
# 单任务最大Token消耗(超过则告警)
max-tokens-per-task: 100000
# 单任务最大执行时间(分钟)
max-execution-minutes: 10
alert:
# 钉钉告警webhook
dingtalk-webhook: ${DINGTALK_WEBHOOK:}
# 告警冷却时间(分钟,防止告警风暴)
alert-cooldown-minutes: 5
management:
endpoints:
web:
exposure:
include: health,prometheus,metrics
metrics:
export:
prometheus:
enabled: true3. Agent执行轨迹记录
3.1 轨迹数据模型
package com.laozhang.monitor.model;
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* Agent执行轨迹记录(持久化到MySQL,用于审计)
* 每次工具调用、LLM调用都记录一条
*/
@Data
@Entity
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "agent_trace",
indexes = {
@Index(name = "idx_session_id", columnList = "session_id"),
@Index(name = "idx_created_at", columnList = "created_at"),
@Index(name = "idx_trace_type", columnList = "trace_type")
})
public class AgentTrace {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private String id;
/** 会话ID(一次用户交互的全程) */
@Column(name = "session_id", nullable = false)
private String sessionId;
/** 任务ID(一个具体的Agent任务) */
@Column(name = "task_id", nullable = false)
private String taskId;
/** 用户ID */
@Column(name = "user_id")
private String userId;
/** 轨迹类型 */
@Enumerated(EnumType.STRING)
@Column(name = "trace_type", nullable = false)
private TraceType traceType;
/** LLM调用的输入(Prompt,截取前2000字符) */
@Column(name = "llm_input", columnDefinition = "TEXT")
private String llmInput;
/** LLM调用的输出(截取前2000字符) */
@Column(name = "llm_output", columnDefinition = "TEXT")
private String llmOutput;
/** 工具名称 */
@Column(name = "tool_name")
private String toolName;
/** 工具调用参数(JSON) */
@Column(name = "tool_params", columnDefinition = "TEXT")
private String toolParams;
/** 工具调用结果(JSON,截取前2000字符) */
@Column(name = "tool_result", columnDefinition = "TEXT")
private String toolResult;
/** 调用是否成功 */
@Column(name = "success")
private Boolean success;
/** 错误信息 */
@Column(name = "error_message", columnDefinition = "TEXT")
private String errorMessage;
/** 输入Token数 */
@Column(name = "input_tokens")
private Integer inputTokens;
/** 输出Token数 */
@Column(name = "output_tokens")
private Integer outputTokens;
/** 执行耗时(毫秒) */
@Column(name = "duration_ms")
private Long durationMs;
/** 当前执行步骤序号 */
@Column(name = "step_index")
private Integer stepIndex;
/** 创建时间 */
@Column(name = "created_at", nullable = false)
private LocalDateTime createdAt;
public enum TraceType {
LLM_CALL, // LLM推理调用
TOOL_CALL, // 工具调用
TOOL_RESULT, // 工具返回结果
DECISION, // Agent决策点
TASK_START, // 任务开始
TASK_END // 任务结束
}
}3.2 轨迹记录服务
package com.laozhang.monitor.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.laozhang.monitor.model.AgentTrace;
import com.laozhang.monitor.model.AgentTrace.TraceType;
import com.laozhang.monitor.repository.AgentTraceRepository;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* Agent轨迹记录服务
* 记录Agent执行过程中的每个关键动作
* 使用@Async异步写入,不阻塞Agent执行主流程
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class AgentTraceService {
private final AgentTraceRepository traceRepository;
private final MeterRegistry meterRegistry;
private final ObjectMapper objectMapper;
/**
* 记录LLM调用
*/
@Async
public void recordLlmCall(
String sessionId, String taskId, String userId,
String prompt, String response,
int inputTokens, int outputTokens,
long durationMs, int stepIndex) {
try {
// 截断过长的内容(避免数据库字段溢出)
String truncatedPrompt = truncate(prompt, 2000);
String truncatedResponse = truncate(response, 2000);
AgentTrace trace = AgentTrace.builder()
.sessionId(sessionId)
.taskId(taskId)
.userId(userId)
.traceType(TraceType.LLM_CALL)
.llmInput(truncatedPrompt)
.llmOutput(truncatedResponse)
.inputTokens(inputTokens)
.outputTokens(outputTokens)
.durationMs(durationMs)
.stepIndex(stepIndex)
.success(true)
.createdAt(LocalDateTime.now())
.build();
traceRepository.save(trace);
// 更新Prometheus指标
Counter.builder("agent.llm.calls")
.tag("task_id", maskTaskId(taskId))
.register(meterRegistry)
.increment();
Timer.builder("agent.llm.duration")
.register(meterRegistry)
.record(durationMs, TimeUnit.MILLISECONDS);
Counter.builder("agent.tokens.total")
.tag("type", "input")
.register(meterRegistry)
.increment(inputTokens);
Counter.builder("agent.tokens.total")
.tag("type", "output")
.register(meterRegistry)
.increment(outputTokens);
} catch (Exception e) {
log.error("[轨迹] 记录LLM调用失败: {}", e.getMessage());
}
}
/**
* 记录工具调用
*/
@Async
public void recordToolCall(
String sessionId, String taskId, String userId,
String toolName, Object params, Object result,
boolean success, String errorMessage,
long durationMs, int stepIndex) {
try {
String paramsJson = objectMapper.writeValueAsString(params);
String resultJson = result != null
? truncate(objectMapper.writeValueAsString(result), 2000)
: null;
AgentTrace trace = AgentTrace.builder()
.sessionId(sessionId)
.taskId(taskId)
.userId(userId)
.traceType(TraceType.TOOL_CALL)
.toolName(toolName)
.toolParams(paramsJson)
.toolResult(resultJson)
.success(success)
.errorMessage(errorMessage)
.durationMs(durationMs)
.stepIndex(stepIndex)
.createdAt(LocalDateTime.now())
.build();
traceRepository.save(trace);
// Prometheus指标
Counter.builder("agent.tool.calls")
.tag("tool", toolName)
.tag("success", String.valueOf(success))
.register(meterRegistry)
.increment();
Timer.builder("agent.tool.duration")
.tag("tool", toolName)
.register(meterRegistry)
.record(durationMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("[轨迹] 记录工具调用失败: {}", e.getMessage());
}
}
private String truncate(String text, int maxLength) {
if (text == null) return null;
if (text.length() <= maxLength) return text;
return text.substring(0, maxLength) + "...[已截断]";
}
/**
* 任务ID脱敏(用于Prometheus标签,避免高基数问题)
*/
private String maskTaskId(String taskId) {
if (taskId == null || taskId.length() < 8) return "unknown";
return taskId.substring(0, 4) + "****";
}
}4. 异常检测:核心检测逻辑
package com.laozhang.monitor.detector;
import com.laozhang.monitor.model.AnomalyType;
import com.laozhang.monitor.model.TaskRuntimeStats;
import com.laozhang.monitor.service.AlertService;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Agent异常行为检测器
* 实时检测运行中Agent的异常行为
*
* 检测的异常类型:
* 1. 工具重复调用(相同工具+参数调用超过阈值)
* 2. 工具调用总次数超限(可能是推理循环)
* 3. LLM调用次数超限
* 4. Token消耗超限
* 5. 执行时间超限
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AgentAnomalyDetector {
private final AlertService alertService;
private final MeterRegistry meterRegistry;
@Value("${agent.monitoring.tool-duplicate-threshold:3}")
private int toolDuplicateThreshold;
@Value("${agent.monitoring.max-tool-calls-per-task:50}")
private int maxToolCallsPerTask;
@Value("${agent.monitoring.max-llm-calls-per-task:20}")
private int maxLlmCallsPerTask;
@Value("${agent.monitoring.max-tokens-per-task:100000}")
private int maxTokensPerTask;
@Value("${agent.monitoring.max-execution-minutes:10}")
private int maxExecutionMinutes;
// 运行时统计(内存中,不持久化,用于实时检测)
private final ConcurrentHashMap<String, TaskRuntimeStats> taskStats
= new ConcurrentHashMap<>();
/**
* 工具调用后调用此方法进行检测
* 返回是否需要熔断(true表示Agent应该停止)
*/
public boolean checkAfterToolCall(
String taskId, String toolName, String paramsHash) {
TaskRuntimeStats stats = taskStats.computeIfAbsent(taskId,
k -> new TaskRuntimeStats(taskId));
// 1. 检测工具重复调用
String callKey = toolName + ":" + paramsHash;
int duplicateCount = stats.incrementToolCallCount(callKey);
if (duplicateCount > toolDuplicateThreshold) {
String message = String.format(
"工具重复调用异常: 任务[%s] 工具[%s] 已调用 %d 次(阈值:%d)",
taskId, toolName, duplicateCount, toolDuplicateThreshold);
triggerAnomaly(taskId, AnomalyType.TOOL_LOOP, message);
return true; // 需要熔断
}
// 2. 检测工具调用总次数
int totalCalls = stats.getTotalToolCalls();
if (totalCalls > maxToolCallsPerTask) {
String message = String.format(
"工具调用超限: 任务[%s] 已调用 %d 次(最大:%d)",
taskId, totalCalls, maxToolCallsPerTask);
triggerAnomaly(taskId, AnomalyType.TOOL_OVERFLOW, message);
return true;
}
return false;
}
/**
* LLM调用后检测
*/
public boolean checkAfterLlmCall(
String taskId, int inputTokens, int outputTokens) {
TaskRuntimeStats stats = taskStats.computeIfAbsent(taskId,
k -> new TaskRuntimeStats(taskId));
// 3. 检测LLM调用次数
int llmCalls = stats.incrementLlmCalls();
if (llmCalls > maxLlmCallsPerTask) {
String message = String.format(
"LLM调用超限: 任务[%s] 已调用LLM %d 次(最大:%d)",
taskId, llmCalls, maxLlmCallsPerTask);
triggerAnomaly(taskId, AnomalyType.LLM_LOOP, message);
return true;
}
// 4. 检测Token消耗
int totalTokens = stats.addTokens(inputTokens + outputTokens);
if (totalTokens > maxTokensPerTask) {
String message = String.format(
"Token消耗超限: 任务[%s] 已消耗 %d tokens(最大:%d)," +
"估算费用: $%.2f",
taskId, totalTokens, maxTokensPerTask,
estimateCost(totalTokens));
triggerAnomaly(taskId, AnomalyType.TOKEN_OVERFLOW, message);
return true;
}
return false;
}
/**
* 定时检测执行时间(由调度任务调用)
*/
public boolean checkExecutionTime(String taskId) {
TaskRuntimeStats stats = taskStats.get(taskId);
if (stats == null) return false;
long elapsedMinutes = stats.getElapsedMinutes();
if (elapsedMinutes > maxExecutionMinutes) {
String message = String.format(
"执行超时: 任务[%s] 已运行 %d 分钟(最大:%d分钟)",
taskId, elapsedMinutes, maxExecutionMinutes);
triggerAnomaly(taskId, AnomalyType.EXECUTION_TIMEOUT, message);
return true;
}
return false;
}
private void triggerAnomaly(String taskId, AnomalyType type, String message) {
log.warn("[异常检测] {}: {}", type, message);
// Prometheus计数
Counter.builder("agent.anomaly.detected")
.tag("type", type.name())
.register(meterRegistry)
.increment();
// 发送告警
alertService.sendAlert(taskId, type, message);
}
/**
* 任务完成时清理统计数据
*/
public void cleanupTask(String taskId) {
taskStats.remove(taskId);
}
/**
* 获取任务运行时统计摘要
*/
public Map<String, Object> getTaskStats(String taskId) {
TaskRuntimeStats stats = taskStats.get(taskId);
if (stats == null) return Map.of("status", "not_found");
Map<String, Object> summary = new HashMap<>();
summary.put("taskId", taskId);
summary.put("totalToolCalls", stats.getTotalToolCalls());
summary.put("llmCalls", stats.getLlmCalls());
summary.put("totalTokens", stats.getTotalTokens());
summary.put("elapsedMinutes", stats.getElapsedMinutes());
summary.put("estimatedCostUsd", estimateCost(stats.getTotalTokens()));
return summary;
}
/**
* 估算Token费用(GPT-4o价格)
*/
private double estimateCost(int totalTokens) {
// GPT-4o: 输入 $5/1M tokens,输出 $15/1M tokens
// 简化:取平均 $10/1M tokens
return totalTokens * 10.0 / 1_000_000;
}
}4.1 运行时统计数据结构
package com.laozhang.monitor.model;
import lombok.Getter;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 任务运行时统计(内存数据结构,用于实时检测)
*/
@Getter
public class TaskRuntimeStats {
private final String taskId;
private final Instant startTime;
// 每个工具调用组合的次数(用于检测重复调用)
private final ConcurrentHashMap<String, AtomicInteger> toolCallCounts
= new ConcurrentHashMap<>();
// 总工具调用次数
private final AtomicInteger totalToolCalls = new AtomicInteger(0);
// LLM调用次数
private final AtomicInteger llmCalls = new AtomicInteger(0);
// 总Token消耗
private final AtomicInteger totalTokens = new AtomicInteger(0);
public TaskRuntimeStats(String taskId) {
this.taskId = taskId;
this.startTime = Instant.now();
}
/**
* 记录工具调用,返回当前调用次数
*/
public int incrementToolCallCount(String callKey) {
totalToolCalls.incrementAndGet();
return toolCallCounts
.computeIfAbsent(callKey, k -> new AtomicInteger(0))
.incrementAndGet();
}
public int getTotalToolCalls() {
return totalToolCalls.get();
}
public int incrementLlmCalls() {
return llmCalls.incrementAndGet();
}
public int getLlmCalls() {
return llmCalls.get();
}
public int addTokens(int count) {
return totalTokens.addAndGet(count);
}
public int getTotalTokens() {
return totalTokens.get();
}
public long getElapsedMinutes() {
return (Instant.now().toEpochMilli() - startTime.toEpochMilli()) / 60000;
}
}5. 自动熔断:检测到异常自动停止Agent
package com.laozhang.monitor.circuit;
import com.laozhang.monitor.detector.AgentAnomalyDetector;
import com.laozhang.monitor.model.AnomalyType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Agent熔断器
* 当检测到异常时,阻止Agent继续执行
*
* 使用方式:Agent执行每个步骤前调用 checkAndThrowIfCircuitOpen()
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AgentCircuitBreaker {
private final AgentAnomalyDetector anomalyDetector;
// 已触发熔断的任务ID集合
private final Set<String> openCircuits = ConcurrentHashMap.newKeySet();
/**
* 在每个步骤执行前调用
* 如果任务已触发熔断,抛出异常停止执行
*/
public void checkAndThrowIfCircuitOpen(String taskId) {
if (openCircuits.contains(taskId)) {
log.warn("[熔断器] 任务 {} 已熔断,阻止继续执行", taskId);
throw new AgentCircuitBreakerException(
"Agent已触发熔断保护,任务ID: " + taskId);
}
}
/**
* 工具调用后检测,必要时触发熔断
* 返回是否已熔断
*/
public boolean checkToolCall(String taskId, String toolName, String paramsHash) {
if (openCircuits.contains(taskId)) return true;
boolean shouldBreak = anomalyDetector.checkAfterToolCall(
taskId, toolName, paramsHash);
if (shouldBreak) {
triggerBreaker(taskId, "工具调用异常");
}
return shouldBreak;
}
/**
* LLM调用后检测
*/
public boolean checkLlmCall(String taskId, int inputTokens, int outputTokens) {
if (openCircuits.contains(taskId)) return true;
boolean shouldBreak = anomalyDetector.checkAfterLlmCall(
taskId, inputTokens, outputTokens);
if (shouldBreak) {
triggerBreaker(taskId, "LLM调用异常");
}
return shouldBreak;
}
/**
* 触发熔断
*/
public void triggerBreaker(String taskId, String reason) {
openCircuits.add(taskId);
log.error("[熔断器] 任务 {} 熔断触发: {}", taskId, reason);
}
/**
* 重置熔断(人工干预后调用)
*/
public void resetBreaker(String taskId) {
openCircuits.remove(taskId);
anomalyDetector.cleanupTask(taskId);
log.info("[熔断器] 任务 {} 熔断已重置", taskId);
}
public boolean isOpen(String taskId) {
return openCircuits.contains(taskId);
}
}6. 实时告警:钉钉/企业微信通知
package com.laozhang.monitor.service;
import com.laozhang.monitor.model.AnomalyType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 告警服务
* 支持钉钉/企业微信 Webhook 告警
* 带告警冷却,防止告警风暴
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class AlertService {
private final OkHttpClient httpClient;
@Value("${agent.alert.dingtalk-webhook:}")
private String dingtalkWebhook;
@Value("${agent.alert.alert-cooldown-minutes:5}")
private int cooldownMinutes;
// 告警冷却记录(taskId+type -> 上次告警时间)
private final ConcurrentHashMap<String, LocalDateTime> alertCooldowns
= new ConcurrentHashMap<>();
/**
* 发送告警
*/
public void sendAlert(String taskId, AnomalyType type, String message) {
String cooldownKey = taskId + ":" + type.name();
// 检查冷却时间
LocalDateTime lastAlert = alertCooldowns.get(cooldownKey);
if (lastAlert != null &&
lastAlert.plusMinutes(cooldownMinutes).isAfter(LocalDateTime.now())) {
log.debug("[告警] 冷却中,跳过告警: {}", cooldownKey);
return;
}
alertCooldowns.put(cooldownKey, LocalDateTime.now());
// 构建告警内容
String alertMessage = buildAlertMessage(taskId, type, message);
// 发送钉钉告警
if (!dingtalkWebhook.isBlank()) {
sendDingtalkAlert(alertMessage);
}
// 本地日志记录(兜底)
log.error("[告警发送] {}", alertMessage);
}
private String buildAlertMessage(String taskId, AnomalyType type, String detail) {
return String.format("""
### Agent异常告警
- **时间**: %s
- **任务ID**: %s
- **异常类型**: %s
- **详情**: %s
- **建议**: %s
""",
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
taskId,
type.getDescription(),
detail,
type.getSuggestion()
);
}
private void sendDingtalkAlert(String content) {
try {
String jsonBody = """
{
"msgtype": "markdown",
"markdown": {
"title": "Agent异常告警",
"text": "%s"
}
}
""".formatted(content.replace("\"", "\\\"").replace("\n", "\\n"));
Request request = new Request.Builder()
.url(dingtalkWebhook)
.post(RequestBody.create(jsonBody,
MediaType.parse("application/json")))
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
log.error("[告警] 钉钉发送失败: {}", response.code());
}
}
} catch (Exception e) {
log.error("[告警] 钉钉发送异常: {}", e.getMessage());
}
}
}7. 行为审计:合规场景的完整操作记录
package com.laozhang.monitor.audit;
import com.laozhang.monitor.model.AgentTrace;
import com.laozhang.monitor.repository.AgentTraceRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 行为审计服务
* 合规场景下提供完整的操作记录查询
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class AgentAuditService {
private final AgentTraceRepository traceRepository;
/**
* 查询任务完整轨迹(合规审计用)
*/
public List<AgentTrace> getFullTrace(String taskId) {
return traceRepository.findByTaskIdOrderByCreatedAtAsc(taskId);
}
/**
* 生成审计报告
* 输出:任务概要、每步操作、最终决策依据
*/
public Map<String, Object> generateAuditReport(String taskId) {
List<AgentTrace> traces = getFullTrace(taskId);
if (traces.isEmpty()) {
return Map.of("error", "任务不存在: " + taskId);
}
// 统计各类型调用次数
Map<AgentTrace.TraceType, Long> typeCounts = traces.stream()
.collect(Collectors.groupingBy(
AgentTrace::getTraceType, Collectors.counting()));
// 计算总Token
int totalInputTokens = traces.stream()
.filter(t -> t.getInputTokens() != null)
.mapToInt(AgentTrace::getInputTokens)
.sum();
int totalOutputTokens = traces.stream()
.filter(t -> t.getOutputTokens() != null)
.mapToInt(AgentTrace::getOutputTokens)
.sum();
// 计算总耗时
AgentTrace firstTrace = traces.get(0);
AgentTrace lastTrace = traces.get(traces.size() - 1);
// 工具调用摘要
List<Map<String, Object>> toolSummary = traces.stream()
.filter(t -> t.getTraceType() == AgentTrace.TraceType.TOOL_CALL)
.collect(Collectors.groupingBy(AgentTrace::getToolName))
.entrySet().stream()
.map(entry -> Map.<String, Object>of(
"toolName", entry.getKey(),
"callCount", entry.getValue().size(),
"successCount", entry.getValue().stream()
.filter(t -> Boolean.TRUE.equals(t.getSuccess())).count(),
"avgDurationMs", entry.getValue().stream()
.filter(t -> t.getDurationMs() != null)
.mapToLong(AgentTrace::getDurationMs)
.average().orElse(0)
))
.collect(Collectors.toList());
return Map.of(
"taskId", taskId,
"userId", firstTrace.getUserId(),
"startTime", firstTrace.getCreatedAt(),
"endTime", lastTrace.getCreatedAt(),
"totalSteps", traces.size(),
"typeCounts", typeCounts,
"totalInputTokens", totalInputTokens,
"totalOutputTokens", totalOutputTokens,
"estimatedCostUsd", (totalInputTokens * 5.0 + totalOutputTokens * 15.0) / 1_000_000,
"toolSummary", toolSummary
);
}
/**
* 查询特定用户在时间段内的所有Agent操作(GDPR/合规审计)
*/
public List<AgentTrace> getUserTraces(
String userId, LocalDateTime from, LocalDateTime to) {
return traceRepository.findByUserIdAndCreatedAtBetween(userId, from, to);
}
/**
* 事后复现:根据审计日志重放Agent执行
* 输出步骤序列,供分析问题用
*/
public List<Map<String, Object>> replayTrace(String taskId) {
List<AgentTrace> traces = getFullTrace(taskId);
return traces.stream().map(trace -> {
Map<String, Object> step = new java.util.LinkedHashMap<>();
step.put("step", trace.getStepIndex());
step.put("time", trace.getCreatedAt());
step.put("type", trace.getTraceType().name());
step.put("duration_ms", trace.getDurationMs());
step.put("success", trace.getSuccess());
if (trace.getTraceType() == AgentTrace.TraceType.LLM_CALL) {
step.put("llm_output_preview",
trace.getLlmOutput() != null
? trace.getLlmOutput().substring(0, Math.min(200, trace.getLlmOutput().length()))
: null);
step.put("tokens", Map.of(
"input", trace.getInputTokens(),
"output", trace.getOutputTokens()
));
} else if (trace.getTraceType() == AgentTrace.TraceType.TOOL_CALL) {
step.put("tool", trace.getToolName());
step.put("params", trace.getToolParams());
if (!Boolean.TRUE.equals(trace.getSuccess())) {
step.put("error", trace.getErrorMessage());
}
}
return step;
}).collect(Collectors.toList());
}
}8. 成本追踪:每个任务的精确成本
package com.laozhang.monitor.cost;
import com.laozhang.monitor.repository.AgentTraceRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Map;
/**
* Agent成本追踪服务
* 精确计算每个任务、每个用户的AI调用成本
*/
@Service
@RequiredArgsConstructor
public class AgentCostTracker {
private final AgentTraceRepository traceRepository;
// 模型价格(美元/1K tokens)
private static final Map<String, double[]> MODEL_PRICES = Map.of(
"gpt-4o", new double[]{0.005, 0.015}, // input, output
"gpt-4o-mini", new double[]{0.00015, 0.0006},
"gpt-4", new double[]{0.03, 0.06},
"gpt-3.5-turbo", new double[]{0.0005, 0.0015}
);
/**
* 计算单个任务的精确成本
*/
public Map<String, Object> calculateTaskCost(String taskId) {
Object[] stats = traceRepository.getTaskTokenStats(taskId);
if (stats == null || stats[0] == null) {
return Map.of("error", "任务不存在");
}
long inputTokens = ((Number) stats[0]).longValue();
long outputTokens = ((Number) stats[1]).longValue();
long llmCalls = ((Number) stats[2]).longValue();
long toolCalls = ((Number) stats[3]).longValue();
// 默认用 gpt-4o 价格
double[] prices = MODEL_PRICES.get("gpt-4o");
double inputCost = inputTokens * prices[0] / 1000;
double outputCost = outputTokens * prices[1] / 1000;
double totalCost = inputCost + outputCost;
return Map.of(
"taskId", taskId,
"inputTokens", inputTokens,
"outputTokens", outputTokens,
"totalTokens", inputTokens + outputTokens,
"llmCalls", llmCalls,
"toolCalls", toolCalls,
"inputCostUsd", String.format("$%.4f", inputCost),
"outputCostUsd", String.format("$%.4f", outputCost),
"totalCostUsd", String.format("$%.4f", totalCost),
"totalCostCny", String.format("¥%.2f", totalCost * 7.2) // 汇率7.2
);
}
/**
* 统计时间段内的总成本
*/
public Map<String, Object> calculatePeriodCost(LocalDateTime from, LocalDateTime to) {
Object[] stats = traceRepository.getPeriodTokenStats(from, to);
long totalInputTokens = ((Number) stats[0]).longValue();
long totalOutputTokens = ((Number) stats[1]).longValue();
long taskCount = ((Number) stats[2]).longValue();
double[] prices = MODEL_PRICES.get("gpt-4o");
double totalCost = totalInputTokens * prices[0] / 1000
+ totalOutputTokens * prices[1] / 1000;
return Map.of(
"period", from + " ~ " + to,
"taskCount", taskCount,
"totalInputTokens", totalInputTokens,
"totalOutputTokens", totalOutputTokens,
"totalCostUsd", String.format("$%.2f", totalCost),
"totalCostCny", String.format("¥%.2f", totalCost * 7.2),
"avgCostPerTask", String.format("$%.4f", taskCount > 0 ? totalCost / taskCount : 0)
);
}
}9. 监控Dashboard数据接口
package com.laozhang.monitor.controller;
import com.laozhang.monitor.audit.AgentAuditService;
import com.laozhang.monitor.circuit.AgentCircuitBreaker;
import com.laozhang.monitor.cost.AgentCostTracker;
import com.laozhang.monitor.detector.AgentAnomalyDetector;
import lombok.RequiredArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/api/monitor")
@RequiredArgsConstructor
public class MonitorController {
private final AgentAuditService auditService;
private final AgentCostTracker costTracker;
private final AgentAnomalyDetector anomalyDetector;
private final AgentCircuitBreaker circuitBreaker;
/** 审计报告 */
@GetMapping("/tasks/{taskId}/audit")
public ResponseEntity<Map<String, Object>> getAuditReport(@PathVariable String taskId) {
return ResponseEntity.ok(auditService.generateAuditReport(taskId));
}
/** 执行轨迹回放 */
@GetMapping("/tasks/{taskId}/replay")
public ResponseEntity<List<Map<String, Object>>> replayTrace(@PathVariable String taskId) {
return ResponseEntity.ok(auditService.replayTrace(taskId));
}
/** 任务成本 */
@GetMapping("/tasks/{taskId}/cost")
public ResponseEntity<Map<String, Object>> getTaskCost(@PathVariable String taskId) {
return ResponseEntity.ok(costTracker.calculateTaskCost(taskId));
}
/** 时间段成本统计 */
@GetMapping("/cost/period")
public ResponseEntity<Map<String, Object>> getPeriodCost(
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime from,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime to) {
return ResponseEntity.ok(costTracker.calculatePeriodCost(from, to));
}
/** 实时任务统计 */
@GetMapping("/tasks/{taskId}/stats")
public ResponseEntity<Map<String, Object>> getTaskStats(@PathVariable String taskId) {
return ResponseEntity.ok(anomalyDetector.getTaskStats(taskId));
}
/** 手动重置熔断 */
@PostMapping("/tasks/{taskId}/reset-circuit")
public ResponseEntity<Map<String, Object>> resetCircuit(@PathVariable String taskId) {
circuitBreaker.resetBreaker(taskId);
return ResponseEntity.ok(Map.of(
"message", "熔断已重置,可以重新执行",
"taskId", taskId
));
}
}10. 性能数据与实际效果
在老陈的金融客服场景中,部署监控系统后的效果:
| 指标 | 部署前 | 部署后 |
|---|---|---|
| 月均AI费用 | $1,400 ~ $4,200(波动大) | $1,380 ~ $1,520(稳定) |
| 平均任务Token消耗 | 未统计 | 1,842 tokens |
| P99任务Token消耗 | 未统计 | 8,340 tokens |
| 循环调用异常 | 每月发生5-8次 | 每月发生0-1次(被熔断) |
| 异常发现时间 | 平均4.2小时 | 平均23秒(告警触发) |
| 合规审计时间 | 需2-3天人工整理日志 | 接口调用即时生成 |
关键数字:
- 熔断器拦截异常后:单次任务最多浪费 ~$0.12(vs 未监控时 $2.8/次)
- 工具调用异常检测准确率:94.7%(误报率 5.3%,主要是正常的"查询无结果后重试")
FAQ
Q:Prometheus指标里的工具名作为标签,会不会有高基数问题?
A:工具名(toolName)数量通常有限(几个到几十个),不会造成高基数问题。但任务ID(taskId)绝对不能作为Prometheus标签,因为每次任务都不同,会导致时间序列爆炸。本文在 maskTaskId() 方法里已经对任务ID做了脱敏处理,只取前4位。
Q:轨迹记录会不会影响Agent执行性能?
A:所有轨迹写入都是 @Async 异步的,不在Agent执行的主线程上。实测轨迹记录引入的额外延迟 < 1ms(异步发起写入的开销)。MySQL写入在后台异步线程池中完成,对Agent响应时间无影响。
Q:AgentAnomalyDetector 的状态(taskStats)是存在内存的,服务重启后会丢失?
A:是的。这是一个设计权衡:异常检测需要极低延迟(微秒级判断),放到内存里最快;同时异常检测本来就是"当前运行会话"级别的,服务重启后任务重新开始,重新计数是合理的。如果你需要跨重启的累计统计,可以用Redis原子计数器(INCR)替代内存AtomicInteger。
Q:如何避免告警风暴(一个异常触发大量重复告警)?
A:本文已实现告警冷却机制(alertCooldowns):同一个任务ID + 异常类型,5分钟内只告警一次。另外熔断器一旦触发就不再对该任务执行检测,也起到了收敛告警的作用。
总结
Agent行为监控的三个核心能力:
- 可观测性:每次LLM调用、工具调用都有完整记录,任何时刻都能查看Agent的"想法"
- 异常防护:实时检测重复调用、循环、Token超限,熔断器在扩大损失前自动介入
- 成本控制:精确到每个任务、每个用户的成本追踪,让AI费用从"黑盒"变成"透明"
老陈部署后,花了3个月把单次客服任务的平均Token消耗从"不知道"降到了1,842 tokens,把月费用波动从3倍压缩到10%以内。
