从零开发一个 MCP Server——10 步从概念到实际运行
从零开发一个 MCP Server——10 步从概念到实际运行
适读人群:想给 Claude 接入自己数据或工具的工程师,有 Python 基础 | 阅读时长:约18分钟 | 核心价值:完整可运行的 MCP Server 开发教程,不是文档复述,有真实的 MySQL 查询接入
我的团队里有一个内部知识库,Confluence 上面有几百篇文档:架构设计、接口规范、故障复盘报告、业务流程说明。
每次要查什么,我要打开 Confluence,搜索,翻几篇文档,找到我想要的内容,再复制到 Claude 里分析或者参考。这个流程有时候要 5-10 分钟。
我决定做一个 MCP Server,直接把这个知识库接入 Claude。让 Claude 在我问问题的时候,自己去知识库里找相关的文档,而不是我来回切换窗口。
这篇文章就是这个 MCP Server 的完整开发记录。我把它做成了一个通用的"本地知识库+MySQL"双数据源 MCP Server,可以照着做。
前置条件
你需要:
- Python 3.10+
- 安装
mcp、pymysql、anthropic包 - Claude Code(配置 MCP Server 的地方)
- 一个 MySQL 数据库(或者你要接入的其他数据源)
pip install "mcp[cli]" pymysql python-dotenv第1步:理解 MCP Server 的核心结构
一个 MCP Server 做三件事:
- 声明能力(list_tools):告诉 Claude "我能帮你干什么"
- 处理调用(call_tool):Claude 说"帮我干这个",你去实际执行
- 通过 stdio 通信:Claude Code 和你的 Server 通过标准输入输出交换消息
整个架构非常简单,没有 HTTP、没有端口监听,就是 stdin/stdout。
第2步:创建项目结构
mkdir knowledge-mcp-server
cd knowledge-mcp-server
# 目录结构
# knowledge-mcp-server/
# ├── server.py # 主服务文件
# ├── tools/
# │ ├── __init__.py
# │ ├── knowledge_base.py # 知识库查询工具
# │ └── database.py # 数据库查询工具
# ├── .env # 环境变量(不提交到 git)
# └── requirements.txtmkdir tools
touch tools/__init__.py tools/knowledge_base.py tools/database.py
touch server.py .env requirements.txt第3步:配置环境变量
.env 文件:
# 数据库配置
DB_HOST=localhost
DB_PORT=3306
DB_USER=readonly_user
DB_PASSWORD=your_password_here
DB_NAME=your_database
# 知识库路径(本地文档目录)
KNOWLEDGE_BASE_PATH=/path/to/your/docs第4步:实现数据库查询工具
# tools/database.py
import pymysql
import os
import json
import logging
from typing import Optional
from dbutils.pooled_db import PooledDB
logger = logging.getLogger(__name__)
# 连接池,避免每次查询都新建连接
_pool: Optional[PooledDB] = None
def get_pool() -> PooledDB:
global _pool
if _pool is None:
_pool = PooledDB(
creator=pymysql,
maxconnections=5,
mincached=1,
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", 3306)),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
database=os.getenv("DB_NAME"),
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor
)
return _pool
def execute_query(sql: str, params: tuple = None, max_rows: int = 100) -> dict:
"""
执行只读 SQL 查询
max_rows: 最大返回行数,防止返回太多数据撑爆 Claude 上下文
"""
# 安全检查:只允许 SELECT 语句
sql_stripped = sql.strip().upper()
if not sql_stripped.startswith("SELECT") and not sql_stripped.startswith("SHOW") and not sql_stripped.startswith("DESCRIBE"):
return {"error": "只允许 SELECT/SHOW/DESCRIBE 语句"}
pool = get_pool()
conn = pool.connection()
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
rows = cursor.fetchmany(max_rows)
total_rows = cursor.rowcount
return {
"rows": rows,
"count": len(rows),
"has_more": total_rows > max_rows if total_rows >= 0 else False
}
except pymysql.Error as e:
logger.error(f"SQL 执行失败: {e}, SQL: {sql}")
return {"error": f"查询失败: {str(e)}"}
finally:
conn.close()
def get_table_schema(table_name: str) -> dict:
"""获取表结构,让 Claude 知道表里有什么字段"""
result = execute_query(f"DESCRIBE `{table_name}`")
if "error" in result:
return result
schema_info = []
for row in result["rows"]:
schema_info.append({
"field": row["Field"],
"type": row["Type"],
"nullable": row["Null"] == "YES",
"key": row["Key"],
"default": row["Default"]
})
return {"table": table_name, "columns": schema_info}
def list_tables() -> dict:
"""列出所有可用的表"""
result = execute_query("SHOW TABLES")
if "error" in result:
return result
tables = [list(row.values())[0] for row in result["rows"]]
return {"tables": tables}第5步:实现知识库查询工具
# tools/knowledge_base.py
import os
import json
import logging
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
KNOWLEDGE_BASE_PATH = os.getenv("KNOWLEDGE_BASE_PATH", "./docs")
def _read_file_safe(file_path: Path) -> Optional[str]:
"""读取文件,处理编码问题"""
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
except UnicodeDecodeError:
try:
with open(file_path, "r", encoding="gbk") as f:
return f.read()
except Exception:
return None
except Exception as e:
logger.error(f"读取文件失败 {file_path}: {e}")
return None
def search_knowledge_base(query: str, max_results: int = 5) -> dict:
"""
在知识库里搜索相关文档
简单实现:关键词匹配(实际项目可以接入向量搜索)
"""
base_path = Path(KNOWLEDGE_BASE_PATH)
if not base_path.exists():
return {"error": f"知识库路径不存在: {KNOWLEDGE_BASE_PATH}"}
# 收集所有 md/txt 文件
doc_files = list(base_path.rglob("*.md")) + list(base_path.rglob("*.txt"))
if not doc_files:
return {"results": [], "message": "知识库为空"}
# 关键词分割
keywords = [kw.strip().lower() for kw in query.split() if len(kw.strip()) > 1]
if not keywords:
return {"error": "查询关键词太短"}
# 计算每个文件的匹配分数
scored_results = []
for file_path in doc_files:
content = _read_file_safe(file_path)
if content is None:
continue
content_lower = content.lower()
# 标题(文件名)匹配权重更高
filename_lower = file_path.stem.lower()
score = 0
for keyword in keywords:
# 文件名匹配
if keyword in filename_lower:
score += 10
# 内容匹配(统计出现次数)
score += content_lower.count(keyword)
if score > 0:
# 提取前 500 个字符作为摘要
preview = content[:500].strip()
relative_path = str(file_path.relative_to(base_path))
scored_results.append({
"score": score,
"path": relative_path,
"title": file_path.stem,
"preview": preview
})
# 按分数排序,取前 N 个
scored_results.sort(key=lambda x: x["score"], reverse=True)
top_results = scored_results[:max_results]
# 去掉分数字段,不需要暴露给 Claude
for r in top_results:
del r["score"]
return {
"query": query,
"results": top_results,
"total_found": len(scored_results)
}
def get_document(doc_path: str) -> dict:
"""
获取知识库中某个文档的完整内容
doc_path 是相对于知识库根目录的路径
"""
base_path = Path(KNOWLEDGE_BASE_PATH)
full_path = base_path / doc_path
# 安全检查:防止路径穿越
try:
full_path = full_path.resolve()
base_path = base_path.resolve()
if not str(full_path).startswith(str(base_path)):
return {"error": "非法路径"}
except Exception:
return {"error": "路径解析失败"}
if not full_path.exists():
return {"error": f"文档不存在: {doc_path}"}
content = _read_file_safe(full_path)
if content is None:
return {"error": "文档读取失败"}
# 如果文档很长,只返回前 3000 个字符,告知 Claude 文档被截断
max_content_length = 3000
truncated = len(content) > max_content_length
return {
"path": doc_path,
"title": full_path.stem,
"content": content[:max_content_length] if truncated else content,
"truncated": truncated,
"total_length": len(content)
}
def list_documents(directory: str = "") -> dict:
"""列出知识库中的文档目录结构"""
base_path = Path(KNOWLEDGE_BASE_PATH)
target_path = base_path / directory if directory else base_path
if not target_path.exists():
return {"error": f"目录不存在: {directory}"}
items = []
for item in sorted(target_path.iterdir()):
relative = str(item.relative_to(base_path))
if item.is_dir():
items.append({"type": "directory", "name": item.name, "path": relative})
elif item.suffix in [".md", ".txt", ".rst"]:
items.append({"type": "file", "name": item.name, "path": relative})
return {"directory": directory or "/", "items": items}第6步:实现主 Server 文件
# server.py
import asyncio
import json
import logging
import os
from typing import Any
from dotenv import load_dotenv
load_dotenv()
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, CallToolResult
from tools.database import execute_query, get_table_schema, list_tables
from tools.knowledge_base import search_knowledge_base, get_document, list_documents
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)
server = Server("knowledge-db-server")
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="search_knowledge",
description="在内部知识库中搜索相关文档。支持关键词搜索,返回最相关的文档摘要列表。用于查找架构文档、接口规范、业务流程说明等内部资料。",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,支持多个关键词空格分隔"
},
"max_results": {
"type": "integer",
"description": "返回结果数量,默认5,最多10",
"default": 5
}
},
"required": ["query"]
}
),
Tool(
name="get_document",
description="获取知识库中某个文档的完整内容。使用 search_knowledge 找到文档路径后,用这个工具获取详细内容。",
inputSchema={
"type": "object",
"properties": {
"doc_path": {
"type": "string",
"description": "文档相对路径,如 'architecture/user-service.md'"
}
},
"required": ["doc_path"]
}
),
Tool(
name="list_documents",
description="列出知识库中的文档目录结构,了解知识库里有哪些分类的文档。",
inputSchema={
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "目录路径,留空则列出根目录",
"default": ""
}
}
}
),
Tool(
name="query_database",
description="执行 SQL 查询(仅支持 SELECT/SHOW/DESCRIBE 语句)。用于查询业务数据、分析数据库状态。",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL 查询语句,只允许只读操作"
},
"max_rows": {
"type": "integer",
"description": "最大返回行数,默认50,最大200",
"default": 50
}
},
"required": ["sql"]
}
),
Tool(
name="get_table_schema",
description="获取数据库表的字段结构,了解表有哪些字段和类型,在写 SQL 前先查看表结构。",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "表名"
}
},
"required": ["table_name"]
}
),
Tool(
name="list_tables",
description="列出数据库中所有可用的表,在不清楚有哪些表的时候先调用这个。",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> CallToolResult:
logger.info(f"调用工具: {name}, 参数: {json.dumps(arguments, ensure_ascii=False)}")
try:
if name == "search_knowledge":
result = search_knowledge_base(
query=arguments["query"],
max_results=min(arguments.get("max_results", 5), 10)
)
elif name == "get_document":
result = get_document(arguments["doc_path"])
elif name == "list_documents":
result = list_documents(arguments.get("directory", ""))
elif name == "query_database":
result = execute_query(
sql=arguments["sql"],
max_rows=min(arguments.get("max_rows", 50), 200)
)
elif name == "get_table_schema":
result = get_table_schema(arguments["table_name"])
elif name == "list_tables":
result = list_tables()
else:
return CallToolResult(
content=[TextContent(type="text", text=f"未知工具: {name}")],
isError=True
)
output = json.dumps(result, ensure_ascii=False, default=str, indent=2)
return CallToolResult(content=[TextContent(type="text", text=output)])
except Exception as e:
logger.exception(f"工具执行异常: {name}")
return CallToolResult(
content=[TextContent(type="text", text=f"执行失败: {str(e)}")],
isError=True
)
async def main():
logger.info("启动 Knowledge+DB MCP Server")
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="knowledge-db-server",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=None,
experimental_capabilities={}
)
)
)
if __name__ == "__main__":
asyncio.run(main())第7步:本地测试
在接入 Claude Code 之前,先本地测试 Server 能不能正常启动:
python server.py如果没有报错,说明 Server 能正常启动(它会等待 stdin 输入,这是正常的)。
用 MCP CLI 测试工具调用:
# 安装 mcp cli
pip install "mcp[cli]"
# 列出工具
mcp list-tools server.py
# 测试调用
mcp call-tool server.py list_tables '{}'第8步:配置到 Claude Code
编辑 ~/.claude/claude_desktop_config.json:
{
"mcpServers": {
"knowledge-db": {
"command": "python",
"args": ["/absolute/path/to/knowledge-mcp-server/server.py"],
"env": {
"DB_HOST": "your-db-host",
"DB_PORT": "3306",
"DB_USER": "readonly_user",
"DB_PASSWORD": "your-password",
"DB_NAME": "your_database",
"KNOWLEDGE_BASE_PATH": "/path/to/your/docs"
}
}
}
}第9步:验证运行
重启 Claude Code,然后输入:
列出知识库里有哪些文档目录如果 Claude 能自动调用 list_documents 工具并返回你的文档目录,说明接入成功了。
第10步:常见问题处理
Claude 不调用工具:检查工具的 description 是否清晰,Claude 根据 description 判断是否需要调用这个工具。
工具调用报错:打开日志文件,cat /tmp/mcp_server.log(如果你配置了文件日志),看具体错误。
返回数据太多:减少 max_rows 或者 max_content_length,避免单次工具调用返回太多数据。
连接数据库失败:先用 pymysql 单独测试数据库连接,确认网络和权限都没问题,再排查 MCP Server 层面的问题。
这套 MCP Server 跑起来之后,我查内部文档的效率提升明显。以前要手动搜、手动复制的东西,现在 Claude 自己去取了。
