第1795篇:数据库Schema的AI优化建议——索引推荐与查询分析
第1795篇:数据库Schema的AI优化建议——索引推荐与查询分析
数据库优化是一门玄学,这是很多工程师的共识。同样一条SQL,在这个项目里能跑1ms,换个表结构就变成了10秒。加个索引可能把查询从5秒降到50ms,但也可能把插入从1ms拖慢到50ms。
我见过太多「凭感觉加索引」的情况。有的表有十几个索引,结果大部分索引从来没被用过,只是在每次写入时带来额外开销。有的表每天几十万次的热点查询没有合适的索引,靠MySQL硬扛全表扫描。
数据库优化需要系统性分析,而不是凭经验拍脑袋。这是AI能真正帮上忙的地方。
为什么数据库优化需要AI辅助
传统的数据库优化流程是:
- 打开慢查询日志
- 找到慢SQL
- 用EXPLAIN分析
- 凭经验判断加什么索引
- 测试效果
- 重复
这个流程的问题是步骤3到4之间有一个巨大的经验鸿沟。EXPLAIN的输出信息量很大,但要正确解读它,需要相当深的MySQL内核知识。「Using filesort」「Using temporary」「type: ALL」,这些词对于缺乏经验的工程师来说就是天书。
AI可以帮你:
- 解读EXPLAIN输出,用人话解释瓶颈在哪
- 基于Schema和查询模式推荐索引
- 指出查询写法的问题(比如隐式类型转换导致索引失效)
- 预测新索引对其他查询的影响
整体方案
第一步:Schema信息收集
@Repository
public class SchemaInfoRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
public DatabaseSchema extractSchema(String databaseName) {
DatabaseSchema schema = new DatabaseSchema();
schema.setDatabaseName(databaseName);
// 获取所有表
List<String> tables = jdbcTemplate.queryForList(
"SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_SCHEMA = ?",
String.class, databaseName);
for (String tableName : tables) {
TableInfo tableInfo = extractTableInfo(databaseName, tableName);
schema.addTable(tableInfo);
}
return schema;
}
private TableInfo extractTableInfo(String dbName, String tableName) {
TableInfo tableInfo = new TableInfo();
tableInfo.setTableName(tableName);
// 获取表的基本信息
Map<String, Object> tableStats = jdbcTemplate.queryForMap(
"""
SELECT TABLE_ROWS, DATA_LENGTH, INDEX_LENGTH, AVG_ROW_LENGTH,
TABLE_COMMENT, CREATE_TIME
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
""", dbName, tableName);
tableInfo.setEstimatedRows(((Number) tableStats.get("TABLE_ROWS")).longValue());
tableInfo.setDataSize(((Number) tableStats.get("DATA_LENGTH")).longValue());
tableInfo.setIndexSize(((Number) tableStats.get("INDEX_LENGTH")).longValue());
tableInfo.setComment((String) tableStats.get("TABLE_COMMENT"));
// 获取列信息
List<Map<String, Object>> columns = jdbcTemplate.queryForList(
"""
SELECT COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE, COLUMN_KEY,
COLUMN_DEFAULT, EXTRA, COLUMN_COMMENT, DATA_TYPE,
CHARACTER_MAXIMUM_LENGTH, NUMERIC_PRECISION
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY ORDINAL_POSITION
""", dbName, tableName);
for (Map<String, Object> col : columns) {
ColumnInfo column = new ColumnInfo();
column.setName((String) col.get("COLUMN_NAME"));
column.setType((String) col.get("COLUMN_TYPE"));
column.setNullable("YES".equals(col.get("IS_NULLABLE")));
column.setComment((String) col.get("COLUMN_COMMENT"));
tableInfo.addColumn(column);
}
// 获取现有索引
List<Map<String, Object>> indexRows = jdbcTemplate.queryForList(
"""
SELECT INDEX_NAME, COLUMN_NAME, NON_UNIQUE, SEQ_IN_INDEX,
CARDINALITY, INDEX_TYPE, COMMENT
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY INDEX_NAME, SEQ_IN_INDEX
""", dbName, tableName);
// 将索引行组合成索引对象
Map<String, IndexInfo> indexMap = new LinkedHashMap<>();
for (Map<String, Object> row : indexRows) {
String indexName = (String) row.get("INDEX_NAME");
IndexInfo idx = indexMap.computeIfAbsent(indexName, k -> {
IndexInfo newIdx = new IndexInfo();
newIdx.setName(k);
newIdx.setUnique(((Number) row.get("NON_UNIQUE")).intValue() == 0);
newIdx.setType((String) row.get("INDEX_TYPE"));
return newIdx;
});
idx.addColumn((String) row.get("COLUMN_NAME"),
((Number) row.get("CARDINALITY")).longValue());
}
tableInfo.setIndexes(new ArrayList<>(indexMap.values()));
return tableInfo;
}
public List<SlowQuery> getTopSlowQueries(int topN, int thresholdSeconds) {
return jdbcTemplate.query(
"""
SELECT sql_text, query_time, rows_examined, rows_sent,
db, start_time, lock_time
FROM mysql.slow_log
WHERE query_time > ?
ORDER BY query_time DESC
LIMIT ?
""",
(rs, rowNum) -> {
SlowQuery sq = new SlowQuery();
sq.setSqlText(rs.getString("sql_text"));
sq.setQueryTimeSeconds(rs.getDouble("query_time"));
sq.setRowsExamined(rs.getLong("rows_examined"));
sq.setRowsSent(rs.getLong("rows_sent"));
return sq;
},
thresholdSeconds, topN);
}
public ExplainResult explainQuery(String sql) {
List<Map<String, Object>> rows = jdbcTemplate.queryForList("EXPLAIN " + sql);
return parseExplainResult(rows);
}
}第二步:AI分析引擎
@Service
public class AISchemaOptimizer {
private final ClaudeApiClient claudeClient;
private final SchemaInfoRepository schemaRepo;
public OptimizationReport analyzeAndSuggest(String databaseName) {
// 收集数据
DatabaseSchema schema = schemaRepo.extractSchema(databaseName);
List<SlowQuery> slowQueries = schemaRepo.getTopSlowQueries(20, 1);
// 对每个慢查询获取执行计划
List<SlowQueryWithExplain> queriesWithExplain = slowQueries.stream()
.map(sq -> {
try {
ExplainResult explain = schemaRepo.explainQuery(sq.getSqlText());
return new SlowQueryWithExplain(sq, explain);
} catch (Exception e) {
log.warn("获取执行计划失败: {}", e.getMessage());
return new SlowQueryWithExplain(sq, null);
}
})
.collect(Collectors.toList());
// AI分析
return performAIAnalysis(schema, queriesWithExplain);
}
private OptimizationReport performAIAnalysis(
DatabaseSchema schema, List<SlowQueryWithExplain> slowQueries) {
String prompt = buildAnalysisPrompt(schema, slowQueries);
String response = claudeClient.complete(prompt);
return parseOptimizationReport(response);
}
private String buildAnalysisPrompt(DatabaseSchema schema,
List<SlowQueryWithExplain> slowQueries) {
return String.format("""
你是一位MySQL数据库优化专家。请分析以下数据库Schema和慢查询,提供优化建议。
## 数据库Schema
%s
## 慢查询列表(按执行时间降序)
%s
## 分析要求
### 1. 索引优化建议
对每个慢查询分析:
- 为什么慢(全表扫描/索引效率低/回表次数多)
- 建议添加的索引(给出具体的CREATE INDEX语句)
- 说明这个索引如何改善查询
- 评估对写入性能的影响
### 2. 现有索引分析
- 哪些现有索引可能是冗余的(被其他索引覆盖)
- 哪些索引的基数(Cardinality)太低,效果有限
- 联合索引的列顺序是否合理
### 3. 查询优化建议
- 是否有可以改写以更好利用索引的SQL
- 是否有可以用覆盖索引避免回表的场景
- 是否有不必要的全表扫描可以通过业务逻辑优化规避
### 4. Schema结构优化
- 字段类型是否合理(有没有用VARCHAR存数字的情况)
- 是否有可以通过分区提升性能的大表
- 是否有需要归档的历史数据
请以JSON格式返回结果:
{
"index_recommendations": [
{
"priority": "HIGH/MEDIUM/LOW",
"target_table": "表名",
"sql": "CREATE INDEX语句",
"rationale": "为什么添加这个索引",
"affected_queries": ["受益的SQL片段"],
"estimated_improvement": "预估改善幅度",
"write_impact": "对写入性能的影响评估"
}
],
"redundant_indexes": [
{
"index_name": "索引名",
"table": "表名",
"reason": "冗余原因",
"safe_to_drop": true/false,
"validation_sql": "验证是否真的冗余的SQL"
}
],
"query_rewrites": [
{
"original_sql": "原始SQL",
"rewritten_sql": "优化后SQL",
"improvement_reason": "改善原因",
"notes": "注意事项"
}
],
"schema_issues": [
{
"severity": "HIGH/MEDIUM/LOW",
"table": "表名",
"issue": "问题描述",
"recommendation": "改善建议"
}
],
"summary": {
"total_issues_found": 数量,
"estimated_query_improvement": "总体预估改善",
"top_3_quick_wins": ["最快能见效的3个优化点"]
}
}
只返回JSON,不要其他文字。
""",
formatSchema(schema),
formatSlowQueries(slowQueries));
}
private String formatSchema(DatabaseSchema schema) {
StringBuilder sb = new StringBuilder();
for (TableInfo table : schema.getTables()) {
sb.append(String.format("\n### %s(估计行数:%,d,数据大小:%s)\n",
table.getTableName(),
table.getEstimatedRows(),
formatSize(table.getDataSize())));
if (table.getComment() != null && !table.getComment().isEmpty()) {
sb.append("用途:").append(table.getComment()).append("\n");
}
sb.append("\n**字段:**\n");
for (ColumnInfo col : table.getColumns()) {
sb.append(String.format("- `%s` %s%s%s\n",
col.getName(),
col.getType(),
col.isNullable() ? " NULL" : " NOT NULL",
col.getComment() != null ? " (" + col.getComment() + ")" : ""));
}
sb.append("\n**现有索引:**\n");
for (IndexInfo idx : table.getIndexes()) {
sb.append(String.format("- %s (%s) - 列:%s,基数:%s\n",
idx.getName(),
idx.isUnique() ? "UNIQUE" : "普通",
String.join(", ", idx.getColumnNames()),
idx.getCardinalities().toString()));
}
}
return sb.toString();
}
private String formatSlowQueries(List<SlowQueryWithExplain> queries) {
StringBuilder sb = new StringBuilder();
int idx = 1;
for (SlowQueryWithExplain query : queries) {
sb.append(String.format("\n#### 慢查询 #%d(执行时间:%.2f秒,扫描行数:%,d)\n",
idx++,
query.getSlowQuery().getQueryTimeSeconds(),
query.getSlowQuery().getRowsExamined()));
sb.append("```sql\n").append(query.getSlowQuery().getSqlText()).append("\n```\n");
if (query.getExplainResult() != null) {
sb.append("\nEXPLAIN输出:\n");
sb.append("| id | select_type | table | type | key | rows | Extra |\n");
sb.append("|---|---|---|---|---|---|---|\n");
for (ExplainRow row : query.getExplainResult().getRows()) {
sb.append(String.format("| %s | %s | %s | %s | %s | %s | %s |\n",
row.getId(), row.getSelectType(), row.getTable(),
row.getType(), nullToStr(row.getKey()),
row.getRows(), nullToStr(row.getExtra())));
}
}
}
return sb.toString();
}
}第三步:安全验证与执行
AI给的索引建议不能直接跑,需要先验证安全性:
@Service
public class IndexSuggestionValidator {
private final JdbcTemplate jdbcTemplate;
public ValidationResult validate(IndexRecommendation recommendation) {
ValidationResult result = new ValidationResult();
result.setRecommendation(recommendation);
// 1. 检查表是否存在
if (!tableExists(recommendation.getTargetTable())) {
result.setValid(false);
result.addError("目标表不存在: " + recommendation.getTargetTable());
return result;
}
// 2. 检查列是否存在
List<String> missingColumns = checkColumnsExist(
recommendation.getTargetTable(),
recommendation.getIndexColumns());
if (!missingColumns.isEmpty()) {
result.setValid(false);
result.addError("以下列不存在: " + String.join(", ", missingColumns));
return result;
}
// 3. 检查是否有同名索引
if (indexExists(recommendation.getTargetTable(), recommendation.getIndexName())) {
result.setValid(false);
result.addWarning("同名索引已存在,建议先确认是否是相同的索引");
}
// 4. 估算索引大小(简单估算)
long tableRows = getTableRows(recommendation.getTargetTable());
long estimatedIndexSize = estimateIndexSize(recommendation, tableRows);
if (estimatedIndexSize > 1024 * 1024 * 1024) { // 超过1GB
result.addWarning(String.format("预估索引大小%.1fGB,创建过程可能耗时较长,建议在低峰期执行",
estimatedIndexSize / 1024.0 / 1024 / 1024));
}
// 5. 检查是否有类似的现有索引(避免重复)
List<String> similarIndexes = findSimilarIndexes(
recommendation.getTargetTable(), recommendation.getIndexColumns());
if (!similarIndexes.isEmpty()) {
result.addWarning("发现功能相似的现有索引: " + String.join(", ", similarIndexes) +
",请确认新索引是否真的必要");
}
result.setValid(result.getErrors().isEmpty());
return result;
}
// 生成安全的ALTER TABLE语句(Online DDL,避免锁表)
public String generateSafeAlterStatement(IndexRecommendation recommendation) {
return String.format(
"ALTER TABLE %s ADD INDEX %s (%s), ALGORITHM=INPLACE, LOCK=NONE;",
recommendation.getTargetTable(),
recommendation.getIndexName(),
String.join(", ", recommendation.getIndexColumns()));
}
}一个真实优化案例
分享一个让我印象深刻的案例。有个订单查询接口,生产环境慢查询每天几千次,查询时间平均3秒。
慢SQL:
SELECT o.*, u.nickname, u.phone
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE o.status = 1
AND o.merchant_id = 123
AND o.created_at BETWEEN '2024-01-01' AND '2024-01-31'
ORDER BY o.created_at DESC
LIMIT 20;EXPLAIN显示:
| id | type | table | key | rows | Extra |
|----|-------|-------|------|---------|-----------------|
| 1 | ref | o | NULL | 1523847 | Using filesort |
| 1 | eq_ref| u | PK | 1 | NULL |orders表全表扫描了152万行,还有filesort。
AI的分析:
根因:orders表缺少覆盖 (merchant_id, status, created_at) 组合查询的联合索引。当前查询需要扫描该商户的全部订单后再做日期过滤。
建议索引:
CREATE INDEX idx_merchant_status_created ON orders (merchant_id, status, created_at DESC);原因:merchant_id区分度高(每个商户独立),将其作为最左前缀可快速定位目标数据。status作为等值过滤条件放中间。created_at放最后,可直接支持ORDER BY消除filesort。
对写入的影响:orders表写入量较大,这个索引会增加每次写入约5-10ms的开销。如果orders表每秒写入超过5000条,需要评估这个影响。建议先在压测环境验证。
注意:如果查询中status的值选择性很低(比如大部分订单都是status=1),索引效果会打折扣。可以考虑将status放在最后,改为 (merchant_id, created_at, status) 并在WHERE条件中利用索引跳过。
加了这个索引后,同样的查询从3秒降到了18ms,扫描行数从152万降到了42行。这个效果完全符合AI的预测。
索引冗余分析
AI还帮我们发现了不少冗余索引。一个电商项目的分析结果:
发现的冗余索引(节选):
[冗余] users表 idx_email
现有:idx_email (email)
被覆盖:idx_email_status (email, status)
冗余原因:idx_email_status 是前者的超集,MySQL会优先使用后者
安全删除:是(先用 sys.schema_unused_indexes 验证3天未使用)
[冗余] orders表 idx_created_at
现有:idx_created_at (created_at)
潜在问题:orders表日期范围查询几乎都会同时带 merchant_id 条件,
单独的created_at索引很少独立使用,而idx_merchant_created已覆盖大多数场景
建议:先查询 information_schema.INDEX_STATISTICS 验证使用情况再决定这种分析帮我们最终删掉了11个冗余索引,orders表写入性能提升了约15%。
持续监控与自动建议
把这个能力做成定时任务,每周自动分析一次:
@Component
public class WeeklySchemaReviewJob {
@Autowired
private AISchemaOptimizer optimizer;
@Autowired
private NotificationService notificationService;
@Scheduled(cron = "0 0 2 * * MON") // 每周一凌晨2点
public void runWeeklyReview() {
log.info("开始每周Schema优化分析");
try {
OptimizationReport report = optimizer.analyzeAndSuggest("production_db");
// 过滤出高优先级建议
List<IndexRecommendation> highPriority = report.getIndexRecommendations()
.stream()
.filter(r -> "HIGH".equals(r.getPriority()))
.collect(Collectors.toList());
if (!highPriority.isEmpty()) {
// 发送通知给DBA和技术负责人
notificationService.sendSlackMessage(
"#database-ops",
buildSlackMessage(highPriority, report.getSummary())
);
// 自动创建Jira工单
for (IndexRecommendation rec : highPriority) {
createJiraTicket(rec);
}
}
// 保存完整报告
saveReport(report);
} catch (Exception e) {
log.error("每周Schema分析失败", e);
notificationService.alertOncall("数据库优化分析任务失败: " + e.getMessage());
}
}
}数据库优化的本质是一个信息不对称问题:数据库知道哪里慢,但不会告诉你;工程师知道业务场景,但不一定懂优化原理。AI在这里扮演的是「翻译官」角色——把数据库的执行计划翻译成工程师能理解的优化建议,再把业务访问模式翻译成数据库能高效执行的索引结构。
