MCP 协议实战——给 Claude 接上你的私有数据
MCP 协议实战——给 Claude 接上你的私有数据
适读人群:有自己的私有数据源(数据库、内部 API、文档库)想接入 Claude 的工程师 | 阅读时长:约16分钟 | 核心价值:跳过官方文档的表面,讲工程实践里的真实问题和解决方案
去年我们团队有一个需求,说起来不复杂:让 Claude 能回答关于我们内部系统的问题。
比如:"给我查一下 user_id=10086 的用户最近 30 天的订单状态",或者"我们的 payment-service 今天有没有报过 5xx 错误"。
这类问题 Claude 本身完全回答不了,因为它不知道我们的数据。普通的方法是:我去数据库查一下,把结果粘贴给 Claude,让它帮我分析。这能用,但很麻烦,每次要手动查询再粘贴。
MCP(Model Context Protocol)就是解决这个问题的机制:让 Claude 能直接连接你的数据源,需要什么数据它自己去取。
这篇文章讲我自己实现一个 MCP Server 的过程——接的是内部 MySQL 数据库,不是 Hello World 级别的,是真实项目里在用的。
MCP 是什么,一句话
MCP 是 Anthropic 制定的一个协议,规定了 AI 模型和外部数据/工具之间的通信方式。
你写一个 MCP Server,把你的数据暴露出来;Claude 作为 MCP Client,在需要数据时调用你的 Server。整个过程对 Claude 来说是透明的——它不需要知道你的数据库是什么、API 是什么,它只知道"我可以调用这个工具来获取数据"。
官方文档讲了很多抽象概念。我直接讲工程实现。
我要做什么
目标:写一个 MCP Server,让 Claude 可以:
- 查询用户信息(按 user_id 或 phone)
- 查询用户的订单列表(支持时间范围过滤)
- 查询某个订单的详情和物流状态
- 查询系统的错误日志(按服务名和时间范围)
这四个能力覆盖了我日常最常查的问题。
技术选型:Python + FastAPI,MCP SDK 用 Anthropic 官方的 mcp 包。
开发过程
环境准备
pip install mcp anthropic pymysql fastapi uvicornMCP 的 Python SDK 不复杂,核心是 mcp.server.Server 类,你在这个类上注册你的 tools(工具函数),Claude 就能调用它们。
核心代码结构
# mcp_server.py
import json
import pymysql
import logging
from datetime import datetime
from typing import Any, Optional
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, CallToolResult
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 数据库连接配置(实际项目里用环境变量)
DB_CONFIG = {
"host": "your-db-host",
"port": 3306,
"user": "readonly_user",
"password": "your-password",
"database": "your_database",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor
}
def get_db_connection():
"""获取数据库连接,带重试"""
try:
conn = pymysql.connect(**DB_CONFIG)
return conn
except Exception as e:
logger.error(f"数据库连接失败: {e}")
raise
def execute_query(sql: str, params: tuple = None) -> list[dict]:
"""执行查询,返回结果列表"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
return cursor.fetchall()
finally:
conn.close()
# 初始化 MCP Server
server = Server("internal-data-server")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""声明这个 Server 提供哪些工具"""
return [
Tool(
name="get_user_info",
description="根据用户ID或手机号查询用户基本信息",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "integer",
"description": "用户ID(与 phone 二选一)"
},
"phone": {
"type": "string",
"description": "手机号(与 user_id 二选一)"
}
},
"anyOf": [
{"required": ["user_id"]},
{"required": ["phone"]}
]
}
),
Tool(
name="get_user_orders",
description="查询用户的订单列表,支持按时间范围和状态过滤",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "integer",
"description": "用户ID"
},
"start_date": {
"type": "string",
"description": "开始日期,格式 YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期,格式 YYYY-MM-DD"
},
"status": {
"type": "string",
"description": "订单状态,可选值:PENDING/PAID/SHIPPED/COMPLETED/CANCELLED"
},
"limit": {
"type": "integer",
"description": "返回条数,默认20,最多100",
"default": 20
}
},
"required": ["user_id"]
}
),
Tool(
name="get_order_detail",
description="查询单个订单的详情,包括商品信息和物流状态",
inputSchema={
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "订单号"
}
},
"required": ["order_id"]
}
),
Tool(
name="query_error_logs",
description="查询系统错误日志,按服务名和时间范围过滤",
inputSchema={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "服务名,如 payment-service, user-service"
},
"start_time": {
"type": "string",
"description": "开始时间,格式 YYYY-MM-DD HH:MM:SS"
},
"end_time": {
"type": "string",
"description": "结束时间,格式 YYYY-MM-DD HH:MM:SS"
},
"error_level": {
"type": "string",
"description": "错误级别,可选:ERROR/FATAL,默认 ERROR",
"default": "ERROR"
},
"limit": {
"type": "integer",
"description": "返回条数,默认50",
"default": 50
}
},
"required": ["service_name", "start_time", "end_time"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> CallToolResult:
"""处理工具调用"""
try:
if name == "get_user_info":
result = _get_user_info(arguments)
elif name == "get_user_orders":
result = _get_user_orders(arguments)
elif name == "get_order_detail":
result = _get_order_detail(arguments)
elif name == "query_error_logs":
result = _query_error_logs(arguments)
else:
return CallToolResult(
content=[TextContent(type="text", text=f"未知工具: {name}")],
isError=True
)
return CallToolResult(
content=[TextContent(type="text", text=json.dumps(result, ensure_ascii=False, default=str))]
)
except Exception as e:
logger.error(f"工具执行失败 {name}: {e}")
return CallToolResult(
content=[TextContent(type="text", text=f"查询失败: {str(e)}")],
isError=True
)
def _get_user_info(args: dict) -> dict:
if "user_id" in args:
rows = execute_query(
"SELECT id, nickname, phone, email, status, created_at FROM users WHERE id = %s",
(args["user_id"],)
)
else:
rows = execute_query(
"SELECT id, nickname, phone, email, status, created_at FROM users WHERE phone = %s",
(args["phone"],)
)
if not rows:
return {"found": False, "message": "用户不存在"}
user = rows[0]
# 手机号脱敏
if user.get("phone"):
phone = user["phone"]
user["phone"] = phone[:3] + "****" + phone[-4:]
return {"found": True, "user": user}
def _get_user_orders(args: dict) -> dict:
sql = """
SELECT o.id as order_id, o.status, o.total_amount, o.created_at,
p.name as product_name, p.sku_code
FROM orders o
LEFT JOIN order_items oi ON o.id = oi.order_id
LEFT JOIN products p ON oi.product_id = p.id
WHERE o.user_id = %s
"""
params = [args["user_id"]]
if args.get("start_date"):
sql += " AND o.created_at >= %s"
params.append(args["start_date"] + " 00:00:00")
if args.get("end_date"):
sql += " AND o.created_at <= %s"
params.append(args["end_date"] + " 23:59:59")
if args.get("status"):
sql += " AND o.status = %s"
params.append(args["status"])
limit = min(args.get("limit", 20), 100)
sql += f" ORDER BY o.created_at DESC LIMIT {limit}"
rows = execute_query(sql, tuple(params))
return {"total": len(rows), "orders": rows}
def _get_order_detail(args: dict) -> dict:
order_rows = execute_query(
"""
SELECT o.*, u.nickname as user_name
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE o.id = %s
""",
(args["order_id"],)
)
if not order_rows:
return {"found": False, "message": "订单不存在"}
order = order_rows[0]
items = execute_query(
"""
SELECT oi.quantity, oi.unit_price, p.name, p.sku_code
FROM order_items oi
LEFT JOIN products p ON oi.product_id = p.id
WHERE oi.order_id = %s
""",
(args["order_id"],)
)
order["items"] = items
logistics = execute_query(
"SELECT carrier, tracking_number, status, updated_at FROM logistics WHERE order_id = %s ORDER BY updated_at DESC LIMIT 1",
(args["order_id"],)
)
order["logistics"] = logistics[0] if logistics else None
return {"found": True, "order": order}
def _query_error_logs(args: dict) -> dict:
limit = min(args.get("limit", 50), 200)
rows = execute_query(
"""
SELECT service_name, level, message, stack_trace, created_at
FROM error_logs
WHERE service_name = %s
AND level = %s
AND created_at BETWEEN %s AND %s
ORDER BY created_at DESC
LIMIT %s
""",
(
args["service_name"],
args.get("error_level", "ERROR"),
args["start_time"],
args["end_time"],
limit
)
)
return {"total": len(rows), "logs": rows}
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="internal-data-server",
server_version="1.0.0"
)
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())配置到 Claude Code
把 MCP Server 配置到 Claude Code 的 ~/.claude/claude_desktop_config.json:
{
"mcpServers": {
"internal-data": {
"command": "python",
"args": ["/path/to/mcp_server.py"],
"env": {
"DB_HOST": "your-db-host",
"DB_PASSWORD": "your-password"
}
}
}
}踩过的真实坑
坑一:数据库连接池问题
每次工具调用都新建一个数据库连接,在高频调用下会撑爆连接池。
解决:换成 DBUtils 的 PooledDB 或者 SQLAlchemy 的连接池管理。
from dbutils.pooled_db import PooledDB
pool = PooledDB(
creator=pymysql,
maxconnections=10,
mincached=2,
**DB_CONFIG
)
def get_db_connection():
return pool.connection()坑二:大结果集导致 Claude 上下文溢出
有一次查了一个用户的全部订单,返回了 500 多条,Claude 的上下文直接被撑满,后续对话全部混乱。
解决:在 MCP Server 层面强制限制返回条数,并在 tool description 里说明最大返回量,让 Claude 知道可以分批查。
坑三:日期格式不统一
数据库里存的是 datetime 类型,JSON 序列化之后的格式和 Claude 期望的格式不一样,导致 Claude 在解析时间时出错。
解决:在 json.dumps 里加 default=str,把所有无法序列化的类型(包括 datetime)转成字符串。
坑四:只读权限不够
生产数据库的只读用户默认只有 SELECT 权限,但某些视图或存储过程需要额外权限。在测试环境正常,生产环境报权限错误。
解决:在开发时就用生产环境的只读账户测试,不要依赖有 DBA 权限的账户开发。
实际效果
接好之后,我现在可以直接问 Claude:
"帮我查一下手机号 138xxxx1234 的用户,最近 7 天有没有未完成的订单"
Claude 会自动调用 get_user_info 获取 user_id,再调用 get_user_orders 查询订单,然后给我整理好的结果,不需要我手动查数据库再粘贴了。
这节省的不只是查询时间,是上下文切换的成本。以前查完数据再粘贴给 Claude,这个过程打断了思路。现在 Claude 自己处理数据查询,我可以专注在分析问题上。
