第2123篇:企业内部LLM权限与审计系统——谁用了AI,用来做什么
2026/4/30大约 9 分钟
第2123篇:企业内部LLM权限与审计系统——谁用了AI,用来做什么
适读人群:负责企业AI平台的工程师和安全团队 | 阅读时长:约19分钟 | 核心价值:建立企业AI使用的权限管控和完整审计体系,满足合规要求,防止数据泄露
"我们公司员工用AI把竞争对手分析报告发给AI了,泄露了内部信息。"
这不是假设场景。我认识的一个公司就发生过类似的事。员工用ChatGPT分析内部财务数据,把数字粘贴进去问"这份财报说明了什么问题"。信息出去了,没有任何记录,事后根本没法追查。
企业部署内部AI平台,权限控制和审计往往被当成"后期功能"。实际上这应该是Day 1就要做的基础设施。一旦有监管要求或安全事件,没有审计日志是最被动的处境。
这篇文章讲如何给企业内部AI平台建立权限和审计体系。
企业AI审计的需求分析
/**
* 企业AI审计的三类需求
*
* ===== 安全合规需求 =====
*
* 数据安全:
* - 哪些员工把什么数据发给了AI?
* - 是否有敏感信息(PII、商业机密)被发送?
* - AI的回答是否包含不当内容?
*
* 访问控制:
* - 不同部门/岗位应该能访问哪些AI功能?
* - 财务数据只有财务部能问
* - 代码库只有研发能访问
*
* 合规记录:
* - 金融、医疗行业有明确的AI使用记录要求
* - 出了问题要能提供完整证据链
*
* ===== 成本管控需求 =====
*
* - 哪个部门、哪个用户花了多少token?
* - 有没有人在大量调用(超出业务需要)?
* - 各部门的AI使用预算分配
*
* ===== 质量改进需求 =====
*
* - 员工在问什么问题?(了解真实需求)
* - 哪些问题回答质量不好?(持续优化)
* - 使用率分布(谁在用,谁没用)
*
* 设计原则:
* - 不是监控员工,是保护公司
* - 员工应该知道有审计(透明)
* - 最小权限原则:默认最小访问权限
*/权限模型设计
/**
* 企业AI权限模型
*
* 三层权限结构:
* 功能权限(能用哪些AI功能)
* 数据权限(能访问哪些知识库/数据源)
* 配额限制(能用多少)
*/
@Entity
@Table(name = "ai_permission_policies")
@Data
@Builder
public class AiPermissionPolicy {
@Id
private String policyId;
private String policyName;
private String description;
// 适用范围(Department/Role/User)
private String scopeType;
private String scopeValue; // 部门ID、角色名、用户ID
// 功能权限
@Column(name = "allowed_features", columnDefinition = "TEXT")
private String allowedFeaturesJson; // ["chat", "code_review", "document_qa"]
@Column(name = "denied_features", columnDefinition = "TEXT")
private String deniedFeaturesJson; // 明确禁止的功能
// 数据权限
@Column(name = "accessible_knowledge_bases", columnDefinition = "TEXT")
private String accessibleKnowledgeBasesJson; // 可访问的知识库列表
// 模型权限
@Column(name = "allowed_models", columnDefinition = "TEXT")
private String allowedModelsJson; // 允许使用的模型(节省成本)
// 配额限制
private Integer dailyTokenLimit; // 每日token上限
private Integer monthlyTokenLimit; // 每月token上限
private Integer maxConcurrentSessions; // 最大并发会话数
// 数据安全配置
private Boolean sensitiveDataMaskingEnabled; // 是否启用敏感数据脱敏
private Boolean outputFilterEnabled; // 是否过滤输出中的敏感信息
// 审计级别
@Enumerated(EnumType.STRING)
private AuditLevel auditLevel;
private LocalDateTime createdAt;
private String createdBy;
private LocalDateTime updatedAt;
public enum AuditLevel {
MINIMAL, // 只记录基本元数据
STANDARD, // 记录请求摘要(不记录完整内容)
FULL // 记录完整请求和响应
}
public List<String> getAllowedFeatures() {
return parseJsonList(allowedFeaturesJson);
}
public List<String> getAllowedModels() {
return parseJsonList(allowedModelsJson);
}
public List<String> getAccessibleKnowledgeBases() {
return parseJsonList(accessibleKnowledgeBasesJson);
}
private List<String> parseJsonList(String json) {
if (json == null || json.isBlank()) return List.of();
try {
return new ObjectMapper().readValue(json, new TypeReference<List<String>>() {});
} catch (Exception e) {
return List.of();
}
}
}权限检查服务
/**
* AI请求权限检查
*
* 在每次AI请求前,检查用户是否有权限
* 这是整个审计系统的入口
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class AiPermissionService {
private final AiPermissionPolicyRepository policyRepo;
private final UserTokenUsageRepository tokenUsageRepo;
private final RedisTemplate<String, Object> redisTemplate;
/**
* 检查请求权限
*
* @return 权限检查结果(允许/拒绝及原因)
*/
public PermissionCheckResult checkPermission(AiRequest request) {
String userId = request.getUserId();
String feature = request.getFeature();
// 1. 加载用户的有效权限策略(可能有多个,取最高权限)
AiPermissionPolicy policy = resolveEffectivePolicy(userId, request.getDepartmentId());
if (policy == null) {
return PermissionCheckResult.denied("未配置AI使用权限,请联系管理员");
}
// 2. 功能权限检查
if (!policy.getAllowedFeatures().contains(feature) &&
!policy.getAllowedFeatures().contains("*")) {
return PermissionCheckResult.denied(
String.format("您的账号没有使用 [%s] 功能的权限", feature));
}
if (policy.getDeniedFeaturesJson() != null) {
List<String> denied = policy.getDeniedFeatures();
if (denied.contains(feature)) {
return PermissionCheckResult.denied(
String.format("功能 [%s] 已被您的权限策略禁用", feature));
}
}
// 3. 数据访问权限检查(RAG场景)
if (request.getKnowledgeBaseId() != null) {
if (!policy.getAccessibleKnowledgeBases().contains(request.getKnowledgeBaseId()) &&
!policy.getAccessibleKnowledgeBases().contains("*")) {
return PermissionCheckResult.denied(
String.format("您没有访问知识库 [%s] 的权限", request.getKnowledgeBaseId()));
}
}
// 4. 配额检查(Token用量)
if (policy.getDailyTokenLimit() != null) {
long todayUsage = getTodayTokenUsage(userId);
if (todayUsage >= policy.getDailyTokenLimit()) {
return PermissionCheckResult.denied(
String.format("今日Token用量已达上限(%d),请明日再试",
policy.getDailyTokenLimit()));
}
}
// 5. 并发会话检查
if (policy.getMaxConcurrentSessions() != null) {
int activeSessions = getActiveSessions(userId);
if (activeSessions >= policy.getMaxConcurrentSessions()) {
return PermissionCheckResult.denied("并发会话数已达上限,请关闭其他会话");
}
}
return PermissionCheckResult.allowed(policy);
}
/**
* 解析用户的有效权限策略
*
* 优先级:用户级 > 角色级 > 部门级 > 全局默认
*/
private AiPermissionPolicy resolveEffectivePolicy(String userId, String departmentId) {
// 用户专属策略(最高优先级)
AiPermissionPolicy userPolicy = policyRepo.findByScope("USER", userId);
if (userPolicy != null) return userPolicy;
// 部门策略
if (departmentId != null) {
AiPermissionPolicy deptPolicy = policyRepo.findByScope("DEPARTMENT", departmentId);
if (deptPolicy != null) return deptPolicy;
}
// 全局默认策略
return policyRepo.findByScope("GLOBAL", "default");
}
private long getTodayTokenUsage(String userId) {
String key = "token_usage:" + userId + ":" + LocalDate.now();
Object usage = redisTemplate.opsForValue().get(key);
return usage != null ? Long.parseLong(usage.toString()) : 0;
}
private int getActiveSessions(String userId) {
String pattern = "session:" + userId + ":*";
Set<String> keys = redisTemplate.keys(pattern);
return keys != null ? keys.size() : 0;
}
@Data
@Builder
public static class PermissionCheckResult {
private boolean allowed;
private String denyReason;
private AiPermissionPolicy policy;
public static PermissionCheckResult allowed(AiPermissionPolicy policy) {
return PermissionCheckResult.builder().allowed(true).policy(policy).build();
}
public static PermissionCheckResult denied(String reason) {
return PermissionCheckResult.builder().allowed(false).denyReason(reason).build();
}
}
}审计日志记录
/**
* AI操作审计日志
*
* 完整记录每次AI交互:
* - 谁、什么时间、用什么功能
* - 发送了什么(摘要或完整内容)
* - 收到了什么(摘要或完整内容)
* - 消耗了多少token
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class AiAuditService {
private final AiAuditLogRepository auditRepo;
private final SensitiveDataDetector sensitiveDetector;
/**
* 记录AI请求审计日志
*
* 根据策略决定记录详细程度
*/
@Async
public void recordRequest(
AiRequest request,
AiResponse response,
AiPermissionPolicy policy,
long latencyMs) {
try {
AiAuditLog log = buildAuditLog(request, response, policy, latencyMs);
auditRepo.save(log);
} catch (Exception e) {
log.error("审计日志记录失败: userId={}", request.getUserId(), e);
// 审计失败不影响主流程,但要记录
}
}
private AiAuditLog buildAuditLog(
AiRequest request,
AiResponse response,
AiPermissionPolicy policy,
long latencyMs) {
AiPermissionPolicy.AuditLevel auditLevel = policy.getAuditLevel();
AiAuditLog.AiAuditLogBuilder builder = AiAuditLog.builder()
.logId(UUID.randomUUID().toString())
.userId(request.getUserId())
.departmentId(request.getDepartmentId())
.sessionId(request.getSessionId())
.feature(request.getFeature())
.modelId(request.getModelId())
.knowledgeBaseId(request.getKnowledgeBaseId())
.latencyMs(latencyMs)
.inputTokens(response.getInputTokens())
.outputTokens(response.getOutputTokens())
.isError(response.isError())
.errorMessage(response.getErrorMessage())
.timestamp(LocalDateTime.now())
.clientIp(request.getClientIp());
// 根据审计级别决定记录内容
switch (auditLevel) {
case MINIMAL:
// 只记录元数据,不记录内容
break;
case STANDARD:
// 记录摘要(截断到100字)
builder.inputSummary(truncate(request.getUserMessage(), 100));
builder.outputSummary(truncate(response.getContent(), 100));
// 检测是否包含敏感信息
builder.hasSensitiveInput(sensitiveDetector.hasSensitiveData(request.getUserMessage()));
break;
case FULL:
// 记录完整内容
// 注意:FULL级别可能涉及大量存储,谨慎使用
builder.inputContent(request.getUserMessage());
builder.outputContent(response.getContent());
// 敏感信息检测
SensitiveDataDetector.DetectionResult detection =
sensitiveDetector.detect(request.getUserMessage());
builder.hasSensitiveInput(detection.hasIssues());
builder.sensitiveDataTypes(String.join(",", detection.getFoundTypes()));
break;
}
return builder.build();
}
/**
* 查询审计日志(给管理员用)
*/
public Page<AiAuditLog> queryLogs(AuditQueryParams params, Pageable pageable) {
return auditRepo.findByParams(
params.getUserId(),
params.getDepartmentId(),
params.getFeature(),
params.getStartTime(),
params.getEndTime(),
params.isOnlySensitive(),
pageable
);
}
/**
* 生成使用量报告
*/
public UsageReport generateUsageReport(
LocalDate startDate, LocalDate endDate, String groupBy) {
List<UsageStats> stats = auditRepo.aggregateUsage(
startDate.atStartOfDay(),
endDate.plusDays(1).atStartOfDay(),
groupBy // "department" / "user" / "feature"
);
long totalInputTokens = stats.stream()
.mapToLong(UsageStats::inputTokens).sum();
long totalOutputTokens = stats.stream()
.mapToLong(UsageStats::outputTokens).sum();
// 粗略估算成本(按GPT-4o价格)
double estimatedCost =
totalInputTokens * 0.000005 + totalOutputTokens * 0.000015;
return new UsageReport(
startDate, endDate, groupBy,
totalInputTokens, totalOutputTokens, estimatedCost,
stats
);
}
private String truncate(String text, int maxLength) {
if (text == null) return null;
return text.length() > maxLength ? text.substring(0, maxLength) + "..." : text;
}
record UsageStats(String groupKey, long requestCount,
long inputTokens, long outputTokens) {}
record UsageReport(LocalDate startDate, LocalDate endDate, String groupBy,
long totalInputTokens, long totalOutputTokens, double estimatedCost,
List<UsageStats> breakdown) {}
@Data
@Builder
public static class AuditQueryParams {
private String userId;
private String departmentId;
private String feature;
private LocalDateTime startTime;
private LocalDateTime endTime;
private boolean onlySensitive;
}
}敏感数据检测
/**
* 敏感数据检测器
*
* 在记录审计日志和可选的实时拦截时使用
* 检测用户输入中是否包含不应该发给AI的信息
*/
@Service
@Slf4j
public class SensitiveDataDetector {
// 各类敏感数据的正则表达式
private static final Map<String, Pattern> SENSITIVE_PATTERNS = Map.of(
"PHONE_NUMBER", Pattern.compile("1[3-9]\\d{9}"),
"ID_CARD", Pattern.compile("\\d{17}[0-9Xx]"),
"BANK_CARD", Pattern.compile("\\d{16,19}"),
"EMAIL", Pattern.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}"),
"INTERNAL_IP", Pattern.compile("192\\.168\\.\\d{1,3}\\.\\d{1,3}|10\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}"),
"API_KEY", Pattern.compile("sk-[A-Za-z0-9]{32,}|Bearer [A-Za-z0-9\\-._~+/]+={0,2}")
);
// 高风险关键词(可能涉及商业机密)
private static final List<String> HIGH_RISK_KEYWORDS = List.of(
"季度营收", "年度目标", "竞争对手", "未发布", "保密", "内部文件",
"收购", "并购", "IPO", "股权", "薪酬体系"
);
public boolean hasSensitiveData(String text) {
return detect(text).hasIssues();
}
/**
* 检测文本中的敏感信息
*/
public DetectionResult detect(String text) {
if (text == null || text.isBlank()) {
return DetectionResult.clean();
}
List<String> foundTypes = new ArrayList<>();
List<String> findings = new ArrayList<>();
// 检测正则模式
for (Map.Entry<String, Pattern> entry : SENSITIVE_PATTERNS.entrySet()) {
Matcher matcher = entry.getValue().matcher(text);
if (matcher.find()) {
foundTypes.add(entry.getKey());
findings.add(entry.getKey() + ": " + maskSensitiveValue(matcher.group()));
}
}
// 检测高风险关键词
List<String> matchedKeywords = HIGH_RISK_KEYWORDS.stream()
.filter(text::contains)
.toList();
if (!matchedKeywords.isEmpty()) {
foundTypes.add("SENSITIVE_KEYWORD");
findings.add("敏感关键词: " + String.join(", ", matchedKeywords));
}
return new DetectionResult(!foundTypes.isEmpty(), foundTypes, findings);
}
/**
* 对敏感值进行脱敏(用于日志)
*/
private String maskSensitiveValue(String value) {
if (value.length() <= 4) return "****";
return value.substring(0, 2) + "*".repeat(value.length() - 4) + value.substring(value.length() - 2);
}
public record DetectionResult(
boolean hasIssues,
List<String> foundTypes,
List<String> findings) {
public static DetectionResult clean() {
return new DetectionResult(false, List.of(), List.of());
}
}
}实践建议
审计日志是基础设施,不是功能——Day 1就要建
我见过太多团队说"先把功能做出来,审计以后再加"。等到真的需要审计日志(安全事件、合规检查、成本分析),往往已经有了几个月的无法追溯的历史。审计日志对性能影响极小(异步写入),但在需要的时候价值无法替代。把审计当成基础设施,和数据库一样,是不可省略的。
权限设计要从部门维度入手,而不是用户维度
逐一给每个用户配置权限是不可维护的。正确做法是:以部门为单位定义权限策略,用户自动继承部门权限,特殊需求才配置用户级覆盖。新员工入职时,只需要加入对应部门,权限自动生效。离职时,从部门移除,权限自动撤销。这样的权限管理才能在公司规模增大时保持可维护性。
明确告诉员工有审计,这不是监视而是保护
员工如果不知道有审计,一旦发现会有被监视的不安全感。正确的做法是:在AI产品首次使用时,明确说明"您的AI使用会被记录,用于安全审计和服务改进"。透明的审计反而能提升合规意识,让员工更谨慎地对待敏感信息。不透明的监控才会破坏信任。
