第2140篇:多Agent协作系统设计——工程师视角的Agent编排与通信
第2140篇:多Agent协作系统设计——工程师视角的Agent编排与通信
适读人群:构建复杂AI工作流的工程师 | 阅读时长:约20分钟 | 核心价值:理解多Agent系统的架构模式,掌握Agent间通信、任务分发和错误处理的工程实现
去年参与了一个AI自动化项目:给一家电商公司做"智能运营助手",要能自动分析销售数据、生成报告、识别异常、发送告警、更新数据库。
最初我们把这些功能全塞进一个Agent,写了一个巨大的System Prompt,列了几十条指令。结果是:任务复杂了之后,这个Agent经常漏掉步骤、搞混状态、在不该做决策的地方做决策。
重构方案:拆成多个专职Agent,每个Agent只做一件事,用编排层(Orchestrator)协调它们。拆完之后,每个Agent的行为变得可预测,整个系统的可靠性大幅提升。
这篇文章讲多Agent系统的工程设计——不是AI理论,是真实落地的实现思路。
为什么需要多Agent
/**
* 单Agent vs 多Agent的权衡
*
* ===== 单Agent的局限 =====
*
* 1. 上下文窗口有限
* 一个复杂任务可能需要很多信息,
* 全塞进单个Agent的上下文,容易超出限制
*
* 2. 注意力分散
* Agent要同时处理十几个不同的任务类型,
* 对每个任务的"专注度"都下降了
* (研究表明:指令超过20条后,遗忘率显著上升)
*
* 3. 技能不匹配
* 数据分析和文案撰写是两种完全不同的能力,
* 用同一个Prompt要同时做好两件事很难
*
* 4. 故障影响范围大
* 一个步骤失败可能导致整个流程失败
*
* ===== 多Agent的优势 =====
*
* 1. 专职专能
* 每个Agent专注一个领域,Prompt简单精准
*
* 2. 并行执行
* 无依赖的任务可以由多个Agent并行处理
*
* 3. 独立错误处理
* 一个Agent失败,其他Agent可继续工作
*
* 4. 可单独优化
* 可以独立调优每个Agent,不影响整体
*
* ===== 架构模式 =====
*
* 模式一:主从模式(Orchestrator-Worker)
* - 一个编排Agent,多个执行Agent
* - 编排Agent分解任务、分配工作、汇总结果
* - 适用于:有明确主控逻辑的任务
*
* 模式二:管道模式(Pipeline)
* - Agent串行处理,前一个的输出是后一个的输入
* - 适用于:有明确处理顺序的数据流
*
* 模式三:协商模式(Debate/Consensus)
* - 多个Agent独立分析同一个问题
* - 用投票或协商得出最终答案
* - 适用于:需要高可靠性的关键决策
*/Agent接口设计
/**
* Agent的基础接口
*
* 每个Agent都实现这个接口,保证可组合性
*/
public interface Agent {
/**
* Agent的唯一标识
*/
String getAgentId();
/**
* Agent的能力描述(供编排层理解)
*/
AgentCapability getCapability();
/**
* 执行任务
*
* @param task 输入任务
* @return 执行结果
*/
AgentResult execute(AgentTask task);
/**
* 检查Agent是否可以处理给定类型的任务
*/
default boolean canHandle(String taskType) {
return getCapability().supportedTaskTypes().contains(taskType);
}
}
/**
* Agent基础实现
*
* 提供通用的日志、错误处理、超时控制等
*/
@Slf4j
public abstract class BaseAgent implements Agent {
protected final ChatLanguageModel llm;
// 任务执行超时(子类可覆盖)
protected Duration executionTimeout = Duration.ofSeconds(60);
protected BaseAgent(ChatLanguageModel llm) {
this.llm = llm;
}
@Override
public AgentResult execute(AgentTask task) {
long startMs = System.currentTimeMillis();
log.info("Agent开始执行: agentId={}, taskId={}, taskType={}",
getAgentId(), task.taskId(), task.taskType());
try {
// 超时控制
AgentResult result = executeWithTimeout(task);
long latencyMs = System.currentTimeMillis() - startMs;
log.info("Agent执行完成: agentId={}, taskId={}, latencyMs={}",
getAgentId(), task.taskId(), latencyMs);
return result;
} catch (TimeoutException e) {
log.error("Agent执行超时: agentId={}, taskId={}", getAgentId(), task.taskId());
return AgentResult.failure(task.taskId(), "执行超时");
} catch (Exception e) {
log.error("Agent执行异常: agentId={}, taskId={}",
getAgentId(), task.taskId(), e);
return AgentResult.failure(task.taskId(), e.getMessage());
}
}
/**
* 子类实现具体执行逻辑
*/
protected abstract AgentResult executeInternal(AgentTask task);
private AgentResult executeWithTimeout(AgentTask task) throws TimeoutException {
CompletableFuture<AgentResult> future =
CompletableFuture.supplyAsync(() -> executeInternal(task));
try {
return future.get(executionTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (java.util.concurrent.TimeoutException e) {
future.cancel(true);
throw new TimeoutException("超过超时限制: " + executionTimeout.toSeconds() + "秒");
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("执行被中断");
}
}
}
/**
* Agent任务
*/
public record AgentTask(
String taskId,
String taskType,
String instruction, // 给Agent的指令
Map<String, Object> inputs, // 结构化输入
String context, // 上下文信息
Map<String, Object> metadata // 附加元数据
) {
public static AgentTask create(String type, String instruction, Map<String, Object> inputs) {
return new AgentTask(
UUID.randomUUID().toString(), type, instruction, inputs, null, Map.of()
);
}
}
/**
* Agent执行结果
*/
public record AgentResult(
String taskId,
boolean success,
String output, // 文本输出
Map<String, Object> data, // 结构化数据输出
String errorMessage,
long processingMs
) {
public static AgentResult success(String taskId, String output, Map<String, Object> data) {
return new AgentResult(taskId, true, output, data, null, 0);
}
public static AgentResult failure(String taskId, String error) {
return new AgentResult(taskId, false, null, Map.of(), error, 0);
}
}
public record AgentCapability(
String description,
List<String> supportedTaskTypes,
List<String> requiredInputs,
List<String> outputFields
) {}专职Agent实现示例
/**
* 数据分析Agent
*
* 只负责一件事:分析数据,给出结论和洞察
*/
@Component
@Slf4j
public class DataAnalysisAgent extends BaseAgent {
public DataAnalysisAgent(ChatLanguageModel llm) {
super(llm);
this.executionTimeout = Duration.ofSeconds(90); // 分析任务可能慢一些
}
@Override
public String getAgentId() { return "data-analysis-agent"; }
@Override
public AgentCapability getCapability() {
return new AgentCapability(
"分析数值数据,识别趋势、异常和业务洞察",
List.of("ANALYZE_SALES", "ANALYZE_METRICS", "DETECT_ANOMALY"),
List.of("data", "analysisType"),
List.of("summary", "insights", "anomalies", "recommendations")
);
}
@Override
protected AgentResult executeInternal(AgentTask task) {
String dataJson = task.inputs().getOrDefault("data", "{}").toString();
String analysisType = task.inputs().getOrDefault("analysisType", "general").toString();
String prompt = buildAnalysisPrompt(dataJson, analysisType, task.context());
String rawResponse = llm.generate(prompt);
// 解析结构化输出
try {
Map<String, Object> analysisResult = parseAnalysisResult(rawResponse);
return AgentResult.success(task.taskId(), rawResponse, analysisResult);
} catch (Exception e) {
log.warn("结果解析失败,返回原始文本: {}", e.getMessage());
return AgentResult.success(task.taskId(), rawResponse, Map.of());
}
}
private String buildAnalysisPrompt(String data, String analysisType, String context) {
return """
你是一个数据分析专家,专注于%s分析。
%s
数据:
%s
请分析这些数据,提供:
1. 核心摘要(2-3句话)
2. 关键洞察(列表,3-5条)
3. 异常情况(如果有)
4. 建议行动(1-3条)
返回JSON:
{
"summary": "摘要",
"insights": ["洞察1", "洞察2"],
"anomalies": ["异常1"],
"recommendations": ["建议1"]
}
只返回JSON。
""".formatted(analysisType, context != null ? "背景:" + context : "", data);
}
private Map<String, Object> parseAnalysisResult(String response) throws Exception {
int start = response.indexOf('{'); int end = response.lastIndexOf('}');
String json = (start >= 0 && end > start) ? response.substring(start, end + 1) : response;
return new ObjectMapper().readValue(json, new TypeReference<Map<String, Object>>() {});
}
}
/**
* 报告生成Agent
*
* 只负责:把分析结果和数据组织成可读的报告
*/
@Component
@Slf4j
public class ReportGenerationAgent extends BaseAgent {
public ReportGenerationAgent(ChatLanguageModel llm) { super(llm); }
@Override
public String getAgentId() { return "report-generation-agent"; }
@Override
public AgentCapability getCapability() {
return new AgentCapability(
"将分析结果整合为结构化的业务报告",
List.of("GENERATE_REPORT", "GENERATE_SUMMARY"),
List.of("analysisData", "reportType", "audience"),
List.of("reportTitle", "reportContent", "keyPoints")
);
}
@Override
protected AgentResult executeInternal(AgentTask task) {
String analysisData = task.inputs().getOrDefault("analysisData", "").toString();
String reportType = task.inputs().getOrDefault("reportType", "weekly").toString();
String audience = task.inputs().getOrDefault("audience", "management").toString();
String prompt = """
你是一个专业的商业报告撰写助手。
报告类型:%s
目标读者:%s
基于以下分析结果,生成一份专业报告:
%s
报告要求:
- 语言专业、简洁,适合%s阅读
- 重点突出,避免冗余
- 包含执行摘要、详细发现、建议措施
直接输出报告内容,使用Markdown格式。
""".formatted(reportType, audience, analysisData, audience);
String report = llm.generate(prompt);
// 提取报告标题(第一行H1)
String title = Arrays.stream(report.split("\n"))
.filter(line -> line.startsWith("# "))
.map(line -> line.substring(2).trim())
.findFirst()
.orElse("运营报告");
return AgentResult.success(task.taskId(), report,
Map.of("reportTitle", title, "reportContent", report));
}
}编排层实现
/**
* Agent编排器
*
* 负责:
* 1. 任务分解(把用户意图拆成多个子任务)
* 2. Agent路由(选择合适的Agent执行每个子任务)
* 3. 并行执行(无依赖的任务并行跑)
* 4. 结果聚合(把多个Agent的结果合并)
* 5. 错误处理(某个Agent失败时的降级策略)
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class AgentOrchestrator {
private final List<Agent> agents;
private final ChatLanguageModel llm;
// 最大并行Agent数
private static final int MAX_PARALLEL_AGENTS = 5;
private final ExecutorService parallelExecutor =
Executors.newFixedThreadPool(MAX_PARALLEL_AGENTS);
/**
* 执行复杂任务
*
* 自动分解任务、并行执行、聚合结果
*/
public OrchestratorResult execute(String userIntent, Map<String, Object> context) {
log.info("编排器开始: intent={}", userIntent.substring(0, Math.min(100, userIntent.length())));
// 1. 分解任务
List<PlannedTask> plan = planTasks(userIntent, context);
log.info("任务规划完成: tasks={}", plan.size());
// 2. 按依赖关系分组,同组的任务可以并行
List<List<PlannedTask>> taskGroups = groupByDependency(plan);
// 3. 按组顺序执行(组内并行,组间串行)
Map<String, AgentResult> allResults = new LinkedHashMap<>();
for (List<PlannedTask> group : taskGroups) {
Map<String, AgentResult> groupResults = executeTaskGroup(group, allResults);
allResults.putAll(groupResults);
}
// 4. 聚合最终结果
return aggregateResults(userIntent, allResults);
}
/**
* 任务规划
*
* 用LLM理解用户意图,生成任务执行计划
*/
private List<PlannedTask> planTasks(String userIntent, Map<String, Object> context) {
// 构建可用Agent能力的描述
String agentCapabilities = agents.stream()
.map(a -> "- " + a.getAgentId() + ": " + a.getCapability().description() +
"\n 支持任务类型: " + a.getCapability().supportedTaskTypes())
.collect(Collectors.joining("\n"));
String planPrompt = """
你是任务规划助手。根据用户意图和可用Agent,制定执行计划。
可用Agent:
%s
上下文信息:
%s
用户意图:%s
请拆分成子任务,返回JSON数组:
[
{
"taskId": "t1",
"agentId": "对应的agentId",
"taskType": "对应的任务类型",
"instruction": "具体指令",
"dependsOn": [],
"inputs": {}
}
]
注意:
- dependsOn填写前置任务的taskId
- 没有依赖的任务可以并行执行
- 只使用上面列出的agentId和taskType
只返回JSON数组。
""".formatted(agentCapabilities, context.toString(), userIntent);
try {
String response = llm.generate(planPrompt);
String json = extractJsonArray(response);
return new ObjectMapper().readValue(json, new TypeReference<List<PlannedTask>>() {});
} catch (Exception e) {
log.error("任务规划失败: {}", e.getMessage(), e);
throw new RuntimeException("任务规划失败: " + e.getMessage());
}
}
/**
* 按依赖关系分组
*
* 没有互相依赖的任务可以放在同一组并行执行
*/
private List<List<PlannedTask>> groupByDependency(List<PlannedTask> tasks) {
List<List<PlannedTask>> groups = new ArrayList<>();
Set<String> completedTaskIds = new HashSet<>();
List<PlannedTask> remaining = new ArrayList<>(tasks);
while (!remaining.isEmpty()) {
// 找出所有前置任务已完成的任务
List<PlannedTask> ready = remaining.stream()
.filter(t -> completedTaskIds.containsAll(t.dependsOn()))
.toList();
if (ready.isEmpty()) {
log.error("任务规划存在循环依赖或无法满足的依赖");
// 强制执行剩余任务
groups.add(new ArrayList<>(remaining));
break;
}
groups.add(ready);
ready.forEach(t -> completedTaskIds.add(t.taskId()));
remaining.removeAll(ready);
}
return groups;
}
/**
* 并行执行一组任务
*/
private Map<String, AgentResult> executeTaskGroup(
List<PlannedTask> group, Map<String, AgentResult> previousResults) {
// 构建每个任务的上下文(注入前置任务的结果)
List<CompletableFuture<Map.Entry<String, AgentResult>>> futures = group.stream()
.map(planned -> CompletableFuture.supplyAsync(() -> {
Agent agent = findAgent(planned.agentId());
if (agent == null) {
log.error("找不到Agent: {}", planned.agentId());
return Map.entry(planned.taskId(),
AgentResult.failure(planned.taskId(), "Agent不存在: " + planned.agentId()));
}
// 把依赖任务的结果注入到当前任务的上下文
String dependencyContext = buildDependencyContext(planned.dependsOn(), previousResults);
AgentTask task = new AgentTask(
planned.taskId(), planned.taskType(), planned.instruction(),
planned.inputs(), dependencyContext, Map.of()
);
AgentResult result = agent.execute(task);
return Map.entry(planned.taskId(), result);
}, parallelExecutor))
.toList();
// 等待所有任务完成
return futures.stream()
.map(f -> {
try {
return f.get(120, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("任务执行失败: {}", e.getMessage());
return Map.entry("unknown", AgentResult.failure("unknown", e.getMessage()));
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
/**
* 聚合所有Agent的结果,生成最终输出
*/
private OrchestratorResult aggregateResults(
String userIntent, Map<String, AgentResult> allResults) {
long successCount = allResults.values().stream().filter(AgentResult::success).count();
long failCount = allResults.size() - successCount;
// 收集所有成功的文本输出
String combinedOutput = allResults.values().stream()
.filter(AgentResult::success)
.map(AgentResult::output)
.filter(o -> o != null && !o.isBlank())
.collect(Collectors.joining("\n\n---\n\n"));
// 如果有多个输出,用LLM整合成统一的回答
String finalOutput;
if (allResults.size() > 1 && !combinedOutput.isBlank()) {
finalOutput = synthesizeOutput(userIntent, combinedOutput);
} else {
finalOutput = combinedOutput;
}
return new OrchestratorResult(
finalOutput, allResults, successCount, failCount
);
}
private String synthesizeOutput(String userIntent, String combinedOutput) {
String prompt = """
用户的原始需求:%s
以下是多个专职Agent的处理结果:
%s
请整合以上结果,给出统一、连贯的最终回答,避免重复,突出重点。
""".formatted(userIntent, truncate(combinedOutput, 3000));
try {
return llm.generate(prompt);
} catch (Exception e) {
return combinedOutput; // 整合失败时返回原始结果
}
}
private Agent findAgent(String agentId) {
return agents.stream()
.filter(a -> a.getAgentId().equals(agentId))
.findFirst()
.orElse(null);
}
private String buildDependencyContext(List<String> dependsOn,
Map<String, AgentResult> results) {
if (dependsOn.isEmpty()) return null;
return dependsOn.stream()
.filter(results::containsKey)
.map(id -> {
AgentResult r = results.get(id);
return r.success() ? "任务[" + id + "]结果:\n" + r.output() : null;
})
.filter(s -> s != null)
.collect(Collectors.joining("\n\n"));
}
private String extractJsonArray(String s) {
int start = s.indexOf('['); int end = s.lastIndexOf(']');
return (start >= 0 && end > start) ? s.substring(start, end + 1) : "[]";
}
private String truncate(String s, int maxLen) {
return s.length() > maxLen ? s.substring(0, maxLen) + "..." : s;
}
public record PlannedTask(String taskId, String agentId, String taskType,
String instruction, List<String> dependsOn,
Map<String, Object> inputs) {}
public record OrchestratorResult(String finalOutput, Map<String, AgentResult> taskResults,
long successCount, long failCount) {
public boolean isFullSuccess() { return failCount == 0; }
}
}实践建议
Agent粒度要恰到好处,不要过细也不要过粗
过细:每个LLM调用都拆成一个独立Agent,导致编排层极其复杂,系统整体变慢(每次Agent调用都有通信开销)。过粗:一个Agent承担太多职责,退化回单Agent的问题。经验法则:当一个Agent的System Prompt超过600 tokens,或者要处理超过5种不同类型的任务,就该考虑拆分了。好的Agent粒度的标志:你能用一句话准确描述它做什么("分析销售数据找异常"而不是"处理所有数据相关的事")。
任务规划本身也是LLM调用,要做好错误处理
用LLM做任务规划(把用户意图转换为任务DAG)这一步经常出问题:返回的JSON格式错误、引用了不存在的agentId、产生了循环依赖。这些都要有对应的fallback:JSON解析失败时尝试修复;引用了无效agentId时用最接近的Agent替代;循环依赖时打破循环强制串行。任务规划层的健壮性,直接决定了整个多Agent系统的可靠性。
并行执行要配置合理的超时和最大并发
我们在生产环境踩过一个坑:某次任务规划出了20个并行任务,同时发起了20个LLM调用,触发了速率限制,整个流程失败。修复:限制同时运行的最大Agent数(我们设了5),超出的任务等待前面的完成后再执行。同时每个Agent设置独立超时,防止一个慢Agent阻塞整个流程。并行不是越多越好,要和你的LLM配额上限匹配。
