AI 应用的多版本管理——模型、Prompt、知识库的版本协调
AI 应用的多版本管理——模型、Prompt、知识库的版本协调
适读人群:维护 AI 应用的工程师 | 阅读时长:约14分钟 | 核心价值:建立可回滚、可审计的 AI 版本管理体系,避免升级事故
去年9月,我们把知识库 RAG 用的 embedding 模型从 text-embedding-ada-002 换成了 text-embedding-3-large。
换模型之前做了评测,新模型检索质量明显更好,我们很满意,直接推上了生产。
然后用户开始反映 AI 回答质量下降了。
我们花了两天查问题,最后发现:embedding 模型换了,但知识库里的向量还是老模型生成的。检索时用新模型生成 query 向量,但知识库里是老模型的向量,两者的向量空间不一致,检索结果一团糟。
修复很简单:重新用新模型对知识库做全量 embedding。但这需要时间,在这段时间里,用户一直在用坏掉的系统。
那次事故之后,我认真搭了一套版本管理体系。这篇文章把这套体系完整写出来。
AI 应用里有哪些"版本"
传统应用的版本管理:主要是代码版本(git)。
AI 应用要同时管理多个组件的版本:
组件 变更原因 变更频率
----------------------------------------------------------
大语言模型(LLM) 模型提供商发布新版 每1-3个月
Embedding 模型 检索质量优化 每3-6个月
System Prompt 功能改进、问题修复 每1-2周
RAG Prompt 检索和生成效果调优 每1-2周
知识库内容 业务文档更新 每天
知识库向量索引 embedding 模型变更时重建 和 embedding 模型同步
应用代码 功能开发 每周这些组件之间有依赖关系:知识库向量依赖 embedding 模型版本;Prompt 往往针对特定的 LLM 版本优化;如果你的 Prompt 用了新版 LLM 的特定能力,回滚到老模型时 Prompt 也要同步回滚。
这些依赖关系是事故的根源。
版本管理的数据库设计
先把版本信息存起来:
-- 模型版本表
CREATE TABLE model_configs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL, -- 配置名称,如 'production'
llm_model VARCHAR(100) NOT NULL, -- 如 'gpt-4o-2024-11-20'
embed_model VARCHAR(100) NOT NULL, -- 如 'text-embedding-3-large'
llm_params JSONB NOT NULL DEFAULT '{}', -- temperature, max_tokens 等
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by VARCHAR(100),
note TEXT -- 变更原因
);
-- Prompt 版本表
CREATE TABLE prompt_versions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
prompt_key VARCHAR(100) NOT NULL, -- 如 'system_prompt', 'rag_prompt'
version VARCHAR(20) NOT NULL, -- 语义版本,如 '2.3.1'
content TEXT NOT NULL,
compatible_llms TEXT[] NOT NULL DEFAULT '{}', -- 兼容的 LLM 列表
is_active BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by VARCHAR(100),
note TEXT,
UNIQUE(prompt_key, version)
);
-- 知识库索引版本表
CREATE TABLE kb_index_versions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
kb_id VARCHAR(100) NOT NULL,
version VARCHAR(20) NOT NULL,
embed_model VARCHAR(100) NOT NULL, -- 用哪个 embedding 模型生成的
doc_count INT NOT NULL DEFAULT 0,
chunk_count INT NOT NULL DEFAULT 0,
build_status VARCHAR(20) NOT NULL DEFAULT 'building',
-- 'building' | 'ready' | 'deprecated'
built_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(kb_id, version)
);
-- 活跃配置表:记录当前生产环境用的各版本
CREATE TABLE active_config (
id SERIAL PRIMARY KEY,
model_config_id UUID NOT NULL REFERENCES model_configs(id),
kb_index_id UUID REFERENCES kb_index_versions(id),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_by VARCHAR(100)
);
-- 这个表只有一行,代表当前生产配置
-- 版本切换历史(审计用)
CREATE TABLE config_change_log (
id BIGSERIAL PRIMARY KEY,
from_model_config UUID REFERENCES model_configs(id),
to_model_config UUID REFERENCES model_configs(id),
from_kb_index UUID REFERENCES kb_index_versions(id),
to_kb_index UUID REFERENCES kb_index_versions(id),
changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
changed_by VARCHAR(100),
reason TEXT
);Prompt 版本管理
Prompt 的版本管理是重点,因为它变得最频繁。
from enum import Enum
class PromptRegistry:
"""Prompt 版本注册中心"""
def __init__(self, db):
self.db = db
self._cache = {} # 内存缓存,减少 DB 查询
async def get_active_prompt(self, prompt_key: str, llm_model: str) -> str:
"""获取指定 key 的当前激活 Prompt,同时检查 LLM 兼容性"""
cache_key = f"{prompt_key}:{llm_model}"
if cache_key in self._cache:
return self._cache[cache_key]
row = await self.db.fetchrow(
"""SELECT content, compatible_llms, version
FROM prompt_versions
WHERE prompt_key = $1 AND is_active = TRUE
ORDER BY created_at DESC LIMIT 1""",
prompt_key
)
if not row:
raise ValueError(f"No active prompt found for key: {prompt_key}")
# 检查 LLM 兼容性
compatible = row['compatible_llms']
if compatible and llm_model not in compatible:
# 当前 LLM 不在兼容列表里,找一个兼容的版本
alt_row = await self.db.fetchrow(
"""SELECT content, version
FROM prompt_versions
WHERE prompt_key = $1
AND $2 = ANY(compatible_llms)
ORDER BY created_at DESC LIMIT 1""",
prompt_key, llm_model
)
if alt_row:
import logging
logging.warning(
f"Active prompt {prompt_key} v{row['version']} not compatible "
f"with {llm_model}, falling back to v{alt_row['version']}"
)
content = alt_row['content']
else:
# 没有兼容的版本,用当前激活版本并记录警告
import logging
logging.error(
f"No compatible prompt for {prompt_key} with {llm_model}, "
f"using active version {row['version']} anyway"
)
content = row['content']
else:
content = row['content']
# 缓存5分钟
self._cache[cache_key] = content
asyncio.get_event_loop().call_later(300, lambda: self._cache.pop(cache_key, None))
return content
async def register_prompt(
self,
prompt_key: str,
version: str,
content: str,
compatible_llms: list[str],
note: str = None,
activate_immediately: bool = False
):
"""注册新的 Prompt 版本"""
async with self.db.transaction():
await self.db.execute(
"""INSERT INTO prompt_versions
(prompt_key, version, content, compatible_llms, note)
VALUES ($1, $2, $3, $4, $5)""",
prompt_key, version, content, compatible_llms, note
)
if activate_immediately:
await self.activate_prompt(prompt_key, version)
async def activate_prompt(self, prompt_key: str, version: str):
"""激活指定版本的 Prompt"""
async with self.db.transaction():
# 先把当前激活的版本取消激活
await self.db.execute(
"""UPDATE prompt_versions SET is_active = FALSE
WHERE prompt_key = $1 AND is_active = TRUE""",
prompt_key
)
# 激活新版本
affected = await self.db.execute(
"""UPDATE prompt_versions SET is_active = TRUE
WHERE prompt_key = $1 AND version = $2""",
prompt_key, version
)
if affected == 0:
raise ValueError(f"Prompt {prompt_key} v{version} not found")
# 清缓存
keys_to_delete = [k for k in self._cache if k.startswith(f"{prompt_key}:")]
for k in keys_to_delete:
del self._cache[k]安全的模型升级流程
从那次事故里我总结出了一个模型升级的标准流程,现在每次升级都按这个流程走:
class ModelUpgradeManager:
async def plan_embedding_model_upgrade(
self,
new_embed_model: str,
kb_ids: list[str]
) -> dict:
"""
规划 embedding 模型升级方案。
在实际执行前先做检查,生成升级计划。
"""
current_config = await self.db.fetchrow(
"""SELECT mc.embed_model, kiv.id as kb_index_id, kiv.version as kb_version
FROM active_config ac
JOIN model_configs mc ON ac.model_config_id = mc.id
LEFT JOIN kb_index_versions kiv ON ac.kb_index_id = kiv.id
ORDER BY ac.updated_at DESC LIMIT 1"""
)
plan = {
"current_embed_model": current_config['embed_model'],
"target_embed_model": new_embed_model,
"affected_kbs": kb_ids,
"steps": [
"1. 用新 embedding 模型对所有 KB 重新建索引(后台,不影响生产)",
"2. 等新索引 build 完成并验证质量",
"3. 原子切换:同时更新 model_config 和 kb_index,指向新版本",
"4. 观察24小时,若有问题立即回滚"
],
"rollback_point": {
"model_config_id": current_config.get('model_config_id'),
"kb_index_id": current_config.get('kb_index_id')
}
}
return plan
async def rebuild_kb_index(
self,
kb_id: str,
new_embed_model: str,
new_version: str
) -> str:
"""
用新 embedding 模型重建 KB 索引。
在后台新集合里建,不影响当前生产集合。
返回新索引的 ID。
"""
# 创建新索引版本记录
index_id = str(uuid4())
await self.db.execute(
"""INSERT INTO kb_index_versions
(id, kb_id, version, embed_model, build_status)
VALUES ($1, $2, $3, $4, 'building')""",
index_id, kb_id, new_version, new_embed_model
)
# 在向量库里创建新集合(命名区分版本)
new_collection = f"kb_{kb_id}_{new_version}"
await self.qdrant.create_collection(
collection_name=new_collection,
vectors_config={"size": 3072, "distance": "Cosine"} # text-embedding-3-large 是 3072 维
)
# 获取所有文档并重新 embedding
docs = await self.db.fetch(
"SELECT id, content FROM documents WHERE kb_id = $1 AND status = 'active'",
kb_id
)
chunk_count = 0
for doc in docs:
chunks = self._split_chunks(doc['content'])
embeddings = await self.embedding_service.batch_embed(
[c['text'] for c in chunks],
model=new_embed_model
)
points = [
{
"id": f"{doc['id']}_chunk_{i}",
"vector": emb,
"payload": {"document_id": str(doc['id']), "chunk_index": i}
}
for i, (chunk, emb) in enumerate(zip(chunks, embeddings))
]
await self.qdrant.upsert(
collection_name=new_collection,
points=points
)
chunk_count += len(chunks)
# 更新索引状态
await self.db.execute(
"""UPDATE kb_index_versions
SET build_status = 'ready', chunk_count = $2,
doc_count = $3, built_at = NOW()
WHERE id = $1""",
index_id, chunk_count, len(docs)
)
return index_id
async def atomic_switch_to_new_config(
self,
new_model_config_id: str,
new_kb_index_id: str,
changed_by: str,
reason: str
):
"""
原子切换到新配置。
记录切换历史,支持回滚。
"""
async with self.db.transaction():
# 记录当前配置(用于回滚)
current = await self.db.fetchrow(
"SELECT model_config_id, kb_index_id FROM active_config ORDER BY updated_at DESC LIMIT 1"
)
# 切换配置
await self.db.execute(
"""UPDATE active_config
SET model_config_id = $1, kb_index_id = $2,
updated_at = NOW(), updated_by = $3""",
new_model_config_id, new_kb_index_id, changed_by
)
# 记录变更历史
await self.db.execute(
"""INSERT INTO config_change_log
(from_model_config, to_model_config, from_kb_index, to_kb_index,
changed_by, reason)
VALUES ($1, $2, $3, $4, $5, $6)""",
current['model_config_id'] if current else None,
new_model_config_id,
current['kb_index_id'] if current else None,
new_kb_index_id,
changed_by, reason
)
# 清除应用层缓存
await self.cache.delete("active_model_config")
await self.cache.delete("active_kb_index")
async def rollback(self, steps: int = 1):
"""回滚到之前的配置"""
history = await self.db.fetch(
"""SELECT from_model_config, from_kb_index, changed_at
FROM config_change_log
ORDER BY changed_at DESC LIMIT $1""",
steps
)
if not history or not history[-1]['from_model_config']:
raise ValueError("No rollback point available")
rollback_target = history[-1]
await self.atomic_switch_to_new_config(
new_model_config_id=str(rollback_target['from_model_config']),
new_kb_index_id=str(rollback_target['from_kb_index']) if rollback_target['from_kb_index'] else None,
changed_by="system_rollback",
reason=f"Rollback triggered at {datetime.utcnow().isoformat()}"
)兼容性矩阵:防止版本冲突
有了版本管理,还需要一个"兼容性矩阵"来记录哪些版本可以一起用:
# 在配置文件或数据库里维护这个矩阵
COMPATIBILITY_MATRIX = {
"gpt-4o-2024-11-20": {
"system_prompt": ["3.x.x"], # 3.x.x 的 system prompt 适配这个模型
"rag_prompt": ["2.x.x", "3.x.x"],
},
"gpt-4o-2024-08-06": {
"system_prompt": ["2.x.x", "3.x.x"],
"rag_prompt": ["2.x.x"],
},
"claude-3-5-sonnet-20241022": {
"system_prompt": ["3.x.x"],
"rag_prompt": ["3.x.x"],
}
}
def check_compatibility(llm_model: str, prompt_key: str, prompt_version: str) -> bool:
"""检查给定的 LLM 和 Prompt 版本是否兼容"""
import re
model_compat = COMPATIBILITY_MATRIX.get(llm_model, {})
allowed_patterns = model_compat.get(prompt_key, [])
for pattern in allowed_patterns:
# 把 x 替换成 .*,做版本范围匹配
regex_pattern = pattern.replace(".", r"\.").replace(r"\.x", r"\.\d+")
if re.match(f"^{regex_pattern}$", prompt_version):
return True
return False那次事故之后,我们所有的模型升级都要先过这个检查。embedding 模型变了,必须同时重建知识库索引,不允许只改其中一个。
现在回想起来,那次事故其实是个低级错误:两个互相依赖的组件,只改了一个。版本管理的本质,就是把这些隐性依赖显性化,让你在操作时不能忽视它们。
