多Agent协作框架:Java实现分布式AI任务处理系统
2026/5/22大约 18 分钟AI工程实践多Agent协作Spring AI分布式JavaAgent框架
多Agent协作框架:Java实现分布式AI任务处理系统
一个令人崩溃的项目分析需求
2025年12月初,老王盯着需求文档,感觉自己要崩了。
他所在的团队刚刚做完了公司的AI代码审查系统第一版——一个单个Agent,接收代码仓库路径,输出审查报告。刚上线那几天一切正常,后来业务方越来越激进,要求:
"把整个微服务项目(23个模块、280个Java类)全部审查一遍,同时要给出架构合理性分析、安全漏洞报告、性能热点分析,三个维度缺一不可,要在10分钟内完成。"
老王的单Agent方案直接被打垮了。
问题出在哪?三个致命弱点:
- Token上限:把280个Java文件全塞进一个上下文,直接爆了
- 专业深度不足:同一个Agent既要懂架构又要懂安全还要懂性能,结果三样都做得不精
- 串行慢:一个Agent一件件做,10分钟内根本跑不完
老王在周会上讲了这个问题,旁边一个搞分布式的同事说了一句话:"这不就是微服务的问题吗?单体服务扛不住,你就拆成微服务。你的AI Agent也该这么搞。"
那一刻老王顿悟了。
三周后,他们上线了多Agent版本:一个Orchestrator负责拆任务,三个Specialist分别负责架构/安全/性能,一个Aggregator负责汇总报告。280个类审查完整,时间从25分钟降到了6分钟,质量还提升了。
先说结论(TL;DR)
- 单Agent的瓶颈:Token上限、专业深度、串行效率,三个维度都可能成为天花板
- 多Agent三种架构模式:主从(Orchestrator-Worker)、对等(Peer-to-Peer)、层次(Hierarchical)
- 核心实现:Orchestrator分解任务 → Specialist并行执行 → Aggregator聚合结果
- 通信机制:基于消息队列的异步通信(推荐),或直接方法调用(简单场景)
- 冲突解决:投票机制、置信度加权、人工仲裁三层策略
- 关键指标:各Agent耗时、并行度、结果质量分
多Agent系统的架构模式
三种主要模式对比
选型指南
| 场景 | 推荐模式 | 原因 |
|---|---|---|
| 任务可明确拆解、各部分独立 | 主从模式 | 简单、高效,易于实现 |
| 多个Agent需要相互讨论 | 对等模式 | 支持辩论和协商 |
| 超大规模任务、多级拆解 | 层次模式 | 更好的可扩展性 |
依赖与配置
pom.xml
<dependencies>
<!-- Spring AI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot异步支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- 消息队列支持 (可选,简单场景用内存队列) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Resilience4j 容错 -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot3</artifactId>
<version>2.2.0</version>
</dependency>
<!-- Micrometer 指标 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>application.yml
spring:
ai:
openai:
api-key: ${OPENAI_API_KEY}
chat:
options:
model: gpt-4o
temperature: 0.1
multi-agent:
orchestrator:
max-subtasks: 10 # 最大子任务数
task-timeout-seconds: 120 # 单任务超时
parallel-execution: true # 是否并行执行
specialist:
architecture:
model: gpt-4o # 架构分析用高级模型
temperature: 0.1
security:
model: gpt-4o
temperature: 0.0 # 安全审查要最确定
performance:
model: gpt-4o-mini # 性能分析可以用便宜模型
temperature: 0.1
aggregator:
conflict-resolution: vote # vote | confidence | human
min-confidence: 0.7 # 置信度阈值
logging:
level:
com.laozhang.multiagent: DEBUGAgent间通信设计:消息协议定义
// AgentMessage.java - 通用消息协议
package com.laozhang.multiagent.message;
import lombok.Builder;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.UUID;
@Data
@Builder
public class AgentMessage {
public enum MessageType {
TASK_ASSIGN, // Orchestrator分配任务
TASK_RESULT, // Specialist返回结果
TASK_ERROR, // Specialist报告错误
STATUS_UPDATE, // 进度更新
COORDINATION, // Agent间协调消息
CONFLICT, // 冲突通知
RESOLUTION // 冲突解决结果
}
@Builder.Default
private String messageId = UUID.randomUUID().toString();
private MessageType type;
private String fromAgentId; // 发送方Agent ID
private String toAgentId; // 接收方Agent ID(null=广播)
private String taskId; // 关联的任务ID
private String content; // 消息主要内容
private Map<String, Object> metadata; // 附加元数据
private double confidence; // 结果置信度(0-1)
private boolean success;
@Builder.Default
private LocalDateTime timestamp = LocalDateTime.now();
// 工厂方法
public static AgentMessage taskAssign(String fromId, String toId,
String taskId, String taskDescription) {
return AgentMessage.builder()
.type(MessageType.TASK_ASSIGN)
.fromAgentId(fromId)
.toAgentId(toId)
.taskId(taskId)
.content(taskDescription)
.build();
}
public static AgentMessage taskResult(String fromId, String toId,
String taskId, String result, double confidence) {
return AgentMessage.builder()
.type(MessageType.TASK_RESULT)
.fromAgentId(fromId)
.toAgentId(toId)
.taskId(taskId)
.content(result)
.confidence(confidence)
.success(true)
.build();
}
public static AgentMessage taskError(String fromId, String taskId, String errorMsg) {
return AgentMessage.builder()
.type(MessageType.TASK_ERROR)
.fromAgentId(fromId)
.taskId(taskId)
.content(errorMsg)
.success(false)
.build();
}
}// AgentMessageBus.java - 内存消息总线(简单场景)
package com.laozhang.multiagent.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
@Slf4j
@Component
public class AgentMessageBus {
// 每个Agent有自己的消息队列
private final Map<String, BlockingQueue<AgentMessage>> agentQueues = new ConcurrentHashMap<>();
// 消息订阅者
private final Map<AgentMessage.MessageType, List<Consumer<AgentMessage>>> subscribers
= new ConcurrentHashMap<>();
/**
* 注册Agent到消息总线
*/
public void registerAgent(String agentId) {
agentQueues.putIfAbsent(agentId, new LinkedBlockingQueue<>());
log.info("Agent {} 已注册到消息总线", agentId);
}
/**
* 发送消息
*/
public void send(AgentMessage message) {
log.debug("消息总线: {} -> {} [{}]",
message.getFromAgentId(), message.getToAgentId(), message.getType());
if (message.getToAgentId() != null) {
// 点对点发送
BlockingQueue<AgentMessage> queue = agentQueues.get(message.getToAgentId());
if (queue != null) {
queue.offer(message);
} else {
log.error("目标Agent {} 未注册", message.getToAgentId());
}
} else {
// 广播发送
agentQueues.values().forEach(q -> q.offer(message));
}
// 触发订阅者
List<Consumer<AgentMessage>> handlers = subscribers.get(message.getType());
if (handlers != null) {
handlers.forEach(h -> h.accept(message));
}
}
/**
* 接收消息(阻塞,带超时)
*/
public Optional<AgentMessage> receive(String agentId, long timeoutMs) throws InterruptedException {
BlockingQueue<AgentMessage> queue = agentQueues.get(agentId);
if (queue == null) return Optional.empty();
AgentMessage msg = queue.poll(timeoutMs, TimeUnit.MILLISECONDS);
return Optional.ofNullable(msg);
}
/**
* 订阅特定类型的消息
*/
public void subscribe(AgentMessage.MessageType type, Consumer<AgentMessage> handler) {
subscribers.computeIfAbsent(type, k -> new CopyOnWriteArrayList<>()).add(handler);
}
}Orchestrator Agent:任务分解与分配
// OrchestratorAgent.java
package com.laozhang.multiagent.agent;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.laozhang.multiagent.message.AgentMessage;
import com.laozhang.multiagent.message.AgentMessageBus;
import com.laozhang.multiagent.model.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrchestratorAgent {
private static final String AGENT_ID = "orchestrator";
@Qualifier("orchestratorChatClient")
private final ChatClient chatClient;
private final AgentMessageBus messageBus;
private final ObjectMapper objectMapper;
@Value("${multi-agent.orchestrator.task-timeout-seconds:120}")
private int taskTimeoutSeconds;
@Value("${multi-agent.orchestrator.parallel-execution:true}")
private boolean parallelExecution;
/**
* 主入口:接收复杂任务,分解并协调执行
*/
public MultiAgentResult orchestrate(String complexTask, List<String> availableAgentIds) {
log.info("Orchestrator开始处理: {}", complexTask);
long startTime = System.currentTimeMillis();
// 1. 任务分解
List<SubTask> subTasks = decomposeTask(complexTask, availableAgentIds);
log.info("任务已分解为 {} 个子任务", subTasks.size());
// 2. 任务分配与执行
Map<String, SubTaskResult> results;
if (parallelExecution) {
results = executeParallel(subTasks);
} else {
results = executeSequential(subTasks);
}
// 3. 收集结果
return MultiAgentResult.builder()
.originalTask(complexTask)
.subTasks(subTasks)
.subTaskResults(new ArrayList<>(results.values()))
.totalDurationMs(System.currentTimeMillis() - startTime)
.build();
}
/**
* 使用LLM分解任务
*/
private List<SubTask> decomposeTask(String task, List<String> availableAgents) {
String prompt = String.format("""
你是一个任务编排器。请将以下复杂任务分解为可以并行执行的子任务。
复杂任务:%s
可用的Specialist Agent:
%s
请返回JSON格式的子任务列表:
[
{
"taskId": "task_001",
"assignedAgent": "agent_id",
"description": "子任务描述",
"priority": 1,
"dependsOn": []
}
]
注意:
1. 尽量让子任务可以并行执行(dependsOn为空)
2. 每个子任务必须分配给最合适的Agent
3. 子任务描述要具体,让Agent知道该做什么
""",
task,
availableAgents.stream()
.map(id -> "- " + id)
.collect(Collectors.joining("\n"))
);
String response = chatClient.prompt()
.user(prompt)
.call()
.content();
try {
// 提取JSON部分
String json = extractJson(response);
List<Map<String, Object>> taskMaps = objectMapper.readValue(json, List.class);
return taskMaps.stream().map(this::mapToSubTask).collect(Collectors.toList());
} catch (Exception e) {
log.error("任务分解解析失败", e);
// 降级:把整个任务分配给第一个可用Agent
return List.of(SubTask.builder()
.taskId("task_001")
.assignedAgentId(availableAgents.get(0))
.description(task)
.priority(1)
.build());
}
}
/**
* 并行执行子任务
*/
private Map<String, SubTaskResult> executeParallel(List<SubTask> subTasks) {
ExecutorService executor = Executors.newFixedThreadPool(
Math.min(subTasks.size(), 5)
);
Map<String, Future<SubTaskResult>> futures = new HashMap<>();
for (SubTask task : subTasks) {
// 检查依赖关系(先跳过有依赖的任务)
if (task.getDependsOn() == null || task.getDependsOn().isEmpty()) {
Future<SubTaskResult> future = executor.submit(() -> {
AgentMessage msg = AgentMessage.taskAssign(
AGENT_ID, task.getAssignedAgentId(),
task.getTaskId(), task.getDescription()
);
messageBus.send(msg);
return waitForResult(task.getTaskId(), task.getAssignedAgentId());
});
futures.put(task.getTaskId(), future);
}
}
Map<String, SubTaskResult> results = new HashMap<>();
for (Map.Entry<String, Future<SubTaskResult>> entry : futures.entrySet()) {
try {
SubTaskResult result = entry.getValue().get(taskTimeoutSeconds, TimeUnit.SECONDS);
results.put(entry.getKey(), result);
} catch (TimeoutException e) {
log.error("任务 {} 超时", entry.getKey());
results.put(entry.getKey(), SubTaskResult.timeout(entry.getKey()));
} catch (Exception e) {
log.error("任务 {} 执行异常", entry.getKey(), e);
results.put(entry.getKey(), SubTaskResult.error(entry.getKey(), e.getMessage()));
}
}
executor.shutdown();
return results;
}
/**
* 顺序执行(有依赖关系时)
*/
private Map<String, SubTaskResult> executeSequential(List<SubTask> subTasks) {
Map<String, SubTaskResult> results = new LinkedHashMap<>();
for (SubTask task : subTasks) {
try {
AgentMessage msg = AgentMessage.taskAssign(
AGENT_ID, task.getAssignedAgentId(),
task.getTaskId(), task.getDescription()
);
messageBus.send(msg);
SubTaskResult result = waitForResult(task.getTaskId(), task.getAssignedAgentId());
results.put(task.getTaskId(), result);
} catch (Exception e) {
results.put(task.getTaskId(),
SubTaskResult.error(task.getTaskId(), e.getMessage()));
}
}
return results;
}
/**
* 等待Agent返回结果
*/
private SubTaskResult waitForResult(String taskId, String agentId) {
long deadline = System.currentTimeMillis() + taskTimeoutSeconds * 1000L;
while (System.currentTimeMillis() < deadline) {
try {
Optional<AgentMessage> msg = messageBus.receive(AGENT_ID, 1000);
if (msg.isPresent() && msg.get().getTaskId().equals(taskId)) {
AgentMessage message = msg.get();
if (message.getType() == AgentMessage.MessageType.TASK_RESULT) {
return SubTaskResult.success(taskId, message.getContent(),
message.getConfidence());
} else if (message.getType() == AgentMessage.MessageType.TASK_ERROR) {
return SubTaskResult.error(taskId, message.getContent());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
return SubTaskResult.timeout(taskId);
}
private SubTask mapToSubTask(Map<String, Object> map) {
return SubTask.builder()
.taskId((String) map.get("taskId"))
.assignedAgentId((String) map.get("assignedAgent"))
.description((String) map.get("description"))
.priority((Integer) map.getOrDefault("priority", 1))
.dependsOn((List<String>) map.getOrDefault("dependsOn", new ArrayList<>()))
.build();
}
private String extractJson(String text) {
int start = text.indexOf('[');
int end = text.lastIndexOf(']');
if (start >= 0 && end > start) {
return text.substring(start, end + 1);
}
return "[]";
}
}Specialist Agent:领域专家Agent实现
// SpecialistAgent.java - 抽象基类
package com.laozhang.multiagent.agent;
import com.laozhang.multiagent.message.AgentMessage;
import com.laozhang.multiagent.message.AgentMessageBus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public abstract class SpecialistAgent {
protected final ChatClient chatClient;
protected final AgentMessageBus messageBus;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private volatile boolean running = false;
protected SpecialistAgent(ChatClient chatClient, AgentMessageBus messageBus) {
this.chatClient = chatClient;
this.messageBus = messageBus;
}
@PostConstruct
public void init() {
messageBus.registerAgent(getAgentId());
startListening();
log.info("Specialist Agent {} 已启动", getAgentId());
}
/**
* 启动消息监听循环
*/
private void startListening() {
running = true;
executor.submit(() -> {
while (running) {
try {
messageBus.receive(getAgentId(), 500).ifPresent(msg -> {
if (msg.getType() == AgentMessage.MessageType.TASK_ASSIGN) {
handleTask(msg);
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
/**
* 处理分配的任务
*/
private void handleTask(AgentMessage taskMsg) {
log.info("[{}] 收到任务 {}: {}", getAgentId(), taskMsg.getTaskId(),
taskMsg.getContent().substring(0, Math.min(100, taskMsg.getContent().length())));
try {
long startTime = System.currentTimeMillis();
SpecialistResult result = processTask(taskMsg.getContent());
long duration = System.currentTimeMillis() - startTime;
log.info("[{}] 任务 {} 完成,耗时 {}ms,置信度 {}",
getAgentId(), taskMsg.getTaskId(), duration, result.getConfidence());
// 返回结果给Orchestrator
AgentMessage resultMsg = AgentMessage.taskResult(
getAgentId(), taskMsg.getFromAgentId(),
taskMsg.getTaskId(), result.getContent(), result.getConfidence()
);
messageBus.send(resultMsg);
} catch (Exception e) {
log.error("[{}] 任务 {} 执行失败", getAgentId(), taskMsg.getTaskId(), e);
messageBus.send(AgentMessage.taskError(
getAgentId(), taskMsg.getTaskId(), e.getMessage()
));
}
}
public abstract String getAgentId();
public abstract String getSpecialty();
protected abstract SpecialistResult processTask(String taskDescription);
}// ArchitectureReviewAgent.java - 架构审查专家
package com.laozhang.multiagent.agent.impl;
import com.laozhang.multiagent.agent.SpecialistAgent;
import com.laozhang.multiagent.agent.SpecialistResult;
import com.laozhang.multiagent.message.AgentMessageBus;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
@Component
public class ArchitectureReviewAgent extends SpecialistAgent {
public ArchitectureReviewAgent(
@Qualifier("architectureChatClient") ChatClient chatClient,
AgentMessageBus messageBus) {
super(chatClient, messageBus);
}
@Override
public String getAgentId() {
return "architecture-agent";
}
@Override
public String getSpecialty() {
return "软件架构分析:分层设计、模块耦合、接口设计、设计模式使用";
}
@Override
protected SpecialistResult processTask(String taskDescription) {
String systemPrompt = """
你是一位资深Java软件架构师,专注于:
1. 分层架构(Controller-Service-Repository)合理性
2. 模块间耦合度分析(高内聚低耦合原则)
3. 设计模式的正确使用
4. 接口设计的SOLID原则遵循情况
5. 依赖关系是否存在循环依赖
请给出:
- 架构问题清单(严重程度:HIGH/MEDIUM/LOW)
- 改进建议(具体到代码层面)
- 整体架构评分(0-100)
输出格式要结构化,便于后续聚合处理。
最后给出置信度(0.0-1.0),反映你对分析结果的确定程度。
""";
String response = chatClient.prompt()
.system(systemPrompt)
.user(taskDescription)
.call()
.content();
// 解析置信度(从输出末尾提取)
double confidence = extractConfidence(response);
return SpecialistResult.builder()
.agentId(getAgentId())
.content(response)
.confidence(confidence)
.build();
}
private double extractConfidence(String response) {
// 从响应中提取置信度数值
// 实际实现中可以让LLM按特定格式输出
return 0.85; // 默认置信度
}
}// SecurityReviewAgent.java - 安全审查专家
@Component
public class SecurityReviewAgent extends SpecialistAgent {
public SecurityReviewAgent(
@Qualifier("securityChatClient") ChatClient chatClient,
AgentMessageBus messageBus) {
super(chatClient, messageBus);
}
@Override
public String getAgentId() {
return "security-agent";
}
@Override
public String getSpecialty() {
return "安全漏洞分析:OWASP Top 10、注入攻击、认证授权";
}
@Override
protected SpecialistResult processTask(String taskDescription) {
String systemPrompt = """
你是一位Java安全专家,专注于OWASP Top 10安全漏洞:
1. SQL注入漏洞(未使用参数化查询)
2. XSS跨站脚本(输入未验证/输出未转义)
3. 不安全的直接对象引用
4. 敏感信息泄露(日志、响应体中包含密码/密钥)
5. 缺少认证/授权检查
6. 使用有已知漏洞的组件版本
7. 硬编码凭证
对每个发现的漏洞,请提供:
- 漏洞类型和CVSS评分
- 代码位置(类名+行号)
- 修复建议(附代码示例)
- 修复优先级(P0/P1/P2)
最后给出置信度(0.0-1.0)。
""";
String response = chatClient.prompt()
.system(systemPrompt)
.user(taskDescription)
.call()
.content();
return SpecialistResult.builder()
.agentId(getAgentId())
.content(response)
.confidence(0.90)
.build();
}
}// PerformanceReviewAgent.java - 性能审查专家
@Component
public class PerformanceReviewAgent extends SpecialistAgent {
public PerformanceReviewAgent(
@Qualifier("performanceChatClient") ChatClient chatClient,
AgentMessageBus messageBus) {
super(chatClient, messageBus);
}
@Override
public String getAgentId() {
return "performance-agent";
}
@Override
public String getSpecialty() {
return "性能分析:N+1查询、内存泄漏、并发问题、缓存策略";
}
@Override
protected SpecialistResult processTask(String taskDescription) {
String systemPrompt = """
你是一位Java性能优化专家,专注于:
1. N+1查询问题(循环中查数据库)
2. 缺少缓存导致重复计算
3. 不合理的集合操作(大循环嵌套)
4. 线程安全问题(共享状态未同步)
5. 内存泄漏风险(大对象不释放、监听器未注销)
6. 数据库连接池配置问题
7. 缺少分页查询
请提供:
- 性能问题清单(影响级别:CRITICAL/HIGH/MEDIUM)
- 预估性能提升(如修复后QPS可提升X倍)
- 具体优化代码示例
最后给出置信度(0.0-1.0)。
""";
String response = chatClient.prompt()
.system(systemPrompt)
.user(taskDescription)
.call()
.content();
return SpecialistResult.builder()
.agentId(getAgentId())
.content(response)
.confidence(0.80)
.build();
}
}结果聚合:多Agent结果的融合策略
// ResultAggregator.java
package com.laozhang.multiagent.aggregator;
import com.laozhang.multiagent.model.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Component
@RequiredArgsConstructor
public class ResultAggregator {
@Qualifier("aggregatorChatClient")
private final ChatClient chatClient;
/**
* 聚合多个Specialist的结果
*/
public AggregatedReport aggregate(String originalTask, List<SubTaskResult> results) {
log.info("开始聚合 {} 个子任务结果", results.size());
// 过滤失败的结果
List<SubTaskResult> successResults = results.stream()
.filter(SubTaskResult::isSuccess)
.collect(Collectors.toList());
if (successResults.isEmpty()) {
return AggregatedReport.failed("所有子任务均失败");
}
// 检测冲突
List<Conflict> conflicts = detectConflicts(successResults);
// 解决冲突
if (!conflicts.isEmpty()) {
log.info("检测到 {} 处冲突,开始解决", conflicts.size());
conflicts = resolveConflicts(conflicts, successResults);
}
// 综合生成最终报告
String finalReport = synthesize(originalTask, successResults, conflicts);
// 计算整体置信度(加权平均)
double overallConfidence = successResults.stream()
.mapToDouble(r -> r.getConfidence() * r.getConfidence()) // 置信度平方加权
.average()
.orElse(0.0);
return AggregatedReport.builder()
.originalTask(originalTask)
.finalReport(finalReport)
.subTaskResults(successResults)
.conflicts(conflicts)
.overallConfidence(Math.sqrt(overallConfidence)) // 取平方根还原
.failedTaskCount(results.size() - successResults.size())
.build();
}
/**
* 检测矛盾结论
*/
private List<Conflict> detectConflicts(List<SubTaskResult> results) {
List<Conflict> conflicts = new ArrayList<>();
String conflictCheckPrompt = String.format("""
以下是多个专家Agent对同一代码的分析结果。
请识别其中相互矛盾的结论(例如:一个Agent说某处性能良好,
另一个Agent说同一处存在严重性能问题)。
专家分析结果:
%s
请以JSON格式返回冲突列表:
[{"conflictId": "c1", "description": "冲突描述", "agents": ["agent1", "agent2"], "severity": "HIGH"}]
如果没有冲突,返回空数组[]。
""",
results.stream()
.map(r -> "=== " + r.getTaskId() + " ===\n" + r.getContent())
.collect(Collectors.joining("\n\n"))
);
try {
String response = chatClient.prompt()
.user(conflictCheckPrompt)
.call()
.content();
// 解析冲突(简化实现)
if (!response.contains("[]") && response.contains("[")) {
log.debug("检测到潜在冲突: {}", response);
// 实际应该反序列化JSON
conflicts.add(Conflict.builder()
.conflictId("detected")
.description("自动检测到潜在矛盾,已标记供人工审查")
.build());
}
} catch (Exception e) {
log.error("冲突检测失败", e);
}
return conflicts;
}
/**
* 解决冲突(投票机制)
*/
private List<Conflict> resolveConflicts(List<Conflict> conflicts,
List<SubTaskResult> results) {
// 对每个冲突,让LLM基于置信度和证据做裁决
for (Conflict conflict : conflicts) {
String resolutionPrompt = String.format("""
以下专家Agent对某个问题给出了矛盾的结论:
%s
请基于以下原则做出裁决:
1. 置信度更高的Agent的结论优先
2. 有具体证据(代码行号、具体示例)的优先
3. 安全问题默认选择更严格的结论
请给出最终裁决并解释原因。
""",
conflict.getDescription()
);
String resolution = chatClient.prompt()
.user(resolutionPrompt)
.call()
.content();
conflict.setResolution(resolution);
conflict.setResolved(true);
}
return conflicts;
}
/**
* 综合合成最终报告
*/
private String synthesize(String task, List<SubTaskResult> results, List<Conflict> conflicts) {
StringBuilder context = new StringBuilder();
context.append("原始任务:").append(task).append("\n\n");
context.append("各专家Agent分析结果:\n");
for (SubTaskResult result : results) {
context.append("### 来自 ").append(result.getTaskId())
.append("(置信度: ").append(String.format("%.0f%%", result.getConfidence() * 100)).append(")\n");
context.append(result.getContent()).append("\n\n");
}
if (!conflicts.isEmpty()) {
context.append("### 冲突与裁决\n");
for (Conflict conflict : conflicts) {
context.append("- ").append(conflict.getDescription()).append("\n");
if (conflict.getResolution() != null) {
context.append(" 裁决:").append(conflict.getResolution()).append("\n");
}
}
}
String synthesisPrompt = """
请基于以上多个专家的分析,生成一份综合报告。要求:
1. 按严重程度排序问题(Critical > High > Medium > Low)
2. 合并重复的发现,避免信息冗余
3. 每个问题要有明确的修复行动项
4. 给出优先修复建议(Top 5最重要的改进点)
5. 总结整体代码质量评分(0-100)
报告要专业、精炼、可操作。
""" + context;
return chatClient.prompt()
.user(synthesisPrompt)
.call()
.content();
}
}实战:代码审查多Agent系统完整装配
// MultiAgentCodeReviewService.java
package com.laozhang.multiagent.service;
import com.laozhang.multiagent.agent.OrchestratorAgent;
import com.laozhang.multiagent.aggregator.ResultAggregator;
import com.laozhang.multiagent.model.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
@RequiredArgsConstructor
public class MultiAgentCodeReviewService {
private final OrchestratorAgent orchestrator;
private final ResultAggregator aggregator;
private static final List<String> REVIEW_AGENTS = List.of(
"architecture-agent",
"security-agent",
"performance-agent"
);
/**
* 对代码进行多维度审查
*/
public CodeReviewReport review(String codeContent, String reviewContext) {
log.info("开始多Agent代码审查,代码长度: {} 字符", codeContent.length());
long startTime = System.currentTimeMillis();
// 构建任务描述
String task = String.format("""
请对以下Java代码进行全面审查。
审查背景:%s
代码内容:
```java
%s
```
请从架构、安全、性能三个维度分别进行专业分析。
""", reviewContext, codeContent);
// Orchestrator协调执行
MultiAgentResult multiAgentResult = orchestrator.orchestrate(task, REVIEW_AGENTS);
// 聚合结果
AggregatedReport aggregatedReport = aggregator.aggregate(
task, multiAgentResult.getSubTaskResults()
);
long duration = System.currentTimeMillis() - startTime;
log.info("代码审查完成,耗时 {}ms,置信度 {:.0f}%",
duration, aggregatedReport.getOverallConfidence() * 100);
return CodeReviewReport.builder()
.codeContext(reviewContext)
.finalReport(aggregatedReport.getFinalReport())
.overallConfidence(aggregatedReport.getOverallConfidence())
.subTaskResults(aggregatedReport.getSubTaskResults())
.conflictsDetected(aggregatedReport.getConflicts().size())
.totalDurationMs(duration)
.build();
}
}// CodeReviewController.java
@RestController
@RequestMapping("/api/review")
@RequiredArgsConstructor
public class CodeReviewController {
private final MultiAgentCodeReviewService reviewService;
@PostMapping("/code")
public ResponseEntity<CodeReviewReport> reviewCode(
@RequestBody CodeReviewRequest request) {
CodeReviewReport report = reviewService.review(
request.getCode(), request.getContext()
);
return ResponseEntity.ok(report);
}
record CodeReviewRequest(String code, String context) {}
}容错设计:某个Agent失败时的处理
// AgentFallbackHandler.java
@Component
@RequiredArgsConstructor
@Slf4j
public class AgentFallbackHandler {
private final ChatClient fallbackChatClient;
/**
* Agent失败时的降级处理
*/
public SubTaskResult handleFailure(SubTask failedTask, String errorMessage) {
log.warn("Agent {} 失败,尝试降级处理: {}",
failedTask.getAssignedAgentId(), errorMessage);
try {
// 用通用LLM兜底
String fallbackResponse = fallbackChatClient.prompt()
.system("你是一个通用代码审查助手,请尽力完成以下审查任务,但注意你不是该领域的专家。")
.user(failedTask.getDescription())
.call()
.content();
return SubTaskResult.builder()
.taskId(failedTask.getTaskId())
.content("[降级结果] " + fallbackResponse)
.confidence(0.5) // 降级结果置信度较低
.success(true)
.fallback(true)
.build();
} catch (Exception e) {
log.error("降级处理也失败了", e);
return SubTaskResult.error(failedTask.getTaskId(),
"该维度分析不可用(原因: " + errorMessage + ")");
}
}
}性能监控:多Agent系统的可观测性
// MultiAgentMonitor.java
@Component
@RequiredArgsConstructor
@Slf4j
public class MultiAgentMonitor {
private final MeterRegistry meterRegistry;
public void recordAgentExecution(String agentId, long durationMs,
boolean success, double confidence) {
// 各Agent执行时间
meterRegistry.timer("multiagent.agent.duration",
"agent_id", agentId,
"success", String.valueOf(success)
).record(durationMs, TimeUnit.MILLISECONDS);
// 置信度分布
meterRegistry.summary("multiagent.agent.confidence",
"agent_id", agentId
).record(confidence);
// 成功率
meterRegistry.counter("multiagent.agent.executions",
"agent_id", agentId,
"result", success ? "success" : "failure"
).increment();
}
public void recordParallelEfficiency(int totalAgents, int successAgents,
long totalDurationMs, long maxSingleDurationMs) {
// 并行效率 = 最慢单个任务时间 / 总时间
double parallelEfficiency = (double) maxSingleDurationMs / totalDurationMs;
meterRegistry.gauge("multiagent.parallel.efficiency", parallelEfficiency);
log.info("并行效率: {:.1f}%({}个Agent,最慢{}ms,总耗时{}ms)",
parallelEfficiency * 100, totalAgents, maxSingleDurationMs, totalDurationMs);
}
}生产注意事项
并发安全
- 多个Agent同时写入共享状态时,使用
ConcurrentHashMap和原子操作 - Agent的LLM调用是无状态的,天然线程安全
- 消息总线的队列操作要确保线程安全(
LinkedBlockingQueue)
成本控制
- 每个Agent都会调用LLM,总成本是单Agent的N倍
- 对低优先级的子任务可以使用便宜的模型(gpt-4o-mini)
- 设置Token预算上限,超出时截断历史
超时与熔断
- 每个Agent都要设置独立的超时时间
- 用Resilience4j的
CircuitBreaker包装Agent调用 - 一个Agent熔断不影响其他Agent继续执行
常见问题解答
Q1:多Agent系统的延迟比单Agent高很多,如何优化?
A:关键是充分利用并行:
- 识别任务中哪些子任务可以并行(无依赖关系),全部并行执行
- 对于有依赖的任务,只串行关键路径上的任务
- 一般来说,3个Agent并行,总时间接近最慢的那个Agent,不是3倍
Q2:多Agent结果出现矛盾时怎么处理最好?
A:三层策略:
- 自动解决:置信度差距超过0.2时,高置信度方案优先
- 安全优先:安全相关的冲突,一律选更严格的那个结论
- 人工仲裁:无法自动解决的冲突,打标记等待人工审查,不阻塞流程
Q3:如何设计Agent间的消息格式?
A:建议的设计原则:
- 保持消息不可变(使用Builder创建后不修改)
- 消息要包含足够的上下文(taskId、fromAgentId、置信度)
- 使用枚举类型标识消息类型,避免魔法字符串
Q4:一个项目应该设计多少个Specialist Agent合适?
A:建议原则:
- 3-7个是最佳区间,太少发挥不了多Agent的优势,太多管理复杂度急剧上升
- 每个Agent要有明确的专业边界,避免职责重叠超过30%
- 先从3个最重要的维度开始,逐步增加
Q5:多Agent系统如何测试?
A:分层测试策略:
- 单元测试:每个Specialist Agent独立测试(mock ChatClient)
- 集成测试:测试Orchestrator+单个Specialist的交互
- 端到端测试:固定输入(一段代码),验证最终报告包含预期内容
- 混沌测试:随机让某个Agent返回错误,验证降级逻辑正常
Q6:消息总线用内存队列还是RabbitMQ?
A:根据规模选择:
- 单体部署、低并发:内存队列(本文方案),零依赖,简单可靠
- 多实例部署、高并发:RabbitMQ/Kafka,支持水平扩展和持久化
- 建议:先用内存队列跑通业务逻辑,规模扩大后再换消息队列,接口兼容
总结
多Agent协作不是技术炫技,它解决的是单Agent的三个真实痛点:Token上限、专业深度、执行效率。
可操作行动清单:
记住:一个专业的团队,永远比一个全能的个人更可靠。
