AI应用数据库优化:向量+关系型数据的混合存储实践
AI应用数据库优化:向量+关系型数据的混合存储实践
两个数据库,一团乱麻
2025年10月,上海一家做企业法律AI的公司,技术负责人周磊在代码审查时发现了一段让他皱眉的代码:
// 某个"查询相关法条"的接口
public List<LegalClause> searchRelatedClauses(String query, String tenantId) {
// 第一步:查Qdrant,获取相关向量的ID
List<String> clauseIds = qdrantClient.search(query, topK=20)
.stream().map(ScoredPoint::getId).toList();
// 第二步:查MySQL,获取这些ID对应的业务数据
List<LegalClause> clauses = mysqlClient.findByIds(clauseIds);
// 第三步:再查MySQL,用租户ID过滤(前面Qdrant查不了租户隔离)
clauses = clauses.stream()
.filter(c -> c.getTenantId().equals(tenantId))
.toList();
// 第四步:发现过滤后不够20条,回Qdrant多查一些
if (clauses.size() < 10) {
List<String> moreIds = qdrantClient.search(query, topK=100)...;
// 又一轮MySQL查询...
}
return clauses;
}这段代码的问题是:三个往返(Qdrant → MySQL → 可能再往返),每次查询200-400ms,用户说"AI有点慢"。
更深层的问题是:
- 维护两个数据库:运维、备份、事务、监控都要double
- 数据一致性:Qdrant有数据,MySQL没有(或反过来),经常出现幽灵数据
- 无法做复合查询:无法同时按向量相似度 + 业务条件(租户、时间、分类)联合筛选
- 开发效率低:每个功能都要写两份查询代码
周磊的决策:把Qdrant干掉,全部迁移到PostgreSQL + PGVector。
迁移完成后:
| 指标 | 迁移前(Qdrant + MySQL) | 迁移后(PGVector) |
|---|---|---|
| P99查询延迟 | 380ms | 35ms |
| 代码复杂度 | 两套数据访问层 | 一套 |
| 数据一致性问题 | 每周2-3次 | 0次 |
| 运维组件数 | 3个(Qdrant+MySQL+Redis) | 2个(PG+Redis) |
| 复合查询支持 | 不支持 | 原生支持 |
1. 混合存储的核心价值:为什么选PostgreSQL
1.1 向量 + 关系型数据的天然结合点
1.2 PostgreSQL的优势
向量能力:
- PGVector HNSW索引,P99 < 20ms(100万条以内)
- 原生支持余弦、内积、L2距离
- 向量与关系型数据在同一事务中操作
关系型能力:
- ACID事务
- 多租户隔离(行级安全策略)
- 复杂JOIN查询
- 成熟的备份/恢复/运维工具
混合查询能力:
- 一条SQL同时做向量检索 + 业务过滤
- 消除多次网络往返
2. 完整Schema设计:企业级知识库
2.1 数据库Schema
-- ===== 创建PGVector扩展 =====
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm; -- 全文模糊搜索
-- ===== 租户表 =====
CREATE TABLE tenants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
plan VARCHAR(50) NOT NULL DEFAULT 'free', -- free/pro/enterprise
status VARCHAR(20) NOT NULL DEFAULT 'active',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ===== 知识库表 =====
CREATE TABLE knowledge_bases (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
description TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'active',
doc_count INTEGER NOT NULL DEFAULT 0,
chunk_count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ===== 文档表 =====
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
kb_id UUID NOT NULL REFERENCES knowledge_bases(id) ON DELETE CASCADE,
title VARCHAR(500) NOT NULL,
file_path TEXT,
file_type VARCHAR(50), -- pdf/word/txt/markdown
file_size BIGINT,
content_hash CHAR(64), -- SHA256,用于增量更新判断
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending/processing/active/error
chunk_count INTEGER NOT NULL DEFAULT 0,
token_count INTEGER NOT NULL DEFAULT 0,
metadata JSONB NOT NULL DEFAULT '{}', -- 自定义元数据
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
indexed_at TIMESTAMPTZ -- 最后向量化时间
);
-- ===== 文档分块表(核心表,包含向量) =====
CREATE TABLE document_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL, -- 冗余存储,避免JOIN
kb_id UUID NOT NULL, -- 冗余存储
doc_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL, -- 在文档中的位置
content TEXT NOT NULL,
content_hash CHAR(32), -- MD5,用于增量更新
embedding VECTOR(1536), -- OpenAI 3-small维度
token_count INTEGER,
metadata JSONB NOT NULL DEFAULT '{}',
-- 业务过滤字段(冗余存储,避免JOIN)
doc_title VARCHAR(500),
doc_file_type VARCHAR(50),
doc_created_at TIMESTAMPTZ,
-- 时间
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT fk_tenant FOREIGN KEY (tenant_id) REFERENCES tenants(id),
CONSTRAINT fk_kb FOREIGN KEY (kb_id) REFERENCES knowledge_bases(id)
);
-- ===== 对话历史表 =====
CREATE TABLE conversations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
user_id VARCHAR(255) NOT NULL,
kb_id UUID REFERENCES knowledge_bases(id),
title VARCHAR(500),
message_count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ===== 消息表 =====
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
tenant_id UUID NOT NULL,
role VARCHAR(20) NOT NULL, -- user/assistant/system
content TEXT NOT NULL,
token_count INTEGER,
-- 检索上下文(RAG引用的文档块)
retrieved_chunks JSONB, -- 存储检索到的chunk IDs和相似度
-- 费用追踪
model VARCHAR(100),
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
latency_ms INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);2.2 索引策略
-- ===== 向量索引(最重要)=====
-- 方案A:全局HNSW索引(数据量 < 500万)
CREATE INDEX CONCURRENTLY idx_chunks_embedding_hnsw
ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 128);
-- 方案B:按租户分区后的索引(数据量 > 500万,多租户)
-- 见下方分区表设计
-- ===== 业务索引 =====
-- 租户隔离(几乎所有查询都会带tenant_id)
CREATE INDEX idx_chunks_tenant_kb
ON document_chunks (tenant_id, kb_id);
-- 文档级别查询
CREATE INDEX idx_chunks_doc
ON document_chunks (doc_id, chunk_index);
-- 按时间范围过滤
CREATE INDEX idx_chunks_created_at
ON document_chunks (tenant_id, doc_created_at);
-- 全文搜索(混合检索场景)
CREATE INDEX idx_chunks_content_gin
ON document_chunks USING gin (to_tsvector('chinese', content));
-- 注意:需要安装zhparser扩展才能支持中文分词
-- 或使用 pg_trgm 的 GIN 索引做模糊匹配
-- ===== documents表索引 =====
CREATE INDEX idx_documents_tenant_kb
ON documents (tenant_id, kb_id, status);
CREATE INDEX idx_documents_hash
ON documents (content_hash)
WHERE content_hash IS NOT NULL;
-- ===== 元数据JSON索引(按需创建)=====
-- 如果经常按metadata中的某个字段过滤
CREATE INDEX idx_chunks_metadata_category
ON document_chunks ((metadata->>'category'))
WHERE metadata->>'category' IS NOT NULL;3. 复合查询:向量相似度 + 业务条件的最优写法
3.1 基础复合查询
-- ===== 最常用:向量检索 + 租户隔离 =====
EXPLAIN ANALYZE
SELECT
dc.id,
dc.content,
dc.doc_title,
dc.metadata,
1 - (dc.embedding <=> $1::vector) AS similarity
FROM document_chunks dc
WHERE dc.tenant_id = $2
AND dc.kb_id = $3
AND 1 - (dc.embedding <=> $1::vector) > 0.75 -- 相似度阈值
ORDER BY dc.embedding <=> $1::vector
LIMIT 10;
-- ===== 注意:WHERE条件中的相似度过滤可能不走向量索引!=====
-- 更安全的写法(先检索,再过滤):
SELECT *
FROM (
SELECT
dc.id,
dc.content,
dc.doc_title,
dc.metadata,
1 - (dc.embedding <=> $1::vector) AS similarity
FROM document_chunks dc
WHERE dc.tenant_id = $2
AND dc.kb_id = $3
ORDER BY dc.embedding <=> $1::vector
LIMIT 50 -- 先取50个,再按相似度过滤
) sub
WHERE similarity > 0.75
ORDER BY similarity DESC
LIMIT 10;3.2 复杂复合查询示例
-- ===== 带时间范围 + 文档类型过滤的向量检索 =====
SELECT
dc.id,
dc.content,
dc.doc_title,
dc.doc_file_type,
dc.metadata,
1 - (dc.embedding <=> $1::vector) AS similarity
FROM document_chunks dc
WHERE dc.tenant_id = $2
AND dc.kb_id = ANY($3::uuid[]) -- 多知识库联合查询
AND dc.doc_file_type = ANY($4::text[]) -- 文档类型过滤
AND dc.doc_created_at >= $5 -- 时间范围
AND dc.doc_created_at <= $6
ORDER BY dc.embedding <=> $1::vector
LIMIT 20;
-- ===== 向量检索 + 全文搜索的混合查询(Hybrid Search)=====
-- 用RRF(Reciprocal Rank Fusion)合并两种检索结果
WITH vector_results AS (
SELECT
id,
content,
1 - (embedding <=> $1::vector) AS vector_score,
ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) AS vector_rank
FROM document_chunks
WHERE tenant_id = $2 AND kb_id = $3
ORDER BY embedding <=> $1::vector
LIMIT 60
),
text_results AS (
SELECT
id,
content,
ts_rank(to_tsvector('simple', content), plainto_tsquery('simple', $4)) AS text_score,
ROW_NUMBER() OVER (
ORDER BY ts_rank(
to_tsvector('simple', content),
plainto_tsquery('simple', $4)
) DESC
) AS text_rank
FROM document_chunks
WHERE tenant_id = $2 AND kb_id = $3
AND to_tsvector('simple', content) @@ plainto_tsquery('simple', $4)
ORDER BY text_score DESC
LIMIT 60
),
-- RRF合并(k=60是常用参数)
rrf_scores AS (
SELECT
COALESCE(v.id, t.id) AS id,
COALESCE(v.content, t.content) AS content,
COALESCE(1.0 / (60 + v.vector_rank), 0) +
COALESCE(1.0 / (60 + t.text_rank), 0) AS rrf_score
FROM vector_results v
FULL OUTER JOIN text_results t ON v.id = t.id
)
SELECT id, content, rrf_score
FROM rrf_scores
ORDER BY rrf_score DESC
LIMIT 10;3.3 Spring Data JPA实现复合查询
/**
* 向量检索Repository(使用Native Query)
*/
@Repository
public interface DocumentChunkRepository
extends JpaRepository<DocumentChunk, UUID> {
/**
* 基础向量检索(带租户隔离)
*/
@Query(value = """
SELECT
dc.id,
dc.content,
dc.doc_title,
dc.metadata,
1 - (dc.embedding <=> CAST(:embedding AS vector)) AS similarity
FROM document_chunks dc
WHERE dc.tenant_id = :tenantId
AND dc.kb_id = :kbId
ORDER BY dc.embedding <=> CAST(:embedding AS vector)
LIMIT :limit
""",
nativeQuery = true)
List<ChunkSearchResult> searchByVector(
@Param("tenantId") UUID tenantId,
@Param("kbId") UUID kbId,
@Param("embedding") String embedding, // "[0.1, 0.2, ...]"格式
@Param("limit") int limit
);
/**
* 带时间范围的向量检索
*/
@Query(value = """
SELECT
dc.id,
dc.content,
dc.doc_title,
dc.metadata,
1 - (dc.embedding <=> CAST(:embedding AS vector)) AS similarity
FROM document_chunks dc
WHERE dc.tenant_id = :tenantId
AND dc.kb_id = :kbId
AND (:fileType IS NULL OR dc.doc_file_type = :fileType)
AND (:startDate IS NULL OR dc.doc_created_at >= :startDate)
AND (:endDate IS NULL OR dc.doc_created_at <= :endDate)
ORDER BY dc.embedding <=> CAST(:embedding AS vector)
LIMIT :limit
""",
nativeQuery = true)
List<ChunkSearchResult> searchWithFilters(
@Param("tenantId") UUID tenantId,
@Param("kbId") UUID kbId,
@Param("embedding") String embedding,
@Param("fileType") String fileType,
@Param("startDate") Instant startDate,
@Param("endDate") Instant endDate,
@Param("limit") int limit
);
/**
* 增量更新:按文档ID删除旧chunks
*/
@Modifying
@Query("DELETE FROM DocumentChunk dc WHERE dc.docId = :docId")
void deleteByDocId(@Param("docId") UUID docId);
/**
* 查询文档的content hash(增量检测)
*/
@Query("SELECT d.contentHash FROM Document d WHERE d.id = :docId")
Optional<String> findDocumentHashByDocId(@Param("docId") UUID docId);
}
/**
* 向量检索结果投影
*/
public interface ChunkSearchResult {
UUID getId();
String getContent();
String getDocTitle();
String getMetadata();
Double getSimilarity();
}4. 分区表:多租户场景的大表优化
4.1 按租户分区
-- ===== 分区表设计(适合超大规模多租户)=====
-- 父表(声明式分区)
CREATE TABLE document_chunks_partitioned (
id UUID NOT NULL DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
kb_id UUID NOT NULL,
doc_id UUID NOT NULL,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
content_hash CHAR(32),
embedding VECTOR(1536),
metadata JSONB NOT NULL DEFAULT '{}',
doc_title VARCHAR(500),
doc_file_type VARCHAR(50),
doc_created_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY LIST (tenant_id);
-- 大租户独立分区(VIP客户)
CREATE TABLE chunks_tenant_vip001
PARTITION OF document_chunks_partitioned
FOR VALUES IN ('00000000-0000-0000-0000-000000000001');
-- 中小租户分组(hash分区)
-- 按tenant_id的哈希值分到多个桶
CREATE TABLE document_chunks_partitioned_hash (
LIKE document_chunks INCLUDING ALL
) PARTITION BY HASH (tenant_id);
CREATE TABLE chunks_hash_0
PARTITION OF document_chunks_partitioned_hash
FOR VALUES WITH (MODULUS 8, REMAINDER 0);
-- ... 重复创建0-7号分区
-- 每个分区独立的HNSW索引(更小、更快)
CREATE INDEX idx_chunks_vip001_embedding
ON chunks_tenant_vip001
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 128);4.2 按时间分区(适合日志/消息场景)
-- ===== 按月分区(消息表)=====
CREATE TABLE messages_partitioned (
id UUID NOT NULL DEFAULT gen_random_uuid(),
conversation_id UUID NOT NULL,
tenant_id UUID NOT NULL,
role VARCHAR(20) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- 按月创建分区(自动化脚本)
DO $$
DECLARE
start_date DATE := '2024-01-01';
end_date DATE := '2026-12-01';
partition_date DATE := start_date;
partition_name TEXT;
next_date DATE;
BEGIN
WHILE partition_date < end_date LOOP
next_date := partition_date + INTERVAL '1 month';
partition_name := 'messages_' || TO_CHAR(partition_date, 'YYYY_MM');
EXECUTE FORMAT(
'CREATE TABLE IF NOT EXISTS %I PARTITION OF messages_partitioned
FOR VALUES FROM (%L) TO (%L)',
partition_name,
partition_date::TIMESTAMPTZ,
next_date::TIMESTAMPTZ
);
partition_date := next_date;
END LOOP;
END $$;
-- 索引(每个分区自动继承父表索引定义)
CREATE INDEX idx_messages_conversation_created
ON messages_partitioned (conversation_id, created_at DESC);5. 读写分离:主库写,从库查
5.1 Spring Boot多数据源配置
/**
* 读写分离数据源配置
* 主库:处理写操作(INSERT/UPDATE/DELETE)
* 从库:处理向量检索和只读查询
*/
@Configuration
@EnableTransactionManagement
@Slf4j
public class ReadWriteDataSourceConfig {
/**
* 主库(写)数据源
*/
@Bean("masterDataSource")
@ConfigurationProperties("spring.datasource.master")
public DataSource masterDataSource() {
HikariConfig config = new HikariConfig();
config.setPoolName("Master-Pool");
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionInitSql(
"SET hnsw.ef_search = 64; SET work_mem = '256MB';"
);
return new HikariDataSource(config);
}
/**
* 从库(读/向量检索)数据源
*/
@Bean("slaveDataSource")
@ConfigurationProperties("spring.datasource.slave")
public DataSource slaveDataSource() {
HikariConfig config = new HikariConfig();
config.setPoolName("Slave-Pool");
config.setMaximumPoolSize(40); // 从库承担更多读流量
config.setMinimumIdle(10);
config.setReadOnly(true); // 只读连接
config.setConnectionInitSql(
"SET hnsw.ef_search = 64; " +
"SET work_mem = '512MB';" // 从库给更多工作内存
);
return new HikariDataSource(config);
}
/**
* 路由数据源(根据事务只读属性自动切换)
*/
@Bean
@Primary
public DataSource routingDataSource(
@Qualifier("masterDataSource") DataSource master,
@Qualifier("slaveDataSource") DataSource slave) {
Map<Object, Object> dataSourceMap = new HashMap<>();
dataSourceMap.put("master", master);
dataSourceMap.put("slave", slave);
AbstractRoutingDataSource routing = new AbstractRoutingDataSource() {
@Override
protected Object determineCurrentLookupKey() {
// 只读事务路由到从库
boolean isReadOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
String dataSourceKey = isReadOnly ? "slave" : "master";
log.debug("路由到: {}", dataSourceKey);
return dataSourceKey;
}
};
routing.setTargetDataSources(dataSourceMap);
routing.setDefaultTargetDataSource(master);
routing.afterPropertiesSet();
return routing;
}
}
/**
* 在Service中使用(@Transactional(readOnly = true) → 从库)
*/
@Service
@Slf4j
public class KnowledgeBaseQueryService {
@Autowired
private DocumentChunkRepository chunkRepository;
/**
* 向量检索(路由到从库)
*/
@Transactional(readOnly = true)
public List<ChunkSearchResult> vectorSearch(
UUID tenantId, UUID kbId,
float[] queryEmbedding, int topK) {
String embeddingStr = floatArrayToString(queryEmbedding);
return chunkRepository.searchByVector(tenantId, kbId, embeddingStr, topK);
}
/**
* 写入操作(路由到主库)
*/
@Transactional
public void saveChunks(List<DocumentChunk> chunks) {
chunkRepository.saveAll(chunks);
}
}6. HikariCP配置:向量查询场景最优解
6.1 连接池大小公式
PostgreSQL最优连接数 = (核心数 × 2) + 有效磁盘数
对于8核SSD服务器:(8 × 2) + 1 = 17
建议设置:20(少量超配)
应用侧连接池配置:
主库(写操作):max_pool = 20(以PostgreSQL上限为准)
从库(读操作):max_pool = 40(如果有多个从库实例分担)
注意:每个连接约占PostgreSQL 5-10MB内存
20个连接 × 10MB = 200MB6.2 完整HikariCP配置
# application.yml
spring:
datasource:
master:
jdbc-url: jdbc:postgresql://master:5432/aidb?sslmode=require
username: ${DB_USER}
password: ${DB_PASSWORD}
driver-class-name: org.postgresql.Driver
hikari:
pool-name: Master-Pool
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 3000 # 3秒内获取连接,否则报错
idle-timeout: 600000 # 10分钟空闲连接回收
max-lifetime: 1800000 # 30分钟最大连接寿命
keepalive-time: 30000 # 30秒保活
validation-timeout: 1000 # 1秒连接验证超时
connection-init-sql: |
SET hnsw.ef_search = 64;
SET work_mem = '256MB';
SET statement_timeout = '30s'; -- 30秒语句超时
connection-test-query: SELECT 1
leak-detection-threshold: 10000 # 10秒未归还连接则告警(排查泄漏)
slave:
jdbc-url: jdbc:postgresql://slave:5432/aidb?sslmode=require&targetServerType=preferSecondary
username: ${DB_USER}
password: ${DB_PASSWORD}
hikari:
pool-name: Slave-Pool
maximum-pool-size: 40
minimum-idle: 10
connection-timeout: 3000
connection-init-sql: |
SET hnsw.ef_search = 100;
SET work_mem = '512MB';
SET statement_timeout = '10s'; -- 读操作10秒超时(更严格)
SET default_transaction_read_only = 'on'; -- 强制只读7. 查询优化:EXPLAIN ANALYZE分析慢查询
7.1 常见慢查询模式和优化
-- ===== 问题1:向量检索走了全表扫描 =====
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT id, content,
1 - (embedding <=> '[...]'::vector) AS similarity
FROM document_chunks
WHERE tenant_id = 'xxx'
ORDER BY embedding <=> '[...]'::vector
LIMIT 10;
-- 输出异常:Seq Scan(不走索引)
-- 原因:ef_search参数没设置,或数据量太少(<10000行不走ANN索引)
-- 解决:
SET hnsw.ef_search = 64;
-- 或者,对于小数据集,直接暴力全扫更快(<10000行),不需要HNSW
-- ===== 问题2:LIMIT与ORDER BY向量距离的冲突 =====
-- 错误写法(无法利用向量索引):
SELECT * FROM document_chunks
WHERE tenant_id = 'xxx'
AND 1 - (embedding <=> '[...]'::vector) > 0.8 -- WHERE中的距离条件
LIMIT 10;
-- 这个查询无法走HNSW索引,因为WHERE子句中的距离计算
-- 正确写法:
SELECT * FROM (
SELECT *, 1 - (embedding <=> '[...]'::vector) AS similarity
FROM document_chunks
WHERE tenant_id = 'xxx'
ORDER BY embedding <=> '[...]'::vector
LIMIT 50 -- 先取更多
) sub
WHERE similarity > 0.8
LIMIT 10;
-- ===== 问题3:多条件过滤导致HNSW效果差 =====
-- 当业务过滤条件很严格时(过滤掉95%数据),HNSW召回的候选集中
-- 满足条件的可能不够LIMIT数量
-- 解决:增大LIMIT,在应用层再过滤
-- 或:使用IVFFlat + 过滤(IVFFlat对过滤更友好)7.2 Spring Boot慢查询监控
/**
* PostgreSQL慢查询监控
*/
@Component
@Slf4j
public class SlowQueryMonitor {
private final JdbcTemplate jdbcTemplate;
private static final int SLOW_QUERY_THRESHOLD_MS = 100;
public SlowQueryMonitor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
/**
* 每5分钟查询一次慢查询日志
*/
@Scheduled(fixedRate = 300000)
public void reportSlowQueries() {
try {
// 查询pg_stat_statements(需要开启pg_stat_statements扩展)
String sql = """
SELECT
LEFT(query, 200) AS query_preview,
calls,
total_exec_time::numeric(10,2) AS total_ms,
mean_exec_time::numeric(10,2) AS avg_ms,
max_exec_time::numeric(10,2) AS max_ms,
rows / NULLIF(calls, 0) AS avg_rows
FROM pg_stat_statements
WHERE mean_exec_time > ?
AND userid = (SELECT usesysid FROM pg_user WHERE usename = current_user)
ORDER BY mean_exec_time DESC
LIMIT 10
""";
List<Map<String, Object>> slowQueries = jdbcTemplate.queryForList(
sql, SLOW_QUERY_THRESHOLD_MS);
if (!slowQueries.isEmpty()) {
log.warn("发现 {} 个慢查询(平均耗时 > {}ms):",
slowQueries.size(), SLOW_QUERY_THRESHOLD_MS);
for (Map<String, Object> query : slowQueries) {
log.warn(" 平均: {}ms | 最大: {}ms | 调用: {} | SQL: {}",
query.get("avg_ms"),
query.get("max_ms"),
query.get("calls"),
query.get("query_preview"));
}
} else {
log.debug("无慢查询(阈值: {}ms)", SLOW_QUERY_THRESHOLD_MS);
}
} catch (Exception e) {
log.error("慢查询监控失败(pg_stat_statements可能未启用)", e);
}
}
/**
* 实时EXPLAIN ANALYZE(开发/测试环境使用)
*/
public String explainAnalyze(String sql, Object... params) {
String explainSql = "EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) " + sql;
List<Map<String, Object>> rows = jdbcTemplate.queryForList(explainSql, params);
return rows.stream()
.map(row -> (String) row.get("QUERY PLAN"))
.collect(Collectors.joining("\n"));
}
}8. 行级安全策略:多租户隔离的最佳实践
8.1 PostgreSQL RLS配置
-- ===== 行级安全策略(Row Level Security)=====
-- 从数据库层面强制租户隔离,应用层漏写WHERE也不会越权
-- 启用RLS
ALTER TABLE document_chunks ENABLE ROW LEVEL SECURITY;
ALTER TABLE document_chunks FORCE ROW LEVEL SECURITY;
-- 创建策略:用户只能看到自己租户的数据
CREATE POLICY tenant_isolation ON document_chunks
FOR ALL
TO app_role
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
-- 创建超级管理员策略(可以看所有数据)
CREATE POLICY admin_bypass ON document_chunks
FOR ALL
TO admin_role
USING (true);
-- ===== 在应用中设置当前租户ID =====
-- 每个请求开始时设置
SET LOCAL app.current_tenant_id = '550e8400-e29b-41d4-a716-446655440000';
-- ===== Spring中的实现 =====/**
* 多租户RLS拦截器
* 在每个请求中设置当前租户ID
*/
@Component
@Slf4j
public class TenantContextInterceptor implements HandlerInterceptor {
private final DataSource dataSource;
@Override
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
String tenantId = extractTenantId(request);
if (tenantId != null) {
TenantContext.setCurrentTenant(tenantId);
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler, Exception ex) {
TenantContext.clear();
}
}
/**
* 连接获取时自动设置租户上下文
* 与HikariCP集成
*/
@Component
public class TenantAwareDataSource extends AbstractDataSource {
private final DataSource delegate;
@Override
public Connection getConnection() throws SQLException {
Connection conn = delegate.getConnection();
String tenantId = TenantContext.getCurrentTenant();
if (tenantId != null) {
try (Statement stmt = conn.createStatement()) {
stmt.execute("SET LOCAL app.current_tenant_id = '" + tenantId + "'");
}
}
return conn;
}
}9. 扩容策略:数据量增长时的平滑扩容
9.1 扩容决策树
9.2 在线扩容脚本
-- ===== 大表重建索引(不停服)=====
-- 1. 检查当前索引大小
SELECT
indexname,
pg_size_pretty(pg_relation_size(indexname::regclass)) AS index_size,
idx_scan AS scan_count,
idx_tup_read AS tuples_read
FROM pg_stat_user_indexes
WHERE tablename = 'document_chunks'
ORDER BY pg_relation_size(indexname::regclass) DESC;
-- 2. 在线重建HNSW索引(不锁表)
-- 先创建新索引
CREATE INDEX CONCURRENTLY idx_chunks_embedding_hnsw_v2
ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 32, ef_construction = 256); -- 提升参数
-- 3. 确认新索引就绪后,删除旧索引
DROP INDEX CONCURRENTLY idx_chunks_embedding_hnsw;
-- 重命名新索引
ALTER INDEX idx_chunks_embedding_hnsw_v2
RENAME TO idx_chunks_embedding_hnsw;
-- ===== 数据迁移:从旧表到分区表 =====
-- 不停机迁移:双写 + 追追追
BEGIN;
-- 创建迁移状态表
CREATE TABLE IF NOT EXISTS migration_state (
table_name VARCHAR(100) PRIMARY KEY,
last_migrated_id UUID,
migrated_count BIGINT DEFAULT 0,
started_at TIMESTAMPTZ DEFAULT NOW(),
completed BOOLEAN DEFAULT FALSE
);
INSERT INTO migration_state (table_name) VALUES ('document_chunks')
ON CONFLICT DO NOTHING;
COMMIT;/**
* 数据库在线扩容辅助工具
*/
@Service
@Slf4j
public class DatabaseMigrationService {
private final JdbcTemplate jdbcTemplate;
/**
* 分批迁移数据到分区表(不停服)
* 每批1000条,留余量给正常业务
*/
@Scheduled(fixedRate = 5000) // 每5秒迁移一批
@Transactional
public void migrateToPartitionedTable() {
// 获取上次迁移的位置
String lastIdQuery = "SELECT last_migrated_id FROM migration_state WHERE table_name = 'document_chunks'";
UUID lastMigratedId = jdbcTemplate.queryForObject(lastIdQuery, UUID.class);
// 迁移下一批1000条
String migrateSql = """
INSERT INTO document_chunks_partitioned
SELECT * FROM document_chunks
WHERE id > COALESCE(?, '00000000-0000-0000-0000-000000000000'::uuid)
ORDER BY id
LIMIT 1000
ON CONFLICT (id) DO NOTHING
RETURNING id
""";
List<UUID> migratedIds = jdbcTemplate.queryForList(migrateSql, UUID.class, lastMigratedId);
if (migratedIds.isEmpty()) {
log.info("数据迁移完成!");
jdbcTemplate.update(
"UPDATE migration_state SET completed = true WHERE table_name = 'document_chunks'");
return;
}
UUID newLastId = migratedIds.get(migratedIds.size() - 1);
// 更新进度
jdbcTemplate.update(
"UPDATE migration_state SET last_migrated_id = ?, migrated_count = migrated_count + ? WHERE table_name = 'document_chunks'",
newLastId, migratedIds.size());
log.info("迁移进度 | 本批: {} | 最新ID: {}", migratedIds.size(), newLastId);
}
}10. 完整性能测试数据
10.1 周磊系统迁移后的性能对比
===== 测试环境 =====
服务器:16核 64GB PostgreSQL 16
数据量:150万条document_chunks
向量维度:1536(OpenAI 3-small)
HNSW参数:m=16, ef_construction=128, ef_search=64
测试工具:pgbench + k6
===== 查询性能对比 =====
查询类型 迁移前(Qdrant+MySQL) 迁移后(PGVector) 提升
---
基础向量检索(Top-10) 380ms 18ms 21x
带租户过滤的向量检索 390ms 22ms 18x
混合查询(向量+时间范围) 650ms(两次往返) 35ms 19x
写入(单条chunk) 5ms 4ms -
批量写入(100条) 120ms 85ms 1.4x
===== 并发压测(100并发) =====
指标 迁移前 迁移后
P50延迟 320ms 15ms
P95延迟 580ms 28ms
P99延迟 1200ms 52ms
QPS 85 420
错误率 3.2% 0.1%
===== 运维指标 =====
数据一致性问题 每周2-3次 0次
组件数量 4个 2个(PG+Redis)
备份复杂度 高(两套) 低(一套)
开发代码量 - 减少约40%FAQ
Q1:PGVector和Qdrant相比,数据量多大时要考虑切换?
A:通用建议:500万条以内用PGVector完全够,维护简单且性能优秀。超过500万条或P99超过50ms时,考虑Qdrant(量化压缩节省内存)或Citus(水平分片)。如果有多租户场景且各租户数据隔离要求高,PGVector的行级安全策略是天然优势,可以承载更大数据量。
Q2:向量+关系型数据混在一张表,会影响普通SQL的性能吗?
A:向量列(vector类型)对非向量查询几乎没有影响,因为PostgreSQL的列式存储让每次查询只扫描需要的列。需要注意的是:表大小增加(每条记录+6KB向量),全表扫描会更慢,但有了正确索引就不会全表扫描。
Q3:读写分离后,刚写入的向量数据能立即在从库检索到吗?
A:有延迟(流复制延迟通常<100ms,在同一机房<10ms)。对于RAG系统,新文档入库后通常有秒级的处理延迟(分割、嵌入),加上复制延迟是可以接受的。如果业务要求写入后立即能查到,可以对"刚写入的文档"直接路由到主库查询。
Q4:PostgreSQL的VACUUM对向量索引有影响吗?
A:有。HNSW索引在大量DELETE/UPDATE后会有"膨胀"(dead tuples),需要VACUUM来回收空间并重建索引中的死节点。建议:1)配置autovacuum积极回收;2)对变更频繁的表,适当降低autovacuum_vacuum_scale_factor触发阈值。
Q5:多租户下,不同租户的数据量差异很大(大客户100万条 vs 小客户100条),怎么优化?
A:这正是分区表的价值——为大客户创建独立分区,独立HNSW索引;小客户共用一个通用分区。大客户的索引小而精,查询快;小客户数据量少,即使共用索引也很快。代码层面,根据租户的plan字段动态选择查询策略。
总结
"向量+关系型数据,混在一个PostgreSQL"这个架构,不是妥协,而是在数据量百万级以内场景下的最优解。
它带来的收益:
- 性能提升:消除多次网络往返,复合查询单次SQL完成
- 数据一致性:一个事务保证向量和业务数据同步
- 运维简化:少一个数据库组件,少一套备份/监控/告警
- 开发效率:不再需要维护两套数据访问层
什么时候需要分开:数据量超过500万、需要TB级向量存储、或者现有PostgreSQL服务器资源已经告急。这时候再迁移Qdrant,有了PGVector的业务数据做参照,迁移也不会出现数据不一致。
