第2007篇:AI Agent的状态持久化——跨会话任务续接的设计方案
2026/4/30大约 5 分钟
第2007篇:AI Agent的状态持久化——跨会话任务续接的设计方案
适读人群:构建长时间运行AI Agent的工程师 | 阅读时长:约19分钟 | 核心价值:设计Agent状态的完整持久化方案,让用户可以随时暂停和恢复任务
我们的Agent有个功能:帮用户起草一份完整的项目计划书,需要经过多轮对话、多次工具调用,整个过程可能持续15-30分钟。
有一次,一个用户做到一半,浏览器突然崩溃了。刷新页面后,任务从头开始。他们在那个任务上花了12分钟,全没了。
这个问题让我开始认真思考:Agent的状态应该如何持久化?如何让任务可以中断后从断点继续?
Agent状态的组成
一个运行中的Agent,其状态包含以下部分:
@Entity
@Table(name = "agent_task_state")
@Data
public class AgentTaskState {
@Id
private String taskId;
private String userId;
private String sessionId;
@Enumerated(EnumType.STRING)
private TaskStatus status; // PENDING, RUNNING, PAUSED, COMPLETED, FAILED
private String originalRequest; // 用户的原始请求
@Column(columnDefinition = "TEXT")
private String conversationHistory; // JSON序列化的对话历史
@Column(columnDefinition = "TEXT")
private String completedSteps; // JSON序列化的已完成步骤
@Column(columnDefinition = "TEXT")
private String pendingSteps; // JSON序列化的待执行步骤(如果有计划)
@Column(columnDefinition = "TEXT")
private String accumulatedOutput; // 已经产生的输出内容
@Column(columnDefinition = "TEXT")
private String taskContext; // 任务相关的上下文数据(JSON)
private LocalDateTime createdAt;
private LocalDateTime lastUpdatedAt;
private LocalDateTime pausedAt;
private LocalDateTime completedAt;
private int totalIterations; // 已执行的迭代次数
private int totalTokensUsed; // 已消耗的Token数
private String failureReason; // 失败原因
}状态持久化服务
每一步执行完都要及时持久化:
@Service
@Slf4j
@RequiredArgsConstructor
public class AgentStateService {
private final AgentTaskStateRepository repository;
private final ObjectMapper objectMapper;
/**
* 创建新任务
*/
public AgentTaskState createTask(String userId, String sessionId, String request) {
AgentTaskState state = new AgentTaskState();
state.setTaskId(UUID.randomUUID().toString());
state.setUserId(userId);
state.setSessionId(sessionId);
state.setOriginalRequest(request);
state.setStatus(TaskStatus.PENDING);
state.setCreatedAt(LocalDateTime.now());
state.setLastUpdatedAt(LocalDateTime.now());
state.setCompletedSteps(serializeList(new ArrayList<>()));
state.setConversationHistory(serializeList(new ArrayList<>()));
state.setAccumulatedOutput("");
return repository.save(state);
}
/**
* 记录一步执行结果(每次调用工具或LLM后立即调用)
*/
@Transactional
public void recordStep(String taskId, AgentStep step, List<ChatMessage> currentMessages) {
AgentTaskState state = findOrThrow(taskId);
// 更新对话历史
state.setConversationHistory(serializeList(currentMessages));
// 追加到已完成步骤列表
List<AgentStep> steps = deserializeSteps(state.getCompletedSteps());
steps.add(step);
state.setCompletedSteps(serializeList(steps));
// 更新统计
state.setTotalIterations(state.getTotalIterations() + 1);
state.setLastUpdatedAt(LocalDateTime.now());
state.setStatus(TaskStatus.RUNNING);
repository.save(state);
log.debug("任务 {} 步骤 {} 已持久化", taskId, steps.size());
}
/**
* 记录最终答案
*/
@Transactional
public void markCompleted(String taskId, String finalAnswer, int tokensUsed) {
AgentTaskState state = findOrThrow(taskId);
state.setAccumulatedOutput(finalAnswer);
state.setStatus(TaskStatus.COMPLETED);
state.setCompletedAt(LocalDateTime.now());
state.setTotalTokensUsed(tokensUsed);
repository.save(state);
}
/**
* 标记失败
*/
@Transactional
public void markFailed(String taskId, String reason) {
AgentTaskState state = findOrThrow(taskId);
state.setStatus(TaskStatus.FAILED);
state.setFailureReason(reason);
state.setLastUpdatedAt(LocalDateTime.now());
repository.save(state);
}
/**
* 暂停任务(用户主动暂停或系统维护时)
*/
@Transactional
public void pauseTask(String taskId) {
AgentTaskState state = findOrThrow(taskId);
state.setStatus(TaskStatus.PAUSED);
state.setPausedAt(LocalDateTime.now());
repository.save(state);
log.info("任务 {} 已暂停,当前步骤 {}", taskId,
deserializeSteps(state.getCompletedSteps()).size());
}
/**
* 恢复任务,返回可以继续执行的Agent上下文
*/
public ResumableAgentContext resumeTask(String taskId) {
AgentTaskState state = findOrThrow(taskId);
if (state.getStatus() != TaskStatus.PAUSED &&
state.getStatus() != TaskStatus.RUNNING) {
throw new IllegalStateException("任务状态 " + state.getStatus() + " 不可恢复");
}
// 恢复对话历史
List<ChatMessage> messages = deserializeMessages(state.getConversationHistory());
// 恢复已完成的步骤
List<AgentStep> completedSteps = deserializeSteps(state.getCompletedSteps());
state.setStatus(TaskStatus.RUNNING);
state.setLastUpdatedAt(LocalDateTime.now());
repository.save(state);
log.info("任务 {} 已恢复,从第 {} 步继续", taskId, completedSteps.size() + 1);
return ResumableAgentContext.builder()
.taskId(taskId)
.originalRequest(state.getOriginalRequest())
.conversationHistory(messages)
.completedSteps(completedSteps)
.taskContext(deserializeMap(state.getTaskContext()))
.build();
}
private AgentTaskState findOrThrow(String taskId) {
return repository.findById(taskId)
.orElseThrow(() -> new IllegalArgumentException("任务不存在: " + taskId));
}
private String serializeList(List<?> list) {
try { return objectMapper.writeValueAsString(list); }
catch (JsonProcessingException e) { return "[]"; }
}
private List<AgentStep> deserializeSteps(String json) {
try { return objectMapper.readValue(json, new TypeReference<List<AgentStep>>() {}); }
catch (Exception e) { return new ArrayList<>(); }
}
private List<ChatMessage> deserializeMessages(String json) {
try { return objectMapper.readValue(json, new TypeReference<List<ChatMessage>>() {}); }
catch (Exception e) { return new ArrayList<>(); }
}
private Map<String, Object> deserializeMap(String json) {
if (json == null || json.isBlank()) return new HashMap<>();
try { return objectMapper.readValue(json, new TypeReference<Map<String, Object>>() {}); }
catch (Exception e) { return new HashMap<>(); }
}
}可恢复的Agent执行器
把状态持久化集成进Agent主循环:
@Service
@Slf4j
@RequiredArgsConstructor
public class PersistentReActAgent {
private final ChatClient chatClient;
private final AgentStateService stateService;
private final ToolRegistry toolRegistry;
private static final int MAX_ITERATIONS = 15;
/**
* 启动新任务
*/
public String startTask(String userId, String sessionId,
String request, List<AgentTool> tools) {
AgentTaskState state = stateService.createTask(userId, sessionId, request);
// 异步执行,立即返回taskId
executeAsync(state.getTaskId(), tools);
return state.getTaskId();
}
/**
* 恢复暂停的任务
*/
public void resumeTask(String taskId, List<AgentTool> tools) {
executeAsync(taskId, tools);
}
@Async("agentExecutorPool")
public void executeAsync(String taskId, List<AgentTool> tools) {
try {
ResumableAgentContext ctx = stateService.resumeTask(taskId);
runAgentLoop(taskId, ctx, tools);
} catch (Exception e) {
log.error("Agent任务 {} 执行失败", taskId, e);
stateService.markFailed(taskId, e.getMessage());
}
}
private void runAgentLoop(String taskId, ResumableAgentContext ctx,
List<AgentTool> tools) {
List<ChatMessage> messages = new ArrayList<>(ctx.getConversationHistory());
// 如果是全新任务(没有历史消息),初始化
if (messages.isEmpty()) {
String systemPrompt = buildSystemPrompt(tools);
messages.add(new SystemMessage(systemPrompt));
messages.add(new UserMessage(ctx.getOriginalRequest()));
}
int startIteration = ctx.getCompletedSteps().size();
for (int i = startIteration; i < MAX_ITERATIONS; i++) {
log.debug("任务 {} 执行第 {} 步", taskId, i + 1);
String llmResponse = chatClient.prompt().messages(messages).call().content();
AgentStep step = parseAgentStep(llmResponse);
if (step.getFinalAnswer() != null) {
// 任务完成
stateService.recordStep(taskId, step, messages);
stateService.markCompleted(taskId, step.getFinalAnswer(), estimateTokens(messages));
log.info("任务 {} 已完成", taskId);
return;
}
// 执行工具
String observation = toolRegistry.executeTool(
step.getActionName(), step.getActionInput()
);
step.setObservation(observation);
// 更新消息历史
messages.add(new AssistantMessage(llmResponse));
messages.add(new UserMessage("Observation: " + observation));
// ★ 关键:每步执行后立即持久化 ★
stateService.recordStep(taskId, step, messages);
}
stateService.markFailed(taskId, "超过最大迭代次数 " + MAX_ITERATIONS);
}
}断点续接在用户界面的体现
前端轮询任务状态,支持暂停/恢复:
@RestController
@RequestMapping("/api/agent/tasks")
@RequiredArgsConstructor
public class AgentTaskController {
private final PersistentReActAgent agent;
private final AgentStateService stateService;
@PostMapping
public ResponseEntity<Map<String, String>> startTask(
@RequestBody StartTaskRequest request,
@AuthenticationPrincipal UserDetails user) {
String taskId = agent.startTask(
user.getUsername(), request.getSessionId(),
request.getMessage(), toolRegistry.getAllTools()
);
return ResponseEntity.ok(Map.of("task_id", taskId));
}
@GetMapping("/{taskId}")
public ResponseEntity<TaskStatusResponse> getStatus(@PathVariable String taskId) {
AgentTaskState state = stateService.getState(taskId);
return ResponseEntity.ok(TaskStatusResponse.builder()
.taskId(taskId)
.status(state.getStatus().name())
.currentStep(deserializeSteps(state.getCompletedSteps()).size())
.output(state.getAccumulatedOutput())
.failureReason(state.getFailureReason())
.build());
}
@PostMapping("/{taskId}/pause")
public ResponseEntity<Void> pause(@PathVariable String taskId) {
stateService.pauseTask(taskId);
return ResponseEntity.ok().build();
}
@PostMapping("/{taskId}/resume")
public ResponseEntity<Void> resume(@PathVariable String taskId) {
agent.resumeTask(taskId, toolRegistry.getAllTools());
return ResponseEntity.ok().build();
}
}状态持久化上线后,那个浏览器崩溃的问题彻底消失了。用户刷新页面,任务从断点继续,没有任何损失。
