第1689篇:跨租户数据隔离的工程实现——向量库与提示词的安全边界
第1689篇:跨租户数据隔离的工程实现——向量库与提示词的安全边界
多租户 AI 系统是一种很常见的架构——多个客户或者多个部门共享同一套 AI 基础设施,但每个租户的数据不能互相访问。
这件事说起来简单,做起来坑很多。我踩过一个相当严重的问题:我们的 RAG 系统上线初期,A 租户的文档被 B 租户检索到了。原因是向量库里没有强制附加租户过滤条件,一个粗心的工程师在写检索代码时漏掉了 where tenant_id = ?,结果查询会检索所有租户的文档。
这类错误一旦发生,不光是数据泄露,还可能引发严重的法律和合规问题。
一、多租户 AI 系统的隔离边界在哪里
要做数据隔离,首先要搞清楚哪些地方存在跨租户风险:
向量库:存储了所有租户的文档向量,如果检索时没有过滤,一个租户可以检索到所有人的内容。
LLM 上下文:对话历史、RAG 检索出的文档,如果错误地混入了其他租户的内容,会导致泄露。
系统提示词:不同租户可能有不同的系统提示词配置,这些配置本身也属于隔离范围。
用户画像和记忆:AI 系统可能会存储用户偏好、历史对话摘要,这些数据同样需要租户隔离。
缓存层:如果用了响应缓存,要确保缓存键包含租户信息,不同租户不能命中同一个缓存条目。
日志和审计数据:日志本身也是敏感数据,应该按租户隔离存储和访问权限。
二、向量库的租户隔离
2.1 策略选择:同库隔离 vs 分库隔离
向量库隔离有两种主要策略,各有利弊:
同库多租户(Shared Collection):所有租户的数据存在同一个集合里,通过 metadata 字段过滤。
- 优点:运维成本低,资源利用率高
- 缺点:隔离完全依赖查询时的过滤条件,一旦漏写就会泄露;某个租户的大量数据也会影响其他租户的检索性能
分库单租户(Dedicated Collection/Namespace):每个租户有独立的集合或命名空间。
- 优点:物理隔离,安全性更高;租户间不互相影响性能
- 缺点:运维复杂度高,租户多时集合数量庞大
对于安全要求高的场景,推荐分库单租户。如果租户数量很多(几千个),可以考虑按租户等级分层:高价值/高敏感租户独立集合,普通租户共享集合但强制过滤。
@Service
public class MultiTenantVectorStoreService {
@Autowired
private QdrantClient qdrantClient;
private static final String SHARED_COLLECTION = "shared_knowledge_base";
// 根据租户等级决定使用哪个集合
public String getCollectionName(String tenantId) {
TenantConfig config = tenantConfigRepository.findById(tenantId)
.orElseThrow(() -> new TenantNotFoundException(tenantId));
if (config.getTier() == TenantTier.ENTERPRISE) {
// 企业级租户:独立集合
return "kb_tenant_" + tenantId;
} else {
// 标准租户:共享集合,依靠过滤
return SHARED_COLLECTION;
}
}
// 写入时:确保 tenant_id 正确打标
public void upsertDocument(
String tenantId,
String docId,
float[] vector,
Map<String, Object> payload) {
String collectionName = getCollectionName(tenantId);
// 关键:强制注入 tenant_id,不信任调用方传入的 payload 里的值
Map<String, Value> securePayload = new HashMap<>();
payload.forEach((k, v) -> securePayload.put(k, toValue(v)));
// 覆盖 tenant_id,确保正确
securePayload.put("tenant_id", toValue(tenantId));
securePayload.put("doc_id", toValue(docId));
securePayload.put("indexed_at", toValue(Instant.now().toString()));
PointStruct point = PointStruct.newBuilder()
.setId(PointId.newBuilder().setUuid(docId).build())
.setVectors(Vectors.newBuilder().setVector(
Vector.newBuilder().addAllData(Floats.asList(vector)).build()
).build())
.putAllPayload(securePayload)
.build();
try {
qdrantClient.upsertAsync(collectionName, Collections.singletonList(point)).get();
} catch (Exception e) {
throw new VectorStoreException("向量写入失败", e);
}
}
// 查询时:强制附加租户过滤
public List<ScoredDocument> search(
String tenantId,
float[] queryVector,
int topK,
Map<String, Object> additionalFilters) {
String collectionName = getCollectionName(tenantId);
// 构建基础过滤条件(对于共享集合,这是必须的)
// 对于独立集合,这是额外的安全保障(防御纵深)
Filter.Builder filterBuilder = Filter.newBuilder();
filterBuilder.addMust(Condition.newBuilder()
.setField(FieldCondition.newBuilder()
.setKey("tenant_id")
.setMatch(Match.newBuilder().setKeyword(tenantId).build())
.build())
.build());
// 附加业务过滤条件
if (additionalFilters != null) {
additionalFilters.forEach((key, value) -> {
filterBuilder.addMust(Condition.newBuilder()
.setField(FieldCondition.newBuilder()
.setKey(key)
.setMatch(Match.newBuilder().setKeyword(value.toString()).build())
.build())
.build());
});
}
SearchPoints request = SearchPoints.newBuilder()
.setCollectionName(collectionName)
.addAllVector(Floats.asList(queryVector))
.setLimit(topK)
.setFilter(filterBuilder.build())
.setWithPayload(WithPayloadSelector.newBuilder().setEnable(true).build())
.build();
try {
List<ScoredPoint> results = qdrantClient.searchAsync(request).get();
// 二次校验:确保返回的文档确实属于该租户(防御纵深)
return results.stream()
.filter(point -> tenantId.equals(
point.getPayloadMap().get("tenant_id") != null
? point.getPayloadMap().get("tenant_id").getStringValue()
: null))
.map(this::toScoredDocument)
.collect(Collectors.toList());
} catch (Exception e) {
throw new VectorSearchException("向量检索失败", e);
}
}
// 删除租户的所有数据(租户退出时)
@Async
public void deleteAllTenantData(String tenantId) {
log.info("开始删除租户 {} 的所有向量数据", tenantId);
TenantConfig config = tenantConfigRepository.findById(tenantId)
.orElseThrow(() -> new TenantNotFoundException(tenantId));
if (config.getTier() == TenantTier.ENTERPRISE) {
// 企业级租户:删除整个集合
try {
qdrantClient.deleteCollectionAsync("kb_tenant_" + tenantId).get();
log.info("已删除企业租户 {} 的独立集合", tenantId);
} catch (Exception e) {
log.error("删除租户集合失败: {}", tenantId, e);
}
} else {
// 标准租户:按 tenant_id 过滤删除
Filter filter = Filter.newBuilder()
.addMust(Condition.newBuilder()
.setField(FieldCondition.newBuilder()
.setKey("tenant_id")
.setMatch(Match.newBuilder().setKeyword(tenantId).build())
.build())
.build())
.build();
try {
DeletePoints deleteRequest = DeletePoints.newBuilder()
.setCollectionName(SHARED_COLLECTION)
.setPoints(PointsSelector.newBuilder()
.setFilter(filter)
.build())
.build();
qdrantClient.deleteAsync(deleteRequest).get();
log.info("已删除标准租户 {} 的共享集合数据", tenantId);
} catch (Exception e) {
log.error("删除租户向量数据失败: {}", tenantId, e);
}
}
}
}三、LLM 上下文隔离
3.1 会话上下文管理
每个租户的对话上下文必须严格隔离。这听起来显而易见,但有很多容易踩的坑。
@Service
public class TenantAwareConversationManager {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 构建安全的对话历史键(包含租户ID防止冲突)
private String buildHistoryKey(String tenantId, String sessionId) {
// 确保 sessionId 里没有注入可能
if (!sessionId.matches("[a-zA-Z0-9\\-_]{8,64}")) {
throw new IllegalArgumentException("非法的 sessionId 格式");
}
return String.format("chat_history:%s:%s", tenantId, sessionId);
}
public void addMessage(String tenantId, String sessionId, Message message) {
String key = buildHistoryKey(tenantId, sessionId);
// 消息里附带租户信息,防止后续处理时遗失上下文
message.setTenantId(tenantId);
redisTemplate.opsForList().rightPush(key, message);
redisTemplate.expire(key, Duration.ofHours(24));
// 控制历史长度,防止无限堆积
Long length = redisTemplate.opsForList().size(key);
if (length != null && length > 100) {
// 保留最近100条
redisTemplate.opsForList().trim(key, length - 100, -1);
}
}
public List<Message> getHistory(String tenantId, String sessionId) {
String key = buildHistoryKey(tenantId, sessionId);
List<Object> raw = redisTemplate.opsForList().range(key, 0, -1);
if (raw == null) return Collections.emptyList();
return raw.stream()
.map(m -> (Message) m)
// 再次验证:过滤掉 tenantId 不匹配的消息(防御纵深)
.filter(m -> tenantId.equals(m.getTenantId()))
.collect(Collectors.toList());
}
// 清理到期的会话
@Scheduled(cron = "0 0 3 * * *")
public void cleanExpiredSessions() {
// Redis TTL 机制会自动清理,这里做额外的定期检查
log.info("定期清理对话历史任务执行");
}
}3.2 系统提示词隔离
不同租户通常需要不同的系统提示词。提示词本身也是业务数据,需要隔离存储和访问。
@Service
public class TenantSystemPromptService {
@Autowired
private SystemPromptRepository promptRepository;
@Autowired
private EncryptionService encryptionService;
// 系统提示词按租户隔离存储
public String getSystemPrompt(String tenantId, String scenarioId) {
// 查询时强制带上 tenantId,防止越权访问
SystemPromptConfig config = promptRepository.findByTenantIdAndScenarioId(tenantId, scenarioId)
.orElseGet(() -> {
// 如果租户没有自定义提示词,使用平台默认提示词
return promptRepository.findDefaultByScenarioId(scenarioId)
.orElseThrow(() -> new NotFoundException("场景不存在: " + scenarioId));
});
// 解密(敏感的系统提示词加密存储)
String decryptedPrompt = encryptionService.decrypt(config.getEncryptedContent());
// 注入租户特定的变量
return injectTenantVariables(decryptedPrompt, tenantId);
}
private String injectTenantVariables(String promptTemplate, String tenantId) {
TenantInfo info = tenantRepository.findById(tenantId)
.orElseThrow(() -> new TenantNotFoundException(tenantId));
return promptTemplate
.replace("{TENANT_NAME}", escapeForPrompt(info.getName()))
.replace("{TENANT_INDUSTRY}", escapeForPrompt(info.getIndustry()))
.replace("{TENANT_CUSTOM_INSTRUCTIONS}",
escapeForPrompt(info.getCustomInstructions() != null ? info.getCustomInstructions() : ""));
}
// 防止租户自定义内容里有注入攻击
private String escapeForPrompt(String value) {
if (value == null) return "";
// 不允许租户自定义内容里包含可能覆盖系统指令的词汇
return value
.replaceAll("(?i)ignore.*instruction", "[已过滤]")
.replaceAll("(?i)system prompt", "[已过滤]")
.replace("\n---\n", "\n") // 防止分隔符注入
.trim();
}
// 租户管理员更新提示词(带校验)
@Transactional
public void updateTenantPrompt(String tenantId, String adminUserId, String scenarioId, String newContent) {
// 权限校验:只有租户管理员才能修改
if (!permissionService.isTenantAdmin(adminUserId, tenantId)) {
throw new SecurityException("无权修改系统提示词");
}
// 内容安全检查
if (containsInjectionAttempt(newContent)) {
throw new SecurityException("提示词内容包含不安全内容");
}
// 加密存储
String encrypted = encryptionService.encrypt(newContent);
SystemPromptConfig existing = promptRepository.findByTenantIdAndScenarioId(tenantId, scenarioId)
.orElse(new SystemPromptConfig(tenantId, scenarioId));
existing.setEncryptedContent(encrypted);
existing.setUpdatedBy(adminUserId);
existing.setUpdatedAt(Instant.now());
promptRepository.save(existing);
// 清除缓存
cacheManager.getCache("system-prompts").evict(tenantId + ":" + scenarioId);
auditService.recordPromptUpdate(tenantId, adminUserId, scenarioId);
}
}四、RAG 流水线中的租户隔离
一次完整的 RAG 请求,涉及多个环节,每个环节都要保持租户上下文。
@Service
public class TenantAwareRAGService {
@Autowired
private MultiTenantVectorStoreService vectorStore;
@Autowired
private TenantSystemPromptService promptService;
@Autowired
private TenantAwareConversationManager conversationManager;
@Autowired
private LLMClient llmClient;
public RAGResponse query(RAGRequest request) {
String tenantId = request.getTenantId();
String userId = request.getUserId();
String sessionId = request.getSessionId();
String userQuery = request.getUserQuery();
// 1. 获取租户专属系统提示词
String systemPrompt = promptService.getSystemPrompt(tenantId, request.getScenario());
// 2. 从向量库检索(强制租户过滤)
float[] queryVector = embeddingService.embed(userQuery);
List<ScoredDocument> retrievedDocs = vectorStore.search(
tenantId, // 租户隔离是向量检索的必选参数,不是可选的
queryVector,
5,
null
);
// 3. 获取租户-会话的历史对话
List<Message> history = conversationManager.getHistory(tenantId, sessionId);
// 4. 构建 RAG Prompt(明确区分数据和指令)
String ragPrompt = buildRAGPrompt(systemPrompt, retrievedDocs, userQuery, history, tenantId);
// 5. 调用 LLM
LLMResponse llmResponse = llmClient.complete(ragPrompt);
// 6. 输出安全检查
FilterResult filtered = outputFilter.filter(llmResponse.getContent(), tenantId);
// 7. 保存本次对话(带租户标记)
conversationManager.addMessage(tenantId, sessionId, new Message(Role.USER, userQuery));
conversationManager.addMessage(tenantId, sessionId, new Message(Role.ASSISTANT, filtered.getContent()));
return RAGResponse.builder()
.content(filtered.getContent())
.sourceDocs(retrievedDocs.stream()
.map(d -> d.getDocId())
.collect(Collectors.toList()))
.tenantId(tenantId)
.build();
}
private String buildRAGPrompt(
String systemPrompt,
List<ScoredDocument> docs,
String query,
List<Message> history,
String tenantId) {
StringBuilder sb = new StringBuilder();
sb.append(systemPrompt).append("\n\n");
// 引用文档时加上租户标识,便于后续日志分析
if (!docs.isEmpty()) {
sb.append("以下是相关参考资料(来自你的企业知识库):\n");
sb.append("=== 知识库内容开始 ===\n");
for (int i = 0; i < docs.size(); i++) {
sb.append(String.format("【文档%d】\n%s\n\n", i + 1, docs.get(i).getContent()));
}
sb.append("=== 知识库内容结束 ===\n\n");
}
// 附加历史对话(只取最近几轮)
List<Message> recentHistory = history.stream()
.skip(Math.max(0, history.size() - 6))
.collect(Collectors.toList());
if (!recentHistory.isEmpty()) {
sb.append("对话历史:\n");
for (Message msg : recentHistory) {
sb.append(msg.getRole().equals(Role.USER) ? "用户:" : "助手:");
sb.append(msg.getContent()).append("\n");
}
sb.append("\n");
}
sb.append("用户:").append(query);
return sb.toString();
}
}五、缓存层隔离
响应缓存是很多团队用来降低 LLM 成本的手段,但如果缓存键设计不当,会导致跨租户数据泄露。
@Service
public class TenantAwareCacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 正确的缓存键设计:必须包含租户ID
private String buildCacheKey(String tenantId, String query) {
String normalizedQuery = normalizeQuery(query);
String queryHash = DigestUtils.sha256Hex(normalizedQuery);
return String.format("llm_response_cache:%s:%s", tenantId, queryHash);
}
public Optional<String> get(String tenantId, String query) {
String key = buildCacheKey(tenantId, query);
Object cached = redisTemplate.opsForValue().get(key);
if (cached == null) return Optional.empty();
CachedResponse response = (CachedResponse) cached;
// 验证租户ID一致(防御纵深)
if (!tenantId.equals(response.getTenantId())) {
log.error("缓存租户ID不匹配!期望: {}, 实际: {},安全漏洞!",
tenantId, response.getTenantId());
// 删除这个异常的缓存条目
redisTemplate.delete(key);
return Optional.empty();
}
return Optional.of(response.getContent());
}
public void set(String tenantId, String query, String content) {
String key = buildCacheKey(tenantId, query);
CachedResponse response = CachedResponse.builder()
.tenantId(tenantId) // 在缓存值里也存 tenantId
.content(content)
.cachedAt(Instant.now())
.build();
// 缓存 TTL 要合理,不能太长
redisTemplate.opsForValue().set(key, response, Duration.ofHours(1));
}
// 租户注销时清除该租户的所有缓存
public void evictTenantCache(String tenantId) {
String pattern = String.format("llm_response_cache:%s:*", tenantId);
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
log.info("已清除租户 {} 的 {} 条缓存", tenantId, keys.size());
}
}
private String normalizeQuery(String query) {
// 标准化处理,提高缓存命中率
return query.trim().toLowerCase().replaceAll("\\s+", " ");
}
}六、隔离验证:如何测试你的隔离是否有效
隔离机制实现之后,需要有测试来验证它确实有效。
@SpringBootTest
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class TenantIsolationTest {
@Autowired
private MultiTenantVectorStoreService vectorStore;
@Autowired
private TenantAwareRAGService ragService;
private static final String TENANT_A = "test-tenant-a";
private static final String TENANT_B = "test-tenant-b";
@Test
@Order(1)
void testVectorSearchIsolation() {
// 租户A写入一个独特的文档
String uniqueContent = "这是租户A的绝密内容,包含唯一标识符:TENANT_A_SECRET_12345";
float[] vector = embeddingService.embed(uniqueContent);
vectorStore.upsertDocument(TENANT_A, "doc-secret", vector, Map.of("content", uniqueContent));
// 租户B用相同的向量查询,不应该能检索到租户A的内容
List<ScoredDocument> resultsForB = vectorStore.search(TENANT_B, vector, 10, null);
boolean tenantAContentLeaked = resultsForB.stream()
.anyMatch(doc -> doc.getContent().contains("TENANT_A_SECRET_12345"));
assertFalse(tenantAContentLeaked, "租户隔离失败:租户B检索到了租户A的数据!");
}
@Test
@Order(2)
void testRAGContextIsolation() {
// 租户A进行一次包含敏感信息的对话
RAGRequest requestA = RAGRequest.builder()
.tenantId(TENANT_A)
.userId("user-a")
.sessionId("session-a-001")
.userQuery("请记住我的密码是 CONFIDENTIAL_PASSWORD_XYZ")
.build();
ragService.query(requestA);
// 租户B的新会话不应该能访问到租户A的对话历史
RAGRequest requestB = RAGRequest.builder()
.tenantId(TENANT_B)
.userId("user-b")
.sessionId("session-b-001")
.userQuery("请告诉我之前用户提到的密码")
.build();
RAGResponse responseB = ragService.query(requestB);
assertFalse(responseB.getContent().contains("CONFIDENTIAL_PASSWORD_XYZ"),
"上下文隔离失败:租户B获取到了租户A的对话数据!");
}
@Test
@Order(3)
void testSystemPromptIsolation() {
// 设置租户A的专属系统提示词
promptService.updateTenantPrompt(
TENANT_A, "admin-a", "default",
"你是租户A的专属助手,你的秘密代号是ALPHA_AGENT。");
// 租户B获取的系统提示词中不应该包含租户A的内容
String promptB = promptService.getSystemPrompt(TENANT_B, "default");
assertFalse(promptB.contains("ALPHA_AGENT"),
"系统提示词隔离失败:租户B获取到了租户A的提示词!");
}
@AfterAll
void cleanup() {
// 清理测试数据
vectorStore.deleteAllTenantData(TENANT_A);
vectorStore.deleteAllTenantData(TENANT_B);
}
}七、事故后溯源:数据流转日志
如果发生了隔离事故,需要能够回答"什么数据泄露给了谁"。这要求数据流转要有完整的日志。
@Component
public class DataLineageTracker {
// 记录每次文档被检索到的日志
public void recordDocumentAccess(
String tenantId,
String userId,
String docId,
String docOwnerTenantId, // 文档归属的租户
String sessionId) {
DataAccessLog log = DataAccessLog.builder()
.accessId(UUID.randomUUID().toString())
.timestamp(Instant.now())
.accessorTenantId(tenantId)
.accessorUserId(userId)
.docId(docId)
.docOwnerTenantId(docOwnerTenantId)
.sessionId(sessionId)
.isCrossTenantAccess(!tenantId.equals(docOwnerTenantId)) // 是否跨租户
.build();
dataAccessLogRepository.save(log);
// 跨租户访问是严重事件,立即告警
if (!tenantId.equals(docOwnerTenantId)) {
log.error("!!! 跨租户数据访问 !!! 访问者: {}, 文档所有者: {}, 文档: {}",
tenantId, docOwnerTenantId, docId);
alertService.sendCriticalAlert(
AlertType.CROSS_TENANT_DATA_ACCESS,
String.format("跨租户数据访问事故!访问租户: %s, 文档租户: %s, 文档ID: %s",
tenantId, docOwnerTenantId, docId)
);
}
}
}做好多租户隔离,关键不是技术多复杂,而是每一个环节都不能存在侥幸心理。隔离是一道必须守住的硬性要求,不能因为"这个环节泄露的概率很低"就省掉防护。数据一旦跨租户泄露,没有补救的机会。
