第2305篇:Plan-and-Execute Agent——复杂任务规划与执行分离的工程实现
第2305篇:Plan-and-Execute Agent——复杂任务规划与执行分离的工程实现
适读人群:AI Agent开发工程师、后端架构师 | 阅读时长:约18分钟 | 核心价值:掌握Plan-and-Execute模式的核心思想与工程落地,解决ReAct Agent在复杂任务上的先天缺陷
去年年底,我们接了一个需求:做一个AI助手,能帮销售团队自动生成完整的客户分析报告。任务链路很长——先查CRM拿客户基本信息,再查订单系统看历史购买记录,然后搜索网上的新闻看最近有没有重大事件,最后综合生成一份有结构的分析报告。
我们最初用的是ReAct模式——让LLM边想边做,一步一步往前走。结果发现一个严重问题:当步骤多到七八步时,LLM在第五步会"忘记"自己最初要做什么,开始东拉西扯,有时候甚至把自己绕进去了,最后生成的报告结构乱得一塌糊涂。
问题的本质是:ReAct没有把"想清楚要做什么"和"真正去做"分开。
这个问题催生了Plan-and-Execute架构。
ReAct的根本局限
ReAct(Reason + Act)让LLM在每一步既要观察当前状态、也要决定下一个动作。对于简单的三四步任务,这没问题。但对于需要十几步的复杂任务,有三个硬伤:
上下文污染:随着工具调用的结果不断追加到上下文中,LLM的注意力开始被大量中间结果分散,原始目标的权重在上下文里越来越低。
局部最优陷阱:每次只看"下一步做什么",容易陷入局部最优。比如发现某条搜索结果特别有趣,就一直沿着这条线挖,而忘了全局任务。
无法并行:因为每一步都依赖上一步的结果,天然是串行的,哪怕有些步骤完全可以并行执行。
Plan-and-Execute把这个过程劈成两刀:
Planner只负责规划,不关心执行细节。Executor只负责执行单个步骤,不考虑全局策略。职责分离带来的好处是每个角色都能做到专注。
核心数据结构设计
在动手写代码之前,要把数据结构想清楚。计划是整个系统的核心契约:
/**
* 执行计划——Planner的输出,Executor的输入
*/
public record ExecutionPlan(
String planId,
String originalGoal,
List<PlanStep> steps,
PlanStatus status,
Instant createdAt,
Instant updatedAt
) {
public enum PlanStatus {
CREATED, EXECUTING, REPLANNING, COMPLETED, FAILED
}
}
/**
* 计划中的单个步骤
*/
public record PlanStep(
int stepIndex,
String stepId,
String description, // 人类可读的步骤描述
String toolName, // 要使用的工具名称
Map<String, Object> toolParams, // 工具参数(可含引用前序步骤结果的占位符)
List<String> dependsOn, // 依赖的步骤ID(用于并行调度)
StepStatus status,
Object result,
String errorMessage
) {
public enum StepStatus {
PENDING, RUNNING, COMPLETED, FAILED, SKIPPED
}
public boolean canExecute(Set<String> completedStepIds) {
return dependsOn.isEmpty() || completedStepIds.containsAll(dependsOn);
}
}这个数据结构有几个关键设计:dependsOn字段支持DAG依赖,让并行执行成为可能;toolParams里支持占位符,如"${step_1.result.customerId}",后续执行时动态替换。
Planner的实现
Planner的职责是:给定用户目标,输出一个结构化的执行计划。
@Service
public class TaskPlanner {
private final ChatClient chatClient;
private final ObjectMapper objectMapper;
private final ToolRegistry toolRegistry;
private static final String PLANNER_SYSTEM_PROMPT = """
你是一个任务规划专家。你的工作是将复杂任务分解为一系列具体的、可执行的步骤。
规划原则:
1. 每个步骤必须对应一个可用工具的调用
2. 步骤之间的依赖关系要明确(哪些步骤必须在另一个步骤之后执行)
3. 没有依赖关系的步骤可以并行执行
4. 步骤粒度要适中:不能太粗(一个步骤不能做多件事),也不能太细
5. 参数中可以用 ${stepId.result.fieldName} 引用前序步骤的结果
可用工具列表:
%s
输出JSON格式的执行计划。
""";
public ExecutionPlan createPlan(String userGoal) {
String availableTools = toolRegistry.describeAllTools();
String planJson = chatClient.prompt()
.system(PLANNER_SYSTEM_PROMPT.formatted(availableTools))
.user("请为以下任务创建执行计划:\n" + userGoal)
.call()
.content();
return parsePlan(planJson, userGoal);
}
/**
* Re-planning:基于执行中间结果更新计划
* 当发现原计划中的假设不成立时调用
*/
public ExecutionPlan replan(ExecutionPlan originalPlan,
List<PlanStep> completedSteps,
String replanReason) {
String completedSummary = summarizeCompletedSteps(completedSteps);
String remainingSteps = describeRemainingSteps(originalPlan);
String updatedPlanJson = chatClient.prompt()
.system(PLANNER_SYSTEM_PROMPT.formatted(toolRegistry.describeAllTools()))
.user("""
原始目标:%s
已完成的步骤和结果:
%s
需要重新规划的原因:%s
原本剩余的步骤:
%s
请基于已完成的结果,重新规划剩余步骤。
""".formatted(
originalPlan.originalGoal(),
completedSummary,
replanReason,
remainingSteps
))
.call()
.content();
ExecutionPlan updatedPlan = parsePlan(updatedPlanJson, originalPlan.originalGoal());
// 合并已完成的步骤
List<PlanStep> allSteps = new ArrayList<>(completedSteps);
allSteps.addAll(updatedPlan.steps());
return new ExecutionPlan(
originalPlan.planId(),
originalPlan.originalGoal(),
allSteps,
PlanStatus.EXECUTING,
originalPlan.createdAt(),
Instant.now()
);
}
private ExecutionPlan parsePlan(String planJson, String originalGoal) {
try {
JsonNode root = objectMapper.readTree(extractJson(planJson));
List<PlanStep> steps = new ArrayList<>();
JsonNode stepsNode = root.get("steps");
for (int i = 0; i < stepsNode.size(); i++) {
JsonNode stepNode = stepsNode.get(i);
List<String> deps = new ArrayList<>();
if (stepNode.has("dependsOn")) {
stepNode.get("dependsOn").forEach(d -> deps.add(d.asText()));
}
steps.add(new PlanStep(
i,
stepNode.get("stepId").asText(),
stepNode.get("description").asText(),
stepNode.get("toolName").asText(),
objectMapper.convertValue(stepNode.get("params"), Map.class),
deps,
PlanStep.StepStatus.PENDING,
null,
null
));
}
return new ExecutionPlan(
UUID.randomUUID().toString(),
originalGoal,
steps,
ExecutionPlan.PlanStatus.CREATED,
Instant.now(),
Instant.now()
);
} catch (JsonProcessingException e) {
throw new PlanParseException("无法解析LLM返回的执行计划", e);
}
}
}Executor的实现:支持并行执行
Executor要做两件事:解析占位符(把${step_1.result.xxx}替换成真实值),以及并行调度没有依赖关系的步骤。
@Service
public class TaskExecutor {
private final ToolRegistry toolRegistry;
private final ExecutorService parallelExecutor;
private final PlaceholderResolver placeholderResolver;
public TaskExecutor(ToolRegistry toolRegistry) {
this.toolRegistry = toolRegistry;
this.parallelExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2
);
this.placeholderResolver = new PlaceholderResolver();
}
/**
* 执行整个计划,返回完成后的计划(含所有步骤结果)
*/
public ExecutionPlan executePlan(ExecutionPlan plan) {
Map<String, PlanStep> stepMap = new ConcurrentHashMap<>();
plan.steps().forEach(s -> stepMap.put(s.stepId(), s));
Set<String> completedStepIds = ConcurrentHashMap.newKeySet();
Set<String> failedStepIds = ConcurrentHashMap.newKeySet();
while (hasRemainingSteps(stepMap, completedStepIds, failedStepIds)) {
// 找出当前可以执行的步骤(依赖已全部完成)
List<PlanStep> executableSteps = stepMap.values().stream()
.filter(s -> s.status() == PlanStep.StepStatus.PENDING)
.filter(s -> s.canExecute(completedStepIds))
.toList();
if (executableSteps.isEmpty()) {
// 没有可执行步骤但还有PENDING步骤,说明依赖有问题
break;
}
// 并行执行所有可执行的步骤
List<CompletableFuture<PlanStep>> futures = executableSteps.stream()
.map(step -> CompletableFuture.supplyAsync(
() -> executeStep(step, stepMap),
parallelExecutor
))
.toList();
// 等待所有并行步骤完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
futures.forEach(f -> {
PlanStep completedStep = f.join();
stepMap.put(completedStep.stepId(), completedStep);
if (completedStep.status() == PlanStep.StepStatus.COMPLETED) {
completedStepIds.add(completedStep.stepId());
} else {
failedStepIds.add(completedStep.stepId());
}
});
}
PlanStatus finalStatus = failedStepIds.isEmpty()
? PlanStatus.COMPLETED
: PlanStatus.FAILED;
return new ExecutionPlan(
plan.planId(),
plan.originalGoal(),
new ArrayList<>(stepMap.values()),
finalStatus,
plan.createdAt(),
Instant.now()
);
}
private PlanStep executeStep(PlanStep step, Map<String, PlanStep> allSteps) {
try {
// 解析参数中的占位符
Map<String, Object> resolvedParams = placeholderResolver.resolve(
step.toolParams(), allSteps
);
// 调用工具
Tool tool = toolRegistry.getTool(step.toolName());
Object result = tool.execute(resolvedParams);
return new PlanStep(
step.stepIndex(), step.stepId(), step.description(),
step.toolName(), step.toolParams(), step.dependsOn(),
PlanStep.StepStatus.COMPLETED, result, null
);
} catch (Exception e) {
log.error("步骤执行失败: stepId={}, tool={}", step.stepId(), step.toolName(), e);
return new PlanStep(
step.stepIndex(), step.stepId(), step.description(),
step.toolName(), step.toolParams(), step.dependsOn(),
PlanStep.StepStatus.FAILED, null, e.getMessage()
);
}
}
}
/**
* 占位符解析器:把 ${stepId.result.field} 替换为实际值
*/
public class PlaceholderResolver {
private static final Pattern PLACEHOLDER_PATTERN =
Pattern.compile("\\$\\{([^}]+)\\}");
@SuppressWarnings("unchecked")
public Map<String, Object> resolve(Map<String, Object> params,
Map<String, PlanStep> completedSteps) {
Map<String, Object> resolved = new HashMap<>();
for (Map.Entry<String, Object> entry : params.entrySet()) {
resolved.put(entry.getKey(), resolveValue(entry.getValue(), completedSteps));
}
return resolved;
}
private Object resolveValue(Object value, Map<String, PlanStep> completedSteps) {
if (value instanceof String strValue) {
Matcher matcher = PLACEHOLDER_PATTERN.matcher(strValue);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
String placeholder = matcher.group(1); // e.g. "step_1.result.customerId"
String resolved = resolvePlaceholder(placeholder, completedSteps);
matcher.appendReplacement(sb, Matcher.quoteReplacement(resolved));
}
matcher.appendTail(sb);
return sb.toString();
}
return value;
}
private String resolvePlaceholder(String placeholder, Map<String, PlanStep> steps) {
String[] parts = placeholder.split("\\.", 3);
if (parts.length < 2) return "${" + placeholder + "}";
String stepId = parts[0];
PlanStep step = steps.get(stepId);
if (step == null || step.result() == null) {
throw new PlaceholderResolutionException(
"引用了未完成或不存在的步骤: " + stepId
);
}
// 简单点:把result转成Map然后按路径取值
Object result = step.result();
if (parts.length == 2 && parts[1].equals("result")) {
return String.valueOf(result);
}
// 深路径取值
return extractNestedValue(result, Arrays.copyOfRange(parts, 2, parts.length));
}
}完整的Plan-and-Execute协调器
把Planner和Executor串起来,加上Re-planning逻辑:
@Service
public class PlanAndExecuteAgent {
private final TaskPlanner planner;
private final TaskExecutor executor;
private final ResultSynthesizer synthesizer;
private final ReplanningDecider replanningDecider;
private static final int MAX_REPLAN_ATTEMPTS = 3;
public AgentResult execute(String userGoal) {
log.info("开始Plan-and-Execute任务: {}", userGoal);
// 第一步:规划
ExecutionPlan plan = planner.createPlan(userGoal);
log.info("初始计划生成完毕,共{}个步骤", plan.steps().size());
int replanCount = 0;
while (replanCount <= MAX_REPLAN_ATTEMPTS) {
// 第二步:执行
ExecutionPlan executedPlan = executor.executePlan(plan);
// 检查是否需要Re-planning
ReplanningDecision decision = replanningDecider.evaluate(executedPlan);
if (!decision.needsReplanning()) {
// 执行完成,合成最终结果
String finalResult = synthesizer.synthesize(userGoal, executedPlan);
return AgentResult.success(finalResult, executedPlan);
}
if (replanCount >= MAX_REPLAN_ATTEMPTS) {
// 达到最大重新规划次数
String partialResult = synthesizer.synthesizePartial(userGoal, executedPlan);
return AgentResult.partial(partialResult, executedPlan, decision.reason());
}
// Re-planning
log.warn("触发Re-planning: {}", decision.reason());
List<PlanStep> completedSteps = executedPlan.steps().stream()
.filter(s -> s.status() == PlanStep.StepStatus.COMPLETED)
.toList();
plan = planner.replan(executedPlan, completedSteps, decision.reason());
replanCount++;
}
return AgentResult.failed("超过最大重新规划次数", plan);
}
}
/**
* 判断是否需要Re-planning的决策器
*/
@Component
public class ReplanningDecider {
private final ChatClient chatClient;
public ReplanningDecision evaluate(ExecutionPlan executedPlan) {
long failedSteps = executedPlan.steps().stream()
.filter(s -> s.status() == PlanStep.StepStatus.FAILED)
.count();
// 有步骤失败,需要重新规划
if (failedSteps > 0) {
String failedSummary = executedPlan.steps().stream()
.filter(s -> s.status() == PlanStep.StepStatus.FAILED)
.map(s -> s.description() + ": " + s.errorMessage())
.collect(Collectors.joining("\n"));
return new ReplanningDecision(true,
"以下步骤执行失败,需要替代方案:\n" + failedSummary);
}
// 询问LLM:基于执行结果,是否需要调整后续计划?
// 这里可以做一个轻量判断,避免每次都调用LLM
boolean hasPendingSteps = executedPlan.steps().stream()
.anyMatch(s -> s.status() == PlanStep.StepStatus.PENDING);
if (hasPendingSteps) {
// 还有待执行步骤,继续执行(不是Re-planning的触发条件)
return new ReplanningDecision(false, null);
}
// 全部完成,不需要Re-planning
return new ReplanningDecision(false, null);
}
}与ReAct的性能对比
在我们的客户分析报告场景中,两种方式的对比数据很说明问题:
| 指标 | ReAct | Plan-and-Execute |
|---|---|---|
| 平均完成时间(12步任务) | 145秒 | 67秒 |
| 任务完成率 | 71% | 94% |
| LLM调用次数 | ~15次 | ~4次(1次规划+N次执行+1次合成) |
| 可解释性 | 低(过程隐含) | 高(计划可审查) |
时间缩短主要来自并行执行:原本串行的12步,有6步可以并行,直接省掉了一半时间。
适用场景与局限
Plan-and-Execute特别适合:目标明确、步骤可预见、需要并行的复杂任务。
但它有个天然局限:计划是在信息不完整时制定的。当执行过程中发现了计划制定时未知的信息,必须依赖Re-planning。而Re-planning本身也有成本。
对于高度探索性的任务("帮我研究一下这个问题"——不知道要走多远),ReAct仍然更合适。实际上很多复杂系统会把两者结合:用Plan-and-Execute处理结构化子任务,用ReAct处理需要动态探索的部分。
