第2415篇:AI系统的公平性测试——如何检测和缓解模型偏见
第2415篇:AI系统的公平性测试——如何检测和缓解模型偏见
适读人群:机器学习工程师、算法工程师 | 阅读时长:约14分钟 | 核心价值:从理论到代码,系统化检测和缓解模型偏见的完整方案
"我们的模型是用数据训练的,数据不会说谎。"
我以前也这么想。直到有一次,我在做一个贷款风控模型的复盘,发现模型对来自某几个城市的申请人拒绝率异常高。深挖下去,发现历史数据里这些城市的违约率确实更高——但那是十年前的数据,当地经济条件早就变了。
模型忠实地学习了历史,却把过时的歧视固化成了算法。
这篇文章讲两件事:怎么找到偏见,以及找到了怎么修。
一、先弄清楚你要对抗哪种偏见
"偏见"这个词被滥用了。在机器学习里,偏见有几种不同来源,修法不同:
历史偏见(Historical Bias):训练数据反映了现实世界的不平等。比如历史招聘数据中女性比例低,不是因为女性能力差,而是因为历史上就有性别歧视。模型学到了这个模式。
表示偏见(Representation Bias):数据集中某些群体的样本数量严重不足,导致模型对这些群体的预测能力弱。
测量偏见(Measurement Bias):不同群体使用了不同的测量方式。比如有些群体有完整的银行信用记录,有些群体没有,但这不代表后者信用差,只是数据覆盖不同。
聚合偏见(Aggregation Bias):用一个统一的模型去处理不同特征的群体,忽略了群体间的本质差异。比如用同一个糖尿病风险模型对待不同族裔,但糖化血红蛋白的基线值在不同族裔间本来就有差异。
理清楚是哪类偏见,才能选对缓解方法。
二、公平性指标:没有完美的标准
先说一个让很多人不舒服的事实:没有一个单一的"公平"定义能同时满足所有合理的公平标准。
这是数学上已经证明的结论(Chouldechova 2017, Kleinberg et al. 2016)。
常用的公平性指标:
| 指标名称 | 数学定义 | 适用场景 |
|---|---|---|
| 人口统计均等(Demographic Parity) | P(Ŷ=1|A=0) = P(Ŷ=1|A=1) | 资源分配、招聘初筛 |
| 机会均等(Equal Opportunity) | P(Ŷ=1|Y=1,A=0) = P(Ŷ=1|Y=1,A=1) | 贷款审批、保释决定 |
| 均等赔率(Equalized Odds) | 同时满足TPR和FPR均等 | 医疗诊断 |
| 个体公平(Individual Fairness) | 相似的个体获得相似的预测 | 推荐系统 |
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
from dataclasses import dataclass, field
@dataclass
class FairnessReport:
"""公平性评测报告"""
sensitive_attribute: str
metrics: Dict[str, Dict] = field(default_factory=dict)
violations: List[str] = field(default_factory=list)
recommendations: List[str] = field(default_factory=list)
class FairnessEvaluator:
"""全面的公平性评测工具"""
def __init__(self,
y_true: np.ndarray,
y_pred: np.ndarray,
y_prob: np.ndarray,
sensitive_features: pd.DataFrame):
self.y_true = y_true
self.y_pred = y_pred
self.y_prob = y_prob
self.sensitive_features = sensitive_features
def evaluate(self,
threshold_demographic_parity: float = 0.1,
threshold_equal_opportunity: float = 0.1) -> List[FairnessReport]:
"""对每个敏感特征生成公平性报告"""
reports = []
for col in self.sensitive_features.columns:
report = FairnessReport(sensitive_attribute=col)
groups = self.sensitive_features[col].unique()
# 计算各群体指标
group_metrics = {}
for group in groups:
mask = self.sensitive_features[col] == group
group_metrics[str(group)] = self._compute_group_metrics(mask)
report.metrics = group_metrics
# 检查人口统计均等
pos_rates = [m["positive_prediction_rate"] for m in group_metrics.values()]
dp_disparity = max(pos_rates) - min(pos_rates)
if dp_disparity > threshold_demographic_parity:
report.violations.append(
f"人口统计均等违反:最大差异 {dp_disparity:.3f} > {threshold_demographic_parity}"
)
report.recommendations.append("考虑使用重采样或后处理阈值调整")
# 检查机会均等
tprs = [m["true_positive_rate"] for m in group_metrics.values()
if m["true_positive_rate"] is not None]
if len(tprs) >= 2:
eo_disparity = max(tprs) - min(tprs)
if eo_disparity > threshold_equal_opportunity:
report.violations.append(
f"机会均等违反:TPR最大差异 {eo_disparity:.3f} > {threshold_equal_opportunity}"
)
report.recommendations.append("考虑使用约束优化或后处理方法")
reports.append(report)
return reports
def _compute_group_metrics(self, mask: np.ndarray) -> Dict:
"""计算单个群体的指标"""
y_true_g = self.y_true[mask]
y_pred_g = self.y_pred[mask]
n = len(y_true_g)
if n == 0:
return {"sample_size": 0}
tp = ((y_true_g == 1) & (y_pred_g == 1)).sum()
fp = ((y_true_g == 0) & (y_pred_g == 1)).sum()
tn = ((y_true_g == 0) & (y_pred_g == 0)).sum()
fn = ((y_true_g == 1) & (y_pred_g == 0)).sum()
n_pos = tp + fn
n_neg = fp + tn
return {
"sample_size": n,
"positive_rate_in_data": n_pos / n if n > 0 else None,
"positive_prediction_rate": (tp + fp) / n if n > 0 else None,
"true_positive_rate": tp / n_pos if n_pos > 0 else None,
"false_positive_rate": fp / n_neg if n_neg > 0 else None,
"accuracy": (tp + tn) / n if n > 0 else None,
"precision": tp / (tp + fp) if (tp + fp) > 0 else None,
}三、三类偏见缓解方法
3.1 预处理(Pre-processing):在训练前解决
方法一:重采样
当某群体样本严重不足时,对少数群体过采样或对多数群体欠采样:
from imblearn.over_sampling import SMOTE
from collections import Counter
def fair_resampling(X: pd.DataFrame,
y: pd.Series,
sensitive_col: str,
strategy: str = "oversample") -> Tuple[pd.DataFrame, pd.Series]:
"""
基于敏感属性的公平重采样
目标:使每个群体的正例比例更加均等
"""
groups = X[sensitive_col].unique()
# 计算各群体的正例率
group_stats = {}
for g in groups:
mask = X[sensitive_col] == g
group_stats[g] = {
"n_total": mask.sum(),
"n_positive": y[mask].sum(),
"positive_rate": y[mask].mean()
}
print("重采样前各群体正例率:")
for g, stats in group_stats.items():
print(f" {g}: {stats['positive_rate']:.3f} (n={stats['n_total']})")
if strategy == "oversample":
# 对正例率低的群体进行正例过采样
max_positive_rate = max(s["positive_rate"] for s in group_stats.values())
X_resampled_parts = []
y_resampled_parts = []
for g in groups:
mask = X[sensitive_col] == g
X_g = X[mask]
y_g = y[mask]
target_rate = max_positive_rate
current_rate = group_stats[g]["positive_rate"]
if current_rate < target_rate - 0.05:
# 需要过采样正例
n_needed = int(
(target_rate * len(X_g) - y_g.sum()) / (1 - target_rate)
)
positive_indices = y_g[y_g == 1].index
extra_indices = np.random.choice(
positive_indices, size=n_needed, replace=True
)
X_g = pd.concat([X_g, X_g.loc[extra_indices]])
y_g = pd.concat([y_g, y_g.loc[extra_indices]])
X_resampled_parts.append(X_g)
y_resampled_parts.append(y_g)
X_resampled = pd.concat(X_resampled_parts).sample(frac=1, random_state=42)
y_resampled = pd.concat(y_resampled_parts).loc[X_resampled.index]
return X_resampled, y_resampled
return X, y方法二:去偏特征(Adversarial Debiasing)
让模型学习与敏感属性无关的表示:
import torch
import torch.nn as nn
class AdversarialDebiasingModel(nn.Module):
"""
对抗去偏模型:
主分类器尽量准确预测目标标签
同时,对抗分类器尽量无法从主分类器的表示中预测敏感属性
两个损失对抗,迫使主模型学习与敏感属性无关的特征
"""
def __init__(self, input_dim: int, hidden_dim: int, n_sensitive_classes: int):
super().__init__()
# 主特征提取器
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU()
)
# 主分类器(预测目标标签)
self.main_classifier = nn.Linear(hidden_dim // 2, 1)
# 对抗分类器(尝试预测敏感属性)
self.adversary = nn.Sequential(
nn.Linear(hidden_dim // 2, 32),
nn.ReLU(),
nn.Linear(32, n_sensitive_classes)
)
def forward(self, x):
representation = self.encoder(x)
main_output = self.main_classifier(representation)
adversary_output = self.adversary(representation)
return main_output, adversary_output, representation
def train_adversarial(model, optimizer, X, y, sensitive_attrs,
adversary_weight: float = 1.0):
"""
对抗训练步骤
adversary_weight: 对抗损失的权重,越大越强调去偏
"""
model.train()
optimizer.zero_grad()
main_out, adversary_out, _ = model(X)
# 主任务损失(预测准确性)
main_loss = nn.BCEWithLogitsLoss()(main_out.squeeze(), y.float())
# 对抗损失(让表示无法预测敏感属性)
# 注意:这里取负值,因为我们要最小化对抗分类器的准确性
adversary_loss = nn.CrossEntropyLoss()(adversary_out, sensitive_attrs.long())
# 总损失:主任务好 - 对抗任务好(对抗目标冲突)
total_loss = main_loss - adversary_weight * adversary_loss
total_loss.backward()
optimizer.step()
return main_loss.item(), adversary_loss.item()3.2 训练中约束(In-processing)
在损失函数里直接加入公平性约束:
class FairnessConstrainedLoss(nn.Module):
"""
带公平性约束的损失函数
在交叉熵损失基础上,加入人口统计均等惩罚项
"""
def __init__(self, fairness_weight: float = 0.5):
super().__init__()
self.fairness_weight = fairness_weight
self.base_loss = nn.BCEWithLogitsLoss()
def forward(self, logits: torch.Tensor,
labels: torch.Tensor,
sensitive: torch.Tensor) -> torch.Tensor:
# 基础分类损失
base = self.base_loss(logits, labels.float())
# 公平性惩罚:不同群体预测概率的差异
probs = torch.sigmoid(logits)
groups = sensitive.unique()
group_means = []
for g in groups:
mask = sensitive == g
if mask.sum() > 0:
group_means.append(probs[mask].mean())
if len(group_means) < 2:
return base
# 最大化群体间预测率的均等(最小化差异)
group_means_tensor = torch.stack(group_means)
fairness_penalty = group_means_tensor.var()
total_loss = base + self.fairness_weight * fairness_penalty
return total_loss3.3 后处理(Post-processing):调整决策阈值
最简单直接的方法:针对不同群体设置不同的决策阈值,使各群体的预测率更均衡:
from scipy.optimize import minimize_scalar
from sklearn.metrics import roc_curve
def calibrate_thresholds_for_fairness(
y_true: np.ndarray,
y_prob: np.ndarray,
sensitive: np.ndarray,
fairness_criterion: str = "equal_opportunity") -> Dict[str, float]:
"""
为不同群体寻找最优决策阈值
使得各群体的指标满足公平性约束,同时尽量保持总体准确率
"""
groups = np.unique(sensitive)
thresholds = {}
if fairness_criterion == "equal_opportunity":
# 目标:各群体的TPR(真正率)相等
# 策略:以最低TPR群体为基准,调整其他群体的阈值
# 先找所有群体在默认阈值0.5下的TPR
tprs_at_default = {}
for g in groups:
mask = sensitive == g
y_g = y_true[mask]
p_g = y_prob[mask]
pred_g = (p_g >= 0.5).astype(int)
n_pos = y_g.sum()
tprs_at_default[g] = (pred_g[y_g == 1] == 1).mean() if n_pos > 0 else 0
# 目标TPR:取所有群体TPR的中位数
target_tpr = np.median(list(tprs_at_default.values()))
for g in groups:
mask = sensitive == g
y_g = y_true[mask]
p_g = y_prob[mask]
# 找到使该群体TPR最接近target_tpr的阈值
fprs, tprs, thresh_vals = roc_curve(y_g, p_g)
# 找最接近目标TPR的点
idx = np.argmin(np.abs(tprs - target_tpr))
optimal_threshold = thresh_vals[idx]
thresholds[str(g)] = float(optimal_threshold)
return thresholds
def apply_group_thresholds(y_prob: np.ndarray,
sensitive: np.ndarray,
thresholds: Dict[str, float]) -> np.ndarray:
"""应用群体特定的决策阈值"""
y_pred = np.zeros(len(y_prob), dtype=int)
for group_str, threshold in thresholds.items():
mask = sensitive == type(sensitive[0])(group_str)
y_pred[mask] = (y_prob[mask] >= threshold).astype(int)
return y_pred四、缓解方法的选择框架
graph TD
A["发现公平性问题"] --> B{"问题根源"}
B -->|"数据分布不均"| C["预处理:重采样"]
B -->|"历史标签偏见"| D["预处理:标签重加权或去偏"]
B -->|"特征包含敏感信息"| E["训练中:对抗去偏 或 约束损失"]
B -->|"上线时间紧迫"| F["后处理:阈值校准"]
C --> G["重新评估公平性"]
D --> G
E --> G
F --> G
G -->|"仍不满足"| H["组合多种方法"]
G -->|"满足"| I["记录并上线监控"]没有万能的方法。预处理方法改变了数据分布,可能影响模型整体性能;约束训练需要在性能和公平性间权衡;后处理最快但治标不治本。
实践中,我通常先用后处理快速验证问题是否可解,再评估是否需要从数据层面根治。
