AI 应用的 Schema 演进——向量和关系型数据的协同变更
AI 应用的 Schema 演进——向量和关系型数据的协同变更
写这篇文章的时候,我脑子里一直在想一个具体的场景:
一个企业知识库产品,上线运行了半年,突然产品说要支持"文档分类标签"功能——用户可以给文档打多个标签,搜索的时候可以按标签过滤。
听起来很简单对吧?加个字段,写个接口,上线。
然而在 AI 应用里,这件事要复杂得多。因为涉及到的不只是关系型数据库的 Schema 变更,还包括:
- 向量库里每个 chunk 的 metadata 结构要更新
- 已有的几百万条向量数据要补充 tag 字段
- 如果将来 tag 也要参与语义搜索("找所有关于'财务'标签的文档"),那 tag 信息需要融合进 Embedding
- 向量的维度或者生成策略可能因此改变
这就是 AI 应用的 Schema 演进——你不只是在迁移数据库表,你在迁移"知识结构"。
为什么 AI 应用的 Schema 演进特别难
和传统 CRUD 应用的 Schema 演进相比,AI 应用有几个独特的挑战:
1. 向量不可增量更新 关系型数据库加个字段,只需要 ALTER TABLE 然后批量 UPDATE 新字段就好。但向量没法这么做——如果 Embedding 的输入发生变化(比如增加了 tag 信息),向量必须整体重新生成,不能只改"某一部分"。
2. 向量库的 Schema 变更代价高 Milvus、Qdrant 这类向量库,修改 Collection 的字段结构通常需要重建 Collection(某些版本支持动态 Schema,但限制很多)。重建意味着数据迁移。
3. 新旧 Schema 的数据可比较性问题 迁移期间,数据库里同时存在"旧 Schema 的数据"和"新 Schema 的数据",它们在向量空间里不可直接比较。应用层需要感知并处理这个过渡状态。
4. 迁移时间长 重新生成几百万条向量,可能需要几十小时,这期间系统要持续服务。
Schema 版本化:核心设计思路
解决思路是引入 Schema 版本化机制——把当前在用的 Schema 版本显式地管理起来,让系统在任何时间点都知道每条数据属于哪个 Schema 版本,查询时按版本路由。
@Data
@Builder
@Entity
@Table(name = "document_schema_version")
public class DocumentSchemaVersion {
@Id
private String id;
private int version; // Schema 版本号,整数递增
private String description; // 这个版本变了什么
private String embeddingModel; // 对应的 Embedding 模型
private String chunkStrategy; // 分块策略(fixed/sentence/semantic)
private int chunkSize; // 分块大小
private int chunkOverlap; // 分块重叠
@Column(columnDefinition = "JSON")
private String metadataSchema; // JSON Schema,描述 metadata 的结构
@Column(columnDefinition = "JSON")
private String embeddingInputTemplate; // 如何构建 Embedding 的输入文本
private boolean isActive; // 是否是当前活跃版本
private Instant createdAt;
private Instant activatedAt;
// Schema 版本对应的向量 Collection 名称
public String getCollectionName(String tenantId) {
return String.format("docs_%s_v%d", tenantId, version);
}
}每个 Schema 版本对应一个独立的向量 Collection,这样新旧版本的数据完全隔离:
v1: docs_{tenantId}_v1 ← 旧 Schema,正在使用
v2: docs_{tenantId}_v2 ← 新 Schema,正在迁移中Embedding 输入模板:Schema 的核心
Schema 变更里最关键的部分是"Embedding 输入如何构建"。我用一个模板来描述这个逻辑:
/**
* Embedding 输入构建器
* 根据不同的 Schema 版本,用不同的方式构建 Embedding 的输入文本
*/
@Component
public class EmbeddingInputBuilder {
/**
* 构建文档 chunk 的 Embedding 输入
*/
public String buildInput(DocumentChunk chunk, DocumentSchemaVersion schema) {
String template = schema.getEmbeddingInputTemplate();
// 模板示例(v1):只用正文内容
// template = "{{content}}"
// 模板示例(v2):加入标题和标签,提升语义
// template = "标题:{{title}}\n标签:{{tags}}\n内容:{{content}}"
// 模板示例(v3):加入文档类型和摘要
// template = "类型:{{doc_type}}\n摘要:{{summary}}\n标题:{{title}}\n内容:{{content}}"
return fillTemplate(template, chunk);
}
private String fillTemplate(String template, DocumentChunk chunk) {
Map<String, String> vars = new HashMap<>();
vars.put("content", chunk.getContent());
vars.put("title", chunk.getMetadata("title", ""));
vars.put("tags", String.join(", ", chunk.getTagList()));
vars.put("doc_type", chunk.getMetadata("doc_type", "unknown"));
vars.put("summary", chunk.getMetadata("summary", ""));
String result = template;
for (Map.Entry<String, String> entry : vars.entrySet()) {
result = result.replace("{{" + entry.getKey() + "}}", entry.getValue());
}
return result;
}
}这个设计的好处是:当 Schema 版本变化时,只需要在数据库里更新模板字符串,不需要改代码。
Schema 演进流程
Schema 变更的具体实现
下面以"增加 tags 支持"这个具体需求为例,走一遍完整的变更流程:
第一步:定义新 Schema 版本
// 在数据库里注册 v2 Schema
DocumentSchemaVersion v2 = DocumentSchemaVersion.builder()
.version(2)
.description("增加标签支持,Embedding 输入中融入标签信息")
.embeddingModel("text-embedding-3-large")
.chunkStrategy("sentence")
.chunkSize(512)
.chunkOverlap(50)
.metadataSchema("""
{
"type": "object",
"properties": {
"title": {"type": "string"},
"doc_type": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"author": {"type": "string"},
"created_at": {"type": "string", "format": "date"}
}
}
""")
.embeddingInputTemplate("标签:{{tags}}\\n标题:{{title}}\\n内容:{{content}}")
.isActive(false)
.createdAt(Instant.now())
.build();
schemaVersionRepo.save(v2);第二步:创建 v2 Collection
@Service
public class CollectionManager {
@Autowired
private MilvusClient milvusClient;
public void createCollectionForSchema(String tenantId, DocumentSchemaVersion schema) {
String collectionName = schema.getCollectionName(tenantId);
// 根据 Schema 版本的 Embedding 模型确定维度
int dimension = getDimensionForModel(schema.getEmbeddingModel());
// 定义字段(根据 metadataSchema 动态构建)
List<FieldType> fields = new ArrayList<>();
fields.add(FieldType.newBuilder()
.withName("id")
.withDataType(DataType.VarChar)
.withMaxLength(64)
.withPrimaryKey(true)
.build());
fields.add(FieldType.newBuilder()
.withName("embedding")
.withDataType(DataType.FloatVector)
.withDimension(dimension)
.build());
// 根据 metadataSchema 动态添加过滤字段
fields.add(FieldType.newBuilder()
.withName("doc_id")
.withDataType(DataType.VarChar)
.withMaxLength(64)
.build());
fields.add(FieldType.newBuilder()
.withName("tenant_id")
.withDataType(DataType.VarChar)
.withMaxLength(64)
.build());
// v2 新增:tags 字段(用逗号分隔的字符串,便于过滤)
fields.add(FieldType.newBuilder()
.withName("tags")
.withDataType(DataType.VarChar)
.withMaxLength(512)
.build());
CollectionSchemaParam schemaParam = CollectionSchemaParam.newBuilder()
.withFieldTypes(fields)
.build();
CreateCollectionParam createParam = CreateCollectionParam.newBuilder()
.withCollectionName(collectionName)
.withSchema(schemaParam)
.build();
milvusClient.createCollection(createParam);
// 创建索引
createIndex(collectionName);
log.info("Created collection {} for schema v{}", collectionName, schema.getVersion());
}
private int getDimensionForModel(String model) {
return switch (model) {
case "text-embedding-ada-002" -> 1536;
case "text-embedding-3-large" -> 3072;
case "text-embedding-3-small" -> 1536;
default -> throw new IllegalArgumentException("Unknown model: " + model);
};
}
}第三步:存量迁移 Job
这里的难点在于旧数据里可能没有 tags 字段(因为当时还不支持标签功能)。迁移时要从关系型数据库里读取文档的最新状态(包括用户事后补充的标签),然后用新模板重新生成 Embedding:
@Service
public class SchemaMigrationService {
@Autowired
private DocumentRepository documentRepo;
@Autowired
private DocumentSchemaVersionRepository schemaRepo;
@Autowired
private EmbeddingInputBuilder inputBuilder;
@Autowired
private EmbeddingServiceFactory embeddingFactory;
@Autowired
private VectorStoreService vectorStore;
/**
* 将某租户的文档从 v1 Schema 迁移到 v2 Schema
*/
public void migrateToNewSchema(String tenantId, int fromVersion, int toVersion) {
DocumentSchemaVersion toSchema = schemaRepo.findByVersion(toVersion);
EmbeddingService embService = embeddingFactory.getService(toSchema.getEmbeddingModel());
String targetCollection = toSchema.getCollectionName(tenantId);
// 分页处理所有文档
int pageSize = 50;
int page = 0;
while (true) {
Page<Document> docPage = documentRepo.findByTenantId(
tenantId, PageRequest.of(page, pageSize)
);
if (docPage.isEmpty()) break;
for (Document doc : docPage.getContent()) {
try {
migrateDocument(doc, toSchema, embService, targetCollection, tenantId);
} catch (Exception e) {
log.error("Failed to migrate document {}: {}", doc.getId(), e.getMessage());
// 记录失败,后续可以重试
migrationFailureRepo.record(doc.getId(), tenantId, e.getMessage());
}
}
page++;
log.info("Migrated page {}/{} for tenant {}",
page, docPage.getTotalPages(), tenantId);
}
}
private void migrateDocument(Document doc, DocumentSchemaVersion targetSchema,
EmbeddingService embService,
String targetCollection, String tenantId) {
// 检查是否已迁移(幂等)
if (vectorStore.existsByDocId(targetCollection, doc.getId(), tenantId)) {
return;
}
// 读取文档的所有 chunk(原始文本保存在关系型数据库)
List<DocumentChunk> chunks = documentRepo.findChunksByDocId(doc.getId());
// 给每个 chunk 设置最新的 metadata(包括新增的 tags)
for (DocumentChunk chunk : chunks) {
chunk.setTagList(doc.getTags()); // 从关系型 DB 读取最新的标签
chunk.setMetadata("title", doc.getTitle());
chunk.setMetadata("doc_type", doc.getDocType());
}
// 用新 Schema 的模板构建 Embedding 输入
List<String> embeddingInputs = chunks.stream()
.map(chunk -> inputBuilder.buildInput(chunk, targetSchema))
.collect(Collectors.toList());
// 生成新的 Embedding
List<float[]> newEmbeddings = embService.embedBatch(embeddingInputs);
for (int i = 0; i < chunks.size(); i++) {
chunks.get(i).setEmbedding(newEmbeddings.get(i));
}
// 写入新 Collection
vectorStore.insertChunks(targetCollection, chunks, tenantId);
}
}第四步:感知 Schema 版本的查询
查询时,需要知道当前应该用哪个 Schema 版本来处理请求:
@Service
public class SchemaAwareQueryService {
@Autowired
private DocumentSchemaVersionRepository schemaRepo;
@Autowired
private EmbeddingInputBuilder inputBuilder;
@Autowired
private EmbeddingServiceFactory embeddingFactory;
@Autowired
private VectorStoreService vectorStore;
public List<DocumentChunk> search(String tenantId, String queryText,
List<String> tagFilter, int topK) {
// 获取当前活跃的 Schema 版本
DocumentSchemaVersion activeSchema = schemaRepo.findActiveByTenantId(tenantId);
// 用当前 Schema 的模型生成查询向量
// 注意:查询的 Embedding 输入也要和文档的 Embedding 输入用一样的模板
// 这里查询模板和文档模板略有不同——不包含 content 占位符
String queryInput = buildQueryInput(queryText, tagFilter, activeSchema);
EmbeddingService embService = embeddingFactory.getService(activeSchema.getEmbeddingModel());
float[] queryVector = embService.embed(queryInput);
// 构建过滤条件(如果有标签过滤)
String filterExpr = buildFilterExpr(tenantId, tagFilter);
String collectionName = activeSchema.getCollectionName(tenantId);
return vectorStore.search(collectionName, queryVector, filterExpr, topK, tenantId);
}
private String buildQueryInput(String queryText, List<String> tags,
DocumentSchemaVersion schema) {
// 查询 Embedding 的输入也需要和文档 Embedding 的输入格式一致
// 但查询通常没有 content,所以用 queryText 代替
DocumentChunk pseudoChunk = DocumentChunk.builder()
.content(queryText)
.tagList(tags != null ? tags : List.of())
.build();
return inputBuilder.buildInput(pseudoChunk, schema);
}
private String buildFilterExpr(String tenantId, List<String> tagFilter) {
StringBuilder sb = new StringBuilder();
sb.append(String.format("tenant_id == \"%s\"", tenantId));
if (tagFilter != null && !tagFilter.isEmpty()) {
// 简单的标签过滤:tags 字段包含指定标签(用 like 匹配)
for (String tag : tagFilter) {
sb.append(String.format(" && tags like \"%%%s%%\"", tag));
}
}
return sb.toString();
}
}关系型数据库的协同变更
向量库的变更和关系型数据库的变更需要协调进行。原则是:关系型数据库先变更(向后兼容),向量库后跟进。
-- 关系型数据库 Schema 变更(向后兼容,新字段允许 NULL)
ALTER TABLE document ADD COLUMN tags JSON NULL DEFAULT NULL;
ALTER TABLE document_chunk ADD COLUMN schema_version INT NOT NULL DEFAULT 1;
-- 索引:支持按 schema_version 查询(方便分版本迁移)
CREATE INDEX idx_document_chunk_schema_version ON document_chunk(schema_version);
-- 迁移进度表
CREATE TABLE schema_migration_progress (
id VARCHAR(64) PRIMARY KEY,
tenant_id VARCHAR(64) NOT NULL,
from_version INT NOT NULL,
to_version INT NOT NULL,
total_docs BIGINT NOT NULL,
processed_docs BIGINT NOT NULL DEFAULT 0,
failed_docs BIGINT NOT NULL DEFAULT 0,
status ENUM('RUNNING', 'PAUSED', 'COMPLETED', 'FAILED') NOT NULL,
started_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP NULL,
INDEX idx_tenant_status (tenant_id, status)
);一个真实的踩坑经历
我在做"增加 tags 支持"这个 Schema 演进时,犯了一个错误:
迁移完成之后,新的查询改用了"标签:xxx 标题:yyy 内容:zzz"这个模板来生成 Embedding。但是查询的时候,我忘了把查询输入也改成相同格式——查询输入还是直接用的原始问题文本,没有附加标签信息。
结果就是:文档侧的 Embedding 融合了标签信息,查询侧的 Embedding 没有,两者在向量空间里的分布不一致,召回质量反而比迁移前还差了一些。
这个问题最终是通过把查询 Embedding 的输入格式也统一到和文档一样的模板来解决的:查询"关于财务合规的文件"时,Embedding 输入变成"标签:(空)\n 标题:(空)\n 内容:关于财务合规的文件",这样就和文档 Embedding 的构建方式对齐了。
教训:Schema 版本管理一定要包含"Embedding 输入的构建方式",而且文档侧和查询侧必须共用同一套规则。
小结
AI 应用的 Schema 演进,核心思路是:
- Schema 版本化:每个 Schema 版本对应独立的向量 Collection,新旧版本完全隔离
- Embedding 输入模板化:把"如何构建 Embedding 输入"从代码里抽出来,放进 Schema 版本配置,文档侧和查询侧共用
- 关系型 DB 先行:关系型数据库的变更先于向量库,保持向后兼容
- 渐进切流:迁移期间双写,按覆盖率逐步切换查询到新 Schema
Schema 演进是 AI 应用生命周期中的必经之路,产品需求永远在变。建立这套机制的成本,远小于每次变更都"推倒重来"的成本。
