Agent 的中间状态持久化——长任务不怕中途挂掉
Agent 的中间状态持久化——长任务不怕中途挂掉
那天下午两点,我收到了一条告警。
我们有一个 Agent 任务,专门负责处理大客户的月度数据分析报告。任务流程是:拉取原始数据 → 清洗 → 多维度统计 → 生成图表 → 撰写分析文案 → 导出 PDF。正常跑完需要 12 分钟左右。
这次,任务在第 10 分钟挂掉了。原因是部署的那台机器内存不够,OOM 了。
重启之后,任务从头开始重跑。又跑了 10 分钟,又挂了——因为重跑时数据量更大了(原始数据还没清理)。
最终这个任务反复重试,折腾了一个下午,大客户那边等报告等到傻眼,下午 6 点才出来。
这件事让我深刻理解了一个道理:长任务不做状态持久化,就等于在沙滩上建城堡。
一、问题的本质:无状态重启 vs 有状态恢复
传统的无状态 HTTP 服务,每次请求独立处理,失败了重试就行,没有"中间状态"的概念。
但 Agent 执行长任务完全不一样。它是一个有状态的过程:
- 第 1 步:拉取了 5000 条原始数据(已完成)
- 第 2 步:清洗了 4800 条(已完成)
- 第 3 步:统计计算完成了 3 个维度的数据(已完成)
- 第 4 步:正在生成第 4 个维度的统计——崩了
如果没有状态持久化,重启后只能从第 1 步开始。前面做的工作全部白费。
Checkpoint 机制解决的就是这个问题:把每个已完成步骤的状态保存下来,崩溃恢复时从最近的 Checkpoint 继续,而不是从头开始。
二、需要持久化哪些状态
在设计之前,先想清楚需要保存什么。
任务级别的状态
任务ID:task_2024011501_monthly_report
任务状态:RUNNING / PAUSED / COMPLETED / FAILED
当前步骤:step_3_statistics
已完成步骤列表:[step_1_fetch, step_2_clean]
开始时间:2024-01-15 14:00:00
最后更新时间:2024-01-15 14:08:32
重试次数:0步骤级别的状态
步骤ID:step_2_clean
步骤名称:数据清洗
步骤状态:COMPLETED
开始时间:2024-01-15 14:01:30
完成时间:2024-01-15 14:03:45
输出数据引用:/tmp/agent/task_xxx/step_2_output.json
输出数据摘要:{count: 4800, checksum: "a3b4c5"}LLM 对话上下文
这是容易忘记的一部分。Agent 在多步执行过程中,LLM 的对话历史是有状态的——它"记得"之前分析了什么,当前在做什么。崩溃恢复时,必须把这个上下文也恢复出来,否则 LLM 会"失忆",产生不一致的输出。
三、基于 Redis 的状态持久化实现
选 Redis 的原因很直接:速度快、支持复杂数据结构、TTL 管理方便。
/**
* Agent 任务的状态模型
*/
@Data
@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
public class AgentTaskState {
private String taskId;
private String taskType;
private TaskStatus status;
/** 所有步骤的执行状态 */
private List<StepState> steps;
/** 当前正在执行的步骤索引 */
private int currentStepIndex;
/** LLM 对话历史(序列化存储) */
private List<ChatMessage> conversationHistory;
/** 任务的输入参数 */
private Map<String, Object> inputParams;
/** 各步骤的中间输出(key: stepId, value: 输出数据的存储引用) */
private Map<String, String> stepOutputRefs;
/** 元数据 */
private long createdAt;
private long lastUpdatedAt;
private int retryCount;
private String failureReason;
public enum TaskStatus {
PENDING, RUNNING, PAUSED, COMPLETED, FAILED, RECOVERING
}
}
@Data
@Builder
public class StepState {
private String stepId;
private String stepName;
private StepStatus status;
private long startedAt;
private long completedAt;
private String outputRef;
private String errorMessage;
public enum StepStatus {
PENDING, RUNNING, COMPLETED, FAILED, SKIPPED
}
}/**
* Agent 状态持久化服务
* 核心职责:把 AgentTaskState 读写到 Redis
*/
@Service
@Slf4j
public class AgentStateStore {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private ObjectMapper objectMapper;
// 状态 TTL:7天,超过7天未活跃的任务自动清理
private static final Duration STATE_TTL = Duration.ofDays(7);
private static final String STATE_KEY_PREFIX = "agent:state:";
private static final String STEP_OUTPUT_PREFIX = "agent:output:";
/**
* 保存或更新任务状态
*/
public void saveState(AgentTaskState state) {
String key = STATE_KEY_PREFIX + state.getTaskId();
state.setLastUpdatedAt(System.currentTimeMillis());
try {
String json = objectMapper.writeValueAsString(state);
redisTemplate.opsForValue().set(key, json, STATE_TTL);
log.debug("状态已保存,taskId={}, step={}, status={}",
state.getTaskId(), state.getCurrentStepIndex(), state.getStatus());
} catch (JsonProcessingException e) {
log.error("状态序列化失败,taskId={}", state.getTaskId(), e);
throw new StateStoreException("状态保存失败", e);
}
}
/**
* 加载任务状态(用于恢复)
*/
public Optional<AgentTaskState> loadState(String taskId) {
String key = STATE_KEY_PREFIX + taskId;
String json = redisTemplate.opsForValue().get(key);
if (json == null) {
return Optional.empty();
}
try {
AgentTaskState state = objectMapper.readValue(json, AgentTaskState.class);
log.info("状态加载成功,taskId={}, 从步骤{}恢复", taskId, state.getCurrentStepIndex());
return Optional.of(state);
} catch (JsonProcessingException e) {
log.error("状态反序列化失败,taskId={}", taskId, e);
return Optional.empty();
}
}
/**
* 标记步骤完成,同时保存步骤输出
*/
public void markStepCompleted(String taskId, String stepId, Object stepOutput) {
Optional<AgentTaskState> stateOpt = loadState(taskId);
if (stateOpt.isEmpty()) {
throw new StateNotFoundException("任务状态不存在: " + taskId);
}
AgentTaskState state = stateOpt.get();
// 找到对应步骤并更新状态
state.getSteps().stream()
.filter(s -> s.getStepId().equals(stepId))
.findFirst()
.ifPresent(step -> {
step.setStatus(StepState.StepStatus.COMPLETED);
step.setCompletedAt(System.currentTimeMillis());
// 保存步骤输出(大数据单独存,避免主状态膨胀)
if (stepOutput != null) {
String outputRef = saveStepOutput(taskId, stepId, stepOutput);
step.setOutputRef(outputRef);
state.getStepOutputRefs().put(stepId, outputRef);
}
});
// 推进步骤指针
state.setCurrentStepIndex(state.getCurrentStepIndex() + 1);
// 持久化更新后的状态
saveState(state);
}
/**
* 保存步骤输出(独立存储,避免主状态 JSON 过大)
*/
private String saveStepOutput(String taskId, String stepId, Object output) {
String outputKey = STEP_OUTPUT_PREFIX + taskId + ":" + stepId;
try {
String json = objectMapper.writeValueAsString(output);
redisTemplate.opsForValue().set(outputKey, json, STATE_TTL);
return outputKey;
} catch (JsonProcessingException e) {
throw new StateStoreException("步骤输出保存失败", e);
}
}
/**
* 读取步骤输出
*/
public <T> Optional<T> loadStepOutput(String outputRef, Class<T> type) {
String json = redisTemplate.opsForValue().get(outputRef);
if (json == null) return Optional.empty();
try {
return Optional.of(objectMapper.readValue(json, type));
} catch (JsonProcessingException e) {
log.error("步骤输出反序列化失败,ref={}", outputRef, e);
return Optional.empty();
}
}
/**
* 清理任务状态(任务完成后可选清理)
*/
public void cleanupState(String taskId) {
String pattern = "agent:*:" + taskId + "*";
// 扫描并删除所有相关 key
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
log.info("任务状态已清理,taskId={}, 删除{}个key", taskId, keys.size());
}
}
}四、Agent 执行器——带 Checkpoint 的核心逻辑
/**
* 支持 Checkpoint 的 Agent 任务执行器
*/
@Component
@Slf4j
public class CheckpointableAgentExecutor {
@Autowired
private AgentStateStore stateStore;
@Autowired
private ChatLanguageModel llm;
/**
* 执行 Agent 任务(支持断点恢复)
*
* @param taskId 任务ID,如果是恢复执行,传入之前的任务ID
* @param taskDef 任务定义
* @param forceRestart 是否强制从头开始(忽略已有状态)
*/
public AgentTaskResult execute(
String taskId,
AgentTaskDefinition taskDef,
boolean forceRestart) {
// 1. 尝试加载已有状态(断点恢复)
AgentTaskState state;
boolean isRecovering = false;
Optional<AgentTaskState> existingState = stateStore.loadState(taskId);
if (existingState.isPresent() && !forceRestart) {
state = existingState.get();
isRecovering = true;
log.info("检测到已有状态,从第{}步恢复执行,taskId={}",
state.getCurrentStepIndex(), taskId);
state.setStatus(AgentTaskState.TaskStatus.RECOVERING);
stateStore.saveState(state);
} else {
// 全新任务,初始化状态
state = initializeState(taskId, taskDef);
log.info("全新任务开始执行,taskId={}", taskId);
}
// 2. 恢复 LLM 对话上下文
List<ChatMessage> chatHistory = isRecovering
? state.getConversationHistory()
: new ArrayList<>();
// 3. 逐步执行(从当前步骤开始)
List<AgentStep> allSteps = taskDef.getSteps();
int startIndex = state.getCurrentStepIndex();
for (int i = startIndex; i < allSteps.size(); i++) {
AgentStep step = allSteps.get(i);
log.info("执行步骤 {}/{}: {}", i + 1, allSteps.size(), step.getName());
try {
// 更新当前步骤状态为 RUNNING
updateStepStatus(state, step.getId(), StepState.StepStatus.RUNNING);
stateStore.saveState(state);
// 执行步骤
StepResult result = executeStep(step, state, chatHistory);
// 更新对话历史(包含本步骤的 LLM 交互)
chatHistory.addAll(result.getNewMessages());
state.setConversationHistory(chatHistory);
// 标记步骤完成,保存中间输出
stateStore.markStepCompleted(taskId, step.getId(), result.getOutput());
log.info("步骤完成: {},输出已持久化", step.getName());
} catch (Exception e) {
log.error("步骤执行失败: {}", step.getName(), e);
// 更新步骤状态为 FAILED
updateStepStatus(state, step.getId(), StepState.StepStatus.FAILED);
state.setStatus(AgentTaskState.TaskStatus.FAILED);
state.setFailureReason(e.getMessage());
stateStore.saveState(state);
throw new AgentTaskException(
String.format("任务在步骤[%s]失败,可通过任务ID[%s]恢复执行",
step.getName(), taskId), e);
}
}
// 4. 所有步骤完成
state.setStatus(AgentTaskState.TaskStatus.COMPLETED);
stateStore.saveState(state);
log.info("任务全部完成,taskId={}", taskId);
return buildFinalResult(state, taskDef);
}
/**
* 执行单个步骤
* 这里是实际调用 LLM 和工具的地方
*/
private StepResult executeStep(
AgentStep step,
AgentTaskState state,
List<ChatMessage> chatHistory) {
// 构建本步骤的 Prompt,包含:
// 1. 步骤目标描述
// 2. 前序步骤的输出摘要
String stepPrompt = buildStepPrompt(step, state);
// 把历史对话 + 本步骤 Prompt 发给 LLM
List<ChatMessage> messages = new ArrayList<>(chatHistory);
messages.add(new UserMessage(stepPrompt));
// 调用 LLM(这里是简化版,实际需要处理工具调用)
AiMessage response = llm.generate(messages).content();
messages.add(response);
return StepResult.builder()
.output(response.text())
.newMessages(List.of(new UserMessage(stepPrompt), response))
.build();
}
private AgentTaskState initializeState(String taskId, AgentTaskDefinition taskDef) {
List<StepState> stepStates = taskDef.getSteps().stream()
.map(step -> StepState.builder()
.stepId(step.getId())
.stepName(step.getName())
.status(StepState.StepStatus.PENDING)
.build())
.collect(Collectors.toList());
AgentTaskState state = AgentTaskState.builder()
.taskId(taskId)
.taskType(taskDef.getTaskType())
.status(AgentTaskState.TaskStatus.RUNNING)
.steps(stepStates)
.currentStepIndex(0)
.conversationHistory(new ArrayList<>())
.inputParams(taskDef.getInputParams())
.stepOutputRefs(new HashMap<>())
.createdAt(System.currentTimeMillis())
.retryCount(0)
.build();
stateStore.saveState(state);
return state;
}
private void updateStepStatus(AgentTaskState state, String stepId, StepState.StepStatus newStatus) {
state.getSteps().stream()
.filter(s -> s.getStepId().equals(stepId))
.findFirst()
.ifPresent(s -> {
s.setStatus(newStatus);
if (newStatus == StepState.StepStatus.RUNNING) {
s.setStartedAt(System.currentTimeMillis());
}
});
}
private String buildStepPrompt(AgentStep step, AgentTaskState state) {
StringBuilder sb = new StringBuilder();
sb.append("当前任务步骤:").append(step.getName()).append("\n");
sb.append("步骤目标:").append(step.getDescription()).append("\n\n");
// 附上前序步骤的关键输出
if (state.getCurrentStepIndex() > 0) {
sb.append("前序步骤已完成:\n");
state.getSteps().stream()
.filter(s -> s.getStatus() == StepState.StepStatus.COMPLETED)
.forEach(s -> sb.append("- ").append(s.getStepName())
.append("(已完成)\n"));
}
return sb.toString();
}
private AgentTaskResult buildFinalResult(AgentTaskState state, AgentTaskDefinition taskDef) {
return AgentTaskResult.builder()
.taskId(state.getTaskId())
.status("COMPLETED")
.completedSteps(state.getSteps().size())
.outputRefs(state.getStepOutputRefs())
.build();
}
}五、带 Checkpoint 的任务执行流程
六、任务恢复的 API 设计
给调用方一个清晰的恢复接口:
@RestController
@RequestMapping("/api/agent/tasks")
@Slf4j
public class AgentTaskController {
@Autowired
private CheckpointableAgentExecutor executor;
@Autowired
private AgentStateStore stateStore;
/**
* 提交新任务
*/
@PostMapping
public ResponseEntity<TaskSubmitResponse> submitTask(
@RequestBody AgentTaskRequest request) {
String taskId = "task_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8);
// 异步执行,立即返回 taskId
CompletableFuture.runAsync(() -> {
try {
executor.execute(taskId, request.toTaskDefinition(), false);
} catch (Exception e) {
log.error("任务执行失败,taskId={}", taskId, e);
}
});
return ResponseEntity.accepted().body(
TaskSubmitResponse.builder()
.taskId(taskId)
.message("任务已提交,可使用 taskId 查询进度或恢复执行")
.build()
);
}
/**
* 查询任务状态
*/
@GetMapping("/{taskId}/status")
public ResponseEntity<TaskStatusResponse> getStatus(@PathVariable String taskId) {
return stateStore.loadState(taskId)
.map(state -> ResponseEntity.ok(TaskStatusResponse.builder()
.taskId(taskId)
.status(state.getStatus().name())
.currentStepIndex(state.getCurrentStepIndex())
.totalSteps(state.getSteps().size())
.completedSteps((int) state.getSteps().stream()
.filter(s -> s.getStatus() == StepState.StepStatus.COMPLETED)
.count())
.lastUpdatedAt(state.getLastUpdatedAt())
.failureReason(state.getFailureReason())
.build()))
.orElse(ResponseEntity.notFound().build());
}
/**
* 恢复执行失败的任务
*/
@PostMapping("/{taskId}/resume")
public ResponseEntity<TaskSubmitResponse> resumeTask(
@PathVariable String taskId,
@RequestBody AgentTaskRequest request) {
Optional<AgentTaskState> existingState = stateStore.loadState(taskId);
if (existingState.isEmpty()) {
return ResponseEntity.notFound().build();
}
AgentTaskState state = existingState.get();
if (state.getStatus() != AgentTaskState.TaskStatus.FAILED) {
return ResponseEntity.badRequest().body(
TaskSubmitResponse.builder()
.message("只有 FAILED 状态的任务可以恢复执行,当前状态: " + state.getStatus())
.build()
);
}
// 重置失败步骤状态,准备重新执行
state.getSteps().stream()
.filter(s -> s.getStatus() == StepState.StepStatus.FAILED)
.forEach(s -> s.setStatus(StepState.StepStatus.PENDING));
state.setStatus(AgentTaskState.TaskStatus.RECOVERING);
state.setRetryCount(state.getRetryCount() + 1);
stateStore.saveState(state);
// 异步恢复执行
CompletableFuture.runAsync(() -> {
try {
executor.execute(taskId, request.toTaskDefinition(), false);
} catch (Exception e) {
log.error("任务恢复执行失败,taskId={}", taskId, e);
}
});
return ResponseEntity.accepted().body(
TaskSubmitResponse.builder()
.taskId(taskId)
.message(String.format("任务已恢复,从第%d步继续执行", state.getCurrentStepIndex()))
.build()
);
}
}七、几个实际经验
经验1:步骤划分要合理
步骤不是越细越好。太细的步骤会导致 Redis 里积累大量中间状态,而且 LLM 上下文恢复的开销也会增大。一般来说,一个步骤的执行时间在 30 秒到 3 分钟之间比较合适。
经验2:大数据输出要特殊处理
当步骤输出是几十 MB 的数据时,不要直接存 Redis(Redis 单个 value 建议不超过 5MB)。可以存到 S3 或本地文件系统,Redis 里只存文件路径(引用),读取时再按需加载。
经验3:LLM 对话历史要压缩
随着任务步骤增多,LLM 对话历史会越来越长。当历史超过一定长度时,需要做压缩(summarize),只保留关键信息,否则后续步骤的 Token 消耗会越来越高。
/**
* 压缩过长的对话历史
*/
private List<ChatMessage> compressHistoryIfNeeded(
List<ChatMessage> history,
int maxMessages) {
if (history.size() <= maxMessages) {
return history;
}
// 保留最近的 N 条,把更早的历史压缩成一条摘要
List<ChatMessage> recent = history.subList(history.size() - maxMessages / 2, history.size());
List<ChatMessage> toSummarize = history.subList(0, history.size() - maxMessages / 2);
String summary = summarizeHistory(toSummarize);
List<ChatMessage> compressed = new ArrayList<>();
compressed.add(new SystemMessage("以下是之前步骤的执行摘要:\n" + summary));
compressed.addAll(recent);
return compressed;
}经验4:幂等键和 Checkpoint 要配合
步骤恢复时,如果步骤内部有外部调用(API 调用、数据库写操作),同样需要幂等保护。否则步骤恢复时会重复执行已经成功的外部操作。这两篇文章的内容要组合起来用。
八、回到那个下午两点的告警
如果当时我们有这套 Checkpoint 机制:
- OOM 崩溃前,前 9 分钟完成的步骤状态都已经持久化到 Redis
- 重启后,任务自动检测到 Redis 中的已有状态
- 从第 10 分钟那个步骤重新开始,而不是从头
- 整个恢复过程可能只需要 2-3 分钟
大客户当天下午三点就能拿到报告,而不是等到傍晚六点。
长任务做状态持久化,不是"有时间再做"的优化项,而是生产系统必须具备的基础能力。 你不知道什么时候会发生 OOM、网络抖动、机器重启。当它发生时,你希望系统能优雅地恢复,而不是让用户从头等起。
