第2115篇:多Agent系统设计——协调多个AI完成复杂任务的工程实践
2026/4/30大约 9 分钟
第2115篇:多Agent系统设计——协调多个AI完成复杂任务的工程实践
适读人群:构建复杂AI工作流的工程师 | 阅读时长:约21分钟 | 核心价值:掌握多Agent系统的架构模式,理解协调、通信和状态管理的关键问题
单个Agent能做的事情有限。当任务复杂到需要多种专业能力协同时,就需要多Agent系统。
比如一个完整的代码审查工作流:先用一个Agent分析代码结构,再用另一个查安全漏洞,再用一个检查业务逻辑,最后用一个综合这些输出生成最终报告。每个Agent专注自己擅长的,整体效果远超单个通用Agent。
但多Agent系统也带来了新问题:Agent之间怎么通信?如果一个Agent失败了,整体流程怎么处理?Agent的输出质量怎么保证不影响下游?
这篇文章把这些问题系统化地解决。
多Agent系统的架构模式
/**
* 多Agent系统的三种主要架构模式
*
* ===== 模式一:管道(Pipeline)=====
*
* Agent A → Agent B → Agent C → 结果
*
* 线性流程,每个Agent处理上一个的输出
* 适合:有明确处理步骤的任务(文档处理、内容生成流水线)
* 优点:简单、可预测、易调试
* 缺点:串行执行,整体速度受最慢Agent限制
*
* ===== 模式二:协调者-执行者(Orchestrator-Executor)=====
*
* 协调者Agent
* / | \
* Agent A Agent B Agent C
*
* 协调者分配任务,执行者并行完成,协调者汇总结果
* 适合:复杂任务的并行处理(研究、分析)
* 优点:并行执行,协调者可以动态调整策略
* 缺点:协调者是单点,Prompt设计更复杂
*
* ===== 模式三:点对点(Peer-to-Peer)=====
*
* Agent A ⇄ Agent B ⇄ Agent C
*
* Agent之间直接通信,没有中心协调者
* 适合:需要协商和迭代的任务(辩论、代码审查)
* 优点:更灵活,可以模拟人类协作
* 缺点:复杂度高,容易出现循环或死锁
*/专业Agent定义
/**
* Agent基础接口
*
* 每个Agent有明确的职责和专业能力
* Agent之间通过消息传递通信,不直接调用对方
*/
public interface SpecializedAgent<I, O> {
/**
* Agent唯一标识
*/
String getAgentId();
/**
* Agent描述(给协调者看的,帮助协调者决定用哪个Agent)
*/
String getDescription();
/**
* Agent能力标签
*/
Set<String> getCapabilities();
/**
* 执行任务
*/
AgentResult<O> execute(AgentContext context, I input);
/**
* 任务是否支持异步执行
*/
default boolean supportsAsync() { return false; }
@Data
@Builder
class AgentContext {
private String taskId;
private String sessionId;
private String requestingAgentId; // 是哪个Agent请求的
private Map<String, Object> sharedState; // 共享状态
private int maxExecutionTimeMs;
private boolean dryRun;
}
@Data
@Builder
class AgentResult<T> {
private boolean success;
private T output;
private String error;
private Map<String, Object> metadata; // 执行元数据(token用量、耗时等)
private List<String> warnings;
public static <T> AgentResult<T> success(T output) {
return AgentResult.<T>builder().success(true).output(output).build();
}
public static <T> AgentResult<T> failure(String error) {
return AgentResult.<T>builder().success(false).error(error).build();
}
}
}具体Agent实现示例
/**
* 代码分析Agent
*
* 专注于分析代码的结构、质量和潜在问题
*/
@Component
@Slf4j
public class CodeAnalysisAgent implements SpecializedAgent<CodeAnalysisInput, CodeAnalysisOutput> {
private final ChatLanguageModel llm;
@Autowired
public CodeAnalysisAgent(ChatLanguageModel llm) {
this.llm = llm;
}
@Override
public String getAgentId() { return "code_analysis"; }
@Override
public String getDescription() {
return "分析代码结构、复杂度、可读性,识别代码坏味道和重构机会";
}
@Override
public Set<String> getCapabilities() {
return Set.of("code_analysis", "quality_assessment", "refactoring_suggestion");
}
@Override
public AgentResult<CodeAnalysisOutput> execute(AgentContext context, CodeAnalysisInput input) {
String prompt = """
请分析以下代码,从以下维度评估:
1. 代码结构(模块划分是否合理)
2. 复杂度(方法是否过长、嵌套是否过深)
3. 命名规范(变量、方法、类名是否清晰)
4. 潜在的可读性问题
5. 建议的重构点
代码语言:%s
代码内容:
```
%s
```
请返回JSON格式:
{
"overallScore": 0-10,
"structureScore": 0-10,
"complexityScore": 0-10,
"readabilityScore": 0-10,
"issues": [
{
"type": "STRUCTURE/COMPLEXITY/NAMING/READABILITY",
"severity": "LOW/MEDIUM/HIGH",
"description": "问题描述",
"location": "大致位置(行号或方法名)",
"suggestion": "改进建议"
}
],
"summary": "整体评价(2-3句话)"
}
只返回JSON。
""".formatted(input.getLanguage(), input.getCode());
try {
String response = llm.generate(prompt);
CodeAnalysisOutput output = parseOutput(response);
return AgentResult.success(output);
} catch (Exception e) {
log.error("代码分析失败: {}", e.getMessage());
return AgentResult.failure("代码分析失败: " + e.getMessage());
}
}
private CodeAnalysisOutput parseOutput(String response) {
try {
String json = extractJson(response);
return new ObjectMapper().readValue(json, CodeAnalysisOutput.class);
} catch (Exception e) {
throw new RuntimeException("输出解析失败", e);
}
}
private String extractJson(String s) {
int start = s.indexOf('{'); int end = s.lastIndexOf('}');
return (start >= 0 && end > start) ? s.substring(start, end + 1) : s;
}
@Data
public static class CodeAnalysisInput {
private String language;
private String code;
private String context; // 可选:代码的业务背景
}
@Data
public static class CodeAnalysisOutput {
private double overallScore;
private double structureScore;
private double complexityScore;
private double readabilityScore;
private List<CodeIssue> issues;
private String summary;
@Data
public static class CodeIssue {
private String type;
private String severity;
private String description;
private String location;
private String suggestion;
}
}
}
/**
* 安全漏洞检测Agent
*/
@Component
@Slf4j
public class SecurityAnalysisAgent implements SpecializedAgent<SecurityAnalysisAgent.SecurityInput, SecurityAnalysisAgent.SecurityOutput> {
private final ChatLanguageModel llm;
@Autowired
public SecurityAnalysisAgent(ChatLanguageModel llm) { this.llm = llm; }
@Override
public String getAgentId() { return "security_analysis"; }
@Override
public String getDescription() {
return "检测代码中的安全漏洞:SQL注入、XSS、CSRF、不安全的依赖等";
}
@Override
public Set<String> getCapabilities() {
return Set.of("security_analysis", "vulnerability_detection", "owasp_check");
}
@Override
public AgentResult<SecurityOutput> execute(AgentContext context, SecurityInput input) {
String prompt = """
请对以下代码进行安全审查,关注OWASP Top 10和常见安全漏洞。
代码:
```%s
%s
```
请返回JSON:
{
"riskLevel": "LOW/MEDIUM/HIGH/CRITICAL",
"vulnerabilities": [
{
"type": "漏洞类型(如SQL_INJECTION)",
"severity": "LOW/MEDIUM/HIGH/CRITICAL",
"description": "漏洞描述",
"cweId": "CWE编号(如CWE-89)",
"affectedCode": "受影响的代码片段",
"recommendation": "修复建议"
}
],
"securityScore": 0-10
}
只返回JSON。
""".formatted(input.getLanguage(), input.getCode());
try {
String response = llm.generate(prompt);
String json = extractJson(response);
SecurityOutput output = new ObjectMapper().readValue(json, SecurityOutput.class);
return AgentResult.success(output);
} catch (Exception e) {
return AgentResult.failure("安全分析失败: " + e.getMessage());
}
}
private String extractJson(String s) {
int start = s.indexOf('{'); int end = s.lastIndexOf('}');
return (start >= 0 && end > start) ? s.substring(start, end + 1) : s;
}
@Data public static class SecurityInput { private String language; private String code; }
@Data
public static class SecurityOutput {
private String riskLevel;
private List<Vulnerability> vulnerabilities;
private double securityScore;
@Data
public static class Vulnerability {
private String type;
private String severity;
private String description;
private String cweId;
private String affectedCode;
private String recommendation;
}
}
}协调者Agent
/**
* 代码审查协调者Agent
*
* 负责:
* 1. 根据代码特征决定用哪些专业Agent
* 2. 并行调度专业Agent执行
* 3. 综合各Agent的输出生成最终报告
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class CodeReviewOrchestrator {
private final CodeAnalysisAgent codeAnalysisAgent;
private final SecurityAnalysisAgent securityAnalysisAgent;
private final ChatLanguageModel llm;
private final ExecutorService executor = Executors.newFixedThreadPool(4);
/**
* 完整的代码审查
*/
public CodeReviewReport reviewCode(String code, String language, String context) {
String taskId = UUID.randomUUID().toString();
log.info("开始代码审查: taskId={}, language={}", taskId, language);
SpecializedAgent.AgentContext agentCtx = SpecializedAgent.AgentContext.builder()
.taskId(taskId)
.sessionId(taskId)
.requestingAgentId("orchestrator")
.sharedState(new ConcurrentHashMap<>())
.maxExecutionTimeMs(30000)
.build();
CodeAnalysisAgent.CodeAnalysisInput codeInput = new CodeAnalysisAgent.CodeAnalysisInput();
codeInput.setLanguage(language);
codeInput.setCode(code);
SecurityAnalysisAgent.SecurityInput secInput = new SecurityAnalysisAgent.SecurityInput();
secInput.setLanguage(language);
secInput.setCode(code);
// 并行执行两个专业Agent
CompletableFuture<SpecializedAgent.AgentResult<CodeAnalysisAgent.CodeAnalysisOutput>> codeAnalysisFuture =
CompletableFuture.supplyAsync(() -> codeAnalysisAgent.execute(agentCtx, codeInput), executor);
CompletableFuture<SpecializedAgent.AgentResult<SecurityAnalysisAgent.SecurityOutput>> securityFuture =
CompletableFuture.supplyAsync(() -> securityAnalysisAgent.execute(agentCtx, secInput), executor);
// 等待两个结果
SpecializedAgent.AgentResult<CodeAnalysisAgent.CodeAnalysisOutput> codeResult;
SpecializedAgent.AgentResult<SecurityAnalysisAgent.SecurityOutput> secResult;
try {
codeResult = codeAnalysisFuture.get(25, java.util.concurrent.TimeUnit.SECONDS);
secResult = securityFuture.get(25, java.util.concurrent.TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("代码审查Agent超时", e);
}
// 用LLM综合两个Agent的输出,生成最终报告
return synthesizeReport(code, codeResult, secResult, context);
}
/**
* 综合多个Agent输出生成最终报告
*
* 这个步骤也由LLM完成:把各Agent的结构化输出转成人类友好的审查报告
*/
private CodeReviewReport synthesizeReport(
String code,
SpecializedAgent.AgentResult<CodeAnalysisAgent.CodeAnalysisOutput> codeResult,
SpecializedAgent.AgentResult<SecurityAnalysisAgent.SecurityOutput> secResult,
String context) {
String codeAnalysisSummary = codeResult.isSuccess() && codeResult.getOutput() != null
? "代码质量分数:" + codeResult.getOutput().getOverallScore() + "/10\n" +
"主要问题:" + (codeResult.getOutput().getIssues() != null ?
codeResult.getOutput().getIssues().size() + "个" : "0个")
: "代码分析失败:" + codeResult.getError();
String securitySummary = secResult.isSuccess() && secResult.getOutput() != null
? "安全风险等级:" + secResult.getOutput().getRiskLevel() + "\n" +
"漏洞数量:" + (secResult.getOutput().getVulnerabilities() != null ?
secResult.getOutput().getVulnerabilities().size() + "个" : "0个")
: "安全分析失败:" + secResult.getError();
String synthesisPrompt = """
请综合以下代码审查结果,生成一份完整的审查报告。
代码质量分析结果:
%s
安全分析结果:
%s
请生成:
1. 总体评分(综合两个维度)
2. 优先级最高的3-5个问题(按严重程度排序)
3. 立即需要修复的安全问题(如有)
4. 改进建议总结
5. 是否建议合并(APPROVE/REQUEST_CHANGES/NEEDS_DISCUSSION)
语气专业友好,像资深工程师给同事的代码审查反馈。
""".formatted(codeAnalysisSummary, securitySummary);
String synthesizedReport = llm.generate(synthesisPrompt);
// 计算综合评分
double overallScore = calculateOverallScore(codeResult, secResult);
String recommendation = determineRecommendation(codeResult, secResult);
return CodeReviewReport.builder()
.overallScore(overallScore)
.recommendation(recommendation)
.synthesizedReport(synthesizedReport)
.codeAnalysisResult(codeResult.getOutput())
.securityAnalysisResult(secResult.getOutput())
.build();
}
private double calculateOverallScore(
SpecializedAgent.AgentResult<CodeAnalysisAgent.CodeAnalysisOutput> codeResult,
SpecializedAgent.AgentResult<SecurityAnalysisAgent.SecurityOutput> secResult) {
double codeScore = (codeResult.isSuccess() && codeResult.getOutput() != null)
? codeResult.getOutput().getOverallScore() : 5.0;
double secScore = (secResult.isSuccess() && secResult.getOutput() != null)
? secResult.getOutput().getSecurityScore() : 5.0;
// 安全分占权重更高
return codeScore * 0.4 + secScore * 0.6;
}
private String determineRecommendation(
SpecializedAgent.AgentResult<CodeAnalysisAgent.CodeAnalysisOutput> codeResult,
SpecializedAgent.AgentResult<SecurityAnalysisAgent.SecurityOutput> secResult) {
// 有CRITICAL安全问题 → 必须修改
if (secResult.isSuccess() && secResult.getOutput() != null) {
boolean hasCritical = secResult.getOutput().getVulnerabilities() != null &&
secResult.getOutput().getVulnerabilities().stream()
.anyMatch(v -> "CRITICAL".equals(v.getSeverity()));
if (hasCritical) return "REQUEST_CHANGES";
}
// 代码质量低于6分 → 建议修改
if (codeResult.isSuccess() && codeResult.getOutput() != null &&
codeResult.getOutput().getOverallScore() < 6.0) {
return "REQUEST_CHANGES";
}
// 安全风险是HIGH → 需要讨论
if (secResult.isSuccess() && secResult.getOutput() != null &&
"HIGH".equals(secResult.getOutput().getRiskLevel())) {
return "NEEDS_DISCUSSION";
}
return "APPROVE";
}
@Data
@Builder
public static class CodeReviewReport {
private double overallScore;
private String recommendation; // APPROVE/REQUEST_CHANGES/NEEDS_DISCUSSION
private String synthesizedReport;
private CodeAnalysisAgent.CodeAnalysisOutput codeAnalysisResult;
private SecurityAnalysisAgent.SecurityOutput securityAnalysisResult;
}
}Agent间通信与状态管理
/**
* 多Agent共享状态管理
*
* 问题:Agent之间需要共享上下文,但不能直接耦合
* 解决:通过共享状态存储(黑板模式)
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class AgentStateManager {
private final RedisTemplate<String, String> redisTemplate;
private final ObjectMapper objectMapper;
private static final String STATE_PREFIX = "agent:state:";
private static final Duration DEFAULT_TTL = Duration.ofHours(2);
/**
* Agent写入共享状态(黑板)
*/
public <T> void writeState(String taskId, String key, T value) {
try {
String stateKey = STATE_PREFIX + taskId + ":" + key;
redisTemplate.opsForValue().set(
stateKey,
objectMapper.writeValueAsString(value),
DEFAULT_TTL
);
} catch (Exception e) {
log.error("状态写入失败: taskId={}, key={}", taskId, key, e);
}
}
/**
* Agent读取共享状态
*/
public <T> Optional<T> readState(String taskId, String key, Class<T> type) {
try {
String stateKey = STATE_PREFIX + taskId + ":" + key;
String json = redisTemplate.opsForValue().get(stateKey);
if (json == null) return Optional.empty();
return Optional.of(objectMapper.readValue(json, type));
} catch (Exception e) {
log.warn("状态读取失败: taskId={}, key={}", taskId, key, e);
return Optional.empty();
}
}
/**
* 获取任务的所有状态(供协调者汇总用)
*/
public Map<String, String> getAllTaskState(String taskId) {
String pattern = STATE_PREFIX + taskId + ":*";
Set<String> keys = redisTemplate.keys(pattern);
if (keys == null || keys.isEmpty()) return Map.of();
Map<String, String> state = new LinkedHashMap<>();
keys.forEach(key -> {
String shortKey = key.substring((STATE_PREFIX + taskId + ":").length());
String value = redisTemplate.opsForValue().get(key);
if (value != null) state.put(shortKey, value);
});
return state;
}
/**
* 清理任务状态(任务完成后调用)
*/
public void cleanupTaskState(String taskId) {
String pattern = STATE_PREFIX + taskId + ":*";
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}
}
}实践建议
多Agent不是越多越好
很多工程师看到多Agent的概念就兴奋,想把每个功能都变成一个Agent。但Agent的数量越多,协调开销越大,出错的可能性越高,调试也越难。我的经验是:先用最少的Agent满足需求,当单个Agent开始出现专注度下降(一个Prompt做太多事情)时才拆分。一般来说,3-5个专业Agent的系统已经能处理很复杂的任务了。
Agent的输出要结构化
Agent之间传递的数据最好是结构化JSON,而不是自由文本。自由文本作为下游Agent的输入,会导致提示词复杂化,而且解析结果不稳定。让每个Agent明确定义自己的输出Schema,上下游Agent就能可靠地读取数据。
失败处理要从一开始设计好
在多Agent系统里,局部失败很常见:网络超时、LLM输出解析失败、某个Agent的专业知识范围不够。设计时要考虑:这个Agent失败了,整体任务是终止还是用降级结果继续?协调者应该有明确的降级策略,而不是让未处理的异常冒泡。
