第1679篇:Agent工作流的可视化监控——实时追踪任务执行图
第1679篇:Agent工作流的可视化监控——实时追踪任务执行图
有一天下班前,产品经理找我说:我们的Agent在跑一个长任务,跑了20分钟了不知道到哪一步,用户很焦虑,能不能给个进度条?
我当时的第一反应是:这简单,加个百分比不就行了。但仔细想了想,Agent任务和普通任务不一样——它的步骤是动态生成的,执行路径不固定,"20%完成"这个数字根本没有意义。
用户真正需要的是:知道Agent在干什么,为什么要干这个,还有多久。这就引出了今天的话题:Agent工作流的可视化监控。
为什么Agent监控比普通任务监控难
普通异步任务:步骤固定,进度可以按步骤计算。
Agent任务:步骤由LLM动态决定,执行中可能新增步骤,也可能跳过某些步骤。你没法预先知道"总共多少步"。
更复杂的是:
- Agent可能并发执行多个子任务,形成树状结构
- 某个工具调用失败后可能走不同的分支
- Agent可能自己发现计划不对,重新规划
这意味着我们需要的不是一个线性进度条,而是一个实时生长的执行图。
核心数据模型:执行追踪树
// Agent执行节点
@Data
@Builder
public class ExecutionNode {
private String nodeId;
private String parentNodeId; // 父节点(null表示根节点)
private String taskId; // 所属任务
private NodeType type; // PLAN/TOOL_CALL/LLM_CALL/AGENT/DECISION
private String name; // 节点名称(工具名、Agent名等)
private NodeStatus status; // PENDING/RUNNING/SUCCESS/FAILED/SKIPPED
private String input; // 输入内容(JSON)
private String output; // 输出内容(JSON)
private String error; // 错误信息(失败时)
private LocalDateTime startedAt;
private LocalDateTime completedAt;
private long durationMs;
// 监控相关
private int tokenUsed; // 消耗的Token数
private double costUsd; // 消耗的美元成本
private int retryCount; // 重试次数
// 元数据
private Map<String, Object> metadata;
// 计算进度
public double getProgress() {
return switch (status) {
case SUCCESS -> 1.0;
case FAILED, SKIPPED -> 1.0;
case RUNNING -> 0.5; // 运行中估算50%
case PENDING -> 0.0;
};
}
}
public enum NodeType {
ROOT, // 根节点(整个任务)
PLAN, // 规划阶段
LLM_CALL, // LLM推理调用
TOOL_CALL, // 工具调用
SUB_AGENT, // 子Agent
DECISION, // 条件判断点
HUMAN_INPUT // 人工干预节点
}
public enum NodeStatus {
PENDING, RUNNING, SUCCESS, FAILED, SKIPPED, WAITING_HUMAN
}执行追踪器:在Agent执行中嵌入追踪逻辑
@Service
public class AgentExecutionTracer {
private final ExecutionNodeRepository nodeRepository;
private final WebSocketBroadcaster broadcaster;
/**
* 开始一个新任务的追踪
*/
public ExecutionContext startTask(String taskId, String description) {
ExecutionNode root = ExecutionNode.builder()
.nodeId(UUID.randomUUID().toString())
.taskId(taskId)
.type(NodeType.ROOT)
.name("任务: " + description)
.status(NodeStatus.RUNNING)
.startedAt(LocalDateTime.now())
.build();
nodeRepository.save(root);
broadcaster.broadcast(taskId, NodeEvent.started(root));
return new ExecutionContext(taskId, root.getNodeId(), this);
}
/**
* 开始一个子节点
*/
public String startNode(String taskId, String parentNodeId,
NodeType type, String name,
Object input) {
ExecutionNode node = ExecutionNode.builder()
.nodeId(UUID.randomUUID().toString())
.parentNodeId(parentNodeId)
.taskId(taskId)
.type(type)
.name(name)
.status(NodeStatus.RUNNING)
.input(JSON.toJSONString(input))
.startedAt(LocalDateTime.now())
.build();
nodeRepository.save(node);
// 实时推送到前端
broadcaster.broadcast(taskId, NodeEvent.started(node));
log.debug("执行节点开始: nodeId={}, type={}, name={}",
node.getNodeId(), type, name);
return node.getNodeId();
}
/**
* 完成一个节点
*/
public void completeNode(String nodeId, Object output) {
ExecutionNode node = nodeRepository.findById(nodeId)
.orElseThrow(() -> new RuntimeException("节点不存在: " + nodeId));
node.setStatus(NodeStatus.SUCCESS);
node.setOutput(JSON.toJSONString(output));
node.setCompletedAt(LocalDateTime.now());
node.setDurationMs(Duration.between(node.getStartedAt(), node.getCompletedAt())
.toMillis());
nodeRepository.save(node);
broadcaster.broadcast(node.getTaskId(), NodeEvent.completed(node));
}
/**
* 节点失败
*/
public void failNode(String nodeId, String error) {
ExecutionNode node = nodeRepository.findById(nodeId).orElseThrow();
node.setStatus(NodeStatus.FAILED);
node.setError(error);
node.setCompletedAt(LocalDateTime.now());
nodeRepository.save(node);
broadcaster.broadcast(node.getTaskId(), NodeEvent.failed(node));
}
/**
* 更新节点的Token消耗(在LLM调用完成后调用)
*/
public void updateTokenUsage(String nodeId, int tokens, double costUsd) {
nodeRepository.updateTokenUsage(nodeId, tokens, costUsd);
broadcaster.broadcast(nodeRepository.getTaskId(nodeId),
NodeEvent.tokenUsageUpdated(nodeId, tokens, costUsd));
}
/**
* 获取任务的完整执行树
*/
public ExecutionTree getExecutionTree(String taskId) {
List<ExecutionNode> allNodes = nodeRepository.findByTaskId(taskId);
return ExecutionTree.buildFrom(allNodes);
}
}在Agent各处嵌入追踪点
追踪点需要嵌入到Agent执行的每个关键位置:
@Service
public class TracedAgentExecutor {
private final AgentExecutionTracer tracer;
private final LLMClient llmClient;
private final ToolRouter toolRouter;
public AgentResult execute(AgentTask task) {
// 开始任务追踪
ExecutionContext ctx = tracer.startTask(task.getTaskId(),
task.getDescription());
try {
AgentResult result = executeWithTracing(task, ctx);
ctx.completeRoot(result);
return result;
} catch (Exception e) {
ctx.failRoot(e.getMessage());
throw e;
}
}
private AgentResult executeWithTracing(AgentTask task, ExecutionContext ctx) {
List<Message> messages = new ArrayList<>();
messages.add(Message.system(buildSystemPrompt()));
messages.add(Message.user(task.getDescription()));
for (int step = 0; step < task.getMaxSteps(); step++) {
// 追踪LLM调用
String llmNodeId = tracer.startNode(
task.getTaskId(), ctx.getCurrentNodeId(),
NodeType.LLM_CALL,
"LLM推理 (第" + (step + 1) + "步)",
Map.of("messages_count", messages.size())
);
LLMResponse response;
try {
long startMs = System.currentTimeMillis();
response = llmClient.chat(ChatRequest.builder()
.messages(messages)
.tools(toolRegistry.getTools())
.build()
);
tracer.completeNode(llmNodeId, Map.of(
"reasoning", response.getContent(),
"has_tool_calls", response.getToolCalls() != null
));
tracer.updateTokenUsage(llmNodeId,
response.getUsage().getTotalTokens(),
response.getUsage().estimateCost()
);
} catch (Exception e) {
tracer.failNode(llmNodeId, e.getMessage());
throw e;
}
messages.add(Message.assistant(response.getContent()));
// 没有工具调用,任务完成
if (response.getToolCalls() == null || response.getToolCalls().isEmpty()) {
return AgentResult.success(task.getTaskId(), response.getContent());
}
// 并发执行多个工具调用
List<ToolCall> toolCalls = response.getToolCalls();
List<ToolResult> toolResults = executeToolCallsWithTracing(
toolCalls, task.getTaskId(), ctx.getCurrentNodeId()
);
// 把工具结果添加到消息列表
for (int i = 0; i < toolCalls.size(); i++) {
messages.add(Message.tool(toolCalls.get(i).getId(),
toolResults.get(i).toString()));
}
}
return AgentResult.failed("超过最大步骤数");
}
private List<ToolResult> executeToolCallsWithTracing(List<ToolCall> toolCalls,
String taskId,
String parentNodeId) {
// 并发执行(用CompletableFuture)
List<CompletableFuture<ToolResult>> futures = toolCalls.stream()
.map(call -> {
// 每个工具调用创建一个追踪节点
String toolNodeId = tracer.startNode(
taskId, parentNodeId,
NodeType.TOOL_CALL,
"工具: " + call.getName(),
call.getArguments()
);
return CompletableFuture.supplyAsync(() -> {
try {
ToolResult result = toolRouter.invoke(
call.getName(), call.getArguments(), null
);
tracer.completeNode(toolNodeId, result);
return result;
} catch (Exception e) {
tracer.failNode(toolNodeId, e.getMessage());
return ToolResult.failed(e.getMessage());
}
});
})
.collect(Collectors.toList());
// 等待所有工具执行完成
return futures.stream()
.map(f -> {
try {
return f.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
return ToolResult.failed("超时: " + e.getMessage());
}
})
.collect(Collectors.toList());
}
}WebSocket实时推送
前端需要实时看到执行状态更新,用WebSocket推送:
@Service
public class WebSocketBroadcaster {
private final SimpMessagingTemplate messagingTemplate;
/**
* 广播节点事件到订阅该任务的所有客户端
*/
public void broadcast(String taskId, NodeEvent event) {
String destination = "/topic/agent-tasks/" + taskId;
messagingTemplate.convertAndSend(destination, event);
}
/**
* 广播任务整体进度更新
*/
public void broadcastProgress(String taskId, TaskProgress progress) {
messagingTemplate.convertAndSend(
"/topic/agent-tasks/" + taskId + "/progress",
progress
);
}
}
// WebSocket配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws/agent-monitoring")
.setAllowedOriginPatterns("*")
.withSockJS();
}
}
// REST API:查询执行树
@RestController
@RequestMapping("/api/agent-tasks")
public class AgentMonitoringController {
@Autowired
private AgentExecutionTracer tracer;
@GetMapping("/{taskId}/execution-tree")
public ExecutionTree getExecutionTree(@PathVariable String taskId) {
return tracer.getExecutionTree(taskId);
}
@GetMapping("/{taskId}/summary")
public TaskSummary getTaskSummary(@PathVariable String taskId) {
ExecutionTree tree = tracer.getExecutionTree(taskId);
return TaskSummary.builder()
.taskId(taskId)
.status(tree.getRootStatus())
.totalNodes(tree.getTotalNodes())
.completedNodes(tree.getCompletedNodes())
.failedNodes(tree.getFailedNodes())
.totalTokens(tree.getTotalTokens())
.totalCostUsd(tree.getTotalCost())
.durationMs(tree.getTotalDurationMs())
.currentStep(tree.getCurrentRunningNode())
.build();
}
@GetMapping("/{taskId}/timeline")
public List<TimelineEvent> getTimeline(@PathVariable String taskId) {
return tracer.getExecutionTree(taskId)
.getAllNodes()
.stream()
.sorted(Comparator.comparing(ExecutionNode::getStartedAt))
.map(node -> TimelineEvent.builder()
.time(node.getStartedAt())
.type(node.getType().name())
.name(node.getName())
.status(node.getStatus().name())
.durationMs(node.getDurationMs())
.build())
.collect(Collectors.toList());
}
}执行树的可视化数据接口
前端(React/Vue)需要渲染执行树,后端要返回适合可视化的数据格式:
@Service
public class ExecutionTreeSerializer {
/**
* 将执行树序列化为Mermaid图表格式
* 可以嵌入到内部管理界面或日志报告里
*/
public String toMermaidDiagram(ExecutionTree tree) {
StringBuilder sb = new StringBuilder();
sb.append("flowchart TD\n");
// 添加节点
for (ExecutionNode node : tree.getAllNodes()) {
String shape = getNodeShape(node.getType());
String style = getNodeStyle(node.getStatus());
String label = escapeLabel(node.getName() +
(node.getDurationMs() > 0 ? "\n" + node.getDurationMs() + "ms" : ""));
sb.append(String.format(" %s%s[\"%s\"]%s\n",
node.getNodeId().replace("-", ""), shape, label, ""));
sb.append(String.format(" style %s %s\n",
node.getNodeId().replace("-", ""), style));
}
// 添加边
for (ExecutionNode node : tree.getAllNodes()) {
if (node.getParentNodeId() != null) {
sb.append(String.format(" %s --> %s\n",
node.getParentNodeId().replace("-", ""),
node.getNodeId().replace("-", "")
));
}
}
return sb.toString();
}
/**
* 将执行树序列化为D3.js树图格式
*/
public Map<String, Object> toD3TreeData(ExecutionTree tree) {
ExecutionNode root = tree.getRootNode();
return buildD3Node(root, tree);
}
private Map<String, Object> buildD3Node(ExecutionNode node, ExecutionTree tree) {
Map<String, Object> d3Node = new LinkedHashMap<>();
d3Node.put("id", node.getNodeId());
d3Node.put("name", node.getName());
d3Node.put("type", node.getType().name());
d3Node.put("status", node.getStatus().name());
d3Node.put("duration", node.getDurationMs());
d3Node.put("tokens", node.getTokenUsed());
d3Node.put("cost", node.getCostUsd());
d3Node.put("startTime", node.getStartedAt());
d3Node.put("endTime", node.getCompletedAt());
// 递归构建子节点
List<ExecutionNode> children = tree.getChildren(node.getNodeId());
if (!children.isEmpty()) {
d3Node.put("children", children.stream()
.sorted(Comparator.comparing(ExecutionNode::getStartedAt))
.map(child -> buildD3Node(child, tree))
.collect(Collectors.toList()));
}
return d3Node;
}
private String getNodeShape(NodeType type) {
return switch (type) {
case LLM_CALL -> "[";
case TOOL_CALL -> "(";
case SUB_AGENT -> "{";
case DECISION -> "{{";
case HUMAN_INPUT -> "[\\/";
default -> "[";
};
}
private String getNodeStyle(NodeStatus status) {
return switch (status) {
case SUCCESS -> "fill:#4caf50,color:#fff";
case FAILED -> "fill:#f44336,color:#fff";
case RUNNING -> "fill:#2196f3,color:#fff";
case WAITING_HUMAN -> "fill:#ff9800,color:#fff";
default -> "fill:#9e9e9e,color:#fff";
};
}
}智能进度预估
虽然Agent步骤不固定,但可以用历史数据来预估剩余时间:
@Service
public class ProgressEstimator {
private final ExecutionHistoryRepository historyRepo;
/**
* 基于历史数据预估任务完成时间
*/
public ProgressEstimation estimate(String taskId, String taskType) {
ExecutionTree currentTree = tracer.getExecutionTree(taskId);
// 查找相似任务的历史执行数据
List<ExecutionHistory> similarTasks = historyRepo.findSimilar(taskType, 20);
if (similarTasks.isEmpty()) {
// 没有历史数据,给出保守估计
return ProgressEstimation.unknown();
}
// 统计历史任务的步骤数分布
DoubleSummaryStatistics stepStats = similarTasks.stream()
.mapToDouble(ExecutionHistory::getTotalNodes)
.summaryStatistics();
// 统计历史任务的耗时分布
DoubleSummaryStatistics durationStats = similarTasks.stream()
.mapToDouble(h -> h.getTotalDurationMs())
.summaryStatistics();
int currentNodes = currentTree.getTotalNodes();
int estimatedTotalNodes = (int) stepStats.getAverage();
double progressPercent = currentNodes * 100.0 / estimatedTotalNodes;
progressPercent = Math.min(95, progressPercent); // 不超过95%,避免误导
long elapsedMs = currentTree.getElapsedMs();
long estimatedTotalMs = (long) durationStats.getAverage();
long remainingMs = Math.max(0, estimatedTotalMs - elapsedMs);
return ProgressEstimation.builder()
.progressPercent(progressPercent)
.estimatedRemainingMs(remainingMs)
.confidence(similarTasks.size() >= 10 ? "high" : "low")
.currentNodeCount(currentNodes)
.estimatedTotalNodes(estimatedTotalNodes)
.build();
}
/**
* 根据当前执行状态计算"真实感知进度"
* 即使不确定总步骤数,也要给用户有意义的进度反馈
*/
public String generateProgressDescription(ExecutionTree tree) {
ExecutionNode currentNode = tree.getCurrentRunningNode();
if (currentNode == null) {
return "任务已完成";
}
// 根据节点类型生成人性化描述
return switch (currentNode.getType()) {
case LLM_CALL -> "正在思考下一步...";
case TOOL_CALL -> "正在执行:" + currentNode.getName();
case SUB_AGENT -> "子任务进行中:" + currentNode.getName();
case HUMAN_INPUT -> "等待人工确认...";
default -> "处理中...";
};
}
}异常检测与自动告警
可视化监控不只是展示,还要主动发现问题:
@Service
public class ExecutionAnomalyDetector {
@Scheduled(fixedDelay = 30000) // 每30秒检查一次
public void detectAnomalies() {
// 检查长时间没有进展的任务
List<String> stuckTasks = findStuckTasks();
for (String taskId : stuckTasks) {
alertService.sendAlert(AlertType.TASK_STUCK, taskId,
"任务已超过5分钟无进展");
}
// 检查Token消耗异常的任务
List<String> expensiveTasks = findExpensiveTasks();
for (String taskId : expensiveTasks) {
alertService.sendAlert(AlertType.HIGH_COST, taskId,
"任务Token消耗超过预算的150%");
}
// 检查失败率异常高的工具
detectToolFailureSpike();
}
private List<String> findStuckTasks() {
LocalDateTime threshold = LocalDateTime.now().minusMinutes(5);
return nodeRepository.findRunningTasksWithNoProgressSince(threshold)
.stream()
.map(ExecutionNode::getTaskId)
.distinct()
.collect(Collectors.toList());
}
private void detectToolFailureSpike() {
LocalDateTime since = LocalDateTime.now().minusMinutes(15);
// 按工具统计失败率
Map<String, ToolFailureStats> stats = nodeRepository
.getToolFailureStats(since);
for (Map.Entry<String, ToolFailureStats> entry : stats.entrySet()) {
ToolFailureStats s = entry.getValue();
if (s.getTotalCalls() >= 5 && s.getFailureRate() > 0.5) {
alertService.sendAlert(AlertType.TOOL_FAILURE_SPIKE,
null,
String.format("工具'%s'在过去15分钟内失败率达到%.0f%%",
entry.getKey(), s.getFailureRate() * 100)
);
}
}
}
}执行日志的结构化输出
除了实时可视化,还需要输出结构化日志供后续分析:
@Service
public class ExecutionLogExporter {
/**
* 导出任务执行报告(适合保存到文件或发邮件)
*/
public String exportReport(String taskId) {
ExecutionTree tree = tracer.getExecutionTree(taskId);
StringBuilder report = new StringBuilder();
report.append("# Agent任务执行报告\n\n");
report.append(String.format("**任务ID**: %s\n", taskId));
report.append(String.format("**执行时间**: %s\n",
tree.getRootNode().getStartedAt()));
report.append(String.format("**总耗时**: %d 毫秒\n", tree.getTotalDurationMs()));
report.append(String.format("**最终状态**: %s\n", tree.getRootStatus()));
report.append(String.format("**Token消耗**: %d (约 $%.4f)\n\n",
tree.getTotalTokens(), tree.getTotalCost()));
report.append("## 执行步骤\n\n");
// 按时间顺序输出所有节点
tree.getAllNodes().stream()
.sorted(Comparator.comparing(ExecutionNode::getStartedAt))
.forEach(node -> {
String statusIcon = getStatusIcon(node.getStatus());
report.append(String.format("- %s **%s** [%s] - %dms\n",
statusIcon, node.getName(), node.getType(), node.getDurationMs()));
if (node.getStatus() == NodeStatus.FAILED && node.getError() != null) {
report.append(String.format(" - 错误: %s\n", node.getError()));
}
});
report.append("\n## 执行图\n\n");
report.append("```mermaid\n");
report.append(treeSerializer.toMermaidDiagram(tree));
report.append("```\n");
return report.toString();
}
private String getStatusIcon(NodeStatus status) {
return switch (status) {
case SUCCESS -> "✅";
case FAILED -> "❌";
case RUNNING -> "⏳";
case SKIPPED -> "⏭️";
case WAITING_HUMAN -> "👤";
default -> "⭕";
};
}
}几个实际体会
体会1:追踪粒度要适中。
追踪太细(每一行代码都记录)会严重影响性能,追踪太粗(只记录开始和结束)又没法定位问题。我们最后的方案是:LLM调用和工具调用必须追踪,内部逻辑按需追踪,重要的业务决策点单独标注。
体会2:实时推送要做流量控制。
某些任务执行很快,1秒内可能产生几十个节点事件。如果每个事件都立刻推送,WebSocket消息量会很大,前端渲染卡顿。改成批量推送(50ms合一次)后,效果好很多。
体会3:历史数据的进度预估比想象难。
同类任务的执行时间差异可能很大(5秒到500秒都有),简单用平均值预估误差很大。后来改成用中位数,并且根据当前任务的Token消耗速度动态调整,预估准确性提升了很多。
体会4:执行图对调试有奇效。
加了可视化监控之后,以前需要2小时排查的问题,现在10分钟就找到了——直接看执行图,哪个节点红了,看它的输入输出,问题一目了然。这是我觉得可视化监控最大的实用价值。
可视化监控不是锦上添花,它是Agent系统可运维性的基础。一个没有可观测性的Agent,等于一个黑盒。出了问题你根本不知道发生了什么,改起来也是瞎改。把追踪和可视化做好,才能让Agent系统真正进入生产可用的状态。
