RAG数据权限管理:企业知识库的行级安全控制
RAG数据权限管理:企业知识库的行级安全控制
薪资泄露事件:一次真实的安全事故
2025年10月,北京某互联网公司发生了一件让HR总监冒冷汗的事故。
公司刚上线了内部知识库AI助手,接入了HR文件库、员工手册、项目文档等内部资料。上线第三天,一名普通员工小赵在用助手查询"年假政策"时,顺手问了一句:
"我们公司P7级别的薪资范围大概是多少?"
AI助手不假思索地回答:
"根据公司薪酬体系文件,P7级别薪资范围为月薪35,000-55,000元,绩效奖金另算,年终奖约3-6个月月薪。P8级别为..."
小赵愣住了,截图发到了部门群。一个小时内,这张截图传遍了整个公司。
HR总监接到汇报时,差点从椅子上摔下来。薪酬结构是公司机密文件,只有HR部门和高管才有权查看。但公司在搭建RAG系统时,把所有文档一股脑导入了同一个向量数据库,完全没有做任何权限隔离。
任何人问任何问题,AI都会检索所有文档,不管提问者有没有权限查看那份文档。
这次事件没有对外,但内部已经正式立项:在RAG系统加入行级安全控制,是必须做完才能恢复服务的任务。
负责这个任务的是公司的Java架构师老宋。他花了两周时间研究方案,本文是他的实战总结。
先说结论(TL;DR)
| 方案 | 适用场景 | 实现复杂度 | 性能开销 | 安全性 |
|---|---|---|---|---|
| 元数据过滤 | 权限层级简单(3-5个角色) | 低 | 低 | 中等 |
| 多向量存储 | 权限层级固定、数据量大 | 中 | 低 | 高 |
| 混合方案(推荐) | 企业级、权限复杂 | 中高 | 低-中 | 高 |
核心原则: 权限校验必须在向量检索层完成,不能依赖LLM来"不回答没权限的问题"——LLM是无法可靠执行权限控制的。
RAG权限管理的核心挑战
传统Web应用的权限控制很成熟:Spring Security + RBAC,用户请求某个API,先校验角色,没权限返回403。
但RAG系统的问题在于:
用户不是直接请求某个文档,而是用自然语言提问,系统自动检索相关文档。
这带来了两个独特挑战:
挑战1:检索不透明
用户问"公司的福利政策",向量检索可能同时返回:
- 员工手册(全员可见)
- 薪酬福利细则(HR可见)
- 高管薪资方案(仅高管可见)
三份文档的向量相似度都很高,系统无法在检索阶段区分。
挑战2:LLM无法做权限控制
有些团队的"解决方案"是:在提示词里加一句"不要回答涉及薪资的问题"。
这不可靠。LLM会被绕过,例如用户问"我的预期薪资和市场价比怎么样",LLM可能就间接透露了薪资信息。
正确思路是:有权限问题的文档,根本不应该进入LLM的上下文。
方案1:元数据过滤
原理
在向量数据库中为每个文档存储权限元数据(如允许访问的角色列表),检索时把用户角色作为过滤条件传入。
Milvus元数据过滤示例:
# 文档写入时存储权限元数据
entity = {
"id": "doc_001",
"vector": embedding,
"content": "P7级别薪资范围为...",
"allowed_roles": ["HR", "ADMIN"], # 权限元数据
"department": "HR",
"classification": "CONFIDENTIAL"
}检索时:
results = collection.search(
data=[query_vector],
anns_field="vector",
param={"metric_type": "COSINE", "params": {"nprobe": 32}},
limit=10,
expr='array_contains(allowed_roles, "EMPLOYEE")' # 过滤:只返回EMPLOYEE可见的文档
)Java实现
// DocumentPermissionMetadata.java
package com.example.rag.security;
import lombok.Builder;
import lombok.Data;
import java.util.List;
import java.util.Set;
/**
* 文档权限元数据
* 写入向量数据库时一并存储
*/
@Data
@Builder
public class DocumentPermissionMetadata {
/** 文档ID */
private String documentId;
/** 允许访问的角色列表(ROLE_ADMIN, ROLE_HR, ROLE_EMPLOYEE等)*/
private List<String> allowedRoles;
/** 允许访问的部门列表(为空表示全部门可见)*/
private List<String> allowedDepartments;
/** 数据分级(PUBLIC, INTERNAL, CONFIDENTIAL, SECRET)*/
private String classification;
/** 是否仅本人可见 */
private boolean ownerOnly;
/** 文档所有者(ownerOnly=true时有效)*/
private String ownerId;
/**
* 判断指定用户是否有权访问此文档
*/
public boolean isAccessibleBy(UserContext user) {
// 管理员无限制
if (user.getRoles().contains("ROLE_ADMIN")) {
return true;
}
// 仅所有者可见
if (ownerOnly) {
return user.getUserId().equals(ownerId);
}
// 角色检查
if (allowedRoles != null && !allowedRoles.isEmpty()) {
boolean hasRole = user.getRoles().stream()
.anyMatch(allowedRoles::contains);
if (!hasRole) return false;
}
// 部门检查
if (allowedDepartments != null && !allowedDepartments.isEmpty()) {
if (!allowedDepartments.contains(user.getDepartment())) {
return false;
}
}
return true;
}
}// UserContext.java
package com.example.rag.security;
import lombok.Builder;
import lombok.Data;
import java.util.List;
import java.util.Set;
/**
* 当前用户上下文(从Spring Security中提取)
*/
@Data
@Builder
public class UserContext {
private String userId;
private String username;
private String department;
private List<String> roles;
private String dataClassificationLevel; // 用户允许查看的最高分级
/**
* 从Spring Security上下文中获取当前用户信息
*/
public static UserContext fromSecurityContext() {
var auth = org.springframework.security.core.context.SecurityContextHolder
.getContext().getAuthentication();
if (auth == null || !auth.isAuthenticated()) {
throw new org.springframework.security.access.AccessDeniedException("未登录");
}
// 假设Principal是自定义的UserDetails实现
if (auth.getPrincipal() instanceof RagUserDetails userDetails) {
return UserContext.builder()
.userId(userDetails.getUserId())
.username(userDetails.getUsername())
.department(userDetails.getDepartment())
.roles(userDetails.getAuthorities().stream()
.map(a -> a.getAuthority())
.collect(java.util.stream.Collectors.toList()))
.dataClassificationLevel(userDetails.getDataClassificationLevel())
.build();
}
throw new IllegalStateException("未知的Principal类型");
}
}// PermissionAwareVectorSearch.java
package com.example.rag.security;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.document.Document;
import org.springframework.ai.vectorstore.SearchRequest;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.ai.vectorstore.filter.FilterExpressionBuilder;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.stream.Collectors;
/**
* 带权限过滤的向量检索
*
* 在向量检索时注入用户权限过滤条件,
* 确保只检索用户有权访问的文档
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PermissionAwareVectorSearch {
private final VectorStore vectorStore;
private final DocumentPermissionRepository permissionRepository;
/**
* 带权限过滤的相似度检索
*
* @param query 查询文本
* @param user 当前用户上下文
* @param topK 最大返回数量
* @return 有权限访问的相关文档
*/
public List<Document> searchWithPermission(String query, UserContext user, int topK) {
log.debug("用户[{}]执行权限感知检索: query={}", user.getUsername(),
query.substring(0, Math.min(50, query.length())));
// 构建权限过滤表达式
String filterExpression = buildPermissionFilter(user);
log.debug("权限过滤表达式: {}", filterExpression);
SearchRequest searchRequest = SearchRequest.query(query)
.withTopK(topK)
.withSimilarityThreshold(0.5)
.withFilterExpression(filterExpression);
List<Document> results = vectorStore.similaritySearch(searchRequest);
// 二次权限校验(防御性编程,防止元数据过滤被绕过)
List<Document> filteredResults = results.stream()
.filter(doc -> isDocumentAccessible(doc, user))
.collect(Collectors.toList());
if (filteredResults.size() < results.size()) {
int blocked = results.size() - filteredResults.size();
log.warn("二次权限校验拦截了{}条文档(用户: {})", blocked, user.getUsername());
}
return filteredResults;
}
/**
* 构建Milvus元数据过滤表达式
*/
private String buildPermissionFilter(UserContext user) {
StringBuilder filter = new StringBuilder();
// 管理员无过滤
if (user.getRoles().contains("ROLE_ADMIN")) {
return "";
}
// 数据分级过滤
String classificationFilter = buildClassificationFilter(user);
if (!classificationFilter.isEmpty()) {
filter.append(classificationFilter);
}
// 角色过滤(使用Milvus的array_contains)
if (!user.getRoles().isEmpty()) {
if (filter.length() > 0) filter.append(" && ");
// 文档的allowedRoles包含用户的任意一个角色
// 或者allowedRoles为空(表示全员可见)
String roleFilter = user.getRoles().stream()
.map(role -> String.format("array_contains(allowed_roles, \"%s\")", role))
.collect(Collectors.joining(" || "));
filter.append("(json_contains(allowed_roles, []) || (").append(roleFilter).append("))");
}
// 部门过滤
if (user.getDepartment() != null) {
if (filter.length() > 0) filter.append(" && ");
filter.append(String.format(
"(json_contains(allowed_departments, []) || array_contains(allowed_departments, \"%s\"))",
user.getDepartment()
));
}
return filter.toString();
}
private String buildClassificationFilter(UserContext user) {
// 根据用户的最高权限级别确定可访问的文档级别
return switch (user.getDataClassificationLevel()) {
case "SECRET" -> ""; // 可访问全部级别
case "CONFIDENTIAL" -> "classification != \"SECRET\"";
case "INTERNAL" -> "classification in [\"PUBLIC\", \"INTERNAL\"]";
default -> "classification == \"PUBLIC\"";
};
}
private boolean isDocumentAccessible(Document doc, UserContext user) {
// 从文档元数据中提取权限信息,进行二次校验
if (doc.getMetadata() == null) return true;
Object allowedRolesObj = doc.getMetadata().get("allowed_roles");
if (allowedRolesObj == null) return true; // 无权限限制,全员可见
if (allowedRolesObj instanceof List<?> allowedRoles) {
// 管理员始终可访问
if (user.getRoles().contains("ROLE_ADMIN")) return true;
// 检查用户是否具有所需角色
return user.getRoles().stream()
.anyMatch(role -> allowedRoles.contains(role));
}
return true;
}
}方案2:多向量存储
原理
为不同权限级别创建独立的向量存储(不同的Milvus集合或不同的向量数据库实例),检索时只查询用户有权访问的存储。
// MultiStoreVectorSearch.java
package com.example.rag.security;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.document.Document;
import org.springframework.ai.vectorstore.SearchRequest;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* 多向量存储的权限感知检索
*
* 不同数据级别存储在独立的向量集合中,
* 检索时只查询用户有权访问的集合,并发执行提升性能
*/
@Slf4j
@Service
public class MultiStoreVectorSearch {
// 不同级别的向量存储
private final Map<String, VectorStore> storesByClassification;
public MultiStoreVectorSearch(
@Qualifier("publicVectorStore") VectorStore publicStore,
@Qualifier("internalVectorStore") VectorStore internalStore,
@Qualifier("confidentialVectorStore") VectorStore confidentialStore,
@Qualifier("secretVectorStore") VectorStore secretStore) {
storesByClassification = new LinkedHashMap<>();
storesByClassification.put("PUBLIC", publicStore);
storesByClassification.put("INTERNAL", internalStore);
storesByClassification.put("CONFIDENTIAL", confidentialStore);
storesByClassification.put("SECRET", secretStore);
}
/**
* 根据用户权限,并发查询所有可访问的向量存储
*/
public List<Document> searchWithPermission(String query, UserContext user, int topKPerStore) {
List<String> accessibleLevels = getAccessibleLevels(user);
log.debug("用户[{}]可访问级别: {}", user.getUsername(), accessibleLevels);
// 并发查询所有有权访问的向量存储
List<CompletableFuture<List<Document>>> futures = accessibleLevels.stream()
.filter(storesByClassification::containsKey)
.map(level -> CompletableFuture.supplyAsync(() -> {
try {
VectorStore store = storesByClassification.get(level);
List<Document> results = store.similaritySearch(
SearchRequest.query(query)
.withTopK(topKPerStore)
.withSimilarityThreshold(0.5));
// 为每个文档标记数据级别
results.forEach(doc -> {
if (doc.getMetadata() != null) {
doc.getMetadata().put("_classification", level);
}
});
return results;
} catch (Exception e) {
log.warn("查询{}级别向量存储失败: {}", level, e.getMessage());
return Collections.<Document>emptyList();
}
}))
.collect(Collectors.toList());
// 等待所有结果,合并,按相似度排序
List<Document> allResults = futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList());
log.info("多存储检索完成: 查询了{}个级别,共返回{}条文档",
accessibleLevels.size(), allResults.size());
return allResults;
}
private List<String> getAccessibleLevels(UserContext user) {
// 管理员可访问所有级别
if (user.getRoles().contains("ROLE_ADMIN")) {
return List.of("PUBLIC", "INTERNAL", "CONFIDENTIAL", "SECRET");
}
List<String> levels = new ArrayList<>();
levels.add("PUBLIC"); // 所有人都能看PUBLIC
if (user.getRoles().contains("ROLE_EMPLOYEE")) {
levels.add("INTERNAL");
}
if (user.getRoles().contains("ROLE_HR") ||
user.getRoles().contains("ROLE_FINANCE")) {
levels.add("CONFIDENTIAL");
}
if (user.getRoles().contains("ROLE_EXECUTIVE")) {
levels.add("SECRET");
}
return levels;
}
}Spring Security集成
JWT Token + 权限上下文
// RagUserDetails.java
package com.example.rag.security;
import lombok.Builder;
import lombok.Data;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
@Data
@Builder
public class RagUserDetails implements UserDetails {
private String userId;
private String username;
private String password;
private String department;
private String dataClassificationLevel; // 最高可访问级别
private List<String> roles;
private boolean enabled;
private boolean accountNonExpired;
private boolean accountNonLocked;
private boolean credentialsNonExpired;
@Override
public Collection<? extends GrantedAuthority> getAuthorities() {
return roles.stream()
.map(SimpleGrantedAuthority::new)
.collect(Collectors.toList());
}
@Override
public boolean isEnabled() { return enabled; }
@Override
public boolean isAccountNonExpired() { return accountNonExpired; }
@Override
public boolean isAccountNonLocked() { return accountNonLocked; }
@Override
public boolean isCredentialsNonExpired() { return credentialsNonExpired; }
}// JwtAuthenticationFilter.java
package com.example.rag.security;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.filter.OncePerRequestFilter;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class JwtAuthenticationFilter extends OncePerRequestFilter {
private final String jwtSecret;
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
String token = extractToken(request);
if (token != null) {
try {
Claims claims = Jwts.parserBuilder()
.setSigningKey(jwtSecret.getBytes())
.build()
.parseClaimsJws(token)
.getBody();
String userId = claims.getSubject();
String username = claims.get("username", String.class);
String department = claims.get("department", String.class);
String classificationLevel = claims.get("classificationLevel", String.class);
@SuppressWarnings("unchecked")
List<String> roles = claims.get("roles", List.class);
RagUserDetails userDetails = RagUserDetails.builder()
.userId(userId)
.username(username)
.department(department)
.dataClassificationLevel(classificationLevel != null ? classificationLevel : "INTERNAL")
.roles(roles != null ? roles : List.of("ROLE_EMPLOYEE"))
.enabled(true)
.accountNonExpired(true)
.accountNonLocked(true)
.credentialsNonExpired(true)
.build();
var authentication = new UsernamePasswordAuthenticationToken(
userDetails,
null,
userDetails.getAuthorities()
);
SecurityContextHolder.getContext().setAuthentication(authentication);
} catch (Exception e) {
log.warn("JWT解析失败: {}", e.getMessage());
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
return;
}
}
filterChain.doFilter(request, response);
}
private String extractToken(HttpServletRequest request) {
String header = request.getHeader("Authorization");
if (header != null && header.startsWith("Bearer ")) {
return header.substring(7);
}
return null;
}
}// SecurityConfig.java
package com.example.rag.security;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.web.SecurityFilterChain;
import org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter;
@Configuration
@EnableWebSecurity
@EnableMethodSecurity
@RequiredArgsConstructor
public class SecurityConfig {
@Value("${jwt.secret}")
private String jwtSecret;
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.csrf(csrf -> csrf.disable())
.sessionManagement(session ->
session.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
.authorizeHttpRequests(auth -> auth
.requestMatchers("/actuator/health", "/actuator/prometheus").permitAll()
.requestMatchers("/api/rag/**").authenticated()
.requestMatchers("/api/admin/**").hasRole("ADMIN")
.anyRequest().authenticated()
)
.addFilterBefore(
new JwtAuthenticationFilter(jwtSecret),
UsernamePasswordAuthenticationFilter.class
);
return http.build();
}
}行级安全实现:完整的企业知识库
// EnterpriseRagService.java
package com.example.rag.service;
import com.example.rag.audit.AuditLogService;
import com.example.rag.security.PermissionAwareVectorSearch;
import com.example.rag.security.UserContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.document.Document;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.stream.Collectors;
/**
* 企业级RAG服务(集成权限控制 + 审计日志)
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class EnterpriseRagService {
private final PermissionAwareVectorSearch vectorSearch;
private final ChatClient chatClient;
private final AuditLogService auditLogService;
/**
* 权限感知的知识库问答
*/
public RagResponse query(String question) {
// 从Spring Security上下文获取当前用户
UserContext user = UserContext.fromSecurityContext();
long startTime = System.currentTimeMillis();
try {
// 1. 权限感知检索(核心:只检索有权限的文档)
List<Document> documents = vectorSearch.searchWithPermission(question, user, 20);
// 2. 记录检索审计日志
auditLogService.logRetrieval(user, question, documents);
if (documents.isEmpty()) {
return RagResponse.builder()
.question(question)
.answer("未找到相关信息。" + (isLikelyPermissionIssue(question) ?
"如需查看受限内容,请联系相关部门负责人。" : ""))
.hasPermissionIssue(false)
.build();
}
// 3. 构建带权限标注的上下文
String context = buildAnnotatedContext(documents);
// 4. 调用LLM生成答案
String answer = chatClient.prompt()
.system(buildSystemPrompt(user))
.user(String.format("以下是相关文档:\n%s\n\n问题:%s", context, question))
.call()
.content();
long elapsed = System.currentTimeMillis() - startTime;
// 5. 记录回答审计日志
auditLogService.logAnswer(user, question, answer, documents, elapsed);
return RagResponse.builder()
.question(question)
.answer(answer)
.sourceDocuments(documents)
.userId(user.getUserId())
.latencyMs(elapsed)
.build();
} catch (Exception e) {
log.error("查询失败(用户: {}): {}", user.getUsername(), e.getMessage(), e);
auditLogService.logError(user, question, e.getMessage());
throw e;
}
}
/**
* 判断是否可能是权限不足导致的空结果
* (简单启发式规则,实际可以更复杂)
*/
private boolean isLikelyPermissionIssue(String question) {
String lower = question.toLowerCase();
return lower.contains("薪资") || lower.contains("工资") ||
lower.contains("绩效") || lower.contains("晋升") ||
lower.contains("机密") || lower.contains("保密");
}
private String buildSystemPrompt(UserContext user) {
return String.format("""
你是公司内部知识库助手,当前用户是%s(%s部门)。
请严格基于提供的文档内容回答,不要推测文档中未提及的信息。
如果文档中没有答案,明确告知用户。
""", user.getUsername(), user.getDepartment());
}
private String buildAnnotatedContext(List<Document> documents) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < documents.size(); i++) {
Document doc = documents.get(i);
sb.append(String.format("[文档%d]", i + 1));
if (doc.getMetadata() != null) {
Object title = doc.getMetadata().get("title");
Object source = doc.getMetadata().get("source");
if (title != null) sb.append(" 《").append(title).append("》");
if (source != null) sb.append(" 来源: ").append(source);
}
sb.append("\n").append(doc.getContent()).append("\n\n");
}
return sb.toString();
}
}部门隔离:同一问题不同答案
场景说明
不同部门的员工问同一个问题,应该看到不同的内容。例如:
- 财务部员工问"Q3业绩情况":看到详细财务数字
- 销售部员工问"Q3业绩情况":看到销售达成率,不看利润率
- 普通员工问"Q3业绩情况":看到官方对外口径
// DepartmentAwareRetrieval.java
package com.example.rag.service;
import com.example.rag.security.UserContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.document.Document;
import org.springframework.ai.vectorstore.SearchRequest;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Service
@RequiredArgsConstructor
public class DepartmentAwareRetrieval {
private final VectorStore vectorStore;
/**
* 部门感知的检索:同一个查询,不同部门用户看到不同内容
*
* 策略:
* 1. 先检索通用文档(无部门限制的)
* 2. 再检索本部门专属文档
* 3. 合并后去重
*/
public List<Document> searchForDepartment(String query, UserContext user, int topK) {
List<Document> results = new ArrayList<>();
// 查询通用文档(全员可见)
String universalFilter = "allowed_departments == []";
List<Document> universal = vectorStore.similaritySearch(
SearchRequest.query(query)
.withTopK(topK / 2)
.withFilterExpression(universalFilter));
results.addAll(universal);
// 查询本部门文档
if (user.getDepartment() != null) {
String deptFilter = String.format(
"array_contains(allowed_departments, \"%s\")", user.getDepartment());
List<Document> deptSpecific = vectorStore.similaritySearch(
SearchRequest.query(query)
.withTopK(topK / 2)
.withFilterExpression(deptFilter));
// 按部门相关性排到前面
results.addAll(0, deptSpecific);
}
// 去重(同一文档可能在两个查询中都返回)
return results.stream()
.distinct()
.limit(topK)
.collect(java.util.stream.Collectors.toList());
}
}权限变更的数据同步
用户权限变更后处理
// PermissionSyncService.java
package com.example.rag.security;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* 权限变更同步服务
*
* 当用户权限变更时,需要:
* 1. 清除用户的查询缓存(防止用旧权限的缓存结果)
* 2. 更新用户的访问令牌
* 3. 记录权限变更审计日志
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PermissionSyncService {
private final RedisTemplate<String, Object> redisTemplate;
private final AuditLogRepository auditLogRepository;
/**
* 监听用户权限变更事件
*/
@Async
@EventListener
public void handlePermissionChanged(UserPermissionChangedEvent event) {
log.info("用户权限变更: userId={}, 旧角色={}, 新角色={}",
event.getUserId(), event.getOldRoles(), event.getNewRoles());
// 1. 清除该用户的RAG查询缓存
clearUserQueryCache(event.getUserId());
// 2. 使该用户的JWT Token失效(加入黑名单)
invalidateUserTokens(event.getUserId());
// 3. 记录审计日志
auditLogRepository.save(AuditLog.builder()
.eventType("PERMISSION_CHANGED")
.userId(event.getUserId())
.operatorId(event.getOperatorId())
.detail(String.format("角色从%s变更为%s", event.getOldRoles(), event.getNewRoles()))
.timestamp(java.time.LocalDateTime.now())
.build());
}
private void clearUserQueryCache(String userId) {
String cachePattern = "rag:cache:" + userId + ":*";
var keys = redisTemplate.keys(cachePattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
log.info("已清除用户{}的{}条查询缓存", userId, keys.size());
}
}
private void invalidateUserTokens(String userId) {
// 在Redis中标记该用户的所有Token为无效
// JWT验证时需要查询此黑名单
String blacklistKey = "rag:token:blacklist:" + userId;
redisTemplate.opsForValue().set(blacklistKey, "1", 24, TimeUnit.HOURS);
log.info("已将用户{}的Token加入黑名单", userId);
}
}审计日志:谁查询了什么
// AuditLogService.java
package com.example.rag.audit;
import com.example.rag.security.UserContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.document.Document;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
/**
* RAG查询审计日志服务
*
* 记录:
* - 谁(用户ID + 部门)
* - 什么时候(时间戳)
* - 问了什么(查询内容)
* - 查到了什么(文档标题列表)
* - AI回答了什么(回答摘要)
*
* 审计日志满足合规要求(GDPR、等保三级等)
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class AuditLogService {
private final AuditLogRepository auditLogRepository;
/**
* 异步记录检索日志(不阻塞主流程)
*/
@Async("auditExecutor")
public void logRetrieval(UserContext user, String query, List<Document> retrievedDocs) {
try {
String docTitles = retrievedDocs.stream()
.map(doc -> {
Object title = doc.getMetadata() != null ?
doc.getMetadata().get("title") : null;
return title != null ? title.toString() : "未知文档";
})
.collect(Collectors.joining(", "));
AuditLog log = AuditLog.builder()
.eventType("RAG_RETRIEVAL")
.userId(user.getUserId())
.username(user.getUsername())
.department(user.getDepartment())
.queryText(query.substring(0, Math.min(500, query.length()))) // 截断过长查询
.retrievedDocCount(retrievedDocs.size())
.retrievedDocTitles(docTitles)
.timestamp(LocalDateTime.now())
.ipAddress(getCurrentIpAddress())
.build();
auditLogRepository.save(log);
} catch (Exception e) {
// 审计日志失败不应影响主流程
this.log.error("审计日志记录失败: {}", e.getMessage());
}
}
@Async("auditExecutor")
public void logAnswer(UserContext user, String query, String answer,
List<Document> docs, long latencyMs) {
try {
AuditLog auditLog = AuditLog.builder()
.eventType("RAG_ANSWER")
.userId(user.getUserId())
.username(user.getUsername())
.department(user.getDepartment())
.queryText(query.substring(0, Math.min(500, query.length())))
.answerSummary(answer.substring(0, Math.min(200, answer.length())))
.retrievedDocCount(docs.size())
.latencyMs(latencyMs)
.timestamp(LocalDateTime.now())
.build();
auditLogRepository.save(auditLog);
} catch (Exception e) {
log.error("回答审计日志记录失败: {}", e.getMessage());
}
}
@Async("auditExecutor")
public void logError(UserContext user, String query, String errorMessage) {
try {
AuditLog auditLog = AuditLog.builder()
.eventType("RAG_ERROR")
.userId(user.getUserId())
.username(user.getUsername())
.queryText(query.substring(0, Math.min(500, query.length())))
.errorMessage(errorMessage)
.timestamp(LocalDateTime.now())
.build();
auditLogRepository.save(auditLog);
} catch (Exception e) {
log.error("错误审计日志记录失败: {}", e.getMessage());
}
}
private String getCurrentIpAddress() {
// 从RequestContextHolder获取IP
try {
var attrs = (org.springframework.web.context.request.ServletRequestAttributes)
org.springframework.web.context.request.RequestContextHolder.getRequestAttributes();
if (attrs != null) {
return attrs.getRequest().getRemoteAddr();
}
} catch (Exception ignored) {}
return "unknown";
}
}// AuditLog.java
package com.example.rag.audit;
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.CreationTimestamp;
import java.time.LocalDateTime;
@Entity
@Table(name = "rag_audit_log", indexes = {
@Index(name = "idx_user_id", columnList = "user_id"),
@Index(name = "idx_timestamp", columnList = "timestamp"),
@Index(name = "idx_event_type", columnList = "event_type")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AuditLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "event_type", nullable = false, length = 50)
private String eventType;
@Column(name = "user_id", nullable = false, length = 64)
private String userId;
@Column(name = "username", length = 128)
private String username;
@Column(name = "department", length = 64)
private String department;
@Column(name = "query_text", columnDefinition = "TEXT")
private String queryText;
@Column(name = "answer_summary", columnDefinition = "TEXT")
private String answerSummary;
@Column(name = "retrieved_doc_count")
private Integer retrievedDocCount;
@Column(name = "retrieved_doc_titles", columnDefinition = "TEXT")
private String retrievedDocTitles;
@Column(name = "latency_ms")
private Long latencyMs;
@Column(name = "error_message", columnDefinition = "TEXT")
private String errorMessage;
@Column(name = "ip_address", length = 64)
private String ipAddress;
@Column(name = "timestamp")
private LocalDateTime timestamp;
}审计查询接口
// AuditController.java
package com.example.rag.audit;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
/**
* 审计日志查询接口
* 仅管理员可访问
*/
@RestController
@RequestMapping("/api/admin/audit")
@RequiredArgsConstructor
public class AuditController {
private final AuditLogRepository auditLogRepository;
/**
* 查询某用户的所有操作记录(满足GDPR右知权要求)
*/
@GetMapping("/user/{userId}")
@PreAuthorize("hasRole('ADMIN')")
public Page<AuditLog> getUserAuditLogs(
@PathVariable String userId,
@RequestParam(required = false)
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime from,
@RequestParam(required = false)
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime to,
Pageable pageable) {
if (from != null && to != null) {
return auditLogRepository.findByUserIdAndTimestampBetween(userId, from, to, pageable);
}
return auditLogRepository.findByUserId(userId, pageable);
}
/**
* 查询可疑操作(短时间内大量查询机密文档)
*/
@GetMapping("/suspicious")
@PreAuthorize("hasRole('ADMIN')")
public Page<AuditLog> getSuspiciousActivity(Pageable pageable) {
// 查询过去1小时内查询超过100次的用户
LocalDateTime oneHourAgo = LocalDateTime.now().minusHours(1);
return auditLogRepository.findSuspiciousActivity(oneHourAgo, 100, pageable);
}
/**
* 查询特定文档的访问记录
*/
@GetMapping("/document/{documentTitle}")
@PreAuthorize("hasRole('ADMIN') or hasRole('HR')")
public Page<AuditLog> getDocumentAccessLogs(
@PathVariable String documentTitle,
Pageable pageable) {
return auditLogRepository.findByRetrievedDocTitlesContaining(documentTitle, pageable);
}
}合规场景:GDPR与数据分级
GDPR合规实现
// GdprComplianceService.java
package com.example.rag.compliance;
import com.example.rag.audit.AuditLogRepository;
import com.example.rag.security.DocumentPermissionRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* GDPR合规服务
*
* 实现以下GDPR权利:
* - 知情权:用户可查询系统存储了哪些关于自己的数据
* - 删除权(被遗忘权):用户离职/请求删除时,清理其相关数据
* - 数据可携性:导出用户数据
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class GdprComplianceService {
private final AuditLogRepository auditLogRepository;
private final VectorStore vectorStore;
/**
* 删除权:用户离职或请求删除时清理所有数据
*/
@Transactional
public void deleteUserData(String userId, String requestedBy) {
log.info("执行GDPR删除请求: userId={}, requestedBy={}", userId, requestedBy);
// 1. 删除审计日志中的个人信息(脱敏而非硬删除,保留合规记录)
auditLogRepository.anonymizeUserData(userId);
// 2. 删除该用户创建的知识库文档(如果有个人维度的知识库)
// vectorStore.delete(List.of("owner:" + userId)); // 根据实际实现调整
// 3. 清除用户缓存
// redisTemplate.delete("rag:*:" + userId + ":*");
// 4. 记录删除操作(本身的审计不能删除)
log.info("用户{}的数据已按GDPR要求清理,操作人: {}", userId, requestedBy);
}
/**
* 数据分级管理配置
*
* 等保三级要求的数据分级示例:
* - PUBLIC(公开):无限制
* - INTERNAL(内部):员工可见
* - CONFIDENTIAL(机密):指定人员
* - SECRET(绝密):高管+特批
*/
public static class DataClassificationPolicy {
public static final String PUBLIC = "PUBLIC";
public static final String INTERNAL = "INTERNAL";
public static final String CONFIDENTIAL = "CONFIDENTIAL";
public static final String SECRET = "SECRET";
/**
* 根据文档类型推断默认数据分级
*/
public static String inferClassification(String documentType) {
return switch (documentType.toLowerCase()) {
case "员工手册", "政策文件", "公告" -> PUBLIC;
case "技术文档", "项目文档", "会议纪要" -> INTERNAL;
case "财务报表", "薪酬结构", "绩效方案" -> CONFIDENTIAL;
case "董事会决议", "战略规划", "并购方案" -> SECRET;
default -> INTERNAL; // 默认内部级别
};
}
}
}生产注意事项
权限控制的性能优化
/**
* 性能优化要点:
*
* 1. 用户权限缓存
* - 用户权限信息从数据库查询,缓存到Redis(TTL=5分钟)
* - 权限变更时主动失效缓存
* - 避免每次请求都查数据库
*
* 2. 过滤表达式缓存
* - 同一用户的过滤表达式是相同的,缓存到ThreadLocal
* - 单次请求内多次检索复用同一过滤表达式
*
* 3. Milvus索引优化
* - 在allowed_roles字段上建立BITMAP索引
* - 分类字段建立VARCHAR索引
* - 可以加速元数据过滤
*
* 4. 异步审计日志
* - 审计日志写入异步执行,不阻塞主流程
* - 使用独立线程池,避免占用业务线程
*/常见问题解答
Q1:元数据过滤方案能被绕过吗?
如果只依赖Milvus的元数据过滤,确实存在理论上被绕过的可能(如注入攻击)。推荐在向量检索结果出来后,再做一次应用层的二次权限校验(代码中的isDocumentAccessible方法)。双重校验:数据库层过滤 + 应用层校验,安全性更高。
Q2:如果文档权限特别细粒度(到每个人),如何处理?
对于"文档A只有张三能看"这类场景,在元数据中存储owner_id字段,检索时增加条件owner_id == "{userId}"。如果文档可以被多人访问(如"项目组成员"),可以存储允许的user_id列表,用array_contains过滤。注意:allowed_users列表不宜过大(超过100人会影响性能),大规模共享建议改用角色组。
Q3:如何处理文档权限的继承?
例如一个文件夹的权限应该自动传递给其中的所有文件。建议在文档写入时,由业务层计算最终的合并权限(父文件夹权限 ∩ 文件本身权限),存储最终权限到文档元数据。不要在检索时实时计算继承关系,那样会非常复杂且慢。
Q4:Spring Security的@PreAuthorize和RAG权限控制是什么关系?
@PreAuthorize是接口级别的粗粒度控制("哪些用户能调这个API"),RAG权限控制是数据级别的细粒度控制("能调这个API的用户,只能看到他们有权限的数据")。两者互补,缺一不可。
Q5:审计日志应该存在哪里?
首选关系型数据库(MySQL/PostgreSQL)——查询方便,可以做复杂统计。数据量大时可以配合分区表或冷热分离(热数据在RDS,冷数据归档到对象存储)。不建议存在ElasticSearch,因为审计日志需要事务性保证(不能丢),ES不适合。
Q6:如何应对内部人员绕过权限的"内鬼"风险?
技术手段有限,重点依靠审计日志+告警。在审计系统中设置异常检测规则:单用户1小时内查询机密文档超过50次,自动告警;非工作时间(22:00-8:00)有人查询机密文档,记录告警等。同时,每季度审查敏感数据的访问日志,做到事后可追溯。
总结
RAG权限管理是"先有业务,再做安全"的典型踩坑场景。上线前一定要把权限体系设计好,后期补做代价极大(需要重新导入所有文档并加上权限元数据)。
行动清单:
数据安全无小事。一次薪资泄露事件,可能引发全员不满,甚至监管处罚。
