第1682篇:大模型输出的内容安全过滤——敏感词、有害信息检测与拦截链路
第1682篇:大模型输出的内容安全过滤——敏感词、有害信息检测与拦截链路
有一段时间我特别困惑:明明我们用的是一线大厂的大模型API,理论上模型本身做过对齐训练,不会乱说话。那为什么我们还需要在应用层自己做内容安全过滤?
直到我们的一个智能客服上线之后,陆续遇到几个状况:有用户用一些迂回表达方式让模型输出了竞争对手的产品对比信息,有用户让模型帮他写了措辞相当激烈的投诉信,还有一次模型在一段回答末尾鬼使神差地夹了一句"建议您去查阅相关法律……",被客户截图说我们暗示他打官司。
这些都不是大问题,但每一件都让业务侧神经紧绷。模型对齐是模型自己的事,应用层的内容安全是我们工程师的责任——这两件事没有替代关系。
一、内容安全过滤需要解决什么问题
我先把问题范围梳理清楚,这个领域容易搞混的概念很多。
有害内容:包括但不限于暴力、色情、歧视、仇恨言论、自我伤害诱导等。这类是道德和法律层面的硬底线。
违规内容:跟平台规则相关。比如在医疗问诊场景里,模型不能给出明确的诊断结论;在金融咨询场景里,不能出现"保证收益"这类诱导性表述。
业务敏感内容:竞争对手品牌名称、内部数据、未公开信息等,不符合公司利益的表述。
合规内容:不同行业有不同监管要求,比如"请咨询专业医生"这类免责声明是否出现在恰当的地方。
这四类问题的处理方式不一样,不能用同一套方案大包大揽。
二、基础层:敏感词库的建设与维护
很多团队一上来就想用大模型做内容安全检测,觉得既高大上又省事。我的建议是:先把敏感词库这个"便宜货"做扎实,再考虑模型辅助。敏感词匹配的延迟是微秒级,大模型分类最快也要几十毫秒。两者结合才是合理的架构。
2.1 敏感词库分层设计
@Component
public class SensitiveWordLibrary {
// 分级敏感词
public enum SensitivityLevel {
BLOCK, // 直接拦截,零容忍
WARN, // 警告,记录日志
REVIEW, // 需要人工审核
REPLACE // 可以脱敏替换
}
@Data
public static class SensitiveWord {
private String word;
private SensitivityLevel level;
private String category; // 类别:政治/色情/竞品/医疗等
private String replacement; // 脱敏替换词
private boolean fuzzyMatch; // 是否启用模糊匹配
}
// 使用DFA(确定有限自动机)算法实现高效多模式匹配
// 生产环境推荐用 Aho-Corasick 算法,这里简化展示
private Map<SensitivityLevel, AhoCorasickAutomaton> automatonMap;
@PostConstruct
public void init() {
loadSensitiveWords();
}
private void loadSensitiveWords() {
// 从数据库或配置文件加载
// 实际项目中词库应该支持热更新,不能重启服务才生效
List<SensitiveWord> words = sensitiveWordRepository.findAll();
Map<SensitivityLevel, List<String>> grouped = words.stream()
.collect(Collectors.groupingBy(
SensitiveWord::getLevel,
Collectors.mapping(SensitiveWord::getWord, Collectors.toList())
));
automatonMap = new EnumMap<>(SensitivityLevel.class);
for (Map.Entry<SensitivityLevel, List<String>> entry : grouped.entrySet()) {
automatonMap.put(entry.getKey(), AhoCorasickAutomaton.build(entry.getValue()));
}
log.info("敏感词库加载完成,共 {} 条", words.size());
}
public List<MatchResult> match(String text) {
List<MatchResult> results = new ArrayList<>();
for (Map.Entry<SensitivityLevel, AhoCorasickAutomaton> entry : automatonMap.entrySet()) {
List<String> matched = entry.getValue().findAll(text);
for (String word : matched) {
results.add(new MatchResult(word, entry.getKey()));
}
}
return results;
}
// 支持热更新,不用重启服务
@Scheduled(fixedDelay = 60000)
public void refreshWords() {
// 检查词库版本,如有更新则重新加载
String latestVersion = sensitiveWordRepository.getLatestVersion();
if (!latestVersion.equals(currentVersion)) {
loadSensitiveWords();
currentVersion = latestVersion;
log.info("敏感词库已热更新到版本: {}", latestVersion);
}
}
}2.2 Aho-Corasick 算法的简洁实现
标准库里没有现成的 AC 算法,这里给一个生产可用的简版:
public class AhoCorasickAutomaton {
private static final int CHARSET_SIZE = 65536; // Unicode字符集
private static class Node {
Map<Character, Node> children = new HashMap<>();
Node fail;
String pattern; // 非null表示该节点是某个模式串的终止节点
}
private final Node root = new Node();
public static AhoCorasickAutomaton build(List<String> patterns) {
AhoCorasickAutomaton automaton = new AhoCorasickAutomaton();
for (String pattern : patterns) {
automaton.insert(pattern);
}
automaton.buildFailLinks();
return automaton;
}
private void insert(String pattern) {
Node curr = root;
for (char c : pattern.toCharArray()) {
curr = curr.children.computeIfAbsent(c, k -> new Node());
}
curr.pattern = pattern;
}
private void buildFailLinks() {
Queue<Node> queue = new LinkedList<>();
root.fail = root;
for (Node child : root.children.values()) {
child.fail = root;
queue.add(child);
}
while (!queue.isEmpty()) {
Node curr = queue.poll();
for (Map.Entry<Character, Node> entry : curr.children.entrySet()) {
char c = entry.getKey();
Node child = entry.getValue();
Node fail = curr.fail;
while (fail != root && !fail.children.containsKey(c)) {
fail = fail.fail;
}
child.fail = fail.children.getOrDefault(c, root);
if (child.fail == child) child.fail = root;
queue.add(child);
}
}
}
public List<String> findAll(String text) {
List<String> results = new ArrayList<>();
Node curr = root;
for (char c : text.toCharArray()) {
while (curr != root && !curr.children.containsKey(c)) {
curr = curr.fail;
}
curr = curr.children.getOrDefault(c, root);
Node temp = curr;
while (temp != root) {
if (temp.pattern != null) {
results.add(temp.pattern);
}
temp = temp.fail;
}
}
return results;
}
}三、语义层:用模型做分类检测
纯关键词匹配的硬伤是无法处理语义。"我今天心情很差,想消失"和"我的应用服务器消失了",关键词匹配可能都会触发"消失"这个词,但语义完全不同。语义检测需要模型能力。
3.1 专用安全分类器
不要用主业务模型做内容安全分类,这会带来延迟叠加和成本浪费。推荐用专用的安全分类模型,OpenAI 有 Moderation API,国内各大平台也都有对应的内容安全接口。
@Service
public class ContentModerationService {
// OpenAI Moderation API集成
@Autowired
private OpenAiModerationClient moderationClient;
// 国内平台安全检测
@Autowired
private TencentCloudModerationClient tencentModerationClient;
public ModerationResult moderate(String content, ContentType contentType) {
// 并行调用多个检测服务,取最严格的结果
CompletableFuture<ModerationResult> openaiResult = CompletableFuture.supplyAsync(
() -> callOpenAIModeration(content),
securityExecutor
);
CompletableFuture<ModerationResult> tencentResult = CompletableFuture.supplyAsync(
() -> callTencentModeration(content),
securityExecutor
);
try {
ModerationResult r1 = openaiResult.get(2, TimeUnit.SECONDS);
ModerationResult r2 = tencentResult.get(2, TimeUnit.SECONDS);
return mergeResults(r1, r2);
} catch (TimeoutException e) {
log.warn("内容安全检测超时,降级处理");
return ModerationResult.timeout();
}
}
private ModerationResult callOpenAIModeration(String content) {
try {
OpenAIModerationResponse response = moderationClient.create(content);
ModerationResult result = new ModerationResult();
result.setFlagged(response.getResults().get(0).isFlagged());
Map<String, Double> scores = response.getResults().get(0).getCategoryScores();
result.addCategory("hate", scores.getOrDefault("hate", 0.0));
result.addCategory("harassment", scores.getOrDefault("harassment", 0.0));
result.addCategory("self-harm", scores.getOrDefault("self-harm", 0.0));
result.addCategory("sexual", scores.getOrDefault("sexual", 0.0));
result.addCategory("violence", scores.getOrDefault("violence", 0.0));
return result;
} catch (Exception e) {
log.error("OpenAI Moderation API调用失败", e);
return ModerationResult.error("openai");
}
}
private ModerationResult mergeResults(ModerationResult r1, ModerationResult r2) {
// 任一检测结果为flagged,最终结果为flagged(取严格策略)
boolean flagged = r1.isFlagged() || r2.isFlagged();
ModerationResult merged = new ModerationResult();
merged.setFlagged(flagged);
// 合并各类别的最高分
Set<String> allCategories = new HashSet<>();
allCategories.addAll(r1.getCategories().keySet());
allCategories.addAll(r2.getCategories().keySet());
for (String category : allCategories) {
double maxScore = Math.max(
r1.getCategories().getOrDefault(category, 0.0),
r2.getCategories().getOrDefault(category, 0.0)
);
merged.addCategory(category, maxScore);
}
return merged;
}
}3.2 自定义业务安全分类器
平台级的通用 Moderation API 解决不了业务特定的违规问题。比如金融产品不能承诺收益,医疗问诊不能给确定性诊断,这些需要自己写分类器。
@Service
public class BusinessComplianceChecker {
@Autowired
private LLMClient llmClient;
// 按业务场景定义合规规则
private static final Map<BusinessScenario, List<ComplianceRule>> SCENARIO_RULES = Map.of(
BusinessScenario.FINANCIAL_ADVISORY, Arrays.asList(
new ComplianceRule("NO_GUARANTEED_RETURN",
"不得包含任何形式的收益保证表述,包括'保证收益'、'稳赚'、'保本'等"),
new ComplianceRule("RISK_DISCLOSURE_REQUIRED",
"涉及投资建议时,必须包含风险提示"),
new ComplianceRule("NO_UNLICENSED_ADVICE",
"不得给出具体的证券买卖建议")
),
BusinessScenario.MEDICAL_CONSULTATION, Arrays.asList(
new ComplianceRule("NO_DEFINITIVE_DIAGNOSIS",
"不得给出确定性诊断结论"),
new ComplianceRule("PROFESSIONAL_REFERRAL",
"症状描述后必须建议就医"),
new ComplianceRule("NO_PRESCRIPTION",
"不得推荐具体药物和剂量")
)
);
public ComplianceResult check(String output, BusinessScenario scenario) {
List<ComplianceRule> rules = SCENARIO_RULES.getOrDefault(scenario, Collections.emptyList());
if (rules.isEmpty()) {
return ComplianceResult.passed();
}
// 用LLM做批量规则检查
String checkPrompt = buildComplianceCheckPrompt(output, rules);
try {
String llmResponse = llmClient.complete(checkPrompt);
return parseComplianceResult(llmResponse, rules);
} catch (Exception e) {
log.error("合规检查失败", e);
// 检查失败时,根据业务重要性决定是否放行
return scenario.isHighRisk()
? ComplianceResult.blocked("检查服务异常,高风险场景默认拦截")
: ComplianceResult.passed();
}
}
private String buildComplianceCheckPrompt(String output, List<ComplianceRule> rules) {
StringBuilder sb = new StringBuilder();
sb.append("请检查以下文本是否违反指定的合规规则。\n\n");
sb.append("待检查文本:\n").append(output).append("\n\n");
sb.append("合规规则:\n");
for (int i = 0; i < rules.size(); i++) {
ComplianceRule rule = rules.get(i);
sb.append(String.format("%d. [%s] %s\n", i + 1, rule.getId(), rule.getDescription()));
}
sb.append("\n请以JSON格式返回检查结果,格式如下:\n");
sb.append("{\"violations\": [{\"rule_id\": \"规则ID\", \"violated\": true/false, \"reason\": \"原因\"}]}\n");
sb.append("只返回JSON,不要其他内容。");
return sb.toString();
}
}四、拦截链路的完整流水线设计
把上面各个组件串起来,形成一条完整的过滤流水线。
@Service
public class ContentFilterPipeline {
@Autowired
private SensitiveWordLibrary sensitiveWordLibrary;
@Autowired
private ContentModerationService moderationService;
@Autowired
private BusinessComplianceChecker complianceChecker;
@Autowired
private ContentReplacementService replacementService;
public FilterResult filter(
String rawOutput,
FilterContext context) {
long startTime = System.currentTimeMillis();
FilterResult result = FilterResult.builder()
.originalContent(rawOutput)
.build();
// Step 1: 敏感词匹配(同步,微秒级)
List<MatchResult> sensitiveMatches = sensitiveWordLibrary.match(rawOutput);
// 有BLOCK级敏感词,直接拦截
boolean hasBlockLevel = sensitiveMatches.stream()
.anyMatch(m -> m.getLevel() == SensitivityLevel.BLOCK);
if (hasBlockLevel) {
return result.toBuilder()
.blocked(true)
.blockReason("触发高危敏感词")
.filteredContent(generateBlockedMessage(context))
.processingTimeMs(System.currentTimeMillis() - startTime)
.build();
}
// 有REPLACE级敏感词,做脱敏
String processedContent = rawOutput;
if (sensitiveMatches.stream().anyMatch(m -> m.getLevel() == SensitivityLevel.REPLACE)) {
processedContent = replacementService.replace(rawOutput, sensitiveMatches);
}
// Step 2: 通用内容安全检测(异步,50-200ms)
ModerationResult moderationResult = moderationService.moderate(
processedContent, context.getContentType());
if (moderationResult.isFlagged()) {
String topCategory = moderationResult.getTopCategory();
return result.toBuilder()
.blocked(true)
.blockReason("内容安全检测未通过: " + topCategory)
.filteredContent(generateBlockedMessage(context))
.processingTimeMs(System.currentTimeMillis() - startTime)
.build();
}
// Step 3: 业务合规检查(仅高风险场景,50-150ms)
if (context.getBusinessScenario() != null && context.getBusinessScenario().isHighRisk()) {
ComplianceResult complianceResult = complianceChecker.check(
processedContent, context.getBusinessScenario());
if (!complianceResult.isPassed()) {
// 合规违规通常不直接拦截,而是追加免责声明或修改表述
processedContent = complianceResult.getRemediatedContent() != null
? complianceResult.getRemediatedContent()
: appendDisclaimers(processedContent, context.getBusinessScenario());
}
}
return result.toBuilder()
.blocked(false)
.filteredContent(processedContent)
.processingTimeMs(System.currentTimeMillis() - startTime)
.build();
}
private String appendDisclaimers(String content, BusinessScenario scenario) {
String disclaimer = switch (scenario) {
case FINANCIAL_ADVISORY -> "\n\n⚠️ 以上内容仅供参考,不构成投资建议。投资有风险,决策需谨慎。";
case MEDICAL_CONSULTATION -> "\n\n⚠️ 以上内容仅供参考,不能替代专业医疗诊断。如有不适,请及时就医。";
default -> "";
};
return content + disclaimer;
}
private String generateBlockedMessage(FilterContext context) {
return "抱歉,该回答涉及不适当内容,已被过滤。如有问题,请换个方式提问。";
}
}五、流水线性能优化
内容安全过滤是在用户等待响应的关键路径上,性能很重要。我们线上的经验是:
敏感词匹配控制在 1ms 以内,AC 自动机完全够用。
Moderation API 的延迟主要取决于网络,一般 50-200ms。可以和模型推理并发进行——模型还在推理的时候,把已生成的部分 token 先送去检测,等模型输出完毕,检测也差不多跑完了。
@Service
public class StreamingFilterService {
// 流式输出场景下的检测策略
public void filterStreamingOutput(
Flux<String> tokenStream,
FilterContext context,
Consumer<String> outputConsumer) {
StringBuilder buffer = new StringBuilder();
AtomicBoolean blocked = new AtomicBoolean(false);
tokenStream.subscribe(token -> {
if (blocked.get()) return;
buffer.append(token);
// 每积累一定数量的token才做检测,平衡延迟和准确性
if (buffer.length() >= 200 || token.contains("。") || token.contains("\n")) {
String chunk = buffer.toString();
// 快速敏感词检测
List<MatchResult> matches = sensitiveWordLibrary.match(chunk);
if (matches.stream().anyMatch(m -> m.getLevel() == SensitivityLevel.BLOCK)) {
blocked.set(true);
outputConsumer.accept("\n\n[部分内容因安全策略已被过滤]");
return;
}
outputConsumer.accept(chunk);
buffer.setLength(0); // 清空buffer
}
}, error -> {
log.error("流式输出异常", error);
}, () -> {
// 流结束,对完整输出做一次全量检测
if (!blocked.get() && buffer.length() > 0) {
FilterResult finalResult = filterPipeline.filter(buffer.toString(), context);
if (!finalResult.isBlocked()) {
outputConsumer.accept(buffer.toString());
} else {
// 全量检测发现问题,此时只能追加说明,已输出的收不回来
outputConsumer.accept("\n\n[部分内容因安全策略已被截断]");
}
}
});
}
}六、踩过的坑
坑一:过度依赖通用 Moderation API。这些API对英文语料优化很好,对中文特别是一些隐晦表达的检测能力有限。我们后来补充了中文专用的敏感词库和规则,效果才上来。
坑二:敏感词库不分级。最初我们所有敏感词都是一个处理策略:拒绝。结果用户问"我的代码出现了致命错误怎么处理","致命"这个词触发了暴力类敏感词检测。后来按级别分开处理,BLOCK 级才直接拒绝,WARN 级只记录不阻断。
坑三:检测不做缓存。同样的内容被反复检测,浪费算力。对于相同的输入哈希,可以缓存检测结果(设置适当的TTL,比如1小时)。
坑四:流式输出的最终检测太晚。流式输出场景下,如果只在最后做全量检测,有害内容可能已经显示给用户了。要在流式输出过程中就做增量检测,虽然会增加一点实现复杂度,但不能省。
坑五:拦截消息太统一。所有拦截都返回同样一句话,用户不知道自己哪里触发了限制,体验很差。可以根据触发类别给出大致方向的提示,比如"您的问题涉及医疗诊断,建议咨询专业医生"。
七、监控指标
内容安全过滤需要持续监控,几个核心指标:
过滤率:被拦截请求 / 总请求,如果突然升高,可能是攻击行为或词库配置问题。
误报率:通过人工审核发现的"被错误拦截"请求占比,这个要定期抽样检查。
检测延迟P99:过滤流水线的 P99 延迟,目标控制在 300ms 以内。
分类分布:被拦截的内容按类别统计,持续出现某类说明可能需要调整业务逻辑。
内容安全过滤不是一锤子买卖,词库需要持续维护,规则需要根据业务变化调整。这个体系建立起来之后,最重要的工作反而是运营——定期回顾拦截日志,调整误报/漏报的阈值,响应新出现的攻击模式。
