第2091篇:多Agent协作模式——从单一LLM到智能工作流的架构演进
2026/4/30大约 10 分钟
第2091篇:多Agent协作模式——从单一LLM到智能工作流的架构演进
适读人群:正在构建复杂AI自动化工作流的工程师 | 阅读时长:约20分钟 | 核心价值:掌握多Agent协作的核心模式(顺序、并行、监督者、辩论),用Java实现生产级的Agent编排框架
有一类需求,单个LLM搞不定:需要做调研,写代码,测试,审查,最后生成报告——这中间涉及多个步骤,每步都需要不同的能力,而且后一步依赖前一步的输出。
这就是多Agent系统存在的意义。这篇文章把几个核心的Agent协作模式用Java实现出来。
为什么单Agent不够
/**
* 单Agent的限制
*
* 限制1:上下文长度
* 一个需要处理大量文档的任务,所有内容塞进一个prompt会超出限制
* 多个Agent分工处理,每个只需要看相关的部分
*
* 限制2:能力专一化
* "分析代码然后生成测试再做代码审查"这种复合任务
* 让不同Agent专注于各自最擅长的步骤,比一个大而全的Agent更好
*
* 限制3:并行处理
* 独立的子任务可以并行执行,比顺序执行快
*
* 限制4:质量检验
* 一个Agent的输出可以由另一个来评审和校正
*/基础Agent抽象
/**
* Agent基类
* 每个Agent有自己的角色和工具
*/
public interface Agent {
/**
* Agent名称(唯一标识)
*/
String getName();
/**
* Agent的角色描述(用于构建System Prompt)
*/
String getRole();
/**
* 执行任务
*
* @param task 任务描述
* @param context 上下文(前序Agent的输出、共享状态等)
* @return 执行结果
*/
AgentResult execute(String task, AgentContext context);
}
/**
* Agent上下文:在多个Agent之间传递的状态
*/
@Data
public class AgentContext {
// 整体任务目标
private String overallGoal;
// 各Agent的执行结果历史
private List<AgentResult> executionHistory = new ArrayList<>();
// 共享变量(Agent之间可以读写)
private Map<String, Object> sharedVariables = new HashMap<>();
// 对话历史(用于有记忆的Agent)
private List<ChatMessage> conversationHistory = new ArrayList<>();
/**
* 获取指定Agent的最新输出
*/
public Optional<AgentResult> getLastResultOf(String agentName) {
return executionHistory.stream()
.filter(r -> agentName.equals(r.getAgentName()))
.reduce((first, second) -> second);
}
/**
* 获取所有成功执行的结果摘要
*/
public String getSummary() {
return executionHistory.stream()
.filter(r -> r.getStatus() == AgentResult.Status.SUCCESS)
.map(r -> "【" + r.getAgentName() + "】" + r.getSummary())
.collect(Collectors.joining("\n"));
}
}
/**
* Agent执行结果
*/
@Data @Builder
public class AgentResult {
private String agentName;
private Status status;
private String output; // 完整输出
private String summary; // 摘要(给下一个Agent看的)
private Map<String, Object> metadata;
private String errorMessage;
private long elapsedMs;
public enum Status { SUCCESS, FAILED, SKIPPED }
public static AgentResult success(String agentName, String output, String summary) {
return AgentResult.builder()
.agentName(agentName)
.status(Status.SUCCESS)
.output(output)
.summary(summary)
.build();
}
public static AgentResult failure(String agentName, String error) {
return AgentResult.builder()
.agentName(agentName)
.status(Status.FAILED)
.errorMessage(error)
.build();
}
}模式一:顺序链(Pipeline)
/**
* 顺序执行链
* 每个Agent的输出作为下一个Agent的输入
*
* 适用场景:有明确先后依赖的任务流程
* 例:研究 → 大纲 → 写作 → 编辑
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SequentialAgentPipeline {
private final List<Agent> agents;
/**
* 顺序执行所有Agent
* 任意一个失败时,根据策略决定是否继续
*/
public AgentContext execute(String goal, FailureStrategy strategy) {
AgentContext context = new AgentContext();
context.setOverallGoal(goal);
log.info("开始顺序Agent执行: goal='{}', agents={}",
goal.substring(0, Math.min(50, goal.length())),
agents.stream().map(Agent::getName).toList());
String currentTask = goal;
for (Agent agent : agents) {
log.info("执行Agent: {}", agent.getName());
long start = System.currentTimeMillis();
try {
AgentResult result = agent.execute(currentTask, context);
result.setElapsedMs(System.currentTimeMillis() - start);
context.getExecutionHistory().add(result);
if (result.getStatus() == AgentResult.Status.SUCCESS) {
// 下一个Agent的任务 = 当前Agent的输出(可以自定义传递逻辑)
currentTask = result.getOutput();
log.info("Agent {} 完成: summary={}", agent.getName(),
result.getSummary());
} else {
log.error("Agent {} 失败: {}", agent.getName(), result.getErrorMessage());
switch (strategy) {
case STOP_ON_FAILURE -> {
log.info("失败后停止策略,终止执行");
return context;
}
case SKIP_AND_CONTINUE -> {
log.info("跳过失败Agent,继续执行");
// currentTask 保持不变,下个Agent使用上个成功的输出
}
case RETRY_ONCE -> {
log.info("重试一次");
AgentResult retry = agent.execute(currentTask, context);
retry.setElapsedMs(System.currentTimeMillis() - start);
context.getExecutionHistory().add(retry);
if (retry.getStatus() == AgentResult.Status.SUCCESS) {
currentTask = retry.getOutput();
}
}
}
}
} catch (Exception e) {
log.error("Agent {} 异常: {}", agent.getName(), e.getMessage());
context.getExecutionHistory().add(
AgentResult.failure(agent.getName(), e.getMessage()));
if (strategy == FailureStrategy.STOP_ON_FAILURE) {
return context;
}
}
}
return context;
}
public enum FailureStrategy { STOP_ON_FAILURE, SKIP_AND_CONTINUE, RETRY_ONCE }
}模式二:并行执行
/**
* 并行执行多个Agent
* 独立的子任务同时处理,最后合并结果
*
* 适用场景:需要同时收集多方面信息
* 例:同时分析代码、查文档、搜示例,最后合并回答
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ParallelAgentExecutor {
private final ExecutorService agentExecutor =
Executors.newFixedThreadPool(8);
private final Agent mergeAgent; // 负责合并各Agent结果的Agent
/**
* 并行执行多组Agent,每组一个独立子任务
*/
public AgentResult executeParallel(
Map<String, String> agentSubTasks, // agentName → 子任务
Map<String, Agent> agentRegistry,
AgentContext context,
Duration timeout) {
log.info("并行执行 {} 个Agent", agentSubTasks.size());
// 提交所有任务
Map<String, CompletableFuture<AgentResult>> futures = new LinkedHashMap<>();
for (Map.Entry<String, String> entry : agentSubTasks.entrySet()) {
String agentName = entry.getKey();
String subTask = entry.getValue();
Agent agent = agentRegistry.get(agentName);
if (agent == null) {
log.warn("Agent未找到: {}", agentName);
continue;
}
CompletableFuture<AgentResult> future = CompletableFuture
.supplyAsync(() -> agent.execute(subTask, context), agentExecutor)
.orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.exceptionally(e -> {
log.error("Agent {} 执行超时或失败: {}", agentName, e.getMessage());
return AgentResult.failure(agentName, e.getMessage());
});
futures.put(agentName, future);
}
// 收集所有结果
List<AgentResult> results = new ArrayList<>();
for (Map.Entry<String, CompletableFuture<AgentResult>> entry : futures.entrySet()) {
try {
AgentResult result = entry.getValue().get();
results.add(result);
context.getExecutionHistory().add(result);
} catch (Exception e) {
log.error("获取Agent结果失败: {}", entry.getKey());
}
}
// 用mergeAgent合并结果
String mergeTask = buildMergeTask(results, context.getOverallGoal());
return mergeAgent.execute(mergeTask, context);
}
private String buildMergeTask(List<AgentResult> results, String goal) {
StringBuilder sb = new StringBuilder();
sb.append("请综合以下各方面的分析结果,形成对原始目标的完整回答:\n\n");
sb.append("原始目标:").append(goal).append("\n\n");
for (AgentResult result : results) {
if (result.getStatus() == AgentResult.Status.SUCCESS) {
sb.append("=== ").append(result.getAgentName()).append(" ===\n");
sb.append(result.getOutput()).append("\n\n");
}
}
return sb.toString();
}
}模式三:监督者-工作者(Supervisor-Worker)
/**
* 监督者-工作者模式
*
* Supervisor Agent负责:
* 1. 分解任务给不同的Worker
* 2. 评估Worker的输出质量
* 3. 决定是否需要重做
*
* 这是最灵活的协作模式,但也最消耗token
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SupervisorWorkerOrchestrator {
private final ChatLanguageModel llm;
private final Map<String, Agent> workers;
private final ObjectMapper objectMapper;
// 最大重试次数(防止无限循环)
private static final int MAX_ITERATIONS = 5;
/**
* 监督者驱动的任务执行
*/
public AgentContext execute(String goal) {
AgentContext context = new AgentContext();
context.setOverallGoal(goal);
for (int iteration = 0; iteration < MAX_ITERATIONS; iteration++) {
log.info("监督者迭代 {}/{}", iteration + 1, MAX_ITERATIONS);
// 1. 监督者分析当前状态,决定下一步
SupervisorDecision decision = supervisorDecide(goal, context);
log.info("监督者决策: action={}, worker={}, reason={}",
decision.action(), decision.targetWorker(), decision.reason());
if (decision.action() == SupervisorAction.COMPLETE) {
// 任务完成
log.info("监督者判定任务完成");
break;
}
if (decision.action() == SupervisorAction.ASSIGN_TASK) {
// 分配任务给指定Worker
String workerName = decision.targetWorker();
Agent worker = workers.get(workerName);
if (worker == null) {
log.warn("Worker不存在: {}", workerName);
continue;
}
AgentResult result = worker.execute(decision.taskForWorker(), context);
context.getExecutionHistory().add(result);
if (result.getStatus() == AgentResult.Status.FAILED) {
// 通知监督者,让它决定如何处理
context.getSharedVariables().put(
"lastError", "Worker " + workerName + " 失败: " + result.getErrorMessage());
}
}
if (decision.action() == SupervisorAction.REVISE) {
// 要求指定Worker修改输出
String workerName = decision.targetWorker();
Agent worker = workers.get(workerName);
if (worker != null) {
String revisionTask = "请修改你之前的输出:" + decision.revisionInstructions() +
"\n\n之前的输出:" + context.getLastResultOf(workerName)
.map(AgentResult::getOutput).orElse("(无)");
AgentResult revised = worker.execute(revisionTask, context);
context.getExecutionHistory().add(revised);
}
}
}
return context;
}
/**
* 监督者决策逻辑
* 使用LLM分析当前状态,决定下一步行动
*/
private SupervisorDecision supervisorDecide(String goal, AgentContext context) {
String availableWorkers = workers.entrySet().stream()
.map(e -> "- " + e.getKey() + ": " + e.getValue().getRole())
.collect(Collectors.joining("\n"));
String executionSummary = context.getExecutionHistory().isEmpty()
? "(尚未开始执行)"
: context.getSummary();
String prompt = String.format("""
你是一个任务协调员,负责分配任务给专业Agent并监督执行质量。
总体目标:%s
可用的工作Agent:
%s
已完成的执行摘要:
%s
请决定下一步行动:
1. ASSIGN_TASK: 分配新任务给某个Worker
2. REVISE: 要求某个Worker修改之前的输出
3. COMPLETE: 所有任务已完成,可以结束
输出JSON:
{
"action": "ASSIGN_TASK|REVISE|COMPLETE",
"targetWorker": "worker名称(COMPLETE时可为null)",
"taskForWorker": "分配的具体任务描述",
"revisionInstructions": "修改说明(REVISE时必填)",
"reason": "决策原因"
}
只输出JSON:
""", goal, availableWorkers, executionSummary);
try {
String response = llm.generate(prompt).trim();
String json = extractJson(response);
Map<String, Object> decision = objectMapper.readValue(json,
new TypeReference<>() {});
return new SupervisorDecision(
SupervisorAction.valueOf((String) decision.get("action")),
(String) decision.get("targetWorker"),
(String) decision.getOrDefault("taskForWorker", ""),
(String) decision.getOrDefault("revisionInstructions", ""),
(String) decision.getOrDefault("reason", "")
);
} catch (Exception e) {
log.error("监督者决策解析失败: {}", e.getMessage());
return new SupervisorDecision(SupervisorAction.COMPLETE, null, "", "", "解析失败,默认完成");
}
}
private String extractJson(String text) {
int start = text.indexOf('{');
int end = text.lastIndexOf('}');
return start >= 0 && end > start ? text.substring(start, end + 1) : "{}";
}
public enum SupervisorAction { ASSIGN_TASK, REVISE, COMPLETE }
public record SupervisorDecision(
SupervisorAction action, String targetWorker,
String taskForWorker, String revisionInstructions, String reason
) {}
}模式四:辩论模式(多Agent交叉验证)
/**
* 多Agent辩论模式
*
* 多个Agent对同一问题给出不同答案
* 让它们互相审查和批评,最终达成共识
*
* 适用场景:重要决策、需要高置信度的分析
* 代价:Token消耗较大,适合关键场景
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class DebateOrchestrator {
private final List<Agent> debaters; // 参与辩论的Agent
private final Agent judgeAgent; // 裁判Agent,做最终裁定
private static final int MAX_ROUNDS = 2; // 最多辩论轮数
/**
* 组织多Agent辩论
*/
public AgentResult debate(String question) {
AgentContext context = new AgentContext();
context.setOverallGoal(question);
// 第一轮:各Agent独立给出答案
Map<String, String> initialAnswers = new LinkedHashMap<>();
log.info("辩论第一轮:收集各Agent初始答案");
for (Agent debater : debaters) {
AgentResult result = debater.execute(question, context);
if (result.getStatus() == AgentResult.Status.SUCCESS) {
initialAnswers.put(debater.getName(), result.getOutput());
context.getExecutionHistory().add(result);
}
}
String currentAnswers = formatAnswers(initialAnswers);
// 后续轮:让每个Agent评审其他Agent的答案
for (int round = 1; round <= MAX_ROUNDS; round++) {
log.info("辩论第{}轮:交叉评审", round + 1);
Map<String, String> revisedAnswers = new LinkedHashMap<>();
Map<String, String> answersToReview = currentAnswers.isEmpty()
? initialAnswers : new LinkedHashMap<>();
for (Agent debater : debaters) {
String reviewTask = String.format("""
以下是对同一问题的不同回答,请审查这些回答,指出其中的错误或遗漏,
然后给出你修订后的最终答案(可以坚持原有观点,但需要说明理由)。
问题:%s
各方回答:
%s
你之前的答案:
%s
""",
question,
currentAnswers,
initialAnswers.getOrDefault(debater.getName(), "(未提供)")
);
AgentResult revised = debater.execute(reviewTask, context);
if (revised.getStatus() == AgentResult.Status.SUCCESS) {
revisedAnswers.put(debater.getName(), revised.getOutput());
context.getExecutionHistory().add(revised);
}
}
if (!revisedAnswers.isEmpty()) {
initialAnswers = revisedAnswers;
currentAnswers = formatAnswers(revisedAnswers);
}
}
// 最终裁定
log.info("辩论结束,裁判做最终裁定");
String judgeTask = String.format("""
以下是多个专家对同一问题的分析,请综合各方观点,给出最准确、最全面的答案。
问题:%s
各方的最终观点:
%s
请给出综合判断:
""", question, currentAnswers);
return judgeAgent.execute(judgeTask, context);
}
private String formatAnswers(Map<String, String> answers) {
return answers.entrySet().stream()
.map(e -> "=== " + e.getKey() + " ===\n" + e.getValue())
.collect(Collectors.joining("\n\n"));
}
}具体Agent实现示例
/**
* 研究员Agent:负责信息收集
*/
@Component("researcher")
@RequiredArgsConstructor
@Slf4j
public class ResearcherAgent implements Agent {
private final ChatLanguageModel llm;
private final EmbeddingStoreContentRetriever contentRetriever;
@Override
public String getName() { return "researcher"; }
@Override
public String getRole() { return "信息研究员,负责从知识库和网络收集相关资料"; }
@Override
public AgentResult execute(String task, AgentContext context) {
try {
// 从知识库检索相关内容
List<Content> retrievedContent = contentRetriever.retrieve(Query.from(task));
String knowledgeContext = retrievedContent.stream()
.map(c -> c.textSegment().text())
.collect(Collectors.joining("\n\n"));
String prompt = String.format("""
你是一个专业研究员,请基于以下资料对给定任务进行深度研究分析。
任务:%s
参考资料:
%s
如果参考资料不足,请基于你的知识补充,并明确标注哪些是你自己的推断。
""", task, knowledgeContext.isEmpty() ? "(无可用资料,请基于知识回答)" : knowledgeContext);
String output = llm.generate(SystemMessage.from(getRole()),
UserMessage.from(prompt))
.content().text();
// 生成摘要
String summary = output.substring(0, Math.min(150, output.length())) + "...";
return AgentResult.success(getName(), output, summary);
} catch (Exception e) {
return AgentResult.failure(getName(), e.getMessage());
}
}
}
/**
* 编写者Agent:负责内容生成
*/
@Component("writer")
@RequiredArgsConstructor
public class WriterAgent implements Agent {
private final ChatLanguageModel llm;
@Override
public String getName() { return "writer"; }
@Override
public String getRole() { return "内容编写者,将研究资料转化为结构化的可读内容"; }
@Override
public AgentResult execute(String task, AgentContext context) {
// 获取研究员的成果
String research = context.getLastResultOf("researcher")
.map(AgentResult::getOutput)
.orElse(task);
String output = llm.generate(String.format("""
基于以下研究内容,撰写一份清晰、结构化的报告。
研究内容:%s
要求:
1. 结构清晰(有标题、子标题)
2. 语言简洁易懂
3. 关键信息突出
""", research)).trim();
return AgentResult.success(getName(), output, "已完成内容撰写");
}
}多Agent系统的最大陷阱是复杂度失控。把一个可以用单Agent完成的任务拆成五个Agent,只会增加延迟和成本,不会提升质量。
使用多Agent的判断标准:
- 任务确实无法在单次调用中完成(上下文太长、需要并行等)
- 不同步骤需要真正不同的能力或工具
- 需要自我验证和纠错机制
否则,一个精心设计的单Agent + 好的prompt,往往比复杂的多Agent系统更可靠、更便宜。
