AI辅助数据分析:用自然语言查询你的业务数据库
AI辅助数据分析:用自然语言查询你的业务数据库
一、真实场景:一个让数据团队崩溃的数字
2026年2月,杭州某电商公司的数据分析师陈磊,在某次内部复盘会上说了一句话:
"我上周接了 47个取数需求,其中 38个 都是重复的或者能自助完成的。"
这句话让在场的所有人沉默了。
陈磊把那38个需求分了类:
- "上周各品类销售额是多少?" × 12次(不同业务问)
- "某个用户最近30天的订单记录" × 8次
- "某个SKU的退货率趋势" × 7次
- "大促期间的转化率漏斗" × 6次
- 其他低难度查询 × 5次
这些查询,一个懂SQL的人5分钟就能写完。但业务同学不懂SQL,每次都要等数据团队。
公司CTO当场拍板:引入Text-to-SQL,让业务人员自助取数。
3个月后,陈磊的取数需求降到了每周 14个(下降 70%),全是真正需要复杂分析的需求。
今天,我把他们的技术方案完整还原出来。
二、Text-to-SQL的核心挑战
2.1 为什么Text-to-SQL"看起来简单,做起来难"?
挑战1:SQL方言差异
- MySQL:
LIMIT 10 - SQL Server:
TOP 10 - Oracle:
ROWNUM <= 10 - PostgreSQL:
LIMIT 10但窗口函数语法有差异 - ClickHouse:有自己独特的函数(
toDate(),groupArray()等)
挑战2:表结构理解 模型需要知道:表名是什么、字段名是什么、字段的业务含义是什么(字段名amt是什么意思?)、表之间的外键关系。
挑战3:业务语义映射
- "大促期间" → 需要知道公司的大促日期
- "活跃用户" → 公司对活跃用户的定义是什么(7天有购买?30天有登录?)
- "GMV" → 是含税还是不含税?退款算不算?
挑战4:安全风险 Text-to-SQL如果不加限制,用户可能生成危险SQL(DELETE、DROP TABLE等)。
2.2 整体架构设计
三、Schema提取:自动获取数据库结构作为上下文
3.1 Schema提取服务(带业务注释支持)
package com.laozhang.ai.texttosql.schema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.stream.Collectors;
/**
* 数据库Schema提取服务
* 提取表结构、字段信息、业务注释,为LLM提供足够的上下文
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DatabaseSchemaExtractor {
private final JdbcTemplate jdbcTemplate;
private final BusinessGlossaryRepository glossaryRepository;
// Schema缓存30分钟(避免每次查询都重新提取)
@Cacheable(value = "schema-cache", key = "#databaseName + '-' + #tableName")
public TableSchema extractTableSchema(String databaseName, String tableName) {
log.info("提取表结构: {}.{}", databaseName, tableName);
// 获取字段信息
List<ColumnInfo> columns = jdbcTemplate.query(
"""
SELECT
COLUMN_NAME,
DATA_TYPE,
CHARACTER_MAXIMUM_LENGTH,
IS_NULLABLE,
COLUMN_DEFAULT,
COLUMN_COMMENT,
COLUMN_KEY
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY ORDINAL_POSITION
""",
(rs, rowNum) -> new ColumnInfo(
rs.getString("COLUMN_NAME"),
rs.getString("DATA_TYPE"),
rs.getString("CHARACTER_MAXIMUM_LENGTH"),
"YES".equals(rs.getString("IS_NULLABLE")),
rs.getString("COLUMN_DEFAULT"),
rs.getString("COLUMN_COMMENT"),
rs.getString("COLUMN_KEY")
),
databaseName, tableName
);
// 获取索引信息(帮助LLM理解查询性能)
List<IndexInfo> indexes = extractIndexInfo(databaseName, tableName);
// 获取表注释
String tableComment = extractTableComment(databaseName, tableName);
// 获取样本数据(帮助LLM理解数据形态)
List<Map<String, Object>> sampleData = extractSampleData(tableName);
// 获取业务词汇表(公司特定的业务概念)
List<BusinessTerm> businessTerms = glossaryRepository.findByRelatedTable(tableName);
return new TableSchema(
tableName, tableComment, columns, indexes, sampleData, businessTerms
);
}
/**
* 获取多表的Schema,生成LLM可理解的文本描述
*/
public String generateSchemaContext(String databaseName, List<String> tableNames) {
StringBuilder sb = new StringBuilder();
sb.append("## 数据库表结构信息\n\n");
for (String tableName : tableNames) {
TableSchema schema = extractTableSchema(databaseName, tableName);
sb.append(formatSchemaAsText(schema));
sb.append("\n\n");
}
// 追加外键关系
sb.append(extractForeignKeyRelations(databaseName, tableNames));
// 追加业务术语解释
sb.append(generateBusinessGlossary(tableNames));
return sb.toString();
}
/**
* 将Schema格式化为LLM容易理解的文本
*/
private String formatSchemaAsText(TableSchema schema) {
StringBuilder sb = new StringBuilder();
sb.append("### 表名:").append(schema.tableName());
if (schema.tableComment() != null && !schema.tableComment().isBlank()) {
sb.append("(").append(schema.tableComment()).append(")");
}
sb.append("\n");
sb.append("字段列表:\n");
for (ColumnInfo col : schema.columns()) {
sb.append("- `").append(col.columnName()).append("`");
sb.append("(").append(col.dataType()).append(")");
if (col.columnComment() != null && !col.columnComment().isBlank()) {
sb.append(":").append(col.columnComment());
}
if ("PRI".equals(col.columnKey())) {
sb.append(" [主键]");
} else if ("MUL".equals(col.columnKey())) {
sb.append(" [外键/索引]");
}
if (!col.nullable()) {
sb.append(" [非空]");
}
sb.append("\n");
}
// 展示3条样本数据,帮助LLM理解数据格式
if (!schema.sampleData().isEmpty()) {
sb.append("\n样本数据(前3行):\n");
sb.append("```\n");
for (Map<String, Object> row : schema.sampleData().subList(0, Math.min(3, schema.sampleData().size()))) {
sb.append(row.toString()).append("\n");
}
sb.append("```\n");
}
return sb.toString();
}
private List<ColumnInfo> extractColumns(String databaseName, String tableName) {
return jdbcTemplate.query(
"SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_COMMENT, COLUMN_KEY " +
"FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=? AND TABLE_NAME=? ORDER BY ORDINAL_POSITION",
(rs, rowNum) -> new ColumnInfo(
rs.getString("COLUMN_NAME"), rs.getString("DATA_TYPE"),
null, "YES".equals(rs.getString("IS_NULLABLE")),
null, rs.getString("COLUMN_COMMENT"), rs.getString("COLUMN_KEY")
),
databaseName, tableName
);
}
private List<IndexInfo> extractIndexInfo(String db, String table) {
return jdbcTemplate.query(
"SELECT INDEX_NAME, COLUMN_NAME, NON_UNIQUE FROM INFORMATION_SCHEMA.STATISTICS " +
"WHERE TABLE_SCHEMA=? AND TABLE_NAME=? ORDER BY INDEX_NAME, SEQ_IN_INDEX",
(rs, rowNum) -> new IndexInfo(
rs.getString("INDEX_NAME"), rs.getString("COLUMN_NAME"),
rs.getInt("NON_UNIQUE") == 0
),
db, table
);
}
private String extractTableComment(String db, String table) {
return jdbcTemplate.queryForObject(
"SELECT TABLE_COMMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=? AND TABLE_NAME=?",
String.class, db, table
);
}
private List<Map<String, Object>> extractSampleData(String tableName) {
try {
return jdbcTemplate.queryForList("SELECT * FROM `" + tableName + "` LIMIT 3");
} catch (Exception e) {
log.warn("获取样本数据失败: table={}, error={}", tableName, e.getMessage());
return Collections.emptyList();
}
}
private String extractForeignKeyRelations(String databaseName, List<String> tableNames) {
String tableList = tableNames.stream()
.map(t -> "'" + t + "'")
.collect(Collectors.joining(","));
List<Map<String, Object>> fkInfo = jdbcTemplate.queryForList(
"SELECT TABLE_NAME, COLUMN_NAME, REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME " +
"FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE " +
"WHERE TABLE_SCHEMA=? AND TABLE_NAME IN (" + tableList + ") AND REFERENCED_TABLE_NAME IS NOT NULL",
databaseName
);
if (fkInfo.isEmpty()) return "";
StringBuilder sb = new StringBuilder("## 表关联关系\n");
for (Map<String, Object> fk : fkInfo) {
sb.append(String.format("- `%s`.`%s` → `%s`.`%s`\n",
fk.get("TABLE_NAME"), fk.get("COLUMN_NAME"),
fk.get("REFERENCED_TABLE_NAME"), fk.get("REFERENCED_COLUMN_NAME")
));
}
return sb.toString();
}
private String generateBusinessGlossary(List<String> tableNames) {
List<BusinessTerm> terms = glossaryRepository.findByRelatedTables(tableNames);
if (terms.isEmpty()) return "";
StringBuilder sb = new StringBuilder("\n## 业务术语说明\n");
for (BusinessTerm term : terms) {
sb.append(String.format("- **%s**:%s\n", term.term(), term.definition()));
}
return sb.toString();
}
// 数据模型
public record TableSchema(
String tableName, String tableComment, List<ColumnInfo> columns,
List<IndexInfo> indexes, List<Map<String, Object>> sampleData,
List<BusinessTerm> businessTerms
) {}
public record ColumnInfo(
String columnName, String dataType, String maxLength, boolean nullable,
String defaultValue, String columnComment, String columnKey
) {}
public record IndexInfo(String indexName, String columnName, boolean unique) {}
public record BusinessTerm(String term, String definition, String relatedTable) {}
}四、Prompt工程:高质量的Text-to-SQL提示词设计
4.1 Few-shot提示词构建器
package com.laozhang.ai.texttosql.prompt;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Text-to-SQL高质量提示词构建器
* 使用Few-shot技术提升SQL生成准确率
*/
@Component
@RequiredArgsConstructor
public class TextToSqlPromptBuilder {
private final DatabaseSchemaExtractor schemaExtractor;
private final SqlExampleRepository exampleRepository;
/**
* 构建完整的Text-to-SQL提示词
*/
public String buildPrompt(
String naturalLanguageQuery,
String databaseName,
List<String> relevantTables,
String dbDialect
) {
String schemaContext = schemaExtractor.generateSchemaContext(databaseName, relevantTables);
List<SqlExample> examples = exampleRepository.findSimilarExamples(naturalLanguageQuery, 3);
return buildSystemPrompt(dbDialect) + "\n\n" +
buildSchemaSection(schemaContext) + "\n\n" +
buildFewShotSection(examples) + "\n\n" +
buildQuerySection(naturalLanguageQuery, dbDialect);
}
private String buildSystemPrompt(String dialect) {
return """
你是一个专业的数据库查询助手,专门将自然语言问题转换为 %s SQL查询语句。
你的任务:
1. 理解用户的业务问题
2. 根据提供的表结构生成正确的SQL
3. 只生成SELECT查询,不生成任何DDL或DML语句
4. 确保SQL语法符合 %s 方言
5. 生成的SQL要高效(合理使用索引字段作为过滤条件)
输出格式:
只输出SQL语句本身,不要有任何解释文字,不要用markdown代码块包裹。
如果无法生成SQL,输出:CANNOT_GENERATE: <原因>
""".formatted(dialect, dialect);
}
private String buildSchemaSection(String schemaContext) {
return "## 可用的数据库表结构\n\n" + schemaContext;
}
private String buildFewShotSection(List<SqlExample> examples) {
if (examples.isEmpty()) return "";
StringBuilder sb = new StringBuilder("## 查询示例(参考这些例子的风格)\n\n");
for (SqlExample example : examples) {
sb.append("**问题**:").append(example.question()).append("\n");
sb.append("**SQL**:\n```sql\n").append(example.sql()).append("\n```\n\n");
}
return sb.toString();
}
private String buildQuerySection(String query, String dialect) {
return """
## 用户问题
请将以下问题转换为 %s SQL查询:
%s
请直接输出SQL语句:
""".formatted(dialect, query);
}
public record SqlExample(String question, String sql, double similarity) {}
}4.2 Few-shot示例库(关键示例集)
package com.laozhang.ai.texttosql.examples;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
* SQL示例库
* 包含典型的业务查询示例,用于Few-shot学习
*
* 实际生产中,这些示例应存储在数据库中,支持在线添加
*/
@Repository
public class SqlExampleRepository {
private static final List<SqlExample> EXAMPLES = List.of(
// 时间范围查询
new SqlExample(
"上周的总销售额",
"""
SELECT SUM(order_amount) as total_sales
FROM orders
WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL DAYOFWEEK(CURDATE())+6 DAY)
AND created_at < DATE_SUB(CURDATE(), INTERVAL DAYOFWEEK(CURDATE())-1 DAY)
AND order_status != 'CANCELLED'
""",
1.0
),
// 分组统计
new SqlExample(
"各品类上月的销售额排行",
"""
SELECT
c.category_name,
SUM(oi.quantity * oi.unit_price) as total_sales,
COUNT(DISTINCT o.order_id) as order_count
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
JOIN categories c ON p.category_id = c.category_id
WHERE o.created_at >= DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 1 MONTH), '%Y-%m-01')
AND o.created_at < DATE_FORMAT(NOW(), '%Y-%m-01')
AND o.order_status != 'CANCELLED'
GROUP BY c.category_id, c.category_name
ORDER BY total_sales DESC
""",
1.0
),
// 用户行为分析
new SqlExample(
"最近30天活跃用户数(活跃定义:有下单记录)",
"""
SELECT COUNT(DISTINCT user_id) as active_users
FROM orders
WHERE created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)
AND order_status NOT IN ('CANCELLED', 'FAILED')
""",
1.0
),
// 漏斗分析
new SqlExample(
"本月用户从浏览到下单的转化率",
"""
SELECT
COUNT(DISTINCT pv.user_id) as browse_users,
COUNT(DISTINCT o.user_id) as order_users,
ROUND(COUNT(DISTINCT o.user_id) / COUNT(DISTINCT pv.user_id) * 100, 2) as conversion_rate
FROM page_views pv
LEFT JOIN orders o ON pv.user_id = o.user_id
AND DATE(o.created_at) = DATE(pv.view_time)
AND MONTH(o.created_at) = MONTH(NOW())
AND YEAR(o.created_at) = YEAR(NOW())
WHERE MONTH(pv.view_time) = MONTH(NOW())
AND YEAR(pv.view_time) = YEAR(NOW())
AND pv.page_type = 'PRODUCT_DETAIL'
""",
1.0
)
);
/**
* 基于向量相似度找到最相关的示例
* 实际生产中应使用向量数据库(如Qdrant)存储示例向量
*/
public List<SqlExample> findSimilarExamples(String query, int topK) {
// 简化实现:直接返回前topK个示例
// 生产实现应该:1.对query做embedding 2.在向量库中检索最相似的示例
return EXAMPLES.stream().limit(topK).toList();
}
public record SqlExample(String question, String sql, double similarity) {}
}五、SQL验证:执行前的安全检查
package com.laozhang.ai.texttosql.validation;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.select.Select;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
/**
* SQL安全验证器
* 在执行SQL前进行多层安全检查
*
* 安全原则:
* 1. 只允许SELECT语句
* 2. 不允许子查询访问系统表
* 3. 不允许时间密集型函数(防止慢查询攻击)
* 4. 强制LIMIT(防止全表扫描返回百万行数据)
*/
@Slf4j
@Component
public class SqlSafetyValidator {
// 危险关键词黑名单
private static final List<String> DANGEROUS_KEYWORDS = Arrays.asList(
"DROP", "DELETE", "UPDATE", "INSERT", "ALTER", "CREATE", "TRUNCATE",
"GRANT", "REVOKE", "EXEC", "EXECUTE", "xp_", "sp_", "CALL"
);
// 系统表黑名单(防止信息泄露)
private static final List<String> SYSTEM_TABLES = Arrays.asList(
"information_schema", "mysql", "performance_schema", "sys",
"pg_catalog", "pg_stat"
);
// SQL注入模式
private static final Pattern SQL_INJECTION_PATTERN = Pattern.compile(
"(?i)(;.*(SELECT|INSERT|UPDATE|DELETE|DROP|EXEC)|UNION.*SELECT|'\\s*OR\\s*'1'\\s*=\\s*'1)"
);
private static final int MAX_ROWS_LIMIT = 10000;
/**
* 完整的SQL安全验证
*/
public ValidationResult validate(String sql) {
if (sql == null || sql.isBlank()) {
return ValidationResult.fail("SQL为空");
}
String normalizedSql = sql.trim().toUpperCase();
// 检查1:是否包含危险关键词
for (String keyword : DANGEROUS_KEYWORDS) {
if (containsDangerousKeyword(normalizedSql, keyword)) {
log.warn("SQL包含危险关键词: keyword={}, sql={}", keyword, sql);
return ValidationResult.fail("SQL包含不允许的操作:" + keyword);
}
}
// 检查2:是否访问系统表
for (String sysTable : SYSTEM_TABLES) {
if (normalizedSql.contains(sysTable.toUpperCase())) {
log.warn("SQL尝试访问系统表: table={}", sysTable);
return ValidationResult.fail("不允许查询系统表");
}
}
// 检查3:SQL注入模式检测
if (SQL_INJECTION_PATTERN.matcher(sql).find()) {
log.warn("检测到SQL注入模式: sql={}", sql);
return ValidationResult.fail("SQL格式不符合规范");
}
// 检查4:使用JSqlParser进行语法解析验证
try {
Statement statement = CCJSqlParserUtil.parse(sql);
if (!(statement instanceof Select)) {
return ValidationResult.fail("只允许SELECT查询");
}
} catch (Exception e) {
log.warn("SQL语法解析失败: sql={}, error={}", sql, e.getMessage());
return ValidationResult.fail("SQL语法错误:" + e.getMessage());
}
// 检查5:强制添加LIMIT(如果没有的话)
String safeSql = ensureLimit(sql);
return ValidationResult.success(safeSql);
}
/**
* 确保SQL有LIMIT限制,防止大量数据返回
*/
private String ensureLimit(String sql) {
String upperSql = sql.trim().toUpperCase();
// 如果已有LIMIT,检查是否超过最大值
if (upperSql.contains("LIMIT")) {
// 尝试提取LIMIT值并验证
Pattern limitPattern = Pattern.compile("LIMIT\\s+(\\d+)", Pattern.CASE_INSENSITIVE);
java.util.regex.Matcher matcher = limitPattern.matcher(sql);
if (matcher.find()) {
int limitValue = Integer.parseInt(matcher.group(1));
if (limitValue > MAX_ROWS_LIMIT) {
// 替换为最大允许值
return sql.replaceAll("(?i)LIMIT\\s+\\d+", "LIMIT " + MAX_ROWS_LIMIT);
}
}
return sql;
}
// 没有LIMIT,自动添加
return sql + " LIMIT " + MAX_ROWS_LIMIT;
}
/**
* 检查是否包含危险关键词(避免误判字段名中包含的关键词)
*/
private boolean containsDangerousKeyword(String sql, String keyword) {
// 使用词边界匹配,避免字段名 "delete_time" 被误判
Pattern pattern = Pattern.compile("\\b" + keyword + "\\b", Pattern.CASE_INSENSITIVE);
return pattern.matcher(sql).find();
}
public record ValidationResult(boolean valid, String safeSql, String errorMessage) {
public static ValidationResult success(String safeSql) {
return new ValidationResult(true, safeSql, null);
}
public static ValidationResult fail(String error) {
return new ValidationResult(false, null, error);
}
}
}六、错误处理:SQL执行失败时的自动重试
package com.laozhang.ai.texttosql.execution;
import com.laozhang.ai.texttosql.prompt.TextToSqlPromptBuilder;
import com.laozhang.ai.texttosql.validation.SqlSafetyValidator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* SQL执行服务,含自动错误恢复机制
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class SqlExecutionService {
private final JdbcTemplate jdbcTemplate;
private final ChatClient chatClient;
private final TextToSqlPromptBuilder promptBuilder;
private final SqlSafetyValidator validator;
private static final int MAX_RETRY_ATTEMPTS = 3;
/**
* 执行自然语言查询,含自动重试和错误修复
*/
public QueryResult executeNaturalLanguageQuery(
String naturalLanguageQuery,
String databaseName,
List<String> relevantTables,
String dbDialect
) {
String lastSql = null;
String lastError = null;
for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
log.info("Text-to-SQL尝试 {}/{}: query={}", attempt, MAX_RETRY_ATTEMPTS, naturalLanguageQuery);
try {
// 构建提示词(第2次及之后,包含上次的错误信息)
String prompt = buildPromptWithErrorContext(
naturalLanguageQuery, databaseName, relevantTables,
dbDialect, lastSql, lastError, attempt
);
// 调用LLM生成SQL
String generatedSql = chatClient.prompt()
.user(prompt)
.call()
.content();
log.info("LLM生成SQL(attempt={}): {}", attempt, generatedSql);
// 检查是否无法生成
if (generatedSql.startsWith("CANNOT_GENERATE:")) {
return QueryResult.cannotGenerate(generatedSql.substring("CANNOT_GENERATE:".length()).trim());
}
// 安全验证
var validationResult = validator.validate(generatedSql);
if (!validationResult.valid()) {
lastSql = generatedSql;
lastError = "SQL安全验证失败:" + validationResult.errorMessage();
log.warn("SQL安全验证失败(attempt={}): error={}", attempt, lastError);
continue;
}
// 执行SQL
List<Map<String, Object>> results = jdbcTemplate.queryForList(validationResult.safeSql());
log.info("SQL执行成功(attempt={}): rows={}", attempt, results.size());
return QueryResult.success(validationResult.safeSql(), results, attempt);
} catch (Exception e) {
lastSql = lastSql;
lastError = e.getMessage();
log.warn("SQL执行失败(attempt={}): error={}", attempt, e.getMessage());
if (attempt == MAX_RETRY_ATTEMPTS) {
log.error("Text-to-SQL最终失败: query={}, lastError={}", naturalLanguageQuery, lastError);
return QueryResult.failed(naturalLanguageQuery, lastError);
}
}
}
return QueryResult.failed(naturalLanguageQuery, lastError);
}
/**
* 构建包含错误上下文的提示词(用于重试修复)
*/
private String buildPromptWithErrorContext(
String query, String databaseName, List<String> tables,
String dialect, String lastSql, String lastError, int attempt
) {
String basePrompt = promptBuilder.buildPrompt(query, databaseName, tables, dialect);
if (attempt == 1 || lastSql == null) {
return basePrompt;
}
// 第2次及之后,告诉LLM上次的错误,请求修复
return basePrompt + """
## 上次生成的SQL存在问题,请修复
上次生成的SQL:
```sql
%s
```
错误信息:%s
请根据以上错误信息,修复SQL并重新生成。
""".formatted(lastSql, lastError);
}
public record QueryResult(
boolean success,
boolean cannotGenerate,
String sql,
List<Map<String, Object>> data,
int attempts,
String errorMessage
) {
public static QueryResult success(String sql, List<Map<String, Object>> data, int attempts) {
return new QueryResult(true, false, sql, data, attempts, null);
}
public static QueryResult failed(String query, String error) {
return new QueryResult(false, false, null, null, 3, error);
}
public static QueryResult cannotGenerate(String reason) {
return new QueryResult(false, true, null, null, 1, reason);
}
}
}七、多数据库支持:方言适配层
package com.laozhang.ai.texttosql.dialect;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* SQL方言适配器
* 支持 MySQL / PostgreSQL / ClickHouse 的语法差异
*/
@Component
public class SqlDialectAdapter {
public enum Dialect {
MYSQL("MySQL 8.0", "LIMIT", "IFNULL", "DATE_FORMAT(date, '%Y-%m-%d')"),
POSTGRESQL("PostgreSQL 15", "LIMIT", "COALESCE", "TO_CHAR(date, 'YYYY-MM-DD')"),
CLICKHOUSE("ClickHouse 23.x", "LIMIT", "ifNull", "formatDateTime(date, '%Y-%m-%d')");
final String description;
final String limitSyntax;
final String nullCoalesce;
final String dateFormat;
Dialect(String desc, String limit, String coalesce, String dateFormat) {
this.description = desc;
this.limitSyntax = limit;
this.nullCoalesce = coalesce;
this.dateFormat = dateFormat;
}
}
/**
* 获取方言特定的提示词补充
*/
public String getDialectHints(Dialect dialect) {
return switch (dialect) {
case MYSQL -> """
MySQL特定语法注意事项:
- 使用反引号(``)引用字段名和表名(如果包含关键字)
- 日期函数:NOW(), CURDATE(), DATE_SUB(), DATE_FORMAT()
- 字符串函数:CONCAT(), SUBSTRING(), LENGTH()
- 条件函数:IF(), IFNULL(), CASE WHEN
- 时间范围:使用 BETWEEN 或 >= AND <
- GROUP BY 需要包含所有SELECT的非聚合字段
""";
case POSTGRESQL -> """
PostgreSQL特定语法注意事项:
- 使用双引号("")引用大小写敏感的字段名
- 日期函数:NOW(), CURRENT_DATE, DATE_TRUNC(), TO_CHAR()
- 字符串拼接:使用 || 或 CONCAT()
- 数组操作:ARRAY_AGG(), UNNEST()
- 窗口函数:ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...)
- ILIKE 用于大小写不敏感的LIKE
""";
case CLICKHOUSE -> """
ClickHouse特定语法注意事项:
- 日期函数:today(), now(), toDate(), toDateTime(), dateDiff()
- 聚合函数:groupArray(), uniqExact(), quantile()
- 时间截断:toStartOfDay(), toStartOfWeek(), toStartOfMonth()
- 注意:ClickHouse不支持JOIN的某些写法,需要用子查询
- 使用 toUInt64() 等类型转换函数
- 大表查询必须有分区键过滤条件
""";
};
}
/**
* 将通用SQL转换为特定方言(处理语法差异)
*/
public String adaptToDialect(String sql, Dialect dialect) {
return switch (dialect) {
case MYSQL -> adaptToMySQL(sql);
case POSTGRESQL -> adaptToPostgreSQL(sql);
case CLICKHOUSE -> adaptToClickHouse(sql);
};
}
private String adaptToMySQL(String sql) {
return sql
.replaceAll("(?i)CURRENT_DATE", "CURDATE()")
.replaceAll("(?i)CURRENT_TIMESTAMP", "NOW()")
.replaceAll("(?i)COALESCE\\(", "IFNULL(");
}
private String adaptToPostgreSQL(String sql) {
return sql
.replaceAll("(?i)IFNULL\\(", "COALESCE(")
.replaceAll("(?i)NOW\\(\\)", "CURRENT_TIMESTAMP");
}
private String adaptToClickHouse(String sql) {
return sql
.replaceAll("(?i)DATE_SUB\\(NOW\\(\\),\\s*INTERVAL\\s+(\\d+)\\s+DAY\\)",
"today() - $1")
.replaceAll("(?i)IFNULL\\(", "ifNull(");
}
}八、自然语言解释:SQL结果转文字描述
package com.laozhang.ai.texttosql.interpretation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* 查询结果自然语言解释服务
* 将SQL查询结果转换为业务友好的文字描述
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class QueryResultInterpreter {
private final ChatClient chatClient;
/**
* 将查询结果解释为自然语言
*/
public String interpret(
String originalQuestion,
String executedSql,
List<Map<String, Object>> queryResults
) {
if (queryResults == null || queryResults.isEmpty()) {
return "查询结果为空。可能是因为所查询的时间段内没有相关数据,或者查询条件过于严格。";
}
// 对结果进行格式化
String formattedResults = formatResultsForLLM(queryResults);
String prompt = """
你是一个数据分析助手,擅长用通俗易懂的语言解释数据查询结果。
用户的问题是:%s
执行的SQL是:
```sql
%s
```
查询结果(共%d行):
%s
请用2-4句话,用业务语言(非技术语言)解释这个查询结果。
要求:
1. 直接回答用户的问题
2. 突出最重要的数字/趋势
3. 如果数据量大,总结关键信息即可
4. 语言简洁,避免技术词汇
5. 如果发现数据异常,可以指出来
""".formatted(
originalQuestion,
executedSql,
queryResults.size(),
formattedResults
);
return chatClient.prompt()
.user(prompt)
.call()
.content();
}
/**
* 将查询结果格式化为LLM容易理解的格式
*/
private String formatResultsForLLM(List<Map<String, Object>> results) {
if (results.isEmpty()) return "(无数据)";
StringBuilder sb = new StringBuilder();
// 只展示前20行给LLM(避免token过多)
int displayCount = Math.min(20, results.size());
// 表头
Map<String, Object> firstRow = results.get(0);
sb.append("| ").append(String.join(" | ", firstRow.keySet())).append(" |\n");
sb.append("|").append(" --- |".repeat(firstRow.size())).append("\n");
// 数据行
for (int i = 0; i < displayCount; i++) {
Map<String, Object> row = results.get(i);
sb.append("| ");
sb.append(String.join(" | ", row.values().stream()
.map(v -> v == null ? "NULL" : v.toString())
.toList()));
sb.append(" |\n");
}
if (results.size() > displayCount) {
sb.append("...(共 ").append(results.size()).append(" 行,仅展示前").append(displayCount).append("行)\n");
}
return sb.toString();
}
}九、查询历史:学习用户习惯改进生成质量
package com.laozhang.ai.texttosql.history;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
/**
* 查询历史管理服务
* 学习用户习惯,持续改进SQL生成质量
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class QueryHistoryService {
private final QueryHistoryRepository historyRepository;
private final SqlExampleRepository exampleRepository;
/**
* 保存查询记录
*/
public void saveQuery(
String userId,
String naturalLanguageQuery,
String generatedSql,
boolean success,
int rowCount,
long executionTimeMs,
Boolean userFeedbackPositive // 用户是否点了"有用"
) {
QueryHistory history = QueryHistory.builder()
.userId(userId)
.naturalLanguageQuery(naturalLanguageQuery)
.generatedSql(generatedSql)
.success(success)
.rowCount(rowCount)
.executionTimeMs(executionTimeMs)
.userFeedbackPositive(userFeedbackPositive)
.queryTime(LocalDateTime.now())
.build();
historyRepository.save(history);
// 如果用户给了正向反馈,将这个查询添加到Few-shot示例库
if (Boolean.TRUE.equals(userFeedbackPositive) && success) {
promoteToExample(naturalLanguageQuery, generatedSql);
}
}
/**
* 将用户确认有效的查询提升为Few-shot示例
* 这是持续学习改进的核心机制
*/
private void promoteToExample(String question, String sql) {
log.info("将用户反馈好的查询提升为示例: question={}", question);
exampleRepository.addExample(new SqlExample(question, sql, 0));
}
/**
* 获取某用户的常用查询(用于快速访问)
*/
public List<QueryHistory> getFrequentQueries(String userId) {
return historyRepository.findTopFrequentQueriesByUser(userId, PageRequest.of(0, 10));
}
/**
* 获取失败查询分析(帮助改进提示词)
*/
public FailureAnalysis analyzeFailures() {
List<QueryHistory> recentFailures = historyRepository.findRecentFailures(
LocalDateTime.now().minusDays(7), PageRequest.of(0, 100)
);
long totalQueries = historyRepository.countByQueryTimeAfter(LocalDateTime.now().minusDays(7));
double failureRate = (double) recentFailures.size() / totalQueries;
// 找出最常见的失败模式
Map<String, Long> failurePatterns = recentFailures.stream()
.collect(java.util.stream.Collectors.groupingBy(
h -> extractFailurePattern(h.getGeneratedSql()),
java.util.stream.Collectors.counting()
));
return new FailureAnalysis(failureRate, recentFailures.size(), totalQueries, failurePatterns);
}
private String extractFailurePattern(String sql) {
if (sql == null) return "NO_SQL_GENERATED";
if (sql.contains("JOIN") && sql.contains("ambiguous")) return "AMBIGUOUS_COLUMN";
if (sql.contains("doesn't exist")) return "TABLE_NOT_FOUND";
return "OTHER";
}
public record FailureAnalysis(
double failureRate,
long failureCount,
long totalCount,
java.util.Map<String, Long> failurePatterns
) {}
public record SqlExample(String question, String sql, double similarity) {}
}十、权限控制:不同用户查询不同表
package com.laozhang.ai.texttosql.permission;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.*;
/**
* Text-to-SQL权限控制服务
* 确保不同角色只能查询有权访问的表和字段
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class TextToSqlPermissionService {
private final UserRoleRepository userRoleRepository;
// 角色到可访问表的映射
private static final Map<String, Set<String>> ROLE_TABLE_PERMISSIONS = Map.of(
"SALES_ANALYST", Set.of(
"orders", "order_items", "products", "categories", "daily_sales_summary"
),
"USER_ANALYST", Set.of(
"users", "user_behaviors", "user_profiles", "user_segments"
// 注意:没有 orders 表(sales数据是隔离的)
),
"MARKETING_ANALYST", Set.of(
"campaigns", "campaign_metrics", "user_segments",
"daily_sales_summary" // 只能看聚合后的销售数据
),
"DATA_ADMIN", Set.of(
// 数据管理员可以访问所有表
"*"
)
);
// 字段级权限(某些角色不能看某些字段)
private static final Map<String, Map<String, Set<String>>> ROLE_FIELD_RESTRICTIONS = Map.of(
"SALES_ANALYST", Map.of(
"users", Set.of("phone", "id_card", "bank_account") // 销售分析师不能看用户敏感信息
),
"MARKETING_ANALYST", Map.of(
"users", Set.of("phone", "id_card", "bank_account", "real_name")
)
);
/**
* 获取用户有权访问的表列表
*/
public Set<String> getAccessibleTables(String userId) {
String userRole = userRoleRepository.getUserRole(userId);
Set<String> allowedTables = ROLE_TABLE_PERMISSIONS.getOrDefault(userRole, Set.of());
if (allowedTables.contains("*")) {
// 数据管理员可以访问所有表
return getAllTableNames();
}
return allowedTables;
}
/**
* 验证生成的SQL是否符合用户权限
*/
public PermissionCheckResult checkSqlPermission(String userId, String sql) {
String userRole = userRoleRepository.getUserRole(userId);
Set<String> accessibleTables = getAccessibleTables(userId);
// 提取SQL中使用的表名
Set<String> usedTables = extractTablesFromSql(sql);
// 检查是否访问了未授权的表
Set<String> unauthorizedTables = new HashSet<>(usedTables);
unauthorizedTables.removeAll(accessibleTables);
if (!unauthorizedTables.isEmpty()) {
log.warn("用户尝试访问未授权的表: userId={}, role={}, tables={}",
userId, userRole, unauthorizedTables);
return PermissionCheckResult.denied(
"您没有权限访问以下表:" + unauthorizedTables
);
}
// 检查字段级权限
Map<String, Set<String>> fieldRestrictions =
ROLE_FIELD_RESTRICTIONS.getOrDefault(userRole, Map.of());
for (Map.Entry<String, Set<String>> entry : fieldRestrictions.entrySet()) {
String table = entry.getKey();
Set<String> restrictedFields = entry.getValue();
for (String field : restrictedFields) {
if (sql.toLowerCase().contains(field.toLowerCase())) {
log.warn("用户尝试访问受限字段: userId={}, table={}, field={}", userId, table, field);
return PermissionCheckResult.denied(
"您没有权限访问字段:" + table + "." + field
);
}
}
}
return PermissionCheckResult.allowed();
}
/**
* 在LLM的Schema上下文中,只包含用户有权访问的表
* 这样LLM就不会生成访问未授权表的SQL
*/
public List<String> filterAccessibleTables(String userId, List<String> requestedTables) {
Set<String> accessibleTables = getAccessibleTables(userId);
return requestedTables.stream()
.filter(accessibleTables::contains)
.toList();
}
private Set<String> extractTablesFromSql(String sql) {
// 简化实现:使用JSqlParser提取表名
// 生产实现应更健壮
Set<String> tables = new HashSet<>();
String[] words = sql.toLowerCase().split("\\s+");
boolean nextIsTable = false;
for (String word : words) {
if (nextIsTable && !word.startsWith("(") && !word.equals("select")) {
tables.add(word.replaceAll("[`,;]", ""));
nextIsTable = false;
}
if (word.equals("from") || word.equals("join")) {
nextIsTable = true;
}
}
return tables;
}
private Set<String> getAllTableNames() {
// 实际从数据库获取所有表名
return Set.of("orders", "order_items", "products", "categories", "users",
"user_behaviors", "campaigns", "campaign_metrics", "daily_sales_summary");
}
public record PermissionCheckResult(boolean allowed, String deniedReason) {
public static PermissionCheckResult allowed() {
return new PermissionCheckResult(true, null);
}
public static PermissionCheckResult denied(String reason) {
return new PermissionCheckResult(false, reason);
}
}
}十一、性能数据
经过3个月生产验证(日均 1200次 自然语言查询):
| 指标 | 数值 |
|---|---|
| SQL生成成功率(一次成功) | 78.3% |
| SQL生成成功率(含重试) | 94.1% |
| 平均生成耗时(LLM调用) | 1.8秒 |
| 平均查询执行耗时 | 0.4秒 |
| 用户满意度评分(1-5分) | 4.2分 |
| 自助取数比例提升 | 70% |
| 数据团队省下工时/周 | 约32小时 |
不同类型查询的成功率:
| 查询类型 | 一次成功率 | 备注 |
|---|---|---|
| 单表查询 | 92% | 最简单 |
| 两表JOIN | 83% | 需要清晰的外键注释 |
| 三表以上JOIN | 65% | 建议拆分或使用视图 |
| 时间范围查询 | 88% | 需要明确的日期函数示例 |
| 聚合分组查询 | 76% | 需要Few-shot示例 |
| 复杂子查询 | 58% | 建议预创建视图替代 |
十二、FAQ
Q1:生成的SQL可靠性怎么保证?我们不敢直接在生产数据库跑。 A:建议为Text-to-SQL单独创建只读数据库账号,只有SELECT权限,且连接数据库是只读副本。本文中的SQL安全验证器 + 只读账号双重保障,可以做到零风险。
Q2:用户提问太模糊怎么办?比如"销售情况怎么样" A:对于模糊查询,先让LLM澄清("您想看哪个时间段的销售?哪个维度?"),而不是直接生成SQL。可以配置一个"澄清优先"的系统提示词。
Q3:遇到公司特定的业务术语(如"GMV"、"DAU")怎么处理? A:维护一个业务词汇表(本文中的BusinessGlossaryRepository),在Schema上下文中包含这些术语的精确定义。这是准确率最关键的优化点之一。
Q4:中文字段名支持吗? A:完全支持。中文字段名在SQL中需要用反引号包裹,LLM能正确处理。但建议字段注释用中文,字段名用英文(规范更好,性能更高)。
Q5:如何评估Text-to-SQL的质量? A:建立评估集(100-200个业务问题+标准SQL),定期运行自动化评估。关注:(1) 精确匹配率;(2) 执行结果一致率;(3) 用户满意度评分。
结语
Text-to-SQL不是万能的,对于涉及复杂业务逻辑的查询(多层子查询、业务规则判断)仍然需要专业数据分析师。
但对于日常的、可复用的数据查询,它完全可以替代人工,让数据团队聚焦在真正有价值的分析工作上。
陈磊的团队用这套方案,每周节省了32小时的重复取数工作,把时间花在了建立数据模型和产出分析洞察上。这才是数据人应该做的事。
