在生产环境换 Embedding 模型——不停机迁移的工程实践
在生产环境换 Embedding 模型——不停机迁移的工程实践
去年年初,text-embedding-ada-002 还是主流,我们系统里几百万条向量都是用这个模型生成的。然后 OpenAI 发布了 text-embedding-3-large,同等成本下检索质量提升了 20% 以上。
按理说这是好事,升级就好了。但实际情况是:Embedding 模型换了,所有向量都必须重新生成。不是迁移一下数据库表结构,是把几百万条向量全部删掉重建。
而且这件事必须在线上不停服的情况下完成。
我们当时的数据量:
- 约 380 万条向量(覆盖 12 万份文档)
- 平均每天新增 3000 条文档
- 系统 7x24 小时对外提供查询服务,SLA 要求 99.9%
这篇文章把整个迁移过程从头到尾拆开讲。
为什么不能直接换
最暴力的方案:停服 → 删光旧向量 → 重新 Embedding 全量数据 → 重启服务。
这个方案的问题显而易见:380 万条向量,按每批 100 条调用 Embedding API,需要 38000 次 API 调用,按并发 10 线程、每批 2 秒估算,大概需要 21 小时。你让服务停 21 小时吗?
另一个暴力方案:保持服务运行,直接切换 Embedding 模型,同时异步重建向量。这个问题更大:新旧向量在同一个向量空间里是不可比较的。用新模型生成的查询向量,去和旧模型生成的文档向量做相似度计算,得到的结果完全没有意义。
所以需要一套精心设计的迁移方案。
核心思路:双写 + 渐进切流
整个迁移分四个阶段:
阶段一:准备
- 创建新的向量集合(使用新模型的维度,
text-embedding-3-large是 3072 维 vsada-002的 1536 维) - 部署同时支持两个模型的服务版本
阶段二:双写
- 新文档写入时,同时生成旧模型向量(写入旧集合)和新模型向量(写入新集合)
- 存量数据后台异步迁移到新集合
阶段三:灰度切流
- 当新集合的向量覆盖率达到一定比例(先 20%,再 50%,再 80%,再 100%),逐步将查询流量切到新集合
- 设置质量对比评估,确认新集合的召回质量不低于旧集合
阶段四:清理
- 存量数据全部迁移完成、流量完全切到新集合后,清理旧集合
迁移的工程实现
集合版本管理
首先建立一个向量集合的版本管理机制:
@Data
@Builder
@Entity
@Table(name = "vector_collection_config")
public class VectorCollectionConfig {
@Id
private String id;
private String tenantId;
private String collectionName;
private String embeddingModel; // "text-embedding-ada-002" 或 "text-embedding-3-large"
private int embeddingDimension; // 1536 或 3072
private CollectionStatus status; // ACTIVE / MIGRATING / DEPRECATED
private double trafficWeight; // 查询流量权重,0.0 到 1.0
private double coverageRate; // 已覆盖的文档比例
private Instant createdAt;
private Instant deprecatedAt;
public enum CollectionStatus {
ACTIVE, // 当前主用
MIGRATING, // 迁移中(新集合)
DEPRECATED // 已废弃(旧集合,等待清理)
}
}双写服务
@Service
public class DualWriteVectorService {
@Autowired
private VectorCollectionConfigRepository collectionConfigRepo;
@Autowired
private EmbeddingServiceFactory embeddingFactory;
@Autowired
private VectorStoreService vectorStore;
/**
* 双写模式:同时写入所有 ACTIVE 和 MIGRATING 状态的集合
*/
public void insertDocument(String tenantId, DocumentChunk chunk) {
List<VectorCollectionConfig> activeCollections = collectionConfigRepo
.findByTenantIdAndStatusIn(tenantId,
List.of(CollectionStatus.ACTIVE, CollectionStatus.MIGRATING));
List<CompletableFuture<Void>> futures = activeCollections.stream()
.map(config -> CompletableFuture.runAsync(() -> {
// 用对应集合的 Embedding 模型生成向量
EmbeddingService embService = embeddingFactory.getService(config.getEmbeddingModel());
float[] vector = embService.embed(chunk.getContent());
// 写入对应的集合
vectorStore.insert(config.getCollectionName(), chunk, vector, tenantId);
log.debug("Wrote chunk {} to collection {}", chunk.getId(), config.getCollectionName());
}))
.collect(Collectors.toList());
// 等待所有写入完成
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
// 任何一个集合写入失败,都要告警(但不能因此影响主流程)
log.error("Dual write partially failed for chunk {}: {}", chunk.getId(), e.getMessage());
alertService.sendAlert("DUAL_WRITE_FAILURE", chunk.getId());
}
}
/**
* 查询时按流量权重路由到不同集合
*/
public List<SearchResult> search(String tenantId, float[] queryVector,
String queryEmbeddingModel, int topK) {
// 找到使用相同 Embedding 模型的集合
List<VectorCollectionConfig> candidateCollections = collectionConfigRepo
.findByTenantIdAndEmbeddingModel(tenantId, queryEmbeddingModel);
// 按流量权重随机选一个
VectorCollectionConfig selected = weightedRandom(candidateCollections);
return vectorStore.search(selected.getCollectionName(), queryVector, topK, tenantId);
}
private VectorCollectionConfig weightedRandom(List<VectorCollectionConfig> configs) {
double totalWeight = configs.stream().mapToDouble(VectorCollectionConfig::getTrafficWeight).sum();
double random = Math.random() * totalWeight;
double cumulative = 0;
for (VectorCollectionConfig config : configs) {
cumulative += config.getTrafficWeight();
if (random <= cumulative) {
return config;
}
}
return configs.get(configs.size() - 1);
}
}存量数据迁移 Job
这是最核心的部分——把旧集合里的数据全部重新生成新 Embedding 并写入新集合:
@Component
public class EmbeddingMigrationJob {
@Autowired
private DocumentRepository documentRepo;
@Autowired
private VectorStoreService vectorStore;
@Autowired
private EmbeddingServiceFactory embeddingFactory;
@Autowired
private MigrationProgressRepository progressRepo;
// 控制迁移速度:不能把 Embedding API 的速率限制全部占满
private final Semaphore rateLimiter = new Semaphore(5); // 最大 5 并发
@Scheduled(fixedDelay = 60_000) // 每分钟检查一次
public void runMigration() {
List<MigrationTask> pendingTasks = progressRepo.findPendingTasks();
for (MigrationTask task : pendingTasks) {
if (task.getStatus() != MigrationStatus.RUNNING) continue;
migrateBatch(task);
}
}
/**
* 分批迁移,每批 100 条文档
*/
private void migrateBatch(MigrationTask task) {
int batchSize = 100;
long offset = task.getLastProcessedOffset();
// 从旧集合分页读取文档 ID
List<String> documentIds = documentRepo.findIdsByTenantId(
task.getTenantId(), offset, batchSize
);
if (documentIds.isEmpty()) {
// 迁移完成
task.setStatus(MigrationStatus.COMPLETED);
progressRepo.save(task);
log.info("Migration completed for tenant {}", task.getTenantId());
updateCoverageRate(task.getTenantId(), 1.0);
return;
}
EmbeddingService newEmbService = embeddingFactory.getService(task.getTargetModel());
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String docId : documentIds) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
rateLimiter.acquire(); // 限速
try {
migrateDocument(docId, task, newEmbService);
} finally {
rateLimiter.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
futures.add(future);
}
// 等待这批完成
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(5, TimeUnit.MINUTES);
} catch (TimeoutException e) {
log.warn("Batch migration timeout for tenant {}, will retry", task.getTenantId());
return; // 超时就下次再来,不强制等
} catch (Exception e) {
log.error("Batch migration error for tenant {}: {}", task.getTenantId(), e.getMessage());
}
// 更新进度
task.setLastProcessedOffset(offset + documentIds.size());
progressRepo.save(task);
// 更新覆盖率
long totalDocs = documentRepo.countByTenantId(task.getTenantId());
double coverageRate = (double)(offset + documentIds.size()) / totalDocs;
updateCoverageRate(task.getTenantId(), coverageRate);
log.info("Migration progress for tenant {}: {}/{} ({:.1f}%)",
task.getTenantId(), offset + documentIds.size(), totalDocs, coverageRate * 100);
}
private void migrateDocument(String docId, MigrationTask task, EmbeddingService embService) {
// 检查新集合里是否已有(断点续传)
if (vectorStore.existsInCollection(task.getTargetCollection(), docId)) {
return;
}
// 从关系型数据库读取原始文本
List<DocumentChunk> chunks = documentRepo.findChunksByDocumentId(docId);
if (chunks.isEmpty()) return;
// 批量生成新 Embedding
List<String> texts = chunks.stream().map(DocumentChunk::getContent).collect(Collectors.toList());
List<float[]> newEmbeddings = embService.embedBatch(texts);
// 写入新集合
for (int i = 0; i < chunks.size(); i++) {
DocumentChunk chunk = chunks.get(i);
chunk.setEmbedding(newEmbeddings.get(i));
}
vectorStore.insertChunks(task.getTargetCollection(), chunks, task.getTenantId());
}
private void updateCoverageRate(String tenantId, double rate) {
collectionConfigRepo.updateCoverageRate(tenantId, "MIGRATING", rate);
// 根据覆盖率自动调整流量权重(这是渐进切流的核心)
double newWeight = calculateTrafficWeight(rate);
collectionConfigRepo.updateTrafficWeight(tenantId, "MIGRATING", newWeight);
log.info("Updated traffic weight for tenant {} new collection: {:.2f}", tenantId, newWeight);
}
/**
* 覆盖率到流量权重的映射策略
* 不是线性的:覆盖率 50% 时只给 20% 流量,覆盖率 100% 时给 100% 流量
* 这样保守一点,优先保证质量
*/
private double calculateTrafficWeight(double coverageRate) {
if (coverageRate < 0.2) return 0.0;
if (coverageRate < 0.5) return 0.1;
if (coverageRate < 0.8) return 0.3;
if (coverageRate < 0.95) return 0.7;
return 1.0;
}
}查询路由:感知当前模型阶段
查询的时候,客户端发来的是文本,需要先 Embed,然后在向量库里搜索。迁移期间,这个"先 Embed"用的是哪个模型,决定了要查哪个集合:
@Service
public class RAGQueryService {
@Autowired
private VectorCollectionConfigRepository collectionConfigRepo;
@Autowired
private EmbeddingServiceFactory embeddingFactory;
@Autowired
private DualWriteVectorService vectorService;
public List<DocumentChunk> retrieve(String tenantId, String query, int topK) {
// 获取当前应该使用的 Embedding 模型(取流量权重最高的那个集合对应的模型)
VectorCollectionConfig primaryCollection = collectionConfigRepo
.findPrimaryByTenantId(tenantId);
String embeddingModel = primaryCollection.getEmbeddingModel();
EmbeddingService embService = embeddingFactory.getService(embeddingModel);
// 用对应模型 Embed 查询
float[] queryVector = embService.embed(query);
// 查询(内部按流量权重路由)
List<SearchResult> results = vectorService.search(
tenantId, queryVector, embeddingModel, topK
);
return results.stream().map(this::toChunk).collect(Collectors.toList());
}
}一致性校验:迁移完成后的验证
迁移完成后,必须验证新集合的数据和旧集合一致:
@Service
public class MigrationConsistencyChecker {
public ConsistencyReport check(String tenantId, String oldCollection, String newCollection) {
// 随机抽样 1000 条文档
List<String> sampleIds = documentRepo.sampleDocumentIds(tenantId, 1000);
int missingCount = 0;
int dimensionMismatchCount = 0;
for (String docId : sampleIds) {
// 检查新集合中是否存在
boolean existsInNew = vectorStore.existsInCollection(newCollection, docId);
if (!existsInNew) {
missingCount++;
log.warn("Document {} missing in new collection", docId);
continue;
}
// 检查维度是否正确
int dimension = vectorStore.getDimension(newCollection, docId);
if (dimension != 3072) { // text-embedding-3-large 的维度
dimensionMismatchCount++;
log.warn("Document {} has wrong dimension: {}", docId, dimension);
}
}
double missingRate = (double) missingCount / sampleIds.size();
double mismatchRate = (double) dimensionMismatchCount / sampleIds.size();
boolean isHealthy = missingRate < 0.001 && mismatchRate == 0; // 允许 0.1% 的缺失
return ConsistencyReport.builder()
.tenantId(tenantId)
.sampleSize(sampleIds.size())
.missingCount(missingCount)
.dimensionMismatchCount(dimensionMismatchCount)
.missingRate(missingRate)
.isHealthy(isHealthy)
.build();
}
/**
* 召回质量对比:用一批测试 query,比较新旧集合的召回质量
*/
public QualityComparisonReport compareQuality(String tenantId,
List<String> testQueries,
List<List<String>> groundTruth) {
double oldMRR = 0, newMRR = 0;
for (int i = 0; i < testQueries.size(); i++) {
String query = testQueries.get(i);
List<String> relevant = groundTruth.get(i);
// 旧集合查询
List<SearchResult> oldResults = queryCollection(tenantId, query,
"text-embedding-ada-002", 10);
oldMRR += computeMRR(oldResults, relevant);
// 新集合查询
List<SearchResult> newResults = queryCollection(tenantId, query,
"text-embedding-3-large", 10);
newMRR += computeMRR(newResults, relevant);
}
oldMRR /= testQueries.size();
newMRR /= testQueries.size();
double improvement = (newMRR - oldMRR) / oldMRR * 100;
return QualityComparisonReport.builder()
.oldModelMRR(oldMRR)
.newModelMRR(newMRR)
.improvementPercent(improvement)
.isNewModelBetter(newMRR > oldMRR)
.build();
}
private double computeMRR(List<SearchResult> results, List<String> relevant) {
for (int rank = 0; rank < results.size(); rank++) {
if (relevant.contains(results.get(rank).getDocumentId())) {
return 1.0 / (rank + 1);
}
}
return 0.0;
}
}实际迁移结果
我们在一个周末启动了迁移任务(避开高峰期),整个过程:
- 双写阶段持续了 3 天(同时新文档双写,存量后台迁移)
- 存量 380 万条向量的迁移,用了约 36 小时(限速到 5 并发,避免影响线上查询)
- 渐进切流从 10% 到 100% 用了 2 天
- 整个过程服务可用性 100%,没有任何用户感知到影响
质量对比结果:用内部的 100 条测试 query 评估,新模型的 MRR@10 从 0.71 提升到 0.84,提升了 18.3%。
成本方面:text-embedding-3-large 比 ada-002 贵一点,但召回质量提升后,整个系统需要召回的 chunk 数量减少了(因为前几个结果就已经很准确),LLM 处理的 context 变短,最终总成本反而下降了约 8%。
小结
Embedding 模型迁移的核心难点不是技术复杂度,而是如何在不影响服务可用性的情况下完成一个涉及多存储、耗时几十小时的大规模数据重建工作。
解决方案的关键点:
- 双写保证新数据不落空:迁移期间所有新文档同时写入新旧两个集合
- 渐进切流而非一刀切:覆盖率不够高时,新集合只承担小比例流量
- 一致性校验是必须项:迁移完成后,随机抽样验证覆盖率和质量
- 断点续传:迁移 Job 必须支持中断后从断点继续,不能从头重来
