自然语言转 SQL 的生产级实现——让业务自己查数据
大约 7 分钟
自然语言转 SQL 的生产级实现——让业务自己查数据
适读人群:Java/Python工程师、做内部工具的开发者 | 阅读时长:约15分钟 | 核心价值:Text2SQL从demo到生产的真实工程差距,含完整可运行代码
有一天我们的产品经理发消息问我:"老张,能不能做个东西,让我自己能查数据,不用每次来烦你?"
我当时心里一乐,这需求好。然后做了一个下午的 demo 给他看,他觉得很神奇,说"就要这个!"。
然后我花了三周才把这个东西做成一个我敢让他真正用的版本。
demo 和生产之间的差距,在 Text2SQL 这个场景里特别大。这篇文章就来说清楚这个差距在哪,以及我是怎么填上的。
demo 很简单,问题出在哪里
一个最基础的 Text2SQL demo 大概是这样的:
import anthropic
client = anthropic.Anthropic()
def text2sql_demo(question: str, schema: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"数据库表结构:\n{schema}\n\n问题:{question}\n\n请生成对应的SQL查询语句。"
}]
)
return response.content[0].text十几行,跑起来就能用。但这个 demo 用于生产有几个致命问题:
- 没有安全边界:业务可以查任何表,可以执行
DROP TABLE,可以用SELECT * FROM user WHERE 1=1把所有用户数据拖出来 - 结果无法理解:业务拿到一堆数字不知道什么意思,问你问题的频率可能更高
- SQL 执行没有限制:没有超时、没有行数限制,一个全表扫描能把数据库打挂
- 没有查询日志:出了问题不知道谁查了什么
这四个问题,每一个在生产里单独发生都是事故。
生产级方案的完整设计
我用 Python 实现,核心是四个模块:权限控制、SQL 生成、安全沙箱、结果解释。
第一层:Schema 权限控制
业务用户不应该看到系统表、用户隐私表的结构。我们做一个"授权 Schema"的概念:
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class TablePermission:
table_name: str
allowed_columns: Optional[List[str]] = None # None 表示所有列
description: str = "" # 给AI看的业务描述,不是技术注释
class SchemaManager:
def __init__(self):
# 这里配置哪些表、哪些列可以被查询
self._permissions: Dict[str, TablePermission] = {}
def register_table(self, permission: TablePermission):
self._permissions[permission.table_name] = permission
def get_authorized_schema(self, db_connection) -> str:
"""生成只包含授权表和列的Schema描述,专门为AI优化"""
schema_parts = []
for table_name, perm in self._permissions.items():
columns = self._get_columns(db_connection, table_name, perm.allowed_columns)
schema_parts.append(
f"表名: {table_name}\n"
f"业务说明: {perm.description}\n"
f"字段: {', '.join(columns)}\n"
)
return "\n---\n".join(schema_parts)
def is_table_allowed(self, table_name: str) -> bool:
return table_name.lower() in self._permissions
def _get_columns(self, conn, table_name: str, allowed_cols: Optional[List[str]]) -> List[str]:
# 实际从数据库获取列定义,这里简化
cursor = conn.cursor()
cursor.execute(f"DESCRIBE {table_name}")
all_columns = [row[0] for row in cursor.fetchall()]
if allowed_cols is None:
return all_columns
return [c for c in all_columns if c in allowed_cols]第二层:SQL 生成(带约束的 Prompt)
import anthropic
import re
class SQLGenerator:
def __init__(self):
self.client = anthropic.Anthropic()
def generate(self, question: str, authorized_schema: str) -> str:
system_prompt = """你是一个数据查询助手。根据用户的问题生成MySQL查询语句。
严格规则:
1. 只能使用 SELECT 语句,不允许 INSERT/UPDATE/DELETE/DROP/CREATE/ALTER 等任何修改语句
2. 只能查询提供的表结构里列出的表和字段
3. 必须在 SELECT 语句中加上 LIMIT,不超过1000行
4. 不允许使用子查询访问未授权的表
5. 只输出SQL语句本身,不要任何解释,不要Markdown代码块标记
如果问题无法用现有表结构回答,输出:CANNOT_ANSWER: <原因>"""
response = self.client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
system=system_prompt,
messages=[{
"role": "user",
"content": f"数据库表结构:\n{authorized_schema}\n\n用户问题:{question}"
}]
)
return response.content[0].text.strip()第三层:SQL 安全沙箱(最关键的一层)
不能只靠 AI 遵守规则——模型可能被精心构造的问题绕过。必须有独立的代码层验证:
import sqlglot
import sqlglot.expressions as exp
class SQLSandbox:
def __init__(self, schema_manager: SchemaManager):
self.schema_manager = schema_manager
def validate(self, sql: str) -> tuple[bool, str]:
"""
返回 (is_valid, error_message)
"""
if sql.startswith("CANNOT_ANSWER:"):
return False, sql[14:].strip()
try:
parsed = sqlglot.parse_one(sql, dialect="mysql")
except Exception as e:
return False, f"SQL解析失败: {str(e)}"
# 规则1:只允许 SELECT
if not isinstance(parsed, exp.Select):
return False, "只允许 SELECT 查询"
# 规则2:检查所有被引用的表是否在授权列表里
tables_used = [
table.name.lower()
for table in parsed.find_all(exp.Table)
]
for table in tables_used:
if not self.schema_manager.is_table_allowed(table):
return False, f"表 '{table}' 没有查询权限"
# 规则3:必须有 LIMIT
if not parsed.find(exp.Limit):
# 自动补充 LIMIT,而不是报错
sql = sql.rstrip(";") + " LIMIT 1000"
parsed = sqlglot.parse_one(sql, dialect="mysql")
# 规则4:检查 LIMIT 值不超过上限
limit_expr = parsed.find(exp.Limit)
if limit_expr:
limit_val = int(limit_expr.expression.this)
if limit_val > 1000:
return False, f"LIMIT {limit_val} 超过最大限制 1000"
return True, sql # 返回可能被修改后的 SQL
def execute_safe(self, sql: str, conn, timeout_seconds: int = 10):
"""带超时的安全执行"""
cursor = conn.cursor()
# MySQL 设置语句级超时(需要 MySQL 5.7.8+)
cursor.execute(f"SET SESSION MAX_EXECUTION_TIME={timeout_seconds * 1000}")
try:
cursor.execute(sql)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return {"columns": columns, "rows": rows, "count": len(rows)}
except Exception as e:
raise RuntimeError(f"查询执行失败: {str(e)}")第四层:结果解释(让业务真正看懂)
返回裸数据是不够的,业务需要看懂这些数字意味着什么:
class ResultExplainer:
def __init__(self):
self.client = anthropic.Anthropic()
def explain(self, question: str, sql: str, result: dict) -> str:
if result["count"] == 0:
return "查询没有找到符合条件的数据。"
# 只取前10行给AI解释,避免token浪费
preview_rows = result["rows"][:10]
preview = "\n".join([
str(dict(zip(result["columns"], row)))
for row in preview_rows
])
response = self.client.messages.create(
model="claude-sonnet-4-5",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""用户问题:{question}
查询返回了 {result['count']} 条数据,以下是前几条:
{preview}
请用简洁的中文解释这个查询结果的含义,帮助非技术人员理解数据说明了什么。不要提SQL,直接说业务结论。"""
}]
)
return response.content[0].text.strip()组合成完整的服务
import sqlite3 # 示例用sqlite3,生产替换成mysql-connector
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
filename="text2sql_audit.log",
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
class Text2SQLService:
def __init__(self, db_connection):
self.conn = db_connection
self.schema_manager = SchemaManager()
self.generator = SQLGenerator()
self.sandbox = SQLSandbox(self.schema_manager)
self.explainer = ResultExplainer()
def setup_permissions(self):
"""配置允许查询的表和字段"""
self.schema_manager.register_table(TablePermission(
table_name="orders",
allowed_columns=["id", "user_id", "amount", "status", "created_at"],
description="订单表,记录用户的购买订单,status字段值:pending=待支付, paid=已支付, cancelled=已取消"
))
self.schema_manager.register_table(TablePermission(
table_name="products",
allowed_columns=["id", "name", "price", "category", "stock"],
description="商品表,记录在售商品信息"
))
# 注意:user 表不在授权列表,业务无法查询用户隐私数据
def query(self, question: str, operator: str) -> dict:
"""
operator: 操作人,用于审计日志
"""
start_time = datetime.now()
log_entry = {
"operator": operator,
"question": question,
"sql": None,
"status": "pending"
}
try:
# 第一步:获取授权Schema
authorized_schema = self.schema_manager.get_authorized_schema(self.conn)
# 第二步:生成SQL
raw_sql = self.generator.generate(question, authorized_schema)
log_entry["sql"] = raw_sql
# 第三步:安全验证
is_valid, validated_sql_or_error = self.sandbox.validate(raw_sql)
if not is_valid:
log_entry["status"] = "blocked"
logging.warning(f"{log_entry}")
return {
"success": False,
"error": f"无法回答这个问题:{validated_sql_or_error}",
"sql": None
}
# 第四步:执行
result = self.sandbox.execute_safe(validated_sql_or_error, self.conn)
# 第五步:生成解释
explanation = self.explainer.explain(question, validated_sql_or_error, result)
log_entry["status"] = "success"
log_entry["rows_returned"] = result["count"]
elapsed = (datetime.now() - start_time).total_seconds()
log_entry["elapsed_seconds"] = elapsed
logging.info(f"{log_entry}")
return {
"success": True,
"explanation": explanation,
"data": result,
"sql": validated_sql_or_error # 对技术人员展示,可选
}
except Exception as e:
log_entry["status"] = "error"
log_entry["error"] = str(e)
logging.error(f"{log_entry}")
return {
"success": False,
"error": "查询执行遇到问题,请联系技术同学",
"sql": None
}demo 和生产的差距总结
我把这几周踩的坑整理成一张表:
| 问题 | demo 的处理 | 生产的处理 |
|---|---|---|
| 权限控制 | 无 | Schema 授权列表,列级别控制 |
| SQL 注入防护 | 依赖模型指令 | sqlglot 静态分析,代码层强制校验 |
| 查询超时 | 无 | MySQL MAX_EXECUTION_TIME |
| 行数限制 | 依赖模型指令 | 自动补 LIMIT,代码层验证 |
| 结果可读性 | 裸数据 | AI 生成业务语言解释 |
| 审计日志 | 无 | 完整记录操作人/问题/SQL/结果 |
| 错误提示 | 报错堆栈 | 业务友好的错误描述 |
这个差距,就是"让 PM 觉得很神奇"和"我敢让 PM 真正用"之间的距离。
还有一件事值得说
这个系统上线一个月后,PM 开始每天自己查数据,给我发消息的频率确实降了很多。但也出现了一个副作用:他开始发现一些以前从来不知道的数据异常,然后又来找我。
这让我意识到:让业务自己查数据,会打开一扇你没预料到的门。 要做好准备应对随之而来的"为什么数据是这样"的问题。
从工程角度看,这其实是好事——问题被发现了,总比一直藏着要好。
