第2281篇:MCP协议深度解析——Model Context Protocol的架构设计与Java实现
第2281篇:MCP协议深度解析——Model Context Protocol的架构设计与Java实现
适读人群:有一定AI应用开发经验的Java工程师 | 阅读时长:约18分钟 | 核心价值:深入理解MCP协议的设计理念,掌握Java实现的关键技术点
去年年底,我们团队花了三个月时间给公司的智能客服系统接入了七八个外部工具:查询订单、检索知识库、调用内部API、读取数据库……每个工具都是一套独立的集成逻辑,提示词里堆满了工具描述,上下文窗口被吃掉了一大半,最要命的是每次换一个LLM提供商,这些工具集成代码要重写一遍。
那段时间我就有个强烈的感觉:这块应该有个标准化的东西。
后来Anthropic在2024年底发布MCP(Model Context Protocol)的时候,我几乎是一眼就看懂了它要解决什么问题。不是因为我多聪明,而是这个痛点我亲身经历过。
MCP的本质:一个AI工具集成的标准化协议
在理解MCP之前,先想想没有MCP的世界是什么样的。
每个AI应用想给LLM提供工具能力,都得自己设计:工具怎么描述、工具调用结果怎么返回、上下文资源怎么管理。OpenAI有OpenAI的Function Calling格式,Anthropic有Anthropic的Tool Use格式,Gemini又是另一套。同一个工具,要接不同的模型,就要写不同的适配层。
MCP想做的事情就是:在AI模型和外部世界之间定义一个标准的通信协议,让工具的开发者只需要实现一次,就能被所有支持MCP的AI系统使用。
这个设计理念和USB标准很像。在USB出现之前,各种外设都有自己的接口;USB出现之后,厂商按照USB规范造设备,用户不用关心底层细节,插上就用。
MCP的架构分三个角色:
- MCP Host:运行AI模型的宿主应用,比如Claude Desktop,或者你自己开发的AI应用
- MCP Client:内嵌在Host中,负责和MCP Server通信的客户端组件
- MCP Server:实际提供工具能力的服务,可以是本地进程,也可以是远程HTTP服务
协议层的设计决策
MCP基于JSON-RPC 2.0协议,这个选择值得分析一下。
JSON-RPC是个非常简单的协议规范,定义了请求/响应/通知三种消息格式。选它的原因我猜主要有两个:
一是足够简单,任何语言都能实现,不需要复杂的序列化框架;二是它天然支持双向通信的通知模式,MCP Server可以主动推送资源变更通知给Client。
MCP协议目前支持两种传输层:
- stdio传输:适合本地进程,通过标准输入输出通信,最简单
- HTTP+SSE传输:适合远程服务,用HTTP发送请求,用SSE(Server-Sent Events)接收服务端推送
MCP定义了三类核心能力:
Tools(工具):AI可以调用的函数,类似Function Calling。区别是MCP的工具定义完全由Server管理,Client只需要知道有哪些工具、怎么调用。
Resources(资源):AI可以读取的数据,比如文件内容、数据库查询结果。Resources是只读的,不是可执行的动作。
Prompts(提示模板):预定义的提示词模板,可以带参数,用于标准化常见的AI交互场景。
Java实现MCP Client的核心代码
官方有Java SDK,但理解协议本身更重要。我们来看一个手写的简化实现,帮助你理解协议的实际交互过程。
首先定义MCP消息的基础数据结构:
// JSON-RPC 2.0 基础消息格式
public class JsonRpcMessage {
private String jsonrpc = "2.0";
private Object id; // 请求ID,可以是字符串或数字
private String method; // 方法名
private Object params; // 参数
private Object result; // 成功结果
private JsonRpcError error; // 错误信息
// getters/setters...
}
public class JsonRpcError {
private int code;
private String message;
private Object data;
}MCP初始化握手流程:
@Service
public class McpClientService {
private final ObjectMapper objectMapper = new ObjectMapper();
private Process serverProcess;
private BufferedReader serverReader;
private PrintWriter serverWriter;
private final AtomicInteger requestIdCounter = new AtomicInteger(1);
/**
* 通过stdio启动并初始化MCP Server
*/
public void initializeServer(String serverCommand) throws IOException {
// 启动MCP Server进程
ProcessBuilder pb = new ProcessBuilder(serverCommand.split(" "));
pb.redirectErrorStream(false);
serverProcess = pb.start();
serverReader = new BufferedReader(
new InputStreamReader(serverProcess.getInputStream())
);
serverWriter = new PrintWriter(
new OutputStreamWriter(serverProcess.getOutputStream())
);
// 发送initialize请求
Map<String, Object> initParams = new HashMap<>();
initParams.put("protocolVersion", "2024-11-05");
initParams.put("capabilities", buildClientCapabilities());
initParams.put("clientInfo", Map.of(
"name", "my-mcp-client",
"version", "1.0.0"
));
JsonRpcMessage initResponse = sendRequest("initialize", initParams);
if (initResponse.getError() != null) {
throw new RuntimeException("MCP初始化失败: " + initResponse.getError().getMessage());
}
// 发送initialized通知(这是必须的,告知Server初始化完成)
sendNotification("notifications/initialized", null);
log.info("MCP Server初始化成功: {}", initResponse.getResult());
}
/**
* 列举所有可用工具
*/
public List<McpTool> listTools() throws IOException {
JsonRpcMessage response = sendRequest("tools/list", null);
if (response.getError() != null) {
throw new RuntimeException("获取工具列表失败: " + response.getError().getMessage());
}
Map<String, Object> result = (Map<String, Object>) response.getResult();
List<Map<String, Object>> toolsData = (List<Map<String, Object>>) result.get("tools");
return toolsData.stream()
.map(this::parseToolDefinition)
.collect(Collectors.toList());
}
/**
* 调用工具
*/
public McpToolResult callTool(String toolName, Map<String, Object> arguments)
throws IOException {
Map<String, Object> params = new HashMap<>();
params.put("name", toolName);
params.put("arguments", arguments);
JsonRpcMessage response = sendRequest("tools/call", params);
if (response.getError() != null) {
throw new RuntimeException("工具调用失败: " + response.getError().getMessage());
}
Map<String, Object> result = (Map<String, Object>) response.getResult();
List<Map<String, Object>> content = (List<Map<String, Object>>) result.get("content");
boolean isError = (boolean) result.getOrDefault("isError", false);
return new McpToolResult(content, isError);
}
/**
* 发送JSON-RPC请求并等待响应
*/
private JsonRpcMessage sendRequest(String method, Object params) throws IOException {
int requestId = requestIdCounter.getAndIncrement();
Map<String, Object> request = new HashMap<>();
request.put("jsonrpc", "2.0");
request.put("id", requestId);
request.put("method", method);
if (params != null) {
request.put("params", params);
}
String requestJson = objectMapper.writeValueAsString(request);
serverWriter.println(requestJson);
serverWriter.flush();
// 读取响应(同步等待)
String responseLine = serverReader.readLine();
if (responseLine == null) {
throw new IOException("MCP Server关闭了连接");
}
return objectMapper.readValue(responseLine, JsonRpcMessage.class);
}
/**
* 发送通知(不需要响应)
*/
private void sendNotification(String method, Object params) throws IOException {
Map<String, Object> notification = new HashMap<>();
notification.put("jsonrpc", "2.0");
notification.put("method", method);
if (params != null) {
notification.put("params", params);
}
serverWriter.println(objectMapper.writeValueAsString(notification));
serverWriter.flush();
}
private Map<String, Object> buildClientCapabilities() {
Map<String, Object> capabilities = new HashMap<>();
capabilities.put("roots", Map.of("listChanged", true));
capabilities.put("sampling", new HashMap<>());
return capabilities;
}
}把MCP工具转换成LLM可用的格式
MCP的工具描述格式和各个LLM厂商的原生格式不一样,需要做转换。以Anthropic的Claude API为例:
@Service
public class McpToClaudeConverter {
/**
* 将MCP工具定义转换为Claude API的tool格式
*/
public List<Tool> convertToClaudeTools(List<McpTool> mcpTools) {
return mcpTools.stream()
.map(this::convertSingleTool)
.collect(Collectors.toList());
}
private Tool convertSingleTool(McpTool mcpTool) {
// MCP的inputSchema就是JSON Schema格式,可以直接用
return Tool.builder()
.name(mcpTool.getName())
.description(mcpTool.getDescription())
.inputSchema(mcpTool.getInputSchema()) // JSON Schema
.build();
}
/**
* 完整的AI对话+工具调用循环
*/
public String chatWithTools(String userMessage, List<McpTool> availableTools)
throws IOException {
AnthropicClient client = new AnthropicClient(System.getenv("ANTHROPIC_API_KEY"));
List<Tool> claudeTools = convertToClaudeTools(availableTools);
List<Message> messages = new ArrayList<>();
messages.add(Message.user(userMessage));
// 对话循环,处理多轮工具调用
while (true) {
MessageResponse response = client.messages()
.create(MessageCreateParams.builder()
.model("claude-opus-4-5")
.maxTokens(4096)
.tools(claudeTools)
.messages(messages)
.build());
// 如果是普通回复,直接返回
if ("end_turn".equals(response.getStopReason())) {
return extractTextContent(response);
}
// 如果需要调用工具
if ("tool_use".equals(response.getStopReason())) {
// 把AI的响应加入对话历史
messages.add(Message.assistant(response.getContent()));
// 处理所有工具调用请求
List<ToolResultContent> toolResults = new ArrayList<>();
for (ContentBlock block : response.getContent()) {
if (block instanceof ToolUseBlock toolUse) {
// 通过MCP调用实际工具
McpToolResult mcpResult = mcpClientService.callTool(
toolUse.getName(),
toolUse.getInput()
);
String resultText = extractTextFromMcpResult(mcpResult);
toolResults.add(ToolResultContent.builder()
.toolUseId(toolUse.getId())
.content(resultText)
.isError(mcpResult.isError())
.build());
}
}
// 把工具结果加入对话历史
messages.add(Message.user(toolResults));
// 继续循环,让AI根据工具结果给出最终回答
}
}
}
}协议细节:能力协商和版本管理
MCP协议设计里有个挺有意思的地方:能力协商(Capability Negotiation)。
在初始化握手时,Client和Server都会声明自己支持的能力集合。比如Server可能声明支持resources但不支持prompts,Client在后续交互中就知道不要发资源相关的请求。
// Server在初始化响应里返回的能力声明示例
{
"capabilities": {
"tools": {
"listChanged": true // 支持工具列表变更通知
},
"resources": {
"subscribe": true, // 支持资源订阅
"listChanged": true
},
"logging": {} // 支持日志功能
// 没有"prompts"字段,说明不支持prompts
}
}这种设计的好处是:协议可以扩展,新能力用新字段声明,老版本的Client/Server看到不认识的能力字段直接忽略,保持向后兼容。
另一个关键的工程细节:MCP的错误处理。MCP定义了一套标准错误码,其中几个最重要的:
public enum McpErrorCode {
PARSE_ERROR(-32700, "JSON解析错误"),
INVALID_REQUEST(-32600, "无效请求"),
METHOD_NOT_FOUND(-32601, "方法不存在"),
INVALID_PARAMS(-32602, "参数无效"),
INTERNAL_ERROR(-32603, "内部错误"),
// MCP自定义错误码范围:-32099 到 -32000
RESOURCE_NOT_FOUND(-32002, "资源不存在"),
TOOL_NOT_FOUND(-32001, "工具不存在");
private final int code;
private final String defaultMessage;
}和现有系统集成的工程考量
在实际项目中,把MCP集成进现有Java应用,有几个工程问题要提前想清楚:
1. 连接池管理
如果你的应用需要同时连接多个MCP Server,每个连接都是一个子进程(stdio模式),进程管理的开销不小。需要设计连接池:
@Component
public class McpServerPool {
private final Map<String, BlockingQueue<McpConnection>> pools = new ConcurrentHashMap<>();
private final McpServerConfig config;
public McpConnection borrowConnection(String serverName) throws InterruptedException {
BlockingQueue<McpConnection> pool = pools.get(serverName);
if (pool == null) {
throw new IllegalArgumentException("未知的MCP Server: " + serverName);
}
// 等待可用连接,最多等5秒
McpConnection conn = pool.poll(5, TimeUnit.SECONDS);
if (conn == null) {
throw new RuntimeException("获取MCP连接超时,Server: " + serverName);
}
// 检查连接是否还活着
if (!conn.isAlive()) {
conn = createNewConnection(serverName);
}
return conn;
}
public void returnConnection(String serverName, McpConnection conn) {
if (conn.isAlive()) {
pools.get(serverName).offer(conn);
} else {
// 连接已死,静默丢弃,下次会创建新连接
log.warn("MCP连接已断开,丢弃: {}", serverName);
}
}
}2. 超时和熔断
MCP工具调用本质上是同步阻塞的,必须配置超时:
@Around("@annotation(McpToolCall)")
public Object handleMcpCall(ProceedingJoinPoint pjp) throws Throwable {
CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
try {
return pjp.proceed();
} catch (Throwable e) {
throw new CompletionException(e);
}
});
try {
return future.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
throw new McpTimeoutException("MCP工具调用超时");
}
}3. 工具调用结果的缓存
某些只读工具(比如查询公司通讯录)的结果可以缓存,减少重复调用:
@Cacheable(value = "mcp-tool-results",
key = "#toolName + ':' + #arguments.hashCode()",
condition = "@mcpToolRegistry.isCacheable(#toolName)")
public McpToolResult cachedToolCall(String toolName, Map<String, Object> arguments) {
return mcpClientService.callTool(toolName, arguments);
}深度思考:MCP解决了什么,没解决什么
MCP在工具集成标准化这件事上做得很好,但它不是银弹。有几个问题值得清醒认识:
MCP没有解决工具发现问题。 MCP定义了工具怎么被调用,但没有定义怎么发现可用的工具。企业内部有几百个MCP Server,你怎么知道哪个Server有你需要的工具?这需要一个额外的工具注册表/目录服务。
MCP的安全模型还不完善。 当前版本的MCP对工具调用的权限控制很简单,企业级使用需要在外层额外做权限校验。这一点官方也承认,说后续版本会加强。
stdio传输的限制。 stdio传输简单,但不适合高并发场景,也不适合跨机器部署。HTTP+SSE传输好一些,但SSE本身的长连接在负载均衡场景下有问题。
工具调用的可观测性。 MCP协议里没有内置的追踪/度量支持,你需要在Client层自己加。
尽管如此,MCP的出现方向是对的。统一的工具接口标准,对整个AI生态的长远发展有价值——就像REST API标准化了Web服务的集成方式一样,MCP有潜力标准化AI工具集成。至于现在的不完善,都是可以在工程层面弥补的。
