AI Agent 工程化实战——从玩具 Demo 到生产可用的 Agent 系统
AI Agent 工程化实战——从玩具 Demo 到生产可用的 Agent 系统
适读人群:AI 工程师、有 Agent Demo 但无法上线的开发者 | 阅读时长:约20分钟 | 核心价值:系统性解决 Agent 生产化的关键问题
几乎每个做过 Agent Demo 的工程师都遇到过同一个问题:本地测试的时候效果挺好,一到生产环境就各种出问题。
不是代码写得不好,是 Agent 的生产化有一套专门的工程问题需要解决。
这篇文章不讲 Agent 的基础概念(前面有一堆文章讲过),专门讲从 Demo 到生产这段路上要解决什么问题。
Agent Demo 和生产 Agent 的核心差距
| 维度 | Demo | 生产 |
|---|---|---|
| 容错性 | 工具失败直接崩溃 | 重试、降级、兜底 |
| 状态管理 | 内存里,重启就丢 | 持久化,支持恢复 |
| 并发 | 单线程 | 多用户并发,隔离不干扰 |
| 可观测性 | print 调试 | 完整的链路追踪 |
| 安全性 | 无限制 | 权限控制、内容过滤 |
| 成本控制 | 不管 | Token 预算,防止超额 |
| 中断/恢复 | 不支持 | 长任务支持暂停、恢复 |
Agent 状态管理——持久化是关键
@Entity
@Table(name = "agent_sessions")
public class AgentSession {
@Id
private String sessionId;
private String userId;
private String agentType;
private String status; // RUNNING / PAUSED / COMPLETED / FAILED
@Column(columnDefinition = "TEXT")
private String stateJson; // 序列化的 Agent 状态
@Column(columnDefinition = "TEXT")
private String conversationHistory; // 对话历史
private int stepCount; // 已执行步骤数
private int maxSteps; // 最大步骤数限制
private int tokenUsed; // 已消耗 token
private int tokenBudget; // token 预算
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
private LocalDateTime completedAt;
}@Service
@Slf4j
public class AgentSessionService {
private final AgentSessionRepository repository;
private final ObjectMapper objectMapper;
/**
* 创建新的 Agent 会话
*/
public AgentSession createSession(String userId, String agentType, int tokenBudget) {
AgentSession session = new AgentSession();
session.setSessionId(UUID.randomUUID().toString());
session.setUserId(userId);
session.setAgentType(agentType);
session.setStatus("RUNNING");
session.setMaxSteps(50); // 最多 50 步,防止无限循环
session.setTokenBudget(tokenBudget);
session.setCreatedAt(LocalDateTime.now());
return repository.save(session);
}
/**
* 保存 Agent 当前状态(每步之后调用)
*/
@Transactional
public void saveState(String sessionId, AgentState state, int tokensUsed) {
AgentSession session = repository.findById(sessionId)
.orElseThrow(() -> new SessionNotFoundException(sessionId));
session.setStateJson(objectMapper.writeValueAsString(state));
session.setStepCount(session.getStepCount() + 1);
session.setTokenUsed(session.getTokenUsed() + tokensUsed);
session.setUpdatedAt(LocalDateTime.now());
// 检查是否超出限制
if (session.getStepCount() >= session.getMaxSteps()) {
session.setStatus("FAILED");
session.setCompletedAt(LocalDateTime.now());
log.warn("Session {} 超出最大步骤数 {}", sessionId, session.getMaxSteps());
}
if (session.getTokenUsed() >= session.getTokenBudget()) {
session.setStatus("FAILED");
session.setCompletedAt(LocalDateTime.now());
log.warn("Session {} 超出 Token 预算 {}", sessionId, session.getTokenBudget());
}
repository.save(session);
}
/**
* 从已保存状态恢复 Agent(服务重启后使用)
*/
public Optional<AgentState> restoreState(String sessionId) {
return repository.findBySessionIdAndStatus(sessionId, "RUNNING")
.map(session -> {
try {
return objectMapper.readValue(session.getStateJson(), AgentState.class);
} catch (Exception e) {
log.error("状态反序列化失败", e);
return null;
}
});
}
}Agent 主循环——可靠的 ReAct 实现
@Service
@Slf4j
public class ReActAgent {
private final ChatModel chatModel;
private final Map<String, AgentTool> tools;
private final AgentSessionService sessionService;
private final ContentSafetyService safetyService;
public AgentResult execute(String sessionId, String task) {
AgentSession session = sessionService.getSession(sessionId);
List<Message> history = loadHistory(session);
while (true) {
// 检查限制
if (session.getStepCount() >= session.getMaxSteps()) {
return AgentResult.exceeded("已达到最大步骤数限制");
}
if (session.getTokenUsed() >= session.getTokenBudget()) {
return AgentResult.exceeded("已达到 Token 预算限制");
}
// 构建 Prompt
String systemPrompt = buildSystemPrompt(tools);
// 调用模型(含重试)
AgentStep step = callModelWithRetry(systemPrompt, history, task);
int tokensUsed = step.getTokensUsed();
log.info("Session={}, Step={}, Action={}",
sessionId, session.getStepCount() + 1, step.getAction());
// 安全检查
if (!safetyService.isSafe(step)) {
sessionService.markFailed(sessionId, "安全检查未通过: " + step.getAction());
return AgentResult.blocked("该操作被安全策略阻止");
}
// 执行动作
if ("FINAL_ANSWER".equals(step.getAction())) {
sessionService.markCompleted(sessionId);
return AgentResult.success(step.getActionInput());
}
String observation = executeToolSafely(step.getAction(), step.getActionInput());
// 更新历史
history.add(new AgentMessage("thought", step.getThought()));
history.add(new AgentMessage("action", step.getAction() + ": " + step.getActionInput()));
history.add(new AgentMessage("observation", observation));
// 持久化状态
AgentState state = new AgentState(history, step);
sessionService.saveState(sessionId, state, tokensUsed);
}
}
private String executeToolSafely(String toolName, String toolInput) {
AgentTool tool = tools.get(toolName);
if (tool == null) {
return "错误:工具 " + toolName + " 不存在";
}
try {
return tool.execute(toolInput);
} catch (ToolExecutionException e) {
log.warn("工具执行失败,tool={}, input={}, error={}", toolName, toolInput, e.getMessage());
return "工具执行失败:" + e.getMessage() + "。请尝试其他方式。";
} catch (Exception e) {
log.error("工具执行异常", e);
return "工具暂时不可用,请稍后重试或换其他方式解决。";
}
}
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 2000, multiplier = 2))
private AgentStep callModelWithRetry(String systemPrompt, List<Message> history, String task) {
// 调用模型,包含超时配置
String response = chatModel.generate(buildMessages(systemPrompt, history, task));
return parseAgentStep(response);
}
}可观测性——链路追踪
Agent 调试最痛苦的地方是:出了问题不知道在哪一步出的,每一步做了什么判断。
@Component
@Slf4j
public class AgentTracer {
private final TraceRepository traceRepository;
public void traceStep(String sessionId, int stepIndex, AgentStepTrace trace) {
// 记录每一步的详细信息
AgentStepRecord record = AgentStepRecord.builder()
.sessionId(sessionId)
.stepIndex(stepIndex)
.thought(trace.thought())
.action(trace.action())
.actionInput(trace.actionInput())
.observation(trace.observation())
.tokenInput(trace.tokenInput())
.tokenOutput(trace.tokenOutput())
.latencyMs(trace.latencyMs())
.timestamp(Instant.now())
.build();
traceRepository.save(record);
log.debug("Agent Step [Session={}, Step={}]: Thought={}, Action={}, Observation={}",
sessionId, stepIndex,
truncate(trace.thought(), 100),
trace.action(),
truncate(trace.observation(), 100));
}
public List<AgentStepRecord> getTrace(String sessionId) {
return traceRepository.findBySessionIdOrderByStepIndex(sessionId);
}
}配合 Langfuse 或自建的追踪平台,可以在 UI 上看到每一步的完整链路。
踩坑实录
坑一:Agent 陷入死循环
现象:某些任务下 Agent 一直在重复调用同一个工具,直到超出步骤限制。
原因:工具返回的错误信息没有给 Agent 足够的信息去换策略,Agent 以为"再试一次就好了"。
解法:工具失败时返回更详细的原因和可能的解决建议:
// 不好的工具返回
return "查询失败";
// 好的工具返回
return "查询失败:数据库连接超时(已重试3次)。建议:1. 稍后再试 2. 使用缓存数据 3. 告知用户暂时无法获取实时数据";同时检测循环:
// 检测最近 5 步是否在重复相同动作
private boolean isLooping(List<AgentMessage> history) {
if (history.size() < 10) return false;
List<String> recentActions = history.stream()
.filter(m -> "action".equals(m.role()))
.map(AgentMessage::content)
.collect(toList());
int last5 = recentActions.size();
Set<String> uniqueActions = new HashSet<>(recentActions.subList(Math.max(0, last5-5), last5));
return uniqueActions.size() <= 2; // 5步里只有2种不同动作,认为在循环
}坑二:并发会话状态串扰
现象:高并发下偶尔出现 A 用户的 Agent 回答了 B 用户的问题。
原因:Session 隔离没做好,对话历史用了共享的 ChatMemory Bean 而不是按 sessionId 隔离。
解法:ChatMemory 必须按 sessionId 创建独立实例,永远不要用共享的 ChatMemory:
// 错误:共享的 ChatMemory,高并发下会串
@Bean
public ChatMemory chatMemory() {
return new InMemoryChatMemory();
}
// 正确:每个 sessionId 独立的 ChatMemory
@Bean
public ChatMemoryProvider chatMemoryProvider() {
return sessionId -> MessageWindowChatMemory.withMaxMessages(50);
}坑三:Token 成本失控
现象:上线第一天 Agent 系统就超了预算,一个用户的对话消耗了 50,000 tokens。
原因:没有设 Token 预算上限,用户把 Agent 当搜索引擎用,追问了几十次。
解法:强制 Token 预算:
if (session.getTokenUsed() + estimatedTokens > session.getTokenBudget()) {
return AgentResult.budgetExceeded(
String.format("本次任务已消耗 %d tokens,超出预算 %d tokens。" +
"如需继续,请开始新的会话。", session.getTokenUsed(), session.getTokenBudget())
);
}生产 Agent 系统架构图
用户请求
↓
AgentController (API 入口)
↓
AgentSessionService (会话管理 + Token 预算检查)
↓
ContentSafetyService (输入/输出安全检查)
↓
ReActAgent (主循环)
├── ChatModel (带重试的模型调用)
├── ToolRegistry (工具注册和执行)
└── AgentTracer (每步追踪记录)
↓
AgentSessionService (持久化状态)
↓
返回结果Multi-Agent 协作:让多个 Agent 分工合作
单个 Agent 处理复杂任务时有局限:上下文变长后容易混乱,单个 Agent 的工具集过多时选择失误增多。Multi-Agent 是解决这个问题的思路。
基本模型:一个 Orchestrator(编排 Agent)负责拆解任务,把子任务分发给专门的 Worker Agent,最后汇总结果。
@Service
@Slf4j
public class OrchestratorAgent {
private final ChatModel plannerModel;
private final Map<String, WorkerAgent> workers;
/**
* 处理复杂任务,自动拆解并分发给专业 Agent
*/
public String execute(String complexTask) {
// 1. 任务拆解
String planningPrompt = """
将以下任务拆解为可以并行处理的子任务列表。
可用的工作 Agent 及其能力:
- data-analyst:数据查询和分析
- content-writer:内容生成和编辑
- code-reviewer:代码审查和优化
- email-sender:发送邮件通知
任务:%s
以 JSON 返回:[{"agent": "...", "task": "...", "depends_on": []}]
""".formatted(complexTask);
String planJson = plannerModel.generate(planningPrompt);
List<SubTask> subTasks = parseSubTasks(planJson);
// 2. 按依赖关系执行
Map<String, String> results = new LinkedHashMap<>();
for (SubTask subTask : subTasks) {
// 等待依赖完成
String dependencyContext = subTask.dependsOn().stream()
.filter(results::containsKey)
.map(dep -> dep + "的结果:" + results.get(dep))
.collect(joining("
"));
String workerInput = dependencyContext.isEmpty()
? subTask.task()
: subTask.task() + "
参考信息:
" + dependencyContext;
// 分发给对应的 Worker
WorkerAgent worker = workers.get(subTask.agent());
String result = worker.execute(workerInput);
results.put(subTask.agent() + "_" + subTask.taskId(), result);
}
// 3. 汇总结果
String summaryPrompt = "请综合以下各步骤的结果,给出最终回答:
" +
results.entrySet().stream()
.map(e -> e.getKey() + ":" + e.getValue())
.collect(joining("
"));
return plannerModel.generate(summaryPrompt);
}
}Multi-Agent 的关键点:每个 Worker Agent 的职责要单一而清晰,不要尝试用一个全能 Agent 处理所有事情。专业化的 Agent 工具集更小、Prompt 更聚焦,出错率更低。
长时任务与异步 Agent
有些任务需要几分钟甚至几小时才能完成(比如爬取大量网页、处理大量文档),同步等待不现实。这时候需要异步 Agent。
@Service
@Slf4j
public class AsyncAgentService {
private final AgentSessionService sessionService;
private final ReActAgent agent;
@Async("agentExecutor")
public CompletableFuture<String> executeAsync(String sessionId, String task) {
try {
log.info("开始异步任务,sessionId={}", sessionId);
sessionService.updateStatus(sessionId, "RUNNING");
AgentResult result = agent.execute(sessionId, task);
sessionService.updateStatus(sessionId, "COMPLETED");
sessionService.saveResult(sessionId, result.answer());
// 通知用户任务完成(WebSocket 或消息推送)
notificationService.notify(sessionId, "任务已完成,请查看结果");
return CompletableFuture.completedFuture(result.answer());
} catch (Exception e) {
sessionService.updateStatus(sessionId, "FAILED");
sessionService.saveError(sessionId, e.getMessage());
return CompletableFuture.failedFuture(e);
}
}
// 用户提交任务
public String submitTask(String userId, String task) {
String sessionId = sessionService.createSession(userId, "ASYNC_AGENT", 50000);
executeAsync(sessionId, task); // 异步,立刻返回
return sessionId; // 返回 sessionId,用户凭此查询进度
}
// 用户查询进度
public AgentProgress checkProgress(String sessionId) {
AgentSession session = sessionService.getSession(sessionId);
return new AgentProgress(
session.getStatus(),
session.getStepCount(),
session.getResult(),
session.getError()
);
}
}用户端(前端)的流程:
- 提交任务,拿到 sessionId
- 用 sessionId 轮询进度(或通过 WebSocket 实时接收更新)
- 任务完成后展示结果
这种模式对于耗时任务的用户体验比"转圈圈等待"好太多。
