第2058篇:多Agent协作系统——让多个AI分工完成复杂任务
2026/4/30大约 6 分钟
第2058篇:多Agent协作系统——让多个AI分工完成复杂任务
适读人群:需要构建能处理复杂业务流程的AI系统的工程师 | 阅读时长:约20分钟 | 核心价值:掌握多Agent系统的设计模式,实现Agent之间的协作与分工
单个Agent能做的事情有限。当任务足够复杂,一个Agent既要规划又要执行,还要质量检查,就容易出错。
就像一个人同时扮演产品、开发、测试三个角色——理论上可以,实际上效果很差。
多Agent的思路和团队分工一样:专业的Agent做专业的事,通过协调者Agent来调度。
多Agent系统的架构模式
模式一:协调者-执行者模式
/**
* 多Agent系统:协调者-执行者模式
* 协调者负责分配任务,专业Agent负责执行
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class MultiAgentOrchestrator {
private final ChatLanguageModel orchestratorModel; // 协调用强模型
private final Map<String, SpecialistAgent> specialists;
/**
* 执行复杂任务
*/
public String execute(String task) {
log.info("多Agent任务开始: {}", task.substring(0, Math.min(100, task.length())));
// 1. 协调者分析任务,决定分配方案
TaskAllocation allocation = analyzeAndAllocate(task);
log.info("任务分配方案: {} 个子任务", allocation.subtasks().size());
// 2. 并发执行各子任务
Map<String, CompletableFuture<String>> futures = new HashMap<>();
for (SubTask subtask : allocation.subtasks()) {
SpecialistAgent agent = specialists.get(subtask.agentType());
if (agent == null) {
log.warn("找不到专业Agent: {}", subtask.agentType());
continue;
}
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
log.info("Agent[{}]开始执行: {}", subtask.agentType(), subtask.description());
return agent.execute(subtask);
});
futures.put(subtask.id(), future);
}
// 3. 等待所有子任务完成
Map<String, String> results = new LinkedHashMap<>();
for (Map.Entry<String, CompletableFuture<String>> entry : futures.entrySet()) {
try {
results.put(entry.getKey(), entry.getValue().get(60, TimeUnit.SECONDS));
} catch (TimeoutException e) {
results.put(entry.getKey(), "执行超时");
log.error("子任务{}执行超时", entry.getKey());
} catch (Exception e) {
results.put(entry.getKey(), "执行失败: " + e.getMessage());
log.error("子任务{}执行失败", entry.getKey(), e);
}
}
// 4. 协调者整合结果
return synthesizeResults(task, allocation, results);
}
private TaskAllocation analyzeAndAllocate(String task) {
String allocationPrompt = String.format("""
你是一个任务协调专家,负责将复杂任务分配给专业团队。
可用专业Agent:
%s
任务:%s
请分析任务并制定分配方案,输出JSON格式:
{
"subtasks": [
{
"id": "task-1",
"agentType": "agent类型",
"description": "任务描述",
"priority": 1
}
]
}
""",
getAgentDescriptions(),
task
);
String allocationJson = orchestratorModel.generate(allocationPrompt);
return parseAllocation(allocationJson);
}
private String synthesizeResults(
String originalTask,
TaskAllocation allocation,
Map<String, String> results) {
String resultSummary = allocation.subtasks().stream()
.map(st -> String.format("【%s】%s\n结果:%s",
st.agentType(), st.description(),
results.getOrDefault(st.id(), "未完成")))
.collect(Collectors.joining("\n\n"));
return orchestratorModel.generate(String.format("""
基于以下专业团队的工作结果,生成最终综合报告:
原始任务:%s
各专业团队工作结果:
%s
综合报告:
""", originalTask, resultSummary));
}
private String getAgentDescriptions() {
return specialists.entrySet().stream()
.map(e -> "- " + e.getKey() + ": " + e.getValue().description())
.collect(Collectors.joining("\n"));
}
// 简化的解析方法
private TaskAllocation parseAllocation(String json) { return new TaskAllocation(List.of()); }
public record SubTask(String id, String agentType, String description, int priority) {}
public record TaskAllocation(List<SubTask> subtasks) {}
public interface SpecialistAgent {
String description();
String execute(SubTask task);
}
}专业Agent实现示例
/**
* 研究Agent:负责信息收集和分析
*/
@Component("research")
@RequiredArgsConstructor
public class ResearchAgent implements MultiAgentOrchestrator.SpecialistAgent {
private final ChatLanguageModel llm;
private final WebSearchTool searchTool;
private final EmbeddingStore<TextSegment> knowledgeBase;
private final EmbeddingModel embeddingModel;
@Override
public String description() {
return "信息研究和数据收集,负责搜索和整理相关信息";
}
@Override
public String execute(MultiAgentOrchestrator.SubTask task) {
String query = task.description();
// 内部知识库检索
float[] queryEmb = embeddingModel.embed(query);
List<String> internalDocs = vectorSearch(queryEmb, 3);
// 外部搜索(如果知识库没有)
List<String> searchResults = internalDocs.isEmpty() ?
searchTool.search(query, 3) : List.of();
// 整合信息
String allInfo = String.join("\n\n",
internalDocs.isEmpty() ? List.of() :
List.of("【内部资料】\n" + String.join("\n", internalDocs)),
searchResults.isEmpty() ? List.of() :
List.of("【搜索结果】\n" + String.join("\n", searchResults))
);
if (allInfo.isBlank()) {
return "未找到相关信息";
}
// 提炼关键信息
return llm.generate(String.format("""
请从以下信息中提炼与"%s"最相关的关键内容:
%s
提炼结果:
""", query, allInfo));
}
private List<String> vectorSearch(float[] embedding, int topK) {
EmbeddingSearchRequest request = EmbeddingSearchRequest.builder()
.queryEmbedding(Embedding.from(embedding))
.maxResults(topK)
.minScore(0.65)
.build();
return knowledgeBase.search(request).matches().stream()
.map(m -> m.embedded().text())
.collect(Collectors.toList());
}
}
/**
* 写作Agent:负责内容生成
*/
@Component("writing")
@RequiredArgsConstructor
public class WritingAgent implements MultiAgentOrchestrator.SpecialistAgent {
private final ChatLanguageModel llm;
@Override
public String description() {
return "专业内容写作,生成报告、文档、邮件等";
}
@Override
public String execute(MultiAgentOrchestrator.SubTask task) {
return llm.generate(String.format("""
你是专业的内容写作助手。
写作任务:%s
要求:
- 结构清晰,逻辑严谨
- 语言专业,通俗易懂
- 包含必要的数据和例子
内容:
""", task.description()));
}
}
/**
* 审校Agent:负责质量检查
*/
@Component("review")
@RequiredArgsConstructor
public class ReviewAgent implements MultiAgentOrchestrator.SpecialistAgent {
private final ChatLanguageModel llm;
@Override
public String description() {
return "内容质量审查,检查准确性、完整性和格式";
}
@Override
public String execute(MultiAgentOrchestrator.SubTask task) {
return llm.generate(String.format("""
请对以下内容进行质量审校:
%s
检查维度:
1. 准确性:信息是否有误
2. 完整性:是否遗漏重要内容
3. 逻辑性:论述是否合理
4. 格式:是否规范
审校意见:
""", task.description()));
}
}模式二:事件驱动的Agent协作
/**
* 基于消息队列的Agent协作
* 适合:异步、长流程的任务处理
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class EventDrivenAgentCoordinator {
private final BlockingQueue<AgentMessage> messageQueue = new LinkedBlockingQueue<>();
private final Map<String, MultiAgentOrchestrator.SpecialistAgent> agents;
private final Map<String, AgentTask> pendingTasks = new ConcurrentHashMap<>();
/**
* 提交任务(异步,立即返回任务ID)
*/
public String submitTask(String taskType, String content) {
String taskId = UUID.randomUUID().toString();
AgentTask task = new AgentTask(taskId, taskType, content, TaskStatus.PENDING);
pendingTasks.put(taskId, task);
messageQueue.offer(new AgentMessage(taskId, "START", content, taskType));
return taskId;
}
/**
* 查询任务状态
*/
public AgentTask getTaskStatus(String taskId) {
return pendingTasks.get(taskId);
}
/**
* 消息处理循环(后台运行)
*/
@Scheduled(fixedDelay = 100)
public void processMessages() {
AgentMessage message = messageQueue.poll();
if (message == null) return;
CompletableFuture.runAsync(() -> {
try {
processMessage(message);
} catch (Exception e) {
log.error("消息处理失败: {}", message, e);
updateTaskStatus(message.taskId(), TaskStatus.FAILED, e.getMessage());
}
});
}
private void processMessage(AgentMessage message) {
updateTaskStatus(message.taskId(), TaskStatus.RUNNING, null);
MultiAgentOrchestrator.SpecialistAgent agent = agents.get(message.agentType());
if (agent == null) {
updateTaskStatus(message.taskId(), TaskStatus.FAILED,
"找不到Agent: " + message.agentType());
return;
}
String result = agent.execute(
new MultiAgentOrchestrator.SubTask(
message.taskId(), message.agentType(), message.content(), 1));
updateTaskStatus(message.taskId(), TaskStatus.COMPLETED, result);
log.info("任务完成: {}", message.taskId());
}
private void updateTaskStatus(String taskId, TaskStatus status, String result) {
AgentTask task = pendingTasks.get(taskId);
if (task != null) {
pendingTasks.put(taskId, new AgentTask(
task.id(), task.type(), task.content(), status, result));
}
}
public record AgentMessage(String taskId, String action, String content, String agentType) {}
public record AgentTask(
String id, String type, String content,
TaskStatus status, String result) {
public AgentTask(String id, String type, String content, TaskStatus status) {
this(id, type, content, status, null);
}
}
public enum TaskStatus { PENDING, RUNNING, COMPLETED, FAILED }
}多Agent系统的实际应用
一个实际的内容生产多Agent系统:
多Agent系统的设计原则:
- 每个Agent职责单一,不要让一个Agent既搜索又写作又审校
- 定义清晰的接口,Agent之间通过规范的消息格式通信
- 设置最大循环次数,防止Agent互相调用陷入死循环
- 超时和降级,某个Agent失败时,整个任务能优雅降级而不是崩溃
