第2004篇:Multi-Agent协作系统——任务分解与Agent编排的工程实践
2026/4/30大约 6 分钟
第2004篇:Multi-Agent协作系统——任务分解与Agent编排的工程实践
适读人群:构建复杂AI工作流的工程师 | 阅读时长:约22分钟 | 核心价值:理解多Agent编排的设计模式,掌握任务分解与结果汇聚的工程实现
有一段时间我们在做一个自动化研究报告生成系统。
需求是这样的:输入一个行业名称(比如"新能源汽车"),自动生成一份包含市场规模分析、竞争格局、技术趋势、投资机会的行业研究报告。
最初我用单个Agent来做:一个Agent拿到任务,依次查市场数据、搜竞争对手信息、分析技术趋势……
做了才发现问题:单个Agent面对这么复杂的任务,上下文越来越长,后面的推理质量越来越差。而且所有任务串行执行,一份报告要跑20分钟。
解决方案是拆分成多个专业Agent并行工作,由一个Orchestrator协调。这就是多Agent系统。
多Agent的核心架构模式
关键设计决策:
- Orchestrator负责任务分解,不直接执行业务逻辑
- 专业Agent各自聚焦一个子任务,上下文短、质量高
- 并行执行:无依赖关系的子任务同时运行
- 结果聚合:把各个Agent的输出整合成完整结果
Agent之间的通信协议
多Agent系统里,Agent之间需要有标准化的消息格式:
/**
* Agent间消息的标准格式
*/
@Data
@Builder
public class AgentMessage {
private String messageId; // 消息唯一ID
private String fromAgent; // 发送方Agent名称
private String toAgent; // 接收方Agent名称(null=广播)
private String conversationId; // 对话/任务ID(同一任务的消息共享)
private MessageType type; // REQUEST/RESPONSE/ERROR/STATUS
private String taskDescription; // 任务描述(type=REQUEST时)
private Object payload; // 负载内容
private String errorMessage; // 错误信息(type=ERROR时)
private Map<String, Object> context; // 共享上下文(上游传递给下游的信息)
private LocalDateTime createdAt;
private Integer priority; // 0=普通, 1=高优先级
}
public enum MessageType {
REQUEST, // 请求某个Agent执行任务
RESPONSE, // 任务执行结果
ERROR, // 执行出错
STATUS // 状态更新(长任务时的中间通知)
}Orchestrator:任务分解引擎
@Service
@Slf4j
@RequiredArgsConstructor
public class OrchestratorAgent {
private final ChatClient plannerClient; // 专门用于规划的LLM
private final AgentRegistry agentRegistry;
private final AgentExecutor agentExecutor;
private final ObjectMapper objectMapper;
/**
* 接收高层任务,分解并编排执行
*/
public CompletableFuture<OrchestratorResult> orchestrate(
String taskDescription,
Map<String, Object> context) {
String taskId = UUID.randomUUID().toString();
log.info("开始编排任务: {}", taskDescription);
return CompletableFuture.supplyAsync(() -> {
// 1. 任务分解:让LLM把任务拆成子任务
List<SubTask> subTasks = decomposeTask(taskDescription, context);
log.info("任务分解为 {} 个子任务", subTasks.size());
// 2. 构建执行计划(解析依赖关系,确定并行组)
ExecutionPlan plan = buildExecutionPlan(subTasks);
// 3. 按阶段执行(同一阶段内并行,跨阶段串行)
Map<String, SubTaskResult> allResults = new LinkedHashMap<>();
for (List<SubTask> stage : plan.getStages()) {
Map<String, SubTaskResult> stageResults = executeStage(
stage, allResults, context, taskId
);
allResults.putAll(stageResults);
}
return OrchestratorResult.builder()
.taskId(taskId)
.subTaskResults(allResults)
.succeeded(allResults.values().stream().allMatch(SubTaskResult::isSucceeded))
.build();
});
}
private List<SubTask> decomposeTask(String task, Map<String, Object> context) {
// 获取所有可用Agent的描述
String agentDescriptions = agentRegistry.getAllAgents().stream()
.map(a -> "- " + a.getName() + ": " + a.getDescription())
.collect(Collectors.joining("\n"));
String decompositionPrompt = """
你是一个任务规划器。请将以下任务分解成可以并行执行的子任务。
任务:%s
可用的专业Agent:
%s
请返回JSON格式的子任务列表:
[
{
"task_id": "唯一ID",
"agent_name": "执行此任务的Agent名称",
"task_description": "具体任务描述",
"depends_on": ["前置任务的task_id列表,可为空"],
"output_key": "结果的键名"
}
]
规则:
1. 无依赖关系的任务可以并行执行
2. 每个子任务只能分配给一个Agent
3. task_id不能重复
""".formatted(task, agentDescriptions);
String response = plannerClient.prompt().user(decompositionPrompt).call().content();
return parseSubTasks(response);
}
private ExecutionPlan buildExecutionPlan(List<SubTask> subTasks) {
// 拓扑排序,确定执行阶段
// 没有依赖的任务放在第一阶段,依赖第一阶段的放第二阶段,以此类推
Map<String, SubTask> taskMap = subTasks.stream()
.collect(Collectors.toMap(SubTask::getTaskId, t -> t));
Map<String, Integer> stageMap = new HashMap<>();
for (SubTask task : subTasks) {
computeStage(task, taskMap, stageMap);
}
// 按阶段分组
int maxStage = stageMap.values().stream().mapToInt(i -> i).max().orElse(0);
List<List<SubTask>> stages = new ArrayList<>();
for (int i = 0; i <= maxStage; i++) {
final int stage = i;
List<SubTask> stageTaskList = subTasks.stream()
.filter(t -> stageMap.getOrDefault(t.getTaskId(), 0) == stage)
.collect(Collectors.toList());
stages.add(stageTaskList);
}
return new ExecutionPlan(stages);
}
private int computeStage(SubTask task, Map<String, SubTask> taskMap,
Map<String, Integer> stageMap) {
if (stageMap.containsKey(task.getTaskId())) {
return stageMap.get(task.getTaskId());
}
int stage = 0;
for (String depId : task.getDependsOn()) {
SubTask dep = taskMap.get(depId);
if (dep != null) {
stage = Math.max(stage, computeStage(dep, taskMap, stageMap) + 1);
}
}
stageMap.put(task.getTaskId(), stage);
return stage;
}
private Map<String, SubTaskResult> executeStage(
List<SubTask> stage,
Map<String, SubTaskResult> previousResults,
Map<String, Object> context,
String taskId) {
// 为每个子任务注入前置任务的结果作为上下文
List<CompletableFuture<SubTaskResult>> futures = stage.stream()
.map(subTask -> {
// 构建该子任务的上下文(包含其依赖的前置结果)
Map<String, Object> subContext = new HashMap<>(context);
for (String depId : subTask.getDependsOn()) {
SubTaskResult depResult = previousResults.get(depId);
if (depResult != null) {
subContext.put(depId + "_result", depResult.getOutput());
}
}
return agentExecutor.executeAsync(subTask, subContext, taskId);
})
.collect(Collectors.toList());
// 等待本阶段所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
Map<String, SubTaskResult> results = new LinkedHashMap<>();
for (int i = 0; i < stage.size(); i++) {
SubTaskResult result = futures.get(i).join();
results.put(stage.get(i).getTaskId(), result);
}
return results;
}
}专业Agent的实现
每个专业Agent只负责一个领域:
@Component("marketAgent")
@Slf4j
public class MarketAnalysisAgent implements SpecialistAgent {
private final ChatClient analysisClient;
private final WebSearchTool webSearch;
private final DataApiTool dataApi;
@Override
public String getName() { return "marketAgent"; }
@Override
public String getDescription() {
return "分析特定行业的市场规模、增长率、主要细分市场";
}
@Override
public SubTaskResult execute(SubTask task, Map<String, Object> context) {
String industry = (String) context.getOrDefault("industry",
extractIndustry(task.getTaskDescription()));
log.info("MarketAgent开始分析: {}", industry);
try {
// 1. 搜索市场数据
String searchResult = webSearch.search(industry + " 市场规模 2024");
String apiData = dataApi.getMarketData(industry);
// 2. 分析数据,生成结构化报告
String analysisPrompt = """
请根据以下数据,分析%s行业的市场情况:
搜索结果:
%s
数据源:
%s
请输出包含以下内容的分析报告(Markdown格式):
1. 市场规模(附数据来源)
2. 近三年增长趋势
3. 主要细分市场
4. 驱动增长的关键因素
""".formatted(industry, searchResult, apiData);
String analysis = analysisClient.prompt().user(analysisPrompt).call().content();
return SubTaskResult.builder()
.taskId(task.getTaskId())
.outputKey(task.getOutputKey())
.output(analysis)
.succeeded(true)
.build();
} catch (Exception e) {
log.error("MarketAgent执行失败", e);
return SubTaskResult.builder()
.taskId(task.getTaskId())
.succeeded(false)
.errorMessage("市场分析失败: " + e.getMessage())
.build();
}
}
}结果汇聚与报告生成
@Component
@RequiredArgsConstructor
public class ReportSynthesizer {
private final ChatClient writerClient;
public String synthesize(String reportTitle,
Map<String, SubTaskResult> results) {
// 收集所有子Agent的输出
StringBuilder partsBuilder = new StringBuilder();
results.forEach((taskId, result) -> {
if (result.isSucceeded()) {
partsBuilder.append("## ").append(getPartTitle(taskId)).append("\n\n");
partsBuilder.append(result.getOutput()).append("\n\n");
}
});
// 用写作Agent整合成完整报告
String synthesisPrompt = """
请将以下各部分内容整合成一份连贯、专业的研究报告。
报告标题:%s
各章节内容:
%s
要求:
1. 保持各章节核心信息不变
2. 添加执行摘要(500字以内)
3. 章节间要有逻辑衔接
4. 结尾加上结论与建议
""".formatted(reportTitle, partsBuilder.toString());
return writerClient.prompt().user(synthesisPrompt).call().content();
}
}原来单Agent需要20分钟的报告,多Agent并行执行后缩短到了6分钟。更重要的是,每个专业Agent的上下文保持在可控范围,输出质量明显更好。
