第2473篇:AI在数据仓库中的应用——自然语言查询和数据探索
2026/4/30大约 6 分钟
第2473篇:AI在数据仓库中的应用——自然语言查询和数据探索
适读人群:数据工程师、后端工程师、数据分析师 | 阅读时长:约16分钟 | 核心价值:用Text-to-SQL技术让非技术人员也能直接用自然语言查询数据仓库
我们公司的数据分析师有个日常困境:业务部门需要数据支持,发过来的需求往往是"能不能帮我看看上个月华东区的客单价趋势"或者"最近一周哪些商品的退货率异常高"。
这类需求,对于懂SQL的工程师来说10分钟能出结果。但因为SQL请求要走工单系统,优先级排队,往往要等一两天。业务同事等不起,要么找人走关系插队,要么放弃了。
如果业务同事能直接用自然语言查询,不需要等数据工程师中转,会怎样?
这就是Text-to-SQL要解决的问题。
Text-to-SQL的难点
Text-to-SQL看起来简单——把自然语言翻译成SQL——但实际做起来有几个关键难点:
难点一:Schema理解。LLM需要知道数据仓库有哪些表、哪些字段、字段之间的关系。如果Schema不提供,LLM会"幻想"不存在的表名和字段。
难点二:业务语义消歧。"客单价"在技术上是什么?是order_amount / order_count还是avg(order_amount)?这些业务定义在代码里没有,需要额外维护业务词汇表。
难点三:复杂查询。涉及多表JOIN、窗口函数、条件聚合的复杂查询,LLM生成的SQL正确率会显著下降。
难点四:SQL安全。生成的SQL不能是DROP TABLE这类危险操作,也不能访问不应该被访问的数据。
系统架构
核心实现
1. Schema向量化存储
把数据库Schema向量化,便于语义检索相关的表和字段:
@Service
public class SchemaVectorizer {
private final EmbeddingClient embeddingClient;
private final VectorStore vectorStore;
private final DataSource dataSource;
/**
* 初始化时把所有表/字段的描述向量化并存入向量数据库
*/
@PostConstruct
public void initializeSchemaIndex() {
List<Document> schemaDocuments = new ArrayList<>();
try (Connection conn = dataSource.getConnection()) {
DatabaseMetaData metaData = conn.getMetaData();
// 获取所有表
ResultSet tables = metaData.getTables(null, "dw", null, new String[]{"TABLE", "VIEW"});
while (tables.next()) {
String tableName = tables.getString("TABLE_NAME");
String tableComment = tables.getString("REMARKS");
// 获取表的列信息
List<String> columnDescriptions = new ArrayList<>();
ResultSet columns = metaData.getColumns(null, "dw", tableName, null);
while (columns.next()) {
String colName = columns.getString("COLUMN_NAME");
String colType = columns.getString("TYPE_NAME");
String colComment = columns.getString("REMARKS");
columnDescriptions.add(String.format("%s (%s)%s",
colName, colType,
colComment != null ? ": " + colComment : ""));
}
// 构建表的描述文档
String tableDescription = String.format(
"表名: %s\n描述: %s\n字段:\n%s",
tableName,
tableComment != null ? tableComment : "无描述",
String.join("\n", columnDescriptions)
);
Document doc = new Document(
tableDescription,
Map.of("tableName", tableName, "type", "table_schema")
);
schemaDocuments.add(doc);
}
}
// 批量向量化并存储
vectorStore.add(schemaDocuments);
log.info("已向量化 {} 个表的Schema信息", schemaDocuments.size());
}
/**
* 根据用户问题,检索最相关的表
*/
public List<TableSchema> retrieveRelevantTables(String userQuery, int topK) {
List<Document> relevantDocs = vectorStore.similaritySearch(
SearchRequest.query(userQuery).withTopK(topK)
);
return relevantDocs.stream()
.map(doc -> parseTableSchema(doc))
.collect(toList());
}
}2. 业务词汇表
维护业务术语到技术定义的映射:
@Component
public class BusinessGlossary {
// 业务词汇->SQL片段/计算规则的映射
// 这个字典需要数据分析师和业务人员共同维护
private final Map<String, String> glossaryMap;
public BusinessGlossary() {
this.glossaryMap = new HashMap<>();
// 业务指标定义
glossaryMap.put("客单价", "SUM(order_amount) / COUNT(DISTINCT order_id)");
glossaryMap.put("客均消费", "SUM(order_amount) / COUNT(DISTINCT user_id)");
glossaryMap.put("GMV", "SUM(order_amount)");
glossaryMap.put("转化率", "COUNT(DISTINCT order_id) / COUNT(DISTINCT session_id) * 100");
glossaryMap.put("退货率", "SUM(return_quantity) / SUM(order_quantity) * 100");
glossaryMap.put("复购率", "COUNT(DISTINCT CASE WHEN order_count > 1 THEN user_id END) / COUNT(DISTINCT user_id)");
// 时间范围定义
glossaryMap.put("上个月", "DATE_FORMAT(NOW(), '%Y-%m-01') - INTERVAL 1 MONTH 到 DATE_FORMAT(NOW(), '%Y-%m-01') - INTERVAL 1 DAY");
glossaryMap.put("本季度", "DATE_FORMAT(NOW(), '%Y-') + QUARTER(NOW()) 季度");
// 地区定义
glossaryMap.put("华东区", "province IN ('上海', '江苏', '浙江', '安徽', '江西', '山东', '福建')");
glossaryMap.put("华南区", "province IN ('广东', '广西', '海南')");
}
public String enrichQuery(String userQuery) {
String enriched = userQuery;
for (Map.Entry<String, String> entry : glossaryMap.entrySet()) {
if (userQuery.contains(entry.getKey())) {
enriched = enriched + "\n[业务定义] " + entry.getKey() + " = " + entry.getValue();
}
}
return enriched;
}
public String getRelevantDefinitions(String userQuery) {
return glossaryMap.entrySet().stream()
.filter(e -> userQuery.contains(e.getKey()))
.map(e -> e.getKey() + " 的定义: " + e.getValue())
.collect(joining("\n"));
}
}3. Text-to-SQL生成器
@Service
public class TextToSQLService {
private final ChatClient chatClient;
private final SchemaVectorizer schemaVectorizer;
private final BusinessGlossary glossary;
private final SQLValidator sqlValidator;
public SQLQueryResult translateAndExecute(String userQuestion) {
// 1. 检索相关Schema
List<TableSchema> relevantTables = schemaVectorizer.retrieveRelevantTables(userQuestion, 10);
// 2. 获取业务词汇定义
String businessDefs = glossary.getRelevantDefinitions(userQuestion);
// 3. 生成SQL
String generatedSQL = generateSQL(userQuestion, relevantTables, businessDefs);
// 4. 验证SQL安全性
ValidationResult validation = sqlValidator.validate(generatedSQL);
if (!validation.isSafe()) {
throw new UnsafeQueryException("生成的SQL包含不安全操作: " + validation.getReason());
}
// 5. 执行SQL(只读权限的连接)
QueryResult result = executeReadOnlyQuery(generatedSQL);
// 6. 生成自然语言解读
String interpretation = interpretResult(userQuestion, generatedSQL, result);
return SQLQueryResult.builder()
.originalQuestion(userQuestion)
.generatedSQL(generatedSQL)
.result(result)
.interpretation(interpretation)
.build();
}
private String generateSQL(
String question,
List<TableSchema> schemas,
String businessDefs) {
StringBuilder schemaContext = new StringBuilder();
schemas.forEach(schema -> {
schemaContext.append("-- 表: ").append(schema.getTableName()).append("\n");
schemaContext.append("-- 描述: ").append(schema.getDescription()).append("\n");
schemaContext.append(schema.getCreateStatement()).append("\n\n");
});
String prompt = """
根据以下数据库Schema和用户问题,生成正确的SQL查询。
数据库Schema:
%s
业务术语定义:
%s
用户问题: %s
要求:
1. 只生成SELECT语句,不生成DML或DDL
2. 日期处理用MySQL兼容的函数
3. 结果集不超过10000行(加LIMIT限制)
4. 对聚合查询,结果要有意义的排序
5. 字段命名要有业务含义(使用AS起别名)
只返回SQL语句,不要解释。
""".formatted(schemaContext, businessDefs, question);
return chatClient.call(new Prompt(
List.of(
new SystemMessage("你是一个专业的数据分析师,精通SQL和业务数据分析。生成的SQL必须可以直接执行。"),
new UserMessage(prompt)
),
OpenAiChatOptions.builder()
.withModel("gpt-4o")
.withTemperature(0.1f)
.build()
)).getResult().getOutput().getContent()
.replaceAll("```sql\n?", "")
.replaceAll("```\n?", "")
.trim();
}
private String interpretResult(String question, String sql, QueryResult result) {
if (result.isEmpty()) {
return "查询结果为空,没有符合条件的数据。";
}
String interpretPrompt = """
用户问了:%s
查询结果(前5行):
%s
总行数: %d
请用1-3句话,用业务语言解读这个查询结果的含义。
直接说结论,不要重复描述数据。
""".formatted(question, result.toMarkdownTable(5), result.getTotalRows());
return chatClient.call(new Prompt(new UserMessage(interpretPrompt)))
.getResult().getOutput().getContent();
}
}4. SQL安全验证
@Component
public class SQLValidator {
private static final Set<String> DANGEROUS_KEYWORDS = Set.of(
"DROP", "DELETE", "TRUNCATE", "ALTER", "CREATE",
"INSERT", "UPDATE", "REPLACE", "GRANT", "REVOKE"
);
public ValidationResult validate(String sql) {
String upperSQL = sql.toUpperCase().trim();
// 必须以SELECT开头
if (!upperSQL.startsWith("SELECT")) {
return ValidationResult.unsafe("SQL必须以SELECT开头");
}
// 检查危险关键字
for (String keyword : DANGEROUS_KEYWORDS) {
if (upperSQL.contains(keyword)) {
return ValidationResult.unsafe("包含危险关键字: " + keyword);
}
}
// 检查是否访问了非DW schema的表
if (upperSQL.contains("INFORMATION_SCHEMA") ||
upperSQL.contains("PERFORMANCE_SCHEMA") ||
upperSQL.contains("MYSQL.")) {
return ValidationResult.unsafe("不允许访问系统表");
}
// 检查是否有超大结果集风险(没有WHERE或LIMIT)
boolean hasLimit = upperSQL.contains("LIMIT");
boolean hasWhere = upperSQL.contains("WHERE");
if (!hasLimit && !hasWhere) {
return ValidationResult.warn("建议添加WHERE条件或LIMIT,避免查询全表");
}
return ValidationResult.safe();
}
}一些实际效果和局限
效果好的场景:
- 简单的聚合查询:SUM、COUNT、AVG,按时间维度分组
- 趋势分析:某指标在一段时间内的变化
- 排名类查询:哪些商品/用户/地区的某指标最高/最低
效果差的场景:
- 跨5个以上表的复杂JOIN
- 涉及自定义的复杂业务规则(比如"活跃用户"的定义在代码里,不在数据库里)
- 需要理解数据质量问题(空值、异常值的处理方式)
对于效果差的场景,我们的做法是:让LLM把它能处理的部分翻译成SQL,剩余的复杂逻辑提示用户需要数据工程师协助。
