第2099篇:Text-to-SQL的工程落地——让业务人员用自然语言查数据
大约 11 分钟
第2099篇:Text-to-SQL的工程落地——让业务人员用自然语言查数据
适读人群:负责数据平台或BI系统的工程师 | 阅读时长:约20分钟 | 核心价值:掌握生产级Text-to-SQL的实现路径,包括Schema压缩、SQL校验、安全控制和多表联查的处理策略
"能不能让业务人员自己查数据,不用每次都找数据团队?"
这个需求几乎每个公司都有,Text-to-SQL是当前最靠谱的实现方案。但从"接个API能生成SQL"到"让真实用户在生产数据库上查询",中间的距离比你想象的要大。
SQL写错了不仅查不出数据,还可能造成慢查询拖垮数据库,或者把不该看的数据泄露出去。这篇文章把Text-to-SQL的工程安全问题讲透。
系统架构
Schema压缩:解决上下文溢出
/**
* Schema压缩是Text-to-SQL最关键的工程问题
*
* 问题:真实业务库有几百张表,全部放进Prompt超出LLM上下文
* 解决:先用向量检索找到与问题相关的表,只送相关Schema
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SchemaSelectionService {
private final EmbeddingModel embeddingModel;
private final EmbeddingStore schemaEmbeddingStore;
private final TableMetadataRepository metadataRepo;
/**
* 从问题中选出最相关的表(Top-K)
*/
public List<TableSchema> selectRelevantTables(String question, int topK) {
// 1. 嵌入问题
Embedding questionEmbedding = embeddingModel.embed(question).content();
// 2. 检索相关表(每张表的embedding是表名+列名+描述的组合)
EmbeddingSearchRequest request = EmbeddingSearchRequest.builder()
.queryEmbedding(questionEmbedding)
.maxResults(topK)
.minScore(0.6)
.build();
List<EmbeddingMatch<TextSegment>> matches =
schemaEmbeddingStore.search(request).matches();
// 3. 获取完整表结构
List<TableSchema> schemas = matches.stream()
.map(m -> metadataRepo.getTableSchema(
m.embedded().metadata().getString("tableName")))
.filter(s -> s != null)
.toList();
log.debug("Schema选择: question={}, 找到相关表={}", question,
schemas.stream().map(TableSchema::tableName).toList());
return schemas;
}
/**
* 构建压缩后的Schema描述(送入Prompt)
*
* 不是把所有列都列出来,而是选关键列
* 减少Token,同时保留必要信息
*/
public String buildSchemaDescription(List<TableSchema> tables) {
StringBuilder sb = new StringBuilder();
for (TableSchema table : tables) {
sb.append("表名: ").append(table.tableName());
if (table.description() != null && !table.description().isBlank()) {
sb.append(" -- ").append(table.description());
}
sb.append("\n");
// 列信息:只列关键列(主键、外键、常用查询列)
// 过滤掉blob、text大字段(通常不用于查询条件)
table.columns().stream()
.filter(col -> !isLargeField(col.dataType()))
.forEach(col -> {
sb.append(" ").append(col.columnName())
.append(" (").append(col.dataType()).append(")");
if (col.description() != null && !col.description().isBlank()) {
sb.append(" -- ").append(col.description());
}
if (col.isPrimaryKey()) sb.append(" [主键]");
if (col.isForeignKey()) sb.append(" [外键→" + col.referencedTable() + "]");
sb.append("\n");
});
// 常用查询示例(如果有的话,这对LLM生成SQL很有帮助)
if (!table.exampleQueries().isEmpty()) {
sb.append(" 示例:").append(table.exampleQueries().get(0)).append("\n");
}
sb.append("\n");
}
return sb.toString();
}
/**
* 预计算所有表的Schema Embedding(离线任务)
*
* 每次表结构变化时重新执行
*/
public void indexAllSchemas() {
List<TableSchema> allTables = metadataRepo.getAllTableSchemas();
List<TextSegment> segments = allTables.stream()
.map(table -> {
// 把表的语义信息压缩成一段文字,用于检索
String content = buildEmbeddingContent(table);
return TextSegment.from(
content,
Metadata.from("tableName", table.tableName())
);
})
.toList();
List<Embedding> embeddings = embeddingModel.embedAll(segments).content();
schemaEmbeddingStore.addAll(embeddings, segments);
log.info("Schema索引构建完成: {} 张表", allTables.size());
}
private String buildEmbeddingContent(TableSchema table) {
return String.format("%s %s %s %s",
table.tableName(),
table.description(),
table.columns().stream().map(TableSchema.Column::columnName)
.collect(Collectors.joining(" ")),
String.join(" ", table.businessKeywords())
);
}
private boolean isLargeField(String dataType) {
String type = dataType.toUpperCase();
return type.contains("BLOB") || type.contains("TEXT") || type.contains("JSON");
}
@Data
@Builder
public static class TableSchema {
private String tableName;
private String description;
private List<Column> columns;
private List<String> businessKeywords; // 业务关键词,帮助检索
private List<String> exampleQueries;
@Data
@Builder
public static class Column {
private String columnName;
private String dataType;
private String description;
private boolean isPrimaryKey;
private boolean isForeignKey;
private String referencedTable;
}
}
}SQL生成器
/**
* SQL生成核心服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class TextToSqlService {
private final ChatLanguageModel llm;
private final SchemaSelectionService schemaSelector;
private static final String SQL_GENERATION_PROMPT = """
你是一个资深的SQL工程师。请根据用户的自然语言问题,生成对应的SQL查询语句。
**数据库方言**: %s
**可用的表结构**:
%s
**历史对话**(如果有):
%s
**用户问题**: %s
**SQL生成规则**:
1. 只生成SELECT语句,绝对不要生成INSERT/UPDATE/DELETE/DROP等写操作
2. 必须添加LIMIT子句(默认LIMIT 1000,用户指定则按用户要求但不超过10000)
3. 日期字段用标准格式('2024-01-01')
4. 多表联查时用明确的表别名
5. 模糊查询用LIKE,精确匹配用=
6. 如果问题模糊,选择最保守的解释
返回JSON格式:
{
"sql": "SELECT ...",
"explanation": "这条SQL的含义(用自然语言)",
"assumptions": ["如果有不确定的地方,列出假设"],
"needsConfirmation": false
}
只返回JSON。
""";
/**
* 生成SQL
*/
public SqlGenerationResult generateSql(SqlGenerationRequest request) {
// 1. 选择相关Schema
List<SchemaSelectionService.TableSchema> tables =
schemaSelector.selectRelevantTables(request.getQuestion(), 8);
if (tables.isEmpty()) {
return SqlGenerationResult.failed("未找到与问题相关的数据表");
}
String schemaDescription = schemaSelector.buildSchemaDescription(tables);
// 2. 构建Prompt
String historyContext = buildHistoryContext(request.getConversationHistory());
String prompt = SQL_GENERATION_PROMPT.formatted(
request.getDatabaseDialect(),
schemaDescription,
historyContext,
request.getQuestion()
);
// 3. 调用LLM
String response;
try {
response = llm.generate(prompt);
} catch (Exception e) {
log.error("LLM调用失败: {}", e.getMessage());
return SqlGenerationResult.failed("SQL生成服务暂时不可用,请稍后重试");
}
// 4. 解析结果
return parseSqlResult(response, tables);
}
private String buildHistoryContext(List<ConversationTurn> history) {
if (history == null || history.isEmpty()) return "无";
return history.stream()
.map(turn -> String.format("用户:%s\nSQL:%s",
turn.question(), turn.generatedSql()))
.collect(Collectors.joining("\n---\n"));
}
private SqlGenerationResult parseSqlResult(
String response, List<SchemaSelectionService.TableSchema> tables) {
try {
String json = extractJson(response);
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(json);
String sql = node.path("sql").asText("").trim();
String explanation = node.path("explanation").asText("");
List<String> assumptions = new ArrayList<>();
for (JsonNode a : node.path("assumptions")) {
assumptions.add(a.asText());
}
boolean needsConfirmation = node.path("needsConfirmation").asBoolean(false);
if (sql.isBlank()) {
return SqlGenerationResult.failed("未生成有效的SQL");
}
return SqlGenerationResult.success(sql, explanation, assumptions,
needsConfirmation, tables.stream().map(SchemaSelectionService.TableSchema::tableName).toList());
} catch (Exception e) {
log.warn("SQL结果解析失败: {}", e.getMessage());
return SqlGenerationResult.failed("SQL解析失败,请重新描述您的问题");
}
}
private String extractJson(String s) {
int start = s.indexOf('{');
int end = s.lastIndexOf('}');
return (start >= 0 && end > start) ? s.substring(start, end + 1) : s;
}
public record SqlGenerationResult(
boolean success, String sql, String explanation,
List<String> assumptions, boolean needsConfirmation,
List<String> usedTables, String errorMessage
) {
static SqlGenerationResult success(
String sql, String explanation, List<String> assumptions,
boolean needsConfirmation, List<String> tables) {
return new SqlGenerationResult(true, sql, explanation,
assumptions, needsConfirmation, tables, null);
}
static SqlGenerationResult failed(String error) {
return new SqlGenerationResult(false, null, null,
List.of(), false, List.of(), error);
}
}
@Data
@Builder
public static class SqlGenerationRequest {
private String question;
private String databaseDialect;
private List<ConversationTurn> conversationHistory;
private String userId;
}
public record ConversationTurn(String question, String generatedSql) {}
}SQL安全校验
/**
* SQL安全校验
*
* 这是最重要的安全层:防止LLM生成危险SQL
* 包括:写操作、高成本查询、越权访问
*/
@Service
@Slf4j
public class SqlSecurityValidator {
// 危险关键词(大写检查)
private static final Set<String> DANGEROUS_KEYWORDS = Set.of(
"INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE", "ALTER",
"CREATE", "GRANT", "REVOKE", "EXEC", "EXECUTE", "CALL",
"LOAD", "OUTFILE", "INTO DUMPFILE"
);
// 每次查询最大返回行数
private static final int MAX_LIMIT = 10000;
// 每用户每小时最大查询次数
private static final int MAX_QUERIES_PER_HOUR = 100;
// 各用户的允许访问表集合(基于权限控制)
private final Map<String, Set<String>> userTablePermissions;
/**
* 完整安全校验
*/
public SqlValidationResult validate(String sql, String userId) {
List<String> violations = new ArrayList<>();
// 1. 危险操作检查
String upperSql = sql.toUpperCase();
for (String keyword : DANGEROUS_KEYWORDS) {
// 用正则确保是完整单词,不是部分匹配
if (upperSql.matches(".*\\b" + keyword + "\\b.*")) {
violations.add("包含危险操作: " + keyword);
}
}
if (!violations.isEmpty()) {
log.warn("SQL安全校验失败: userId={}, violations={}, sql={}",
userId, violations, sql);
return SqlValidationResult.failed(violations);
}
// 2. 注释注入检查(SQL注释可能绕过校验)
if (sql.contains("--") || sql.contains("/*") || sql.contains("*/")) {
violations.add("SQL中包含注释,禁止使用");
}
// 3. 多语句检查(分号分隔的多条SQL)
String[] statements = sql.split(";");
if (statements.length > 1) {
violations.add("不允许同时执行多条SQL语句");
}
// 4. LIMIT检查(防止全表扫描)
if (!upperSql.contains("LIMIT")) {
violations.add("SQL必须包含LIMIT子句");
} else {
// 检查LIMIT值是否合理
java.util.regex.Matcher m = java.util.regex.Pattern
.compile("LIMIT\\s+(\\d+)", java.util.regex.Pattern.CASE_INSENSITIVE)
.matcher(sql);
if (m.find()) {
int limitValue = Integer.parseInt(m.group(1));
if (limitValue > MAX_LIMIT) {
violations.add("LIMIT超出最大允许值: " + MAX_LIMIT);
}
}
}
// 5. 表权限检查
Set<String> accessedTables = extractTableNames(sql);
Set<String> allowedTables = userTablePermissions.getOrDefault(
userId, Collections.emptySet());
for (String table : accessedTables) {
if (!allowedTables.isEmpty() && !allowedTables.contains(table.toLowerCase())) {
violations.add("无权访问表: " + table);
}
}
if (!violations.isEmpty()) {
return SqlValidationResult.failed(violations);
}
return SqlValidationResult.passed();
}
/**
* 简单的表名提取(基于正则,非完整SQL解析)
*/
private Set<String> extractTableNames(String sql) {
Set<String> tables = new HashSet<>();
// FROM子句
java.util.regex.Matcher fromMatcher = java.util.regex.Pattern
.compile("FROM\\s+([\\w.]+)", java.util.regex.Pattern.CASE_INSENSITIVE)
.matcher(sql);
while (fromMatcher.find()) {
tables.add(fromMatcher.group(1).toLowerCase());
}
// JOIN子句
java.util.regex.Matcher joinMatcher = java.util.regex.Pattern
.compile("JOIN\\s+([\\w.]+)", java.util.regex.Pattern.CASE_INSENSITIVE)
.matcher(sql);
while (joinMatcher.find()) {
tables.add(joinMatcher.group(1).toLowerCase());
}
return tables;
}
public record SqlValidationResult(boolean passed, List<String> violations) {
static SqlValidationResult passed() {
return new SqlValidationResult(true, List.of());
}
static SqlValidationResult failed(List<String> violations) {
return new SqlValidationResult(false, violations);
}
}
}SQL执行和结果处理
/**
* SQL执行引擎
*
* 安全沙箱执行:专用只读数据源、超时控制、资源限制
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SqlExecutionService {
// 专用只读数据源(只有SELECT权限的用户)
@Qualifier("readOnlyDataSource")
private final DataSource readOnlyDataSource;
private final JdbcTemplate readOnlyJdbcTemplate;
private final SqlSecurityValidator securityValidator;
// 单次查询超时(防止慢查询)
private static final int QUERY_TIMEOUT_SECONDS = 30;
/**
* 执行SQL并返回结果
*/
public QueryExecutionResult execute(String sql, String userId) {
// 1. 安全校验
SqlSecurityValidator.SqlValidationResult validation =
securityValidator.validate(sql, userId);
if (!validation.passed()) {
return QueryExecutionResult.failed(
"SQL安全校验未通过: " + String.join("; ", validation.violations()));
}
// 2. 执行SQL(带超时控制)
long startMs = System.currentTimeMillis();
try {
List<Map<String, Object>> rows = readOnlyJdbcTemplate.query(
sql,
(rs, rowNum) -> {
Map<String, Object> row = new LinkedHashMap<>();
ResultSetMetaData meta = rs.getMetaData();
int cols = meta.getColumnCount();
for (int i = 1; i <= cols; i++) {
row.put(meta.getColumnLabel(i), rs.getObject(i));
}
return row;
}
);
long elapsed = System.currentTimeMillis() - startMs;
log.info("SQL执行完成: userId={}, rows={}, elapsed={}ms",
userId, rows.size(), elapsed);
// 如果查询很慢,记录慢查询日志
if (elapsed > 5000) {
log.warn("慢查询: userId={}, elapsed={}ms, sql={}", userId, elapsed, sql);
}
return QueryExecutionResult.success(rows, elapsed);
} catch (QueryTimeoutException e) {
log.warn("SQL超时: userId={}, sql={}", userId, sql);
return QueryExecutionResult.failed("查询超时(超过" + QUERY_TIMEOUT_SECONDS + "秒),请尝试缩小查询范围");
} catch (DataAccessException e) {
log.error("SQL执行失败: userId={}, error={}, sql={}", userId, e.getMessage(), sql);
// 给用户友好的错误信息(不暴露数据库内部错误)
String userMessage = translateSqlError(e);
return QueryExecutionResult.failed(userMessage);
}
}
/**
* 把SQL错误转换成用户友好的信息
*/
private String translateSqlError(DataAccessException e) {
String msg = e.getMessage() != null ? e.getMessage().toLowerCase() : "";
if (msg.contains("table") && msg.contains("not exist")) {
return "查询的数据表不存在,请检查表名";
}
if (msg.contains("column") && msg.contains("not found")) {
return "查询的字段不存在,请检查字段名";
}
if (msg.contains("syntax")) {
return "SQL语法有误,请重新描述您的问题";
}
if (msg.contains("access denied")) {
return "无权访问该数据,请联系管理员";
}
return "查询执行失败,请稍后重试或换一种问法";
}
public record QueryExecutionResult(
boolean success, List<Map<String, Object>> rows,
long elapsedMs, String errorMessage, int rowCount
) {
static QueryExecutionResult success(List<Map<String, Object>> rows, long elapsed) {
return new QueryExecutionResult(true, rows, elapsed, null, rows.size());
}
static QueryExecutionResult failed(String error) {
return new QueryExecutionResult(false, List.of(), 0, error, 0);
}
}
}结果自然语言解释
/**
* 结果解释服务
*
* 把查询结果转化为自然语言描述
* 让不懂SQL的业务人员也能理解查询结果
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ResultExplanationService {
private final ChatLanguageModel llm;
/**
* 生成查询结果的自然语言解释
*/
public String explain(String originalQuestion, String sql,
List<Map<String, Object>> results) {
if (results.isEmpty()) {
return "查询结果为空,没有找到符合条件的数据。";
}
// 准备结果摘要(不把所有数据都放进去)
String resultSummary = buildResultSummary(results);
String prompt = """
用户查询了数据,请用自然语言解释查询结果。
用户的问题:%s
执行的SQL:%s
查询结果摘要:
%s
请用1-3句简洁的自然语言描述查询结果,直接回答用户的问题。
- 如果是统计数据(COUNT/SUM等),直接说明数字
- 如果是列表数据,说明有多少条以及关键信息
- 不需要重复SQL内容
""".formatted(originalQuestion, sql, resultSummary);
try {
return llm.generate(prompt).trim();
} catch (Exception e) {
// LLM失败时给基础说明
return String.format("查询到 %d 条结果。", results.size());
}
}
private String buildResultSummary(List<Map<String, Object>> results) {
StringBuilder sb = new StringBuilder();
sb.append("共 ").append(results.size()).append(" 条记录\n");
// 展示前5条(不超过)
int preview = Math.min(5, results.size());
for (int i = 0; i < preview; i++) {
sb.append(" ").append(results.get(i)).append("\n");
}
if (results.size() > preview) {
sb.append(" ...(剩余 ").append(results.size() - preview).append(" 条)\n");
}
// 如果有数值列,加统计摘要
if (!results.isEmpty()) {
Map<String, Object> first = results.get(0);
first.forEach((key, value) -> {
if (value instanceof Number) {
// 计算数值列的汇总
OptionalDouble avg = results.stream()
.map(r -> r.get(key))
.filter(v -> v instanceof Number)
.mapToDouble(v -> ((Number) v).doubleValue())
.average();
avg.ifPresent(a ->
sb.append(" ").append(key).append(" 平均值: ")
.append(String.format("%.2f", a)).append("\n"));
}
});
}
return sb.toString();
}
}完整流程编排
/**
* Text-to-SQL完整流程编排器
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class TextToSqlOrchestrator {
private final TextToSqlService sqlGenerator;
private final SqlExecutionService executionService;
private final ResultExplanationService explanationService;
/**
* 完整的自然语言查询流程
*/
public NlQueryResponse query(NlQueryRequest request) {
log.info("NL查询: userId={}, question={}",
request.getUserId(), request.getQuestion());
// Step 1: 生成SQL
TextToSqlService.SqlGenerationResult genResult = sqlGenerator.generateSql(
TextToSqlService.SqlGenerationRequest.builder()
.question(request.getQuestion())
.databaseDialect(request.getDialect())
.conversationHistory(request.getHistory())
.userId(request.getUserId())
.build()
);
if (!genResult.success()) {
return NlQueryResponse.failed(genResult.errorMessage());
}
// Step 2: 如果LLM认为需要确认,先向用户确认
if (genResult.needsConfirmation()) {
return NlQueryResponse.needsConfirmation(
genResult.sql(), genResult.explanation(), genResult.assumptions());
}
// Step 3: 执行SQL
SqlExecutionService.QueryExecutionResult execResult =
executionService.execute(genResult.sql(), request.getUserId());
if (!execResult.success()) {
return NlQueryResponse.executionFailed(
genResult.sql(), genResult.explanation(), execResult.errorMessage());
}
// Step 4: 自然语言解释结果
String explanation = explanationService.explain(
request.getQuestion(), genResult.sql(), execResult.rows());
return NlQueryResponse.success(
genResult.sql(), explanation, execResult.rows(), execResult.elapsedMs());
}
public record NlQueryRequest(
String question, String userId, String dialect,
List<TextToSqlService.ConversationTurn> history
) {}
public record NlQueryResponse(
String status, String sql, String explanation,
List<Map<String, Object>> data, long elapsedMs,
String errorMessage, List<String> assumptions
) {
static NlQueryResponse success(String sql, String explanation,
List<Map<String, Object>> data, long elapsed) {
return new NlQueryResponse("success", sql, explanation, data, elapsed, null, List.of());
}
static NlQueryResponse failed(String error) {
return new NlQueryResponse("failed", null, null, List.of(), 0, error, List.of());
}
static NlQueryResponse executionFailed(String sql, String explanation, String error) {
return new NlQueryResponse("execution_failed", sql, explanation,
List.of(), 0, error, List.of());
}
static NlQueryResponse needsConfirmation(String sql, String explanation, List<String> assumptions) {
return new NlQueryResponse("needs_confirmation", sql, explanation,
List.of(), 0, null, assumptions);
}
}
}实践建议
Schema质量决定SQL质量
Text-to-SQL最容易被忽视的工作是表和列的注释。一个有清晰中文描述的字段(create_time -- 订单创建时间),比没有注释的字段(ct)生成SQL的准确率高很多。在推进Text-to-SQL前,先花一周时间补全数据库注释,这笔投入值得。
复杂SQL要用自我修正机制
多表联查的SQL很容易出错。可以在生成SQL后,让LLM检查一遍:"这条SQL是否能正确回答问题?有无逻辑错误?"。简单的自我Review可以降低30-40%的错误率。
慢查询是最大的运营风险
我们上线后最头疼的问题不是SQL错误,而是慢查询。业务人员会写"查询所有订单"这种全表扫描请求,把数据库拖垮。必须在执行前做成本估算(EXPLAIN),超过阈值的查询拒绝执行。现在会在Schema里标注哪些字段有索引,提示LLM优先用有索引的条件。
