A2A协议实战:Agent-to-Agent通信规范完整实现
A2A协议实战:Agent-to-Agent通信规范完整实现
适读人群:有Spring AI使用经验、想构建多Agent系统的Java工程师 阅读时长:约18分钟 文章价值:完整实现A2A协议,掌握多Agent协作通信的核心范式
老王的多Agent噩梦
老王是某电商平台的高级Java工程师,去年公司决定做一套智能客服系统:订单查询、退款审核、商品推荐,三个功能模块。他很自然地拆成了三个Agent,每个Agent单独和LLM通信。
结果上线一个月,运维天天找他——三个Agent互相不知道对方在干什么,同一个用户的问题被三个Agent各自接了一遍,给出了三个不同的答案。用户投诉说"你们客服说法前后矛盾"。
老王盯着代码想了半天,问我:"这多Agent通信有没有统一规范?"
我说有,Google在2025年推出了A2A(Agent-to-Agent)协议。
这篇文章,就把A2A协议从概念到Spring AI落地,完整走一遍。
A2A协议是什么
A2A协议是Google发布的开放标准,目标是让不同框架、不同厂商的Agent能够标准化地互相通信、委托任务、同步状态。
核心理念很简单:每个Agent对外暴露一个"Agent Card",说清楚自己能干什么;其他Agent通过标准的HTTP/JSON-RPC接口调用它,就像调用微服务一样。
A2A协议定义了三个核心组件:
| 组件 | 说明 | 类比 |
|---|---|---|
| Agent Card | Agent的自我描述,包含能力、接口地址、认证方式 | 服务注册信息 |
| Task | Agent之间传递的工作单元,有状态机 | 工单 |
| Message | Task内的消息,支持文本/文件/结构化数据 | 工单备注 |
Task状态流转:
环境准备
<dependencies>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>Agent Card实现
Agent Card是A2A协议的"名片",每个Agent必须在/.well-known/agent.json暴露。
先定义数据结构:
@Data
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public class AgentCard {
private String name;
private String description;
private String url;
private String version;
@JsonProperty("defaultInputModes")
private List<String> defaultInputModes;
@JsonProperty("defaultOutputModes")
private List<String> defaultOutputModes;
private List<AgentCapability> capabilities;
private AgentAuthentication authentication;
@Data
@Builder
public static class AgentCapability {
private String name;
private String description;
private Map<String, Object> inputSchema;
private Map<String, Object> outputSchema;
}
@Data
@Builder
public static class AgentAuthentication {
private List<String> schemes;
}
}订单Agent的Card配置:
@Configuration
public class OrderAgentCardConfig {
@Bean
public AgentCard orderAgentCard(@Value("${server.port:8081}") int port) {
return AgentCard.builder()
.name("订单查询Agent")
.description("负责查询订单状态、物流信息、订单详情")
.url("http://localhost:" + port)
.version("1.0.0")
.defaultInputModes(List.of("text/plain", "application/json"))
.defaultOutputModes(List.of("text/plain", "application/json"))
.capabilities(List.of(
AgentCard.AgentCapability.builder()
.name("queryOrder")
.description("根据订单号查询订单状态和详情")
.inputSchema(Map.of(
"type", "object",
"properties", Map.of(
"orderId", Map.of("type", "string", "description", "订单号")
),
"required", List.of("orderId")
))
.build(),
AgentCard.AgentCapability.builder()
.name("queryLogistics")
.description("查询订单物流轨迹")
.inputSchema(Map.of(
"type", "object",
"properties", Map.of(
"orderId", Map.of("type", "string")
)
))
.build()
))
.authentication(AgentCard.AgentAuthentication.builder()
.schemes(List.of("Bearer"))
.build())
.build();
}
}暴露Well-Known端点:
@RestController
@RequestMapping("/.well-known")
@Slf4j
public class WellKnownController {
private final AgentCard agentCard;
public WellKnownController(AgentCard agentCard) {
this.agentCard = agentCard;
}
@GetMapping("/agent.json")
public AgentCard getAgentCard() {
log.info("Agent Card被请求");
return agentCard;
}
}Task模型与状态管理
A2A的Task是有完整生命周期的,不能用简单的请求-响应模型。
@Data
@Builder
public class A2ATask {
private String id;
private String sessionId;
private TaskStatus status;
private List<A2AMessage> messages;
private Map<String, Object> metadata;
@JsonProperty("createdAt")
private LocalDateTime createdAt;
@JsonProperty("updatedAt")
private LocalDateTime updatedAt;
public enum TaskStatus {
SUBMITTED, WORKING, INPUT_REQUIRED, COMPLETED, FAILED, CANCELED
}
}@Data
@Builder
public class A2AMessage {
private String role; // "user" or "agent"
private List<A2APart> parts;
private LocalDateTime timestamp;
}@Data
@Builder
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = A2APart.TextPart.class, name = "text"),
@JsonSubTypes.Type(value = A2APart.DataPart.class, name = "data")
})
public class A2APart {
@Data
@Builder
public static class TextPart extends A2APart {
private String text;
}
@Data
@Builder
public static class DataPart extends A2APart {
private Map<String, Object> data;
private String mimeType;
}
}Task存储服务(用Redis做持久化):
@Service
@Slf4j
public class TaskStoreService {
private final RedisTemplate<String, A2ATask> redisTemplate;
private static final String KEY_PREFIX = "a2a:task:";
private static final Duration TASK_TTL = Duration.ofHours(24);
public TaskStoreService(RedisTemplate<String, A2ATask> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public A2ATask createTask(String sessionId) {
A2ATask task = A2ATask.builder()
.id(UUID.randomUUID().toString())
.sessionId(sessionId)
.status(A2ATask.TaskStatus.SUBMITTED)
.messages(new ArrayList<>())
.metadata(new HashMap<>())
.createdAt(LocalDateTime.now())
.updatedAt(LocalDateTime.now())
.build();
save(task);
log.info("创建Task: taskId={}, sessionId={}", task.getId(), sessionId);
return task;
}
public void updateStatus(String taskId, A2ATask.TaskStatus status) {
A2ATask task = getTask(taskId);
task.setStatus(status);
task.setUpdatedAt(LocalDateTime.now());
save(task);
log.info("Task状态更新: taskId={}, status={}", taskId, status);
}
public void addMessage(String taskId, A2AMessage message) {
A2ATask task = getTask(taskId);
task.getMessages().add(message);
task.setUpdatedAt(LocalDateTime.now());
save(task);
}
public A2ATask getTask(String taskId) {
A2ATask task = redisTemplate.opsForValue().get(KEY_PREFIX + taskId);
if (task == null) {
throw new TaskNotFoundException("Task不存在: " + taskId);
}
return task;
}
private void save(A2ATask task) {
redisTemplate.opsForValue().set(KEY_PREFIX + task.getId(), task, TASK_TTL);
}
}订单Agent核心实现
现在实现真正干活的Agent。这里用Spring AI的Function Calling能力:
@Service
@Slf4j
public class OrderAgentService {
private final ChatClient chatClient;
private final TaskStoreService taskStoreService;
public OrderAgentService(ChatClient.Builder builder,
TaskStoreService taskStoreService) {
this.chatClient = builder
.defaultSystem("""
你是一个专业的订单查询Agent。
你只负责处理订单相关问题,包括:订单状态查询、物流轨迹查询、订单详情查看。
如果用户的问题不在你的能力范围内,请明确告知,不要尝试处理。
回答要简洁专业,给出准确的订单信息。
""")
.defaultFunctions("queryOrderFunction", "queryLogisticsFunction")
.build();
this.taskStoreService = taskStoreService;
}
public A2ATask processTask(String taskId, String userMessage) {
// 更新状态为working
taskStoreService.updateStatus(taskId, A2ATask.TaskStatus.WORKING);
// 记录用户消息
taskStoreService.addMessage(taskId, A2AMessage.builder()
.role("user")
.parts(List.of(A2APart.TextPart.builder().text(userMessage).build()))
.timestamp(LocalDateTime.now())
.build());
try {
// 调用LLM处理
String response = chatClient.prompt()
.user(userMessage)
.call()
.content();
// 记录Agent回复
taskStoreService.addMessage(taskId, A2AMessage.builder()
.role("agent")
.parts(List.of(A2APart.TextPart.builder().text(response).build()))
.timestamp(LocalDateTime.now())
.build());
// 标记完成
taskStoreService.updateStatus(taskId, A2ATask.TaskStatus.COMPLETED);
} catch (Exception e) {
log.error("Task处理失败: taskId={}", taskId, e);
taskStoreService.updateStatus(taskId, A2ATask.TaskStatus.FAILED);
}
return taskStoreService.getTask(taskId);
}
}A2A服务端(被调用方)
实现标准的A2A HTTP接口:
@RestController
@RequestMapping("/a2a")
@Slf4j
public class A2AServerController {
private final TaskStoreService taskStoreService;
private final OrderAgentService orderAgentService;
// 省略构造器注入
/**
* 发送任务(同步模式)
*/
@PostMapping("/tasks/send")
public ResponseEntity<A2ATask> sendTask(@RequestBody TaskSendRequest request) {
log.info("收到Task请求: sessionId={}", request.getSessionId());
// 创建Task
A2ATask task = taskStoreService.createTask(request.getSessionId());
// 异步处理,立即返回taskId
String userMessage = extractUserMessage(request.getMessage());
CompletableFuture.runAsync(() ->
orderAgentService.processTask(task.getId(), userMessage)
);
return ResponseEntity.accepted().body(task);
}
/**
* 查询任务状态
*/
@GetMapping("/tasks/{taskId}")
public ResponseEntity<A2ATask> getTask(@PathVariable String taskId) {
try {
return ResponseEntity.ok(taskStoreService.getTask(taskId));
} catch (TaskNotFoundException e) {
return ResponseEntity.notFound().build();
}
}
/**
* 订阅任务更新(SSE流式)
*/
@GetMapping(value = "/tasks/{taskId}/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<A2ATask>> subscribeTask(@PathVariable String taskId) {
return Flux.interval(Duration.ofMillis(500))
.map(i -> {
A2ATask task = taskStoreService.getTask(taskId);
return ServerSentEvent.<A2ATask>builder()
.id(String.valueOf(i))
.event("task-update")
.data(task)
.build();
})
.takeUntil(event -> {
A2ATask.TaskStatus status = event.data().getStatus();
return status == A2ATask.TaskStatus.COMPLETED
|| status == A2ATask.TaskStatus.FAILED
|| status == A2ATask.TaskStatus.CANCELED;
});
}
private String extractUserMessage(A2AMessage message) {
return message.getParts().stream()
.filter(p -> p instanceof A2APart.TextPart)
.map(p -> ((A2APart.TextPart) p).getText())
.findFirst()
.orElse("");
}
}A2A客户端(调用方)
编排Agent需要调用其他Agent,这是客户端的实现:
@Service
@Slf4j
public class A2AClientService {
private final WebClient webClient;
private final ObjectMapper objectMapper;
public A2AClientService(WebClient.Builder builder, ObjectMapper objectMapper) {
this.webClient = builder.build();
this.objectMapper = objectMapper;
}
/**
* 获取Agent能力描述
*/
public AgentCard fetchAgentCard(String agentBaseUrl) {
return webClient.get()
.uri(agentBaseUrl + "/.well-known/agent.json")
.retrieve()
.bodyToMono(AgentCard.class)
.block();
}
/**
* 发送任务并等待完成(轮询模式)
*/
public A2ATask sendTaskAndWait(String agentBaseUrl, String sessionId, String message) {
// 1. 提交任务
TaskSendRequest request = TaskSendRequest.builder()
.sessionId(sessionId)
.message(A2AMessage.builder()
.role("user")
.parts(List.of(A2APart.TextPart.builder().text(message).build()))
.timestamp(LocalDateTime.now())
.build())
.build();
A2ATask task = webClient.post()
.uri(agentBaseUrl + "/a2a/tasks/send")
.bodyValue(request)
.retrieve()
.bodyToMono(A2ATask.class)
.block();
log.info("任务已提交: taskId={}", task.getId());
// 2. 轮询等待完成
return pollUntilDone(agentBaseUrl, task.getId());
}
/**
* 发送任务并通过SSE订阅结果
*/
public Mono<A2ATask> sendTaskWithSubscription(String agentBaseUrl,
String sessionId,
String message) {
// 先提交任务
// ... 提交逻辑同上 ...
// 订阅SSE流
return webClient.get()
.uri(agentBaseUrl + "/a2a/tasks/{taskId}/subscribe", "taskId")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<A2ATask>>() {})
.map(ServerSentEvent::data)
.filter(t -> t.getStatus() == A2ATask.TaskStatus.COMPLETED)
.next();
}
private A2ATask pollUntilDone(String baseUrl, String taskId) {
int maxRetries = 30;
int retryCount = 0;
while (retryCount < maxRetries) {
A2ATask task = webClient.get()
.uri(baseUrl + "/a2a/tasks/" + taskId)
.retrieve()
.bodyToMono(A2ATask.class)
.block();
if (task.getStatus() == A2ATask.TaskStatus.COMPLETED
|| task.getStatus() == A2ATask.TaskStatus.FAILED) {
return task;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
retryCount++;
}
throw new RuntimeException("Task超时: " + taskId);
}
}编排Agent(Orchestrator)
最后是把所有子Agent整合起来的编排层:
@Service
@Slf4j
public class OrchestratorService {
private final ChatClient chatClient;
private final A2AClientService a2aClient;
// Agent地址配置
@Value("${agents.order.url:http://localhost:8081}")
private String orderAgentUrl;
@Value("${agents.refund.url:http://localhost:8082}")
private String refundAgentUrl;
@Value("${agents.recommend.url:http://localhost:8083}")
private String recommendAgentUrl;
public OrchestratorService(ChatClient.Builder builder,
A2AClientService a2aClient) {
this.chatClient = builder
.defaultSystem("""
你是一个智能客服编排Agent。
你需要分析用户的问题,判断应该路由给哪个专业Agent处理:
- 订单查询、物流查询 → 订单Agent (ORDER)
- 退款申请、退款审核 → 退款Agent (REFUND)
- 商品推荐、相关商品 → 推荐Agent (RECOMMEND)
请先返回路由决策,格式:ROUTE:[AGENT_NAME]
然后再处理用户请求。
""")
.build();
this.a2aClient = a2aClient;
}
public String handleUserRequest(String userId, String userMessage) {
String sessionId = userId + "_" + System.currentTimeMillis();
log.info("编排层收到请求: userId={}, message={}", userId, userMessage);
// 1. 路由决策
String routingDecision = chatClient.prompt()
.user("分析这个用户请求,决定路由到哪个Agent:\n" + userMessage)
.call()
.content();
log.info("路由决策: {}", routingDecision);
// 2. 根据决策调用对应Agent
String agentUrl;
if (routingDecision.contains("ORDER")) {
agentUrl = orderAgentUrl;
} else if (routingDecision.contains("REFUND")) {
agentUrl = refundAgentUrl;
} else {
agentUrl = recommendAgentUrl;
}
// 3. 调用子Agent
A2ATask result = a2aClient.sendTaskAndWait(agentUrl, sessionId, userMessage);
// 4. 提取最终回答
return extractFinalAnswer(result);
}
private String extractFinalAnswer(A2ATask task) {
if (task.getStatus() != A2ATask.TaskStatus.COMPLETED) {
return "抱歉,处理您的请求时出现了问题,请稍后重试。";
}
return task.getMessages().stream()
.filter(m -> "agent".equals(m.getRole()))
.reduce((first, second) -> second) // 取最后一条Agent消息
.map(m -> m.getParts().stream()
.filter(p -> p instanceof A2APart.TextPart)
.map(p -> ((A2APart.TextPart) p).getText())
.findFirst()
.orElse(""))
.orElse("处理完成,但无法获取响应内容。");
}
}完整调用链路验证
写个集成测试验证整体流程:
@SpringBootTest
@Slf4j
class A2AIntegrationTest {
@Autowired
private OrchestratorService orchestratorService;
@Test
void testOrderQuery() {
String response = orchestratorService.handleUserRequest(
"user001",
"我的订单2024032100001现在到哪了?"
);
log.info("最终响应: {}", response);
assertThat(response).isNotBlank();
}
@Test
void testRefundRequest() {
String response = orchestratorService.handleUserRequest(
"user002",
"订单2024032100002的商品质量有问题,我要申请退款"
);
log.info("最终响应: {}", response);
assertThat(response).isNotBlank();
}
}踩坑与注意事项
实际落地时有几个坑要避开:
| 坑点 | 问题 | 解决方案 |
|---|---|---|
| Task超时 | 子Agent处理慢,编排层超时 | 用SSE订阅替代轮询,配合合理的超时时间 |
| 消息幂等 | 网络重试导致重复处理 | Task ID做幂等键,Redis检查 |
| 错误传播 | 子Agent失败,编排层无感知 | Task.FAILED状态要向上传递,不能吞掉 |
| Agent发现 | Agent地址硬编码不灵活 | 接入服务注册(Nacos/Consul),动态获取Agent列表 |
| 认证传递 | 各Agent间调用要认证 | JWT Token在Header透传,Agent间信任链 |
小结
A2A协议的核心价值在于标准化——以前每个团队自己定多Agent通信格式,现在有了统一规范,Agent可以跨框架、跨服务调用。
对Java工程师来说,实现思路和微服务很像:
- Agent Card = 服务注册
- Task = 有状态工单
- HTTP/SSE = 同步/异步通信
老王按这套方案重构了他的客服系统,三个Agent各司其职,通过编排层统一调度。上线两周,用户投诉降了80%。他说:"这不就是微服务那套思路嘛,换个皮而已。"
对,就是这样。AI工程的复杂度,归根到底还是分布式系统的复杂度。你原有的Java分布式经验,在这里完全用得上。
