第2009篇:AI Agent监控体系——任务链路追踪与异常检测实践
2026/4/30大约 4 分钟
第2009篇:AI Agent监控体系——任务链路追踪与异常检测实践
适读人群:负责AI Agent系统运维的工程师 | 阅读时长:约18分钟 | 核心价值:建立Agent任务的完整可观测性,让"AI做了什么"变得透明可查
Agent上线两周后,有一天产品经理来找我:"这个Agent好像不对劲,有些用户反映它给出了奇怪的回答,但我们不知道它做了什么。"
这就是Agent系统缺乏可观测性的典型症状:黑盒运行,出了问题不知道从哪查起。
传统应用出问题,看日志找到对应请求,检查输入输出,基本就能定位。但Agent的执行是多步骤的:每次LLM调用、每次工具执行,都可能是问题所在。你需要完整的任务链路。
Agent链路追踪的设计
把Agent的每次执行抽象成一个Trace,包含多个Span:
/**
* Agent任务的完整Trace
*/
@Data
@Builder
public class AgentTrace {
private String traceId;
private String taskId;
private String userId;
private String userQuery;
private LocalDateTime startTime;
private LocalDateTime endTime;
private long totalDurationMs;
private boolean succeeded;
private String finalAnswer;
private String failureReason;
private List<AgentSpan> spans; // 每个LLM调用或工具调用是一个Span
private int totalTokensUsed;
private int llmCallCount;
private int toolCallCount;
}
/**
* 单个操作的Span
*/
@Data
@Builder
public class AgentSpan {
private String spanId;
private String traceId;
private String parentSpanId;
@Enumerated(EnumType.STRING)
private SpanType type; // LLM_CALL, TOOL_CALL, AGENT_START, AGENT_END
private String name; // LLM模型名 或 工具名
private LocalDateTime startTime;
private LocalDateTime endTime;
private long durationMs;
// LLM调用的额外信息
private String prompt; // 发送给LLM的完整输入(可选,按需记录)
private String response; // LLM的输出
private Integer inputTokens;
private Integer outputTokens;
// 工具调用的额外信息
private String toolName;
private Map<String, Object> toolParams;
private String toolResult;
private boolean succeeded;
private String errorMessage;
private Map<String, String> tags; // 自定义标签
}Tracer的实现
@Component
@Slf4j
public class AgentTracer {
// 使用ThreadLocal保持当前Trace上下文(对于异步任务需要特殊处理)
private static final ThreadLocal<AgentTraceContext> CURRENT_TRACE = new ThreadLocal<>();
private final AgentTraceRepository traceRepository;
private final MeterRegistry meterRegistry;
/**
* 开始一个新的Trace
*/
public AgentTraceContext startTrace(String taskId, String userId, String query) {
String traceId = UUID.randomUUID().toString();
AgentTraceContext ctx = AgentTraceContext.builder()
.traceId(traceId)
.taskId(taskId)
.userId(userId)
.userQuery(query)
.startTime(LocalDateTime.now())
.spans(new ArrayList<>())
.build();
CURRENT_TRACE.set(ctx);
return ctx;
}
/**
* 开始一个子Span
*/
public String startSpan(SpanType type, String name) {
AgentTraceContext ctx = CURRENT_TRACE.get();
if (ctx == null) return null;
String spanId = UUID.randomUUID().toString();
AgentSpan span = AgentSpan.builder()
.spanId(spanId)
.traceId(ctx.getTraceId())
.type(type)
.name(name)
.startTime(LocalDateTime.now())
.tags(new HashMap<>())
.build();
ctx.getSpans().add(span);
ctx.setCurrentSpanId(spanId);
return spanId;
}
/**
* 结束一个Span,记录结果
*/
public void endSpan(String spanId, boolean succeeded, String result, String error) {
AgentTraceContext ctx = CURRENT_TRACE.get();
if (ctx == null) return;
ctx.getSpans().stream()
.filter(s -> s.getSpanId().equals(spanId))
.findFirst()
.ifPresent(span -> {
span.setEndTime(LocalDateTime.now());
span.setDurationMs(Duration.between(span.getStartTime(), span.getEndTime()).toMillis());
span.setSucceeded(succeeded);
if (succeeded) {
span.setToolResult(result);
} else {
span.setErrorMessage(error);
}
// 记录到Prometheus
recordSpanMetrics(span);
});
}
/**
* 结束整个Trace
*/
public void endTrace(boolean succeeded, String finalAnswer, String failureReason) {
AgentTraceContext ctx = CURRENT_TRACE.get();
if (ctx == null) return;
ctx.setEndTime(LocalDateTime.now());
ctx.setSucceeded(succeeded);
ctx.setFinalAnswer(finalAnswer);
ctx.setFailureReason(failureReason);
// 异步持久化
persistTrace(ctx);
// 记录整体指标
recordTraceMetrics(ctx);
CURRENT_TRACE.remove();
}
private void recordSpanMetrics(AgentSpan span) {
Timer.builder("agent.span.duration")
.tag("type", span.getType().name())
.tag("name", span.getName())
.tag("succeeded", String.valueOf(span.isSucceeded()))
.register(meterRegistry)
.record(span.getDurationMs(), TimeUnit.MILLISECONDS);
}
private void recordTraceMetrics(AgentTraceContext ctx) {
long totalMs = Duration.between(ctx.getStartTime(), ctx.getEndTime()).toMillis();
Timer.builder("agent.task.duration")
.tag("succeeded", String.valueOf(ctx.isSucceeded()))
.register(meterRegistry)
.record(totalMs, TimeUnit.MILLISECONDS);
Counter.builder("agent.task.total")
.tag("succeeded", String.valueOf(ctx.isSucceeded()))
.register(meterRegistry)
.increment();
long toolCalls = ctx.getSpans().stream()
.filter(s -> s.getType() == SpanType.TOOL_CALL).count();
Gauge.builder("agent.task.tool_calls", toolCalls, Long::doubleValue)
.register(meterRegistry);
}
@Async
protected void persistTrace(AgentTraceContext ctx) {
try {
// 转换并保存(可以存到数据库或Elasticsearch)
AgentTrace trace = buildTrace(ctx);
traceRepository.save(trace);
} catch (Exception e) {
log.error("持久化Agent Trace失败: {}", ctx.getTraceId(), e);
}
}
}异常检测:自动发现问题模式
仅仅记录日志不够,还需要自动检测异常:
@Service
@Slf4j
@RequiredArgsConstructor
public class AgentAnomalyDetector {
private final AgentTraceRepository traceRepository;
private final AlertService alertService;
/**
* 定时检测异常模式
*/
@Scheduled(fixedDelay = 300_000) // 每5分钟
public void detectAnomalies() {
LocalDateTime since = LocalDateTime.now().minusMinutes(10);
List<AgentTrace> recentTraces = traceRepository.findSince(since);
checkFailureRate(recentTraces);
checkResponseTime(recentTraces);
checkToolCallAnomaly(recentTraces);
checkLoopingBehavior(recentTraces);
}
private void checkFailureRate(List<AgentTrace> traces) {
if (traces.isEmpty()) return;
long failures = traces.stream().filter(t -> !t.isSucceeded()).count();
double failureRate = (double) failures / traces.size();
if (failureRate > 0.3) { // 失败率超过30%告警
alertService.sendAlert(AlertLevel.HIGH,
String.format("Agent失败率异常: %.1f%% (最近10分钟,共%d次)",
failureRate * 100, traces.size()));
}
}
private void checkLoopingBehavior(List<AgentTrace> traces) {
// 检测Agent是否陷入工具调用循环(反复调用同一个工具)
for (AgentTrace trace : traces) {
if (trace.getSpans() == null) continue;
Map<String, Long> toolCallCounts = trace.getSpans().stream()
.filter(s -> s.getType() == SpanType.TOOL_CALL)
.collect(Collectors.groupingBy(AgentSpan::getToolName, Collectors.counting()));
toolCallCounts.forEach((toolName, count) -> {
if (count >= 5) { // 同一工具调用5次以上
log.warn("检测到疑似循环行为: taskId={}, tool={}, count={}",
trace.getTaskId(), toolName, count);
alertService.sendAlert(AlertLevel.MEDIUM,
"Agent疑似陷入循环: 任务" + trace.getTaskId() +
"中工具" + toolName + "被调用了" + count + "次");
}
});
}
}
private void checkResponseTime(List<AgentTrace> traces) {
OptionalDouble avgDuration = traces.stream()
.filter(AgentTrace::isSucceeded)
.mapToLong(AgentTrace::getTotalDurationMs)
.average();
if (avgDuration.isPresent() && avgDuration.getAsDouble() > 60_000) {
alertService.sendAlert(AlertLevel.MEDIUM,
String.format("Agent平均响应时间过长: %.1f秒",
avgDuration.getAsDouble() / 1000));
}
}
}有了这套监控,当Agent出现问题时,我能很快找到:在哪次LLM调用时推理出错了,哪个工具执行失败了,整个推理链的完整上下文是什么。调试效率提升了十倍以上。
