第1687篇:零信任架构在AI系统中的实践——从边界防御到身份验证
第1687篇:零信任架构在AI系统中的实践——从边界防御到身份验证
传统安全模型有一个核心假设:内网是可信的,外网是不可信的。只要守住边界(防火墙),内部的请求就默认安全。
这个假设在今天已经彻底失效了。云计算让边界消融,混合办公让"内网"无处不在,微服务让内部流量比外部流量还复杂。而 AI 系统带来了新的挑战:当 AI Agent 可以自主调用内部 API、访问数据库、触发业务流程,"谁发起了这个请求"变得前所未有地重要。
零信任(Zero Trust)的核心思想就一句话:永不信任,始终验证(Never trust, always verify)。
一、零信任的三个核心原则
原则一:假设已被攻破(Assume Breach) 系统设计时要假设边界已经被攻破,攻击者可能已经在内网里了。这意味着内部流量同样需要验证,不能因为是"内部请求"就放宽安全要求。
原则二:最小权限(Least Privilege) 任何用户、服务、设备都只拥有完成当前任务所需的最小权限。这不是一次性配置,而是动态的——权限随上下文而变化。
原则三:明确验证(Verify Explicitly) 每次访问请求都要做完整的身份验证和授权,不依赖网络位置、IP 地址等隐式信任。
这三个原则放到 AI 系统里,翻译成工程实践就是:
- AI 服务调用其他服务时,必须携带验证凭证
- AI Agent 的权限随任务动态调整,而不是一开始就给最大权限
- 每个 API 调用都要记录和审计,不能因为是 AI 发起的就绕过审计
二、AI 服务的身份验证
2.1 服务间认证:给 AI 服务颁发身份证
微服务体系里,每个服务都应该有自己的身份(Service Identity)。AI 服务调用数据库、调用其他业务服务时,要用这个身份去认证,而不是依赖 IP 白名单。
推荐的方案是 mTLS(双向 TLS)配合 SPIFFE/SPIRE 做服务身份管理。
// 使用 mTLS 的 AI 服务客户端配置
@Configuration
public class SecureAIServiceClientConfig {
@Bean
public RestTemplate secureRestTemplate(
@Value("${security.mtls.keystore-path}") String keystorePath,
@Value("${security.mtls.keystore-password}") String keystorePassword,
@Value("${security.mtls.truststore-path}") String truststorePath,
@Value("${security.mtls.truststore-password}") String truststorePassword) throws Exception {
// 加载客户端证书(AI 服务的身份)
KeyStore keyStore = KeyStore.getInstance("PKCS12");
try (InputStream ks = new FileInputStream(keystorePath)) {
keyStore.load(ks, keystorePassword.toCharArray());
}
// 加载受信任的 CA 证书
KeyStore trustStore = KeyStore.getInstance("JKS");
try (InputStream ts = new FileInputStream(truststorePath)) {
trustStore.load(ts, truststorePassword.toCharArray());
}
SSLContext sslContext = SSLContextBuilder.create()
.loadKeyMaterial(keyStore, keystorePassword.toCharArray())
.loadTrustMaterial(trustStore, null)
.build();
HttpClient httpClient = HttpClients.custom()
.setSSLContext(sslContext)
.setSSLHostnameVerifier(SSLConnectionSocketFactory.getDefaultHostnameVerifier())
.build();
ClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(httpClient);
return new RestTemplate(factory);
}
}2.2 JWT 令牌:携带调用上下文
除了服务间认证,每个 AI 请求还需要携带调用上下文——是哪个用户触发的、什么业务场景、有哪些权限。用 JWT 来传递这些信息。
@Service
public class AIRequestContextService {
@Autowired
private JwtTokenProvider jwtProvider;
// 为每个 AI 请求生成上下文令牌
public String generateAIRequestToken(
String userId,
String sessionId,
String scenario,
List<String> grantedPermissions) {
Map<String, Object> claims = new HashMap<>();
claims.put("sub", userId);
claims.put("session_id", sessionId);
claims.put("scenario", scenario);
claims.put("permissions", grantedPermissions);
claims.put("ai_request", true); // 标记这是 AI 系统的请求
claims.put("max_resource_level", determineMaxResourceLevel(grantedPermissions));
// 短有效期,AI 请求令牌不需要长时间有效
return jwtProvider.generateToken(claims, Duration.ofMinutes(5));
}
// 下游服务验证 AI 请求令牌
public AIRequestContext validateAndExtract(String token) {
try {
Claims claims = jwtProvider.parseToken(token);
// 验证是 AI 请求令牌
if (!Boolean.TRUE.equals(claims.get("ai_request", Boolean.class))) {
throw new SecurityException("非 AI 请求令牌");
}
// 验证场景是否匹配当前服务
String scenario = (String) claims.get("scenario");
if (!isScenarioAllowed(scenario)) {
throw new SecurityException("场景不匹配:" + scenario);
}
return AIRequestContext.builder()
.userId(claims.getSubject())
.sessionId((String) claims.get("session_id"))
.scenario(scenario)
.permissions((List<String>) claims.get("permissions"))
.build();
} catch (ExpiredJwtException e) {
throw new SecurityException("AI 请求令牌已过期");
} catch (JwtException e) {
throw new SecurityException("AI 请求令牌无效: " + e.getMessage());
}
}
}三、动态权限控制:基于上下文的访问决策
零信任不是一次性配置权限,而是每次请求都动态决策。
3.1 基于属性的访问控制(ABAC)
传统 RBAC(基于角色)的权限模型不够灵活,ABAC 允许基于任意属性组合来做访问决策。
@Service
public class AIAccessDecisionService {
// 访问决策点(PDP:Policy Decision Point)
public AccessDecision evaluate(
AIRequestContext requestContext,
Resource resource,
Action action) {
AccessDecisionRequest decisionRequest = AccessDecisionRequest.builder()
.subject(buildSubjectAttributes(requestContext))
.resource(buildResourceAttributes(resource))
.action(action)
.environment(buildEnvironmentAttributes())
.build();
// 评估所有相关策略
List<Policy> applicablePolicies = policyRepository.findApplicable(decisionRequest);
PolicyEvaluationResult result = evaluatePolicies(applicablePolicies, decisionRequest);
// 记录决策(零信任必须有完整的决策审计)
auditService.recordDecision(decisionRequest, result);
if (result.isPermit()) {
return AccessDecision.permit(result.getObligations());
} else {
return AccessDecision.deny(result.getDenyReason());
}
}
private Map<String, Object> buildSubjectAttributes(AIRequestContext ctx) {
Map<String, Object> attrs = new HashMap<>();
attrs.put("user_id", ctx.getUserId());
attrs.put("user_role", userService.getUserRole(ctx.getUserId()));
attrs.put("department", userService.getUserDepartment(ctx.getUserId()));
attrs.put("permissions", ctx.getPermissions());
attrs.put("scenario", ctx.getScenario());
attrs.put("is_ai_agent", true);
return attrs;
}
private Map<String, Object> buildEnvironmentAttributes() {
Map<String, Object> attrs = new HashMap<>();
attrs.put("current_time", LocalDateTime.now());
attrs.put("day_of_week", LocalDate.now().getDayOfWeek());
attrs.put("is_business_hours", isBusinessHours());
attrs.put("threat_level", threatIntelService.getCurrentThreatLevel());
return attrs;
}
private boolean isBusinessHours() {
LocalTime now = LocalTime.now(ZoneId.of("Asia/Shanghai"));
return now.isAfter(LocalTime.of(9, 0)) && now.isBefore(LocalTime.of(18, 0));
}
}
// 策略示例:XACML 风格的策略定义
@Component
public class AIDataAccessPolicy implements Policy {
@Override
public PolicyEvaluationResult evaluate(AccessDecisionRequest request) {
String userId = (String) request.getSubject().get("user_id");
String scenario = (String) request.getSubject().get("scenario");
String resourceSensitivity = (String) request.getResource().get("sensitivity");
boolean isBusinessHours = (boolean) request.getEnvironment().get("is_business_hours");
// 规则1:高敏感数据只在工作时间允许 AI 访问
if ("HIGH".equals(resourceSensitivity) && !isBusinessHours) {
return PolicyEvaluationResult.deny("高敏感数据仅工作时间可访问");
}
// 规则2:AI 代理访问用户数据时,必须有该用户授权的会话
if (request.getSubject().containsKey("is_ai_agent")) {
String sessionUserId = (String) request.getSubject().get("user_id");
String resourceOwnerId = (String) request.getResource().get("owner_id");
if (resourceOwnerId != null && !resourceOwnerId.equals(sessionUserId)) {
return PolicyEvaluationResult.deny("AI 代理只能访问当前会话用户的数据");
}
}
// 规则3:客服场景只允许只读访问
if ("customer_service".equals(scenario)) {
if (!"READ".equals(request.getAction().getType())) {
return PolicyEvaluationResult.deny("客服场景仅允许只读操作");
}
}
return PolicyEvaluationResult.permit();
}
}3.2 风险自适应访问控制
零信任的更高阶实践:根据实时风险评分动态调整访问策略。
@Service
public class RiskAdaptiveAccessController {
@Autowired
private RiskScoringService riskScoringService;
public AdaptiveAccessResult evaluate(String userId, String sessionId, ResourceAccess access) {
// 计算当前请求的风险分数
RiskScore riskScore = riskScoringService.calculate(RiskContext.builder()
.userId(userId)
.sessionId(sessionId)
.requestedResource(access.getResourceId())
.requestedAction(access.getAction())
.sourceIp(access.getSourceIp())
.userAgent(access.getUserAgent())
.recentAnomalyCount(getRecentAnomalyCount(userId))
.build());
// 根据风险分数决定访问策略
if (riskScore.getScore() < 30) {
// 低风险:正常访问
return AdaptiveAccessResult.allow();
} else if (riskScore.getScore() < 60) {
// 中等风险:允许但降低权限
return AdaptiveAccessResult.allowWithConstraints(
AccessConstraints.builder()
.maxDataRows(100) // 限制返回数据量
.excludeSensitiveFields(true) // 排除敏感字段
.auditLevel(AuditLevel.DETAILED) // 详细审计
.build()
);
} else if (riskScore.getScore() < 80) {
// 高风险:需要 MFA 验证
return AdaptiveAccessResult.requireMFA(riskScore.getRiskFactors());
} else {
// 极高风险:拒绝并告警
alertService.sendAlert(AlertType.HIGH_RISK_ACCESS_ATTEMPT,
String.format("用户 %s 高风险访问请求被拒绝,风险分: %d,因素: %s",
userId, riskScore.getScore(), riskScore.getRiskFactors()));
return AdaptiveAccessResult.deny("风险评分过高,访问被拒绝");
}
}
}
@Service
public class RiskScoringService {
public RiskScore calculate(RiskContext context) {
int score = 0;
List<String> riskFactors = new ArrayList<>();
// 因子1:IP 地址异常
if (!isKnownIP(context.getUserId(), context.getSourceIp())) {
score += 20;
riskFactors.add("来自未知IP: " + maskIP(context.getSourceIp()));
}
// 因子2:近期有异常行为记录
if (context.getRecentAnomalyCount() > 3) {
score += 30;
riskFactors.add("近期异常行为: " + context.getRecentAnomalyCount() + "次");
}
// 因子3:访问高敏感资源
ResourceSensitivity sensitivity = resourceRegistry.getSensitivity(context.getRequestedResource());
if (sensitivity == ResourceSensitivity.HIGH) {
score += 15;
riskFactors.add("高敏感资源访问");
}
// 因子4:非常规时间
if (!isBusinessHours()) {
score += 10;
riskFactors.add("非工作时间访问");
}
// 因子5:高频访问
if (isHighFrequencyAccess(context.getUserId(), context.getRequestedResource())) {
score += 25;
riskFactors.add("异常高频访问");
}
return new RiskScore(Math.min(score, 100), riskFactors);
}
}四、AI Agent 的身份绑定
在 AI Agent 场景里,有一个特殊问题:Agent 会代替用户执行一系列操作,这些操作必须和触发它的用户身份绑定,不能让 Agent 用"系统权限"去做超越用户权限的事。
@Service
public class AgentIdentityBindingService {
// 为 Agent 任务创建委托令牌
public DelegationToken createDelegationToken(
String userId,
String agentId,
List<String> delegatedPermissions,
Duration maxDuration) {
// 关键:delegatedPermissions 是用户权限的子集,不能超越用户权限
List<String> userPermissions = permissionService.getUserPermissions(userId);
List<String> actualPermissions = delegatedPermissions.stream()
.filter(userPermissions::contains)
.collect(Collectors.toList());
if (actualPermissions.isEmpty()) {
throw new SecurityException("没有可委托的权限");
}
DelegationToken token = DelegationToken.builder()
.tokenId(UUID.randomUUID().toString())
.principalUserId(userId) // 人类委托人
.agentId(agentId) // 被委托的 Agent
.delegatedPermissions(actualPermissions)
.issuedAt(Instant.now())
.expiresAt(Instant.now().plus(maxDuration))
.taskScope(buildTaskScope(userId)) // 限定任务范围
.build();
// 存储令牌,用于验证和撤销
delegationTokenStore.save(token);
log.info("Agent {} 获得用户 {} 的委托,权限: {}", agentId, userId, actualPermissions);
return token;
}
// Agent 执行操作前验证委托令牌
public DelegationContext validateDelegation(String tokenId, String requestedPermission) {
DelegationToken token = delegationTokenStore.findById(tokenId)
.orElseThrow(() -> new SecurityException("委托令牌不存在"));
if (token.isExpired()) {
throw new SecurityException("委托令牌已过期");
}
if (token.isRevoked()) {
throw new SecurityException("委托令牌已被撤销");
}
if (!token.getDelegatedPermissions().contains(requestedPermission)) {
throw new SecurityException(String.format(
"Agent %s 无权限执行操作 %s(委托权限: %s)",
token.getAgentId(), requestedPermission, token.getDelegatedPermissions()
));
}
return DelegationContext.from(token);
}
// 用户可以随时撤销委托
public void revoke(String userId, String tokenId) {
DelegationToken token = delegationTokenStore.findById(tokenId)
.orElseThrow(() -> new NotFoundException("委托令牌不存在"));
if (!token.getPrincipalUserId().equals(userId)) {
throw new SecurityException("无权撤销他人的委托令牌");
}
token.setRevoked(true);
token.setRevokedAt(Instant.now());
delegationTokenStore.save(token);
log.info("用户 {} 撤销了 Agent {} 的委托令牌 {}", userId, token.getAgentId(), tokenId);
}
}五、网络微分段:从网络层收窄攻击面
除了身份验证,网络层的微分段也是零信任的重要组成部分。
# Kubernetes NetworkPolicy 示例:限制 AI 服务的网络访问
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: ai-service-network-policy
namespace: ai-production
spec:
podSelector:
matchLabels:
app: ai-service
policyTypes:
- Ingress
- Egress
ingress:
# 只允许来自 API Gateway 的入站流量
- from:
- podSelector:
matchLabels:
app: api-gateway
ports:
- protocol: TCP
port: 8080
egress:
# 只允许调用特定的下游服务
- to:
- podSelector:
matchLabels:
app: knowledge-base-service
ports:
- protocol: TCP
port: 8081
# 允许调用外部 LLM API
- to:
- ipBlock:
cidr: 0.0.0.0/0
except:
- 10.0.0.0/8
- 172.16.0.0/12
- 192.168.0.0/16
ports:
- protocol: TCP
port: 443
# 允许 DNS
- to: []
ports:
- protocol: UDP
port: 53六、审计与可观测性
零信任架构下,所有的访问决策都要留下痕迹。这不只是合规要求,也是威胁检测的基础。
@Component
public class ZeroTrustAuditLogger {
// 结构化的审计日志格式
@Data
@Builder
public static class ZeroTrustAuditEvent {
private String eventId;
private Instant timestamp;
private String userId;
private String agentId; // 如果是 Agent 触发
private String sessionId;
private String resourceId;
private String action;
private String decision; // PERMIT / DENY
private String denyReason;
private List<String> appliedPolicies;
private int riskScore;
private String sourceIp;
private String correlationId; // 关联同一个业务请求的所有操作
}
public void log(ZeroTrustAuditEvent event) {
// 写入审计日志(使用专用的、不可篡改的日志系统)
auditLogWriter.write(event);
// 实时流到 SIEM(安全信息和事件管理系统)
siemConnector.stream(event);
// 触发实时告警检测
if ("DENY".equals(event.getDecision()) && event.getRiskScore() > 70) {
alertService.sendAlert(
AlertType.HIGH_RISK_ACCESS_DENIED,
String.format("高风险访问被拒绝 - 用户: %s, 资源: %s, 风险分: %d",
event.getUserId(), event.getResourceId(), event.getRiskScore())
);
}
}
}七、从边界防御迁移到零信任的路径
理想很丰满,现实很骨感。很少有团队能从头开始搭一套零信任架构,更多是在现有系统上逐步改造。我建议按这个顺序推进:
第一阶段(1-3个月):对 AI 服务的所有入站和出站流量做统一的日志记录,摸清楚现有的流量模式,找到高风险访问路径。
第二阶段(3-6个月):在 AI 服务和敏感数据服务之间引入服务身份认证(mTLS 或 JWT)。把 IP 白名单改造成身份验证。
第三阶段(6-12个月):引入 AI Agent 的委托令牌机制,把 Agent 的权限和用户身份绑定。实施 ABAC 策略引擎。
第四阶段(持续优化):引入风险自适应控制,根据实时风险评分动态调整策略。建立完整的安全可观测性体系。
零信任不是一个产品,是一种架构思想。在 AI 系统这个特定领域,它的核心价值是:让 AI 的每一个动作都有明确的授权来源,让每一次异常行为都可以被追溯。
