多租户 AI 应用架构——数据隔离和资源隔离怎么做
多租户 AI 应用架构——数据隔离和资源隔离怎么做
适读人群:AI 应用开发者、SaaS 产品技术负责人 | 阅读时长:约 14 分钟 | 核心价值:SaaS 类 AI 应用多租户架构的完整实践方案
我在做的一个 AI SaaS 产品,最初做了个很傻的决定:所有租户的数据都放在一个向量数据库里,用 metadata 字段来区分租户。
这个设计在我们只有三个租户的时候运行得很好。等到了几十个租户,问题就来了。
有一次一个租户的 embedding 数据量暴增(他们做了一次大批量导入),导致向量检索的响应时间突然变慢,其他租户的查询也受影响。客户投诉过来,我才意识到:在向量数据库这层,没有做任何资源隔离。
那次事故让我重新想了整个多租户架构。这篇文章就是我踩坑之后的思考。
多租户 AI 应用面临的特殊挑战
普通 SaaS 的多租户问题,业界有很成熟的方案。但 AI 应用有几个特有的挑战:
向量数据库的多租户。 关系型数据库的多租户已经有 Schema 隔离、Row Level Security 等方案,但向量数据库对多租户的支持参差不齐,而且向量检索本身的计算资源隔离更难做。
API Key 的安全管理。 每个租户可能要用不同的 LLM Provider(有的企业有合规要求),也可能有自己的 OpenAI 账号希望和我们的系统集成。
Token 消耗的成本分摊。 一次 LLM 调用消耗了多少 Token,要精准分摊到对应的租户,这是 SaaS 计费的基础。
数据隐私边界。 租户 A 的私有知识库,绝对不能被租户 B 的查询检索到。这个错误的成本不只是技术问题,是法律问题。
多租户隔离的三个层次
在设计方案之前,先要想清楚你需要哪个层次的隔离:
隔离级别 实现方式 适用场景 成本
─────────────────────────────────────────────────────────────
共享模型 数据打标签区分 小团队/初创/低敏感 最低
逻辑隔离 独立 Schema/命名空间 中等敏感度/标准 SaaS 中等
物理隔离 独立实例/独立集群 高度敏感/金融/政府 最高我们产品针对不同价格档位的客户,采用了混合策略:基础版走逻辑隔离,企业版走物理隔离(独立 Namespace 或独立实例)。
向量数据库的多租户设计
以 Milvus 为例(我们最终选了 Milvus,原因是它的 Collection/Partition 概念对多租户支持比较友好):
方案一:Collection 级别隔离
每个租户一个独立的 Collection。
from pymilvus import MilvusClient
class TenantVectorStore:
def __init__(self, milvus_uri: str):
self.client = MilvusClient(uri=milvus_uri)
def get_collection_name(self, tenant_id: str) -> str:
# 每个租户一个独立的 Collection
return f"tenant_{tenant_id}_knowledge"
def ensure_collection(self, tenant_id: str):
collection_name = self.get_collection_name(tenant_id)
if not self.client.has_collection(collection_name):
self.client.create_collection(
collection_name=collection_name,
dimension=1536, # text-embedding-3-small 的维度
metric_type="COSINE",
auto_id=True
)
def insert(self, tenant_id: str, texts: list[str], embeddings: list[list[float]], metadata: list[dict]):
self.ensure_collection(tenant_id)
collection_name = self.get_collection_name(tenant_id)
data = [
{
"vector": emb,
"text": text,
"metadata": str(meta),
"tenant_id": tenant_id # 冗余存储,用于审计
}
for text, emb, meta in zip(texts, embeddings, metadata)
]
self.client.insert(collection_name=collection_name, data=data)
def search(self, tenant_id: str, query_vector: list[float], top_k: int = 5) -> list[dict]:
collection_name = self.get_collection_name(tenant_id)
if not self.client.has_collection(collection_name):
return []
results = self.client.search(
collection_name=collection_name,
data=[query_vector],
output_fields=["text", "metadata"],
limit=top_k
)
return [
{"text": r["entity"]["text"], "score": r["distance"]}
for r in results[0]
]Collection 隔离的好处是完全物理隔离,一个租户的数据量不会影响另一个租户的查询性能。缺点是 Collection 数量过多(几千个租户)时,Milvus 的管理开销会增加。
方案二:Partition 级别隔离(推荐中小规模)
一个 Collection,每个租户一个 Partition。
def get_partition_name(self, tenant_id: str) -> str:
return f"tenant_{tenant_id}"
def ensure_partition(self, collection_name: str, tenant_id: str):
partition_name = self.get_partition_name(tenant_id)
partitions = self.client.list_partitions(collection_name)
if partition_name not in partitions:
self.client.create_partition(
collection_name=collection_name,
partition_name=partition_name
)
def search_in_partition(self, tenant_id: str, query_vector: list[float]) -> list[dict]:
results = self.client.search(
collection_name="shared_knowledge",
data=[query_vector],
partition_names=[self.get_partition_name(tenant_id)], # 只在本租户的分区里搜
limit=5
)
return resultsAPI Key 管理:不能让租户的 Key 和你的 Key 混在一起
这是安全边界问题,必须设计清楚。
我们设计了三种模式:
模式一:平台托管(默认)
租户 → 我们的系统 → 平台的 API Key → LLM Provider
模式二:租户自带 Key(BYOK - Bring Your Own Key)
租户 → 我们的系统 → 租户的 API Key(加密存储)→ LLM Provider
模式三:私有化部署
租户 → 他们内部部署的我们的系统 → 他们自己的 LLM 环境对于 BYOK 模式,API Key 的加密存储是关键:
@Service
@RequiredArgsConstructor
public class TenantApiKeyService {
private final AesGcmEncryptor encryptor; // AES-256-GCM 加密
private final TenantApiKeyRepository repository;
/**
* 保存租户的 API Key(加密存储,绝不明文落库)
*/
public void saveTenantApiKey(String tenantId, String provider, String apiKey) {
// 加密,使用租户ID作为额外的关联数据(AAD)
String encrypted = encryptor.encrypt(apiKey, tenantId.getBytes());
TenantApiKey entity = TenantApiKey.builder()
.tenantId(tenantId)
.provider(provider)
.encryptedKey(encrypted)
.keyHash(DigestUtils.sha256Hex(apiKey)) // 存哈希用于后续验证
.createdAt(Instant.now())
.build();
repository.save(entity);
}
/**
* 获取租户的 API Key(解密,仅在调用时短暂明文存在内存里)
*/
public Optional<String> getTenantApiKey(String tenantId, String provider) {
return repository.findByTenantIdAndProvider(tenantId, provider)
.map(key -> encryptor.decrypt(key.getEncryptedKey(), tenantId.getBytes()));
}
}在调用 LLM 时,根据租户配置动态选择使用哪个 Key:
@Service
@RequiredArgsConstructor
public class TenantAwareLlmClient {
private final TenantApiKeyService keyService;
private final String platformApiKey; // 平台自己的 Key
public LlmResponse call(String tenantId, LlmRequest request) {
// 优先用租户自己的 Key
String apiKey = keyService.getTenantApiKey(tenantId, request.getProvider())
.orElse(platformApiKey);
return doCall(apiKey, request);
}
}成本分摊:Token 计费要精准
SaaS 产品的核心是计费,AI 应用的计费单位是 Token。这个数据必须精准,否则要么自己亏钱,要么账单不合理让客户跑路。
@Component
@RequiredArgsConstructor
public class TenantCostTracker {
private final RedisTemplate<String, Long> redisTemplate;
private final CostRecordRepository repository;
// 实时更新 Redis 里的 Token 计数(用于实时用量查看)
public void recordUsage(String tenantId, String model, TokenUsage usage) {
String dailyKey = buildDailyKey(tenantId, model);
String monthlyKey = buildMonthlyKey(tenantId, model);
redisTemplate.opsForValue().increment(dailyKey + ":prompt", usage.getPromptTokens());
redisTemplate.opsForValue().increment(dailyKey + ":completion", usage.getCompletionTokens());
redisTemplate.opsForValue().increment(monthlyKey + ":prompt", usage.getPromptTokens());
redisTemplate.opsForValue().increment(monthlyKey + ":completion", usage.getCompletionTokens());
// 异步持久化到数据库
persistAsync(tenantId, model, usage);
}
// 计算费用(按模型的实际定价)
public BigDecimal calculateCost(String model, long promptTokens, long completionTokens) {
ModelPricing pricing = getPricing(model);
BigDecimal promptCost = BigDecimal.valueOf(promptTokens)
.divide(BigDecimal.valueOf(1_000_000))
.multiply(pricing.getPromptPricePerMToken());
BigDecimal completionCost = BigDecimal.valueOf(completionTokens)
.divide(BigDecimal.valueOf(1_000_000))
.multiply(pricing.getCompletionPricePerMToken());
return promptCost.add(completionCost);
}
// 检查租户是否超出配额
public boolean isOverQuota(String tenantId) {
TenantPlan plan = planService.getPlan(tenantId);
long monthlyUsage = getMonthlyTokenUsage(tenantId);
return monthlyUsage > plan.getMonthlyTokenLimit();
}
private String buildDailyKey(String tenantId, String model) {
String date = LocalDate.now().format(DateTimeFormatter.ISO_DATE);
return String.format("cost:%s:%s:%s", tenantId, model, date);
}
}数据隔离的防御性编程
有一个很重要的原则:不要只靠逻辑隔离,还要在代码层面加防御。
@Service
public class TenantDataAccessService {
/**
* 获取文档时,强制校验 tenantId
* 不能只靠调用方传正确的 tenantId,要在这层再验证
*/
public Document getDocument(String tenantId, String documentId) {
Document doc = documentRepository.findById(documentId)
.orElseThrow(() -> new DocumentNotFoundException(documentId));
// 关键防御:验证文档归属
if (!doc.getTenantId().equals(tenantId)) {
// 记录安全日志,这可能是越权访问
securityLogger.warn("Tenant {} attempted to access document {} owned by tenant {}",
tenantId, documentId, doc.getTenantId());
throw new AccessDeniedException("Document does not belong to tenant " + tenantId);
}
return doc;
}
}在 JPA 层面,可以用 Hibernate Filter 做行级别的租户过滤:
@Entity
@FilterDef(name = "tenantFilter", parameters = @ParamDef(name = "tenantId", type = String.class))
@Filter(name = "tenantFilter", condition = "tenant_id = :tenantId")
public class KnowledgeDocument {
@Id
private String id;
private String tenantId;
// ...
}
// 在 Repository 或 Service 层激活 Filter
@Component
public class TenantFilterConfig {
@Autowired
private EntityManager entityManager;
public void enableTenantFilter(String tenantId) {
Session session = entityManager.unwrap(Session.class);
session.enableFilter("tenantFilter").setParameter("tenantId", tenantId);
}
}资源隔离:防止一个租户影响所有人
除了数据隔离,还要防止资源占用的相互影响("吵闹的邻居"问题)。
方案:租户级别的速率限制
@Component
@RequiredArgsConstructor
public class TenantRateLimiter {
private final RedisTemplate<String, String> redisTemplate;
public boolean tryAcquire(String tenantId, String resource) {
TenantPlan plan = planService.getPlan(tenantId);
int rateLimit = plan.getApiCallsPerMinute();
String key = String.format("ratelimit:%s:%s:%d",
tenantId, resource,
Instant.now().getEpochSecond() / 60 // 按分钟窗口
);
Long count = redisTemplate.opsForValue().increment(key);
if (count == 1) {
// 第一次设置过期时间
redisTemplate.expire(key, Duration.ofMinutes(2));
}
return count <= rateLimit;
}
}对于向量检索的资源隔离,更进一步的方案是给不同级别的租户分配不同的 Milvus 实例(或者 Namespace),但这个成本高,只给大客户做。
我踩过的坑
回到最开始的问题:所有租户数据放一个向量数据库,用 metadata 过滤。
这个方案最大的问题除了性能隔离,还有一个安全问题:如果过滤条件写错了,或者有 bug,一个租户可能会查到另一个租户的数据。
我们现在的架构:基础版租户走 Partition 隔离,企业版租户走独立 Collection,超大客户(数据量超过 500 万向量)走独立的 Milvus 实例。
这个分层策略成本和安全之间的平衡,比一刀切的方案要好得多。
