AI 应用的多租户架构——数据隔离和资源隔离怎么一起做
AI 应用的多租户架构——数据隔离和资源隔离怎么一起做
去年我接手了一个 SaaS 化的 AI 知识库产品,早期版本的做法是所有租户的数据全扔在同一个 Milvus Collection 里,每条记录带个 tenant_id 字段,查的时候加个过滤条件。产品刚上线三个月,数据量还小,这么搞完全没问题。
直到有一天,一个付费的大客户突然发邮件说:他们的知识库查询结果里出现了明显不属于他们的内容片段。我当时看到这封邮件,冷汗直接出来了。
排查下来是一个边界条件的 bug——向量相似度搜索时,过滤器在某些极端情况下没有正确生效,导致跨租户的数据被召回。这件事让我彻底意识到:多租户 AI 应用的隔离,远比你想象的复杂。数据隔离不只是加个 tenant_id,背后涉及向量存储隔离、Prompt 个性化隔离、资源配额隔离三个维度,缺一不可。
这篇文章我把这套架构完整讲清楚。
多租户的本质问题:三个维度的隔离
先把问题想清楚。多租户 AI 应用面临的隔离挑战,可以分成三层:
第一层:数据隔离 租户 A 的知识库内容,绝对不能被租户 B 检索到。这是最基本的安全要求,也是出问题最严重的那层。
第二层:行为隔离 不同租户可以有不同的系统 Prompt、不同的模型参数配置、不同的 RAG 策略。租户 A 可能要求 AI 用非常正式的语气,租户 B 可能是个游戏公司,要求 AI 活泼一点。
第三层:资源隔离 高付费租户不能被低付费租户的流量拖垮。Token 消耗配额、并发调用上限、向量库写入频率,都需要单独控制。
这三层加在一起,才构成一个生产级别的多租户 AI 系统。
向量数据库的隔离策略:独立 Collection vs 共享 Collection
这是架构决策中最核心的一个问题,我见过很多团队在这里踩坑。
方案一:每租户独立 Collection
每个租户在向量数据库里拥有一个完全独立的 Collection(或者 Namespace)。
优点明显:
- 数据天然隔离,不可能发生跨租户数据泄露
- 可以针对不同租户使用不同的索引配置(比如大客户用 HNSW,小客户用 IVF_FLAT)
- 删除某个租户的数据极其简单,直接 drop collection
缺点也很明显:
- 资源开销大,每个 Collection 都有独立的内存开销
- 管理复杂度高,租户数量多了之后,Collection 数量也会爆炸
- 冷启动慢,不常用的租户 Collection 可能被换出内存,首次查询延迟高
适用场景:租户数量 < 1000,且每个租户的数据量都比较大(> 10万条),付费层级差异显著(有大量高价值客户愿意为独立资源付费)。
方案二:共享 Collection + Metadata Filter
所有租户的数据放在同一个或少数几个 Collection 里,每条向量记录携带 tenant_id 字段,查询时强制加过滤条件。
优点:
- 资源利用率高
- 管理简单,无需为每个新租户创建 Collection
- 适合租户数量多、单租户数据量小的场景
缺点:
- 安全依赖于过滤逻辑的正确性,一旦有 bug 就是跨租户泄露
- 大租户和小租户的查询性能会互相影响
- 删除某个租户数据需要批量操作,效率低
适用场景:租户数量 > 10000,单租户数据量小(< 1万条),对数据隔离的安全要求不是顶级严苛的场景。
方案三:混合模式(我实际用的方案)
这是我在实际项目里落地的方案:按租户规模和付费等级分层。
- 企业级(Enterprise)租户:独立 Collection
- 专业级(Professional)租户:按地区分组,同一地区的专业级租户共享一个 Collection
- 免费/基础级租户:全部共享一个大的 Collection
这样既保证了高价值客户的严格隔离,又控制了整体资源成本。
架构示意:
Enterprise Tenant A → collection_ent_a
Enterprise Tenant B → collection_ent_b
Pro Tenant (Asia) → collection_pro_asia (多租户共享,带 tenant_id 过滤)
Free Tenants → collection_free_global (多租户共享,带 tenant_id 过滤)Mermaid:多租户隔离架构
TenantContext:核心上下文对象设计
在代码层面,我用一个 TenantContext 贯穿整个请求生命周期。这个对象包含了当前请求所有需要的租户信息,避免到处传参。
@Data
@Builder
public class TenantContext {
// 租户基础信息
private String tenantId;
private String tenantName;
private TierLevel tierLevel; // ENTERPRISE / PROFESSIONAL / FREE
// 向量库配置
private String collectionName; // 实际使用的 Collection 名称
private boolean useMetadataFilter; // 是否需要 tenant_id 过滤
// LLM 配置
private String preferredModel; // 该租户配置的首选模型
private String systemPrompt; // 租户自定义的系统提示词
private Map<String, Object> modelParams; // temperature、max_tokens 等
// 配额信息
private long dailyTokenLimit;
private long usedTokensToday;
private int concurrentLimit;
// 审计信息
private String requestId;
private String userId;
private Instant requestTime;
public enum TierLevel {
ENTERPRISE, PROFESSIONAL, FREE
}
// 是否超出配额
public boolean isOverQuota() {
return usedTokensToday >= dailyTokenLimit;
}
// 剩余可用 Token
public long remainingTokens() {
return Math.max(0, dailyTokenLimit - usedTokensToday);
}
}然后用 ThreadLocal 做请求级别的上下文传递:
@Component
public class TenantContextHolder {
private static final ThreadLocal<TenantContext> CONTEXT_HOLDER =
new InheritableThreadLocal<>();
public static void set(TenantContext context) {
CONTEXT_HOLDER.set(context);
}
public static TenantContext get() {
TenantContext ctx = CONTEXT_HOLDER.get();
if (ctx == null) {
throw new IllegalStateException("TenantContext not initialized for current thread");
}
return ctx;
}
public static Optional<TenantContext> getOptional() {
return Optional.ofNullable(CONTEXT_HOLDER.get());
}
public static void clear() {
CONTEXT_HOLDER.remove();
}
}用一个拦截器在请求进来时初始化上下文:
@Component
public class TenantContextInterceptor implements HandlerInterceptor {
@Autowired
private TenantService tenantService;
@Override
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
// 从 JWT Token 中解析租户 ID
String tenantId = extractTenantIdFromToken(request);
if (tenantId == null) {
response.sendError(HttpStatus.UNAUTHORIZED.value(), "Missing tenant context");
return false;
}
// 加载租户完整上下文(可以加缓存)
TenantContext context = tenantService.loadTenantContext(tenantId, request);
TenantContextHolder.set(context);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler, Exception ex) {
TenantContextHolder.clear(); // 必须清理,防止内存泄漏
}
private String extractTenantIdFromToken(HttpServletRequest request) {
String authHeader = request.getHeader("Authorization");
if (authHeader == null || !authHeader.startsWith("Bearer ")) {
return null;
}
// JWT 解析逻辑(省略)
return jwtUtils.extractTenantId(authHeader.substring(7));
}
}向量查询的强制隔离实现
基于 TenantContext,封装一个向量查询服务,确保隔离逻辑不会被绕过:
@Service
public class TenantAwareVectorService {
@Autowired
private MilvusClient milvusClient;
/**
* 安全的向量相似度搜索
* 内部强制添加租户隔离条件,外部调用方无需(也不应该)关心隔离细节
*/
public List<DocumentChunk> search(float[] queryVector, int topK) {
TenantContext ctx = TenantContextHolder.get();
SearchParam.Builder paramBuilder = SearchParam.newBuilder()
.withCollectionName(ctx.getCollectionName())
.withFloatVectors(List.of(queryVector))
.withTopK(topK)
.withVectorFieldName("embedding")
.withMetricType(MetricType.COSINE);
// 关键:如果是共享 Collection,强制添加 tenant_id 过滤
if (ctx.isUseMetadataFilter()) {
String filterExpr = String.format("tenant_id == \"%s\"", ctx.getTenantId());
paramBuilder.withExpr(filterExpr);
}
SearchParam searchParam = paramBuilder.build();
R<SearchResults> response = milvusClient.search(searchParam);
if (response.getStatus() != R.Status.Success.getCode()) {
throw new VectorSearchException("Vector search failed: " + response.getMessage());
}
return parseSearchResults(response.getData(), ctx.getTenantId());
}
/**
* 向量写入,自动打标 tenant_id
*/
public void insert(List<DocumentChunk> chunks) {
TenantContext ctx = TenantContextHolder.get();
List<InsertParam.Field> fields = new ArrayList<>();
// 强制注入 tenant_id
List<String> tenantIds = chunks.stream()
.map(c -> ctx.getTenantId())
.collect(Collectors.toList());
fields.add(new InsertParam.Field("tenant_id", tenantIds));
// 其他字段
List<Long> ids = chunks.stream().map(DocumentChunk::getId).collect(Collectors.toList());
fields.add(new InsertParam.Field("id", ids));
List<List<Float>> vectors = chunks.stream()
.map(DocumentChunk::getEmbedding)
.collect(Collectors.toList());
fields.add(new InsertParam.Field("embedding", vectors));
List<String> contents = chunks.stream()
.map(DocumentChunk::getContent)
.collect(Collectors.toList());
fields.add(new InsertParam.Field("content", contents));
InsertParam insertParam = InsertParam.newBuilder()
.withCollectionName(ctx.getCollectionName())
.withFields(fields)
.build();
R<MutationResult> response = milvusClient.insert(insertParam);
if (response.getStatus() != R.Status.Success.getCode()) {
throw new VectorInsertException("Vector insert failed: " + response.getMessage());
}
}
private List<DocumentChunk> parseSearchResults(SearchResults results, String expectedTenantId) {
List<DocumentChunk> chunks = new ArrayList<>();
// 双重校验:即使过滤器生效,也在代码层面验证 tenant_id
for (SearchResult result : results.getResults()) {
String tenantId = result.getFieldValue("tenant_id").toString();
if (!expectedTenantId.equals(tenantId)) {
// 这种情况不应该发生,一旦发生立即告警
log.error("SECURITY ALERT: Cross-tenant data leak detected! " +
"Expected: {}, Got: {}", expectedTenantId, tenantId);
continue; // 跳过这条结果,并触发安全告警
}
chunks.add(convertToChunk(result));
}
return chunks;
}
}注意 parseSearchResults 里的双重校验——即使数据库层面的过滤器正确生效了,代码层面也再校验一遍。这个双重校验在正常情况下永远不会触发,但一旦触发,说明有严重的安全问题,必须立即告警。
Prompt 个性化隔离
这一层相对容易实现,但也容易忘记做。不同租户可以有完全不同的 AI 人格设定:
@Service
public class TenantAwareLLMService {
@Autowired
private OpenAiChatClient chatClient;
@Autowired
private TenantPromptRepository promptRepository;
public String chat(String userMessage, List<DocumentChunk> context) {
TenantContext ctx = TenantContextHolder.get();
// 1. 加载租户专属 System Prompt
String systemPrompt = buildSystemPrompt(ctx, context);
// 2. 使用租户配置的模型参数
ChatOptions options = ChatOptions.builder()
.withModel(ctx.getPreferredModel())
.withTemperature(getTemperature(ctx))
.withMaxTokens(getMaxTokens(ctx))
.build();
// 3. 构建消息
List<Message> messages = new ArrayList<>();
messages.add(new SystemMessage(systemPrompt));
messages.add(new UserMessage(userMessage));
// 4. 调用(配额检查在这里做)
checkAndDeductQuota(ctx, estimateTokens(messages));
ChatResponse response = chatClient.call(
new Prompt(messages, options)
);
// 5. 记录实际消耗
int actualTokens = response.getMetadata().getUsage().getTotalTokens();
quotaService.recordUsage(ctx.getTenantId(), actualTokens);
return response.getResult().getOutput().getContent();
}
private String buildSystemPrompt(TenantContext ctx, List<DocumentChunk> context) {
// 加载租户自定义的 Prompt 模板
String promptTemplate = promptRepository
.findByTenantId(ctx.getTenantId())
.map(TenantPrompt::getTemplate)
.orElse(DEFAULT_SYSTEM_PROMPT);
// 注入 RAG 检索到的上下文
String contextStr = context.stream()
.map(DocumentChunk::getContent)
.collect(Collectors.joining("\n\n---\n\n"));
return promptTemplate.replace("{{CONTEXT}}", contextStr);
}
private float getTemperature(TenantContext ctx) {
Object temp = ctx.getModelParams().get("temperature");
return temp != null ? Float.parseFloat(temp.toString()) : 0.7f;
}
private int getMaxTokens(TenantContext ctx) {
Object maxTokens = ctx.getModelParams().get("max_tokens");
return maxTokens != null ? Integer.parseInt(maxTokens.toString()) : 2048;
}
private void checkAndDeductQuota(TenantContext ctx, int estimatedTokens) {
if (ctx.isOverQuota()) {
throw new QuotaExceededException(
String.format("Tenant %s has exceeded daily token quota (%d/%d)",
ctx.getTenantId(), ctx.getUsedTokensToday(), ctx.getDailyTokenLimit())
);
}
// 预扣配额(实际消耗后再修正)
quotaService.preDeduct(ctx.getTenantId(), estimatedTokens);
}
}配额管理:Token 桶限流
配额管理用 Redis + Token 桶算法来实现,支持两个维度的限制:每日 Token 总量限制(防止成本超支)和每秒并发请求数限制(防止单租户把共享资源打满)。
@Service
public class QuotaService {
@Autowired
private RedisTemplate<String, Long> redisTemplate;
private static final String DAILY_TOKEN_KEY = "quota:daily:tokens:%s:%s"; // tenantId:date
private static final String RATE_LIMIT_KEY = "quota:rate:%s"; // tenantId
/**
* 检查并预扣 Token 配额
*/
public void preDeduct(String tenantId, int tokens) {
String date = LocalDate.now().toString();
String key = String.format(DAILY_TOKEN_KEY, tenantId, date);
// Lua 脚本保证原子性:检查 + 扣减
String luaScript = """
local current = tonumber(redis.call('GET', KEYS[1]) or 0)
local limit = tonumber(ARGV[1])
local tokens = tonumber(ARGV[2])
if current + tokens > limit then
return -1
end
redis.call('INCRBY', KEYS[1], tokens)
redis.call('EXPIRE', KEYS[1], 86400)
return current + tokens
""";
TenantContext ctx = TenantContextHolder.get();
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
List.of(key),
String.valueOf(ctx.getDailyTokenLimit()),
String.valueOf(tokens)
);
if (result == null || result == -1) {
throw new QuotaExceededException("Daily token quota exceeded for tenant: " + tenantId);
}
}
/**
* 修正实际消耗(预扣和实际消耗有偏差时修正)
*/
public void reconcile(String tenantId, int estimated, int actual) {
int diff = actual - estimated;
if (diff != 0) {
String date = LocalDate.now().toString();
String key = String.format(DAILY_TOKEN_KEY, tenantId, date);
redisTemplate.opsForValue().increment(key, diff);
}
}
/**
* 速率限制:使用滑动窗口
*/
public boolean checkRateLimit(String tenantId, int maxRps) {
String key = String.format(RATE_LIMIT_KEY, tenantId);
long now = System.currentTimeMillis();
long windowStart = now - 1000; // 1秒窗口
// 清理过期记录
redisTemplate.opsForZSet().removeRangeByScore(key, 0, windowStart);
// 统计当前窗口内的请求数
Long count = redisTemplate.opsForZSet().count(key, windowStart, now);
if (count != null && count >= maxRps) {
return false; // 超限
}
// 记录当前请求
redisTemplate.opsForZSet().add(key, String.valueOf(now), now);
redisTemplate.expire(key, 2, TimeUnit.SECONDS);
return true;
}
/**
* 查询某租户今日已用 Token
*/
public long getUsedTokensToday(String tenantId) {
String date = LocalDate.now().toString();
String key = String.format(DAILY_TOKEN_KEY, tenantId, date);
Long value = redisTemplate.opsForValue().get(key);
return value != null ? value : 0L;
}
}审计日志:每次 AI 调用必须可追溯
多租户系统里,审计日志不是可选项,是必须项。一旦出现争议(客户说"AI 回答了不该回答的内容"),没有审计日志你没法解释。
@Aspect
@Component
public class AICallAuditAspect {
@Autowired
private AuditLogRepository auditLogRepository;
@Around("@annotation(com.example.annotation.AICall)")
public Object auditAICall(ProceedingJoinPoint joinPoint) throws Throwable {
TenantContext ctx = TenantContextHolder.getOptional().orElse(null);
String requestId = ctx != null ? ctx.getRequestId() : UUID.randomUUID().toString();
AICallAuditLog log = AICallAuditLog.builder()
.requestId(requestId)
.tenantId(ctx != null ? ctx.getTenantId() : "unknown")
.userId(ctx != null ? ctx.getUserId() : "unknown")
.method(joinPoint.getSignature().getName())
.startTime(Instant.now())
.build();
try {
Object result = joinPoint.proceed();
log.setStatus("SUCCESS");
log.setEndTime(Instant.now());
return result;
} catch (Exception e) {
log.setStatus("FAILED");
log.setErrorMessage(e.getMessage());
log.setEndTime(Instant.now());
throw e;
} finally {
// 异步写入,不影响主流程
auditLogRepository.saveAsync(log);
}
}
}一些实际踩过的坑
坑一:ThreadLocal 在异步调用时的传递问题
用了 @Async 或者 CompletableFuture 之后,ThreadLocal 的 TenantContext 会丢失。解决方案是用 InheritableThreadLocal,或者在异步调用前显式传递 context:
TenantContext ctx = TenantContextHolder.get();
CompletableFuture.runAsync(() -> {
TenantContextHolder.set(ctx); // 显式设置
try {
doAsyncWork();
} finally {
TenantContextHolder.clear();
}
});坑二:向量库的 Collection 缓存
Milvus 有个细节:Collection 加载到内存是有时间的,如果某个小租户的 Collection 长时间不访问被 evict 掉了,下次访问的第一个请求会有 2-3 秒的延迟。对于 Enterprise 客户这是不可接受的。
解决方案是对 Enterprise 租户的 Collection 做定期 keep-alive(每隔 30 分钟做一次轻量级的查询,让 Collection 保持在内存里)。
坑三:删除租户数据时的一致性
删除一个租户时,需要同时清理:向量库里的数据、关系型数据库里的元数据、Redis 里的配额缓存、审计日志(根据合规要求决定是否保留)。这个操作必须用 Saga 模式处理,任何一步失败都要有补偿机制,不能留下脏数据。
小结
多租户 AI 应用的隔离设计,核心是分层思考:
- 存储层:根据租户规模和付费等级选择独立 Collection 或共享 Collection + 强制过滤,高价值客户不应该和免费用户共享存储资源
- 服务层:用 TenantContext 贯穿所有 AI 调用,Prompt 配置、模型参数、配额信息全部走这一套上下文
- 资源层:Token 配额 + 速率限制双重保护,防止单租户拖垮整个系统
- 安全层:过滤逻辑 + 代码双重校验,审计日志全量记录
这套架构在我们的项目里支撑了从 0 到 800+ 租户的增长,没有再出现过跨租户数据泄露的问题。
