第2054篇:AI应用安全——数据脱敏和隐私保护的工程实践
2026/4/30大约 6 分钟
第2054篇:AI应用安全——数据脱敏和隐私保护的工程实践
适读人群:处理敏感数据的AI应用工程师 | 阅读时长:约19分钟 | 核心价值:建立AI应用的数据隐私保护体系,防止敏感信息泄漏给LLM
有一次做代码审查,发现同事直接把用户的完整手机号、身份证号发给GPT做信息提取。
我问他:"你知道OpenAI的隐私政策吗?默认情况下,你发给API的数据可能被用于模型训练。"
他愣了一下,说没想到这个。
AI应用的数据安全比传统应用更复杂,因为你要发数据给第三方LLM,这条链路上有太多潜在的风险。
AI应用的数据隐私风险
第一道防线:数据脱敏
/**
* 数据脱敏工具
* 在发送给LLM之前,对敏感信息进行脱敏
*/
@Service
@Slf4j
public class DataMaskingService {
// 脱敏规则:正则 -> 替换方式
private static final List<MaskingRule> MASKING_RULES = List.of(
// 手机号:保留前3位和后4位
new MaskingRule(
Pattern.compile("1[3-9]\\d{9}"),
m -> m.group().substring(0, 3) + "****" + m.group().substring(7)
),
// 身份证:保留前6位(地区)和后4位
new MaskingRule(
Pattern.compile("\\d{17}[\\dX]"),
m -> m.group().substring(0, 6) + "********" + m.group().substring(14)
),
// 银行卡号:只显示后4位
new MaskingRule(
Pattern.compile("\\d{16,19}"),
m -> "*".repeat(m.group().length() - 4) + m.group().substring(m.group().length() - 4)
),
// 邮箱:脱敏用户名部分
new MaskingRule(
Pattern.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}"),
m -> {
String email = m.group();
int atIndex = email.indexOf('@');
String username = email.substring(0, atIndex);
String domain = email.substring(atIndex);
if (username.length() <= 2) return username + domain;
return username.charAt(0) + "*".repeat(username.length() - 2)
+ username.charAt(username.length() - 1) + domain;
}
),
// IP地址:脱敏最后一段
new MaskingRule(
Pattern.compile("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}"),
m -> {
String[] parts = m.group().split("\\.");
return parts[0] + "." + parts[1] + "." + parts[2] + ".*";
}
)
);
/**
* 对文本进行脱敏处理
* 返回脱敏后的文本和映射表(用于后续恢复)
*/
public MaskingResult mask(String text) {
if (text == null || text.isEmpty()) {
return new MaskingResult(text, Map.of());
}
String maskedText = text;
Map<String, String> reverseMappings = new LinkedHashMap<>();
for (MaskingRule rule : MASKING_RULES) {
Matcher matcher = rule.pattern().matcher(maskedText);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
String original = matcher.group();
String masked = rule.masker().apply(matcher);
// 如果脱敏后不同,才记录映射
if (!original.equals(masked)) {
reverseMappings.put(masked, original);
matcher.appendReplacement(sb, Matcher.quoteReplacement(masked));
}
}
matcher.appendTail(sb);
maskedText = sb.toString();
}
if (!reverseMappings.isEmpty()) {
log.debug("数据脱敏完成,脱敏{}处", reverseMappings.size());
}
return new MaskingResult(maskedText, reverseMappings);
}
/**
* 还原脱敏(如果需要把LLM的输出中的脱敏词替换回原始值)
*/
public String unmask(String maskedText, Map<String, String> reverseMappings) {
if (reverseMappings.isEmpty()) return maskedText;
String result = maskedText;
for (Map.Entry<String, String> entry : reverseMappings.entrySet()) {
result = result.replace(entry.getKey(), entry.getValue());
}
return result;
}
public record MaskingResult(String maskedText, Map<String, String> reverseMappings) {}
private record MaskingRule(
Pattern pattern,
java.util.function.Function<Matcher, String> masker
) {}
}第二道防线:字段级别的脱敏策略
不同的字段有不同的脱敏需求。建立字段级别的脱敏配置:
/**
* 字段级别脱敏注解和处理
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SensitiveField {
MaskType type() default MaskType.PARTIAL;
enum MaskType {
FULL, // 完全隐藏:***
PARTIAL, // 部分显示:138****5678
HASH, // 哈希替换:不可逆
ENCRYPT // 加密替换:可解密
}
}
/**
* 使用注解的数据对象
*/
@Data
public class CustomerInfo {
private String customerId;
private String customerName;
@SensitiveField(type = SensitiveField.MaskType.PARTIAL)
private String phoneNumber;
@SensitiveField(type = SensitiveField.MaskType.PARTIAL)
private String idCardNumber;
@SensitiveField(type = SensitiveField.MaskType.FULL)
private String bankCardNumber;
@SensitiveField(type = SensitiveField.MaskType.PARTIAL)
private String email;
// 普通字段,不脱敏
private String orderHistory;
private String productPreferences;
}
/**
* 自动处理敏感字段的序列化器
*/
@Service
@RequiredArgsConstructor
public class SensitiveDataSerializer {
private final DataMaskingService maskingService;
private final ObjectMapper objectMapper;
/**
* 把包含敏感字段的对象序列化为发给LLM的安全字符串
*/
public String serializeForLlm(Object obj) throws JsonProcessingException {
// 通过反射找到所有带@SensitiveField的字段,进行脱敏
Map<String, Object> safeMap = new LinkedHashMap<>();
for (Field field : obj.getClass().getDeclaredFields()) {
field.setAccessible(true);
try {
Object value = field.get(obj);
if (value == null) continue;
SensitiveField annotation = field.getAnnotation(SensitiveField.class);
if (annotation != null) {
safeMap.put(field.getName(), maskValue(value.toString(), annotation.type()));
} else {
safeMap.put(field.getName(), value);
}
} catch (IllegalAccessException e) {
log.warn("无法访问字段: {}", field.getName());
}
}
return objectMapper.writeValueAsString(safeMap);
}
private String maskValue(String value, SensitiveField.MaskType type) {
return switch (type) {
case FULL -> "*".repeat(Math.min(value.length(), 8));
case PARTIAL -> maskingService.mask(value).maskedText();
case HASH -> DigestUtils.sha256Hex(value).substring(0, 8) + "...";
case ENCRYPT -> "[ENCRYPTED:" + value.length() + "chars]";
};
}
}第三道防线:RAG文档的数据分级
知识库里的文档也可能包含敏感信息,需要分级管理:
/**
* 文档安全级别管理
* 不同安全级别的文档只能发送给相应权限的用户和模型
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SecureDocumentManager {
private final EmbeddingStore<TextSegment> vectorStore;
private final EmbeddingModel embeddingModel;
public enum SecurityLevel {
PUBLIC, // 公开信息,可以发给任何LLM
INTERNAL, // 内部信息,只能发给内部LLM(不能发给OpenAI等)
CONFIDENTIAL, // 机密信息,只能在特定条件下使用
SECRET // 不应该进入RAG系统
}
/**
* 存储文档时记录安全级别
*/
public void ingestDocument(String content, SecurityLevel level, Map<String, String> meta) {
if (level == SecurityLevel.SECRET) {
log.warn("尝试存储SECRET级别文档,已拒绝");
return;
}
TextSegment segment = TextSegment.from(content,
Metadata.from(Map.of(
"security_level", level.name(),
"ingested_at", LocalDateTime.now().toString()
)));
Embedding embedding = Embedding.from(embeddingModel.embed(content));
vectorStore.add(embedding, segment);
}
/**
* 检索时根据用户权限和模型类型过滤
*/
public List<TextSegment> secureSearch(
String query,
UserContext userContext,
LlmType targetLlmType,
int topK) {
float[] queryEmb = embeddingModel.embed(query);
EmbeddingSearchRequest request = EmbeddingSearchRequest.builder()
.queryEmbedding(Embedding.from(queryEmb))
.maxResults(topK * 2) // 多取一些,过滤后可能减少
.minScore(0.6)
.build();
List<EmbeddingMatch<TextSegment>> matches = vectorStore.search(request).matches();
return matches.stream()
.map(EmbeddingMatch::embedded)
.filter(segment -> canAccess(segment, userContext, targetLlmType))
.limit(topK)
.collect(Collectors.toList());
}
private boolean canAccess(TextSegment segment, UserContext user, LlmType llmType) {
String levelStr = segment.metadata().getString("security_level");
if (levelStr == null) return true; // 无标记,默认可访问
SecurityLevel level = SecurityLevel.valueOf(levelStr);
return switch (level) {
case PUBLIC -> true;
case INTERNAL -> {
// INTERNAL文档不能发给外部LLM
yield llmType == LlmType.INTERNAL_ONLY;
}
case CONFIDENTIAL -> {
// CONFIDENTIAL需要特殊权限
yield user.hasConfidentialAccess() && llmType == LlmType.INTERNAL_ONLY;
}
case SECRET -> false; // 不应该存在,拒绝
};
}
public enum LlmType {
EXTERNAL_CLOUD, // OpenAI、Anthropic等外部服务
INTERNAL_ONLY // 内部部署的模型
}
}数据传输的合规检查
/**
* LLM请求的合规检查Advisor(Spring AI)
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class ComplianceCheckAdvisor implements CallAroundAdvisor {
private final DataMaskingService maskingService;
private final ComplianceAuditLogger auditLogger;
@Override
public AdvisedResponse aroundCall(AdvisedRequest advisedRequest,
CallAroundAdvisorChain chain) {
String userInput = advisedRequest.userText();
// 1. 检测是否包含未脱敏的敏感信息
MaskingDetectionResult detection = detectSensitiveData(userInput);
if (detection.hasSensitiveData()) {
// 记录审计日志(监管要求)
auditLogger.log(AuditEvent.SENSITIVE_DATA_DETECTED,
Map.of(
"types", detection.detectedTypes().toString(),
"masked", "true"
));
// 自动脱敏
DataMaskingService.MaskingResult masked = maskingService.mask(userInput);
// 修改请求(Spring AI的AdvisedRequest是不可变的,需要重建)
advisedRequest = AdvisedRequest.from(advisedRequest)
.userText(masked.maskedText())
.build();
log.info("自动脱敏: 检测到{}处敏感信息", detection.count());
}
return chain.nextAroundCall(advisedRequest);
}
private MaskingDetectionResult detectSensitiveData(String text) {
if (text == null) return new MaskingDetectionResult(false, List.of(), 0);
List<String> detectedTypes = new ArrayList<>();
int count = 0;
// 手机号检测
Matcher phoneMatcher = Pattern.compile("1[3-9]\\d{9}").matcher(text);
while (phoneMatcher.find()) {
if (!detectedTypes.contains("手机号")) detectedTypes.add("手机号");
count++;
}
// 身份证检测
Matcher idMatcher = Pattern.compile("\\d{17}[\\dX]").matcher(text);
while (idMatcher.find()) {
if (!detectedTypes.contains("身份证")) detectedTypes.add("身份证");
count++;
}
return new MaskingDetectionResult(!detectedTypes.isEmpty(), detectedTypes, count);
}
public record MaskingDetectionResult(
boolean hasSensitiveData,
List<String> detectedTypes,
int count
) {}
@Override
public String getName() { return "ComplianceCheckAdvisor"; }
@Override
public int getOrder() { return 5; } // 在日志之后、缓存之前
}数据隐私保护不只是技术问题,还涉及到法律合规。GDPR、个人信息保护法等法规对AI应用的数据处理都有明确要求。建立一套完整的数据流追踪和审计机制,是企业级AI应用不可缺少的基础设施。
