第2076篇:AI应用的数据治理——训练数据、知识库数据的全生命周期管理
大约 8 分钟
第2076篇:AI应用的数据治理——训练数据、知识库数据的全生命周期管理
适读人群:负责AI应用数据管理的工程师和数据负责人 | 阅读时长:约18分钟 | 核心价值:建立AI数据治理体系,从数据采集、清洗、版本管理到质量监控的完整工程实践
AI应用的质量,五成取决于模型,五成取决于数据。
但我见过很多团队,在模型选型上花了很多功夫,在数据治理上却几乎没有投入——RAG的知识库直接把文件扔进去,没有任何质量控制;微调数据随便收集,没有标准化处理。
结果就是:换了更好的模型,效果也没提升,因为数据质量限制了上限。
数据治理的范围
知识库数据管理
/**
* 知识库文档的全生命周期管理
* 从入库、更新到淘汰的完整流程
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class KnowledgeBaseDataManager {
private final DocumentQualityChecker qualityChecker;
private final EmbeddingModel embeddingModel;
private final EmbeddingStore<TextSegment> vectorStore;
private final DocumentRepository documentRepo;
/**
* 文档入库流程
* 包含质量检查、处理、索引的完整链路
*/
public DocumentIngestResult ingestDocument(DocumentIngestRequest request) {
String documentId = UUID.randomUUID().toString();
log.info("开始处理文档: name={}, source={}", request.name(), request.source());
// 1. 质量预检
QualityCheckResult qualityCheck = qualityChecker.check(request.content(), request.name());
if (qualityCheck.hasBlockingIssues()) {
return DocumentIngestResult.rejected(documentId, qualityCheck.issues());
}
// 2. 内容清洗
String cleanedContent = cleanContent(request.content());
// 3. 分块
List<String> chunks = chunkDocument(cleanedContent, request.documentType());
// 4. 生成向量并存储
List<String> vectorIds = new ArrayList<>();
for (int i = 0; i < chunks.size(); i++) {
String chunk = chunks.get(i);
TextSegment segment = TextSegment.from(chunk, Metadata.from(Map.of(
"documentId", documentId,
"documentName", request.name(),
"source", request.source(),
"chunkIndex", String.valueOf(i),
"ingestTime", LocalDateTime.now().toString(),
"version", "1"
)));
Embedding embedding = embeddingModel.embed(chunk);
String vectorId = vectorStore.add(embedding, segment);
vectorIds.add(vectorId);
}
// 5. 记录文档元数据
DocumentRecord record = DocumentRecord.builder()
.documentId(documentId)
.name(request.name())
.source(request.source())
.documentType(request.documentType())
.contentHash(hashContent(cleanedContent))
.chunkCount(chunks.size())
.vectorIds(vectorIds)
.status(DocumentStatus.ACTIVE)
.ingestTime(LocalDateTime.now())
.qualityScore(qualityCheck.score())
.build();
documentRepo.save(record);
log.info("文档入库成功: id={}, chunks={}, warnings={}",
documentId, chunks.size(), qualityCheck.warnings().size());
return DocumentIngestResult.success(documentId, chunks.size(), qualityCheck.warnings());
}
/**
* 文档更新
* 先标记旧版本为过期,再插入新版本
*/
public DocumentIngestResult updateDocument(String documentId, DocumentIngestRequest request) {
DocumentRecord existing = documentRepo.findById(documentId)
.orElseThrow(() -> new DocumentNotFoundException(documentId));
// 检查内容是否真的变化了
String newHash = hashContent(request.content());
if (newHash.equals(existing.getContentHash())) {
log.info("文档内容未变化,跳过更新: id={}", documentId);
return DocumentIngestResult.skipped(documentId, "内容未变化");
}
// 标记旧版本为过期
existing.setStatus(DocumentStatus.SUPERSEDED);
existing.setSupersededAt(LocalDateTime.now());
documentRepo.save(existing);
// 删除旧的向量
for (String vectorId : existing.getVectorIds()) {
vectorStore.remove(vectorId);
}
// 插入新版本(重用documentId但版本号+1)
DocumentIngestRequest newRequest = new DocumentIngestRequest(
request.name(), request.source(), request.content(), request.documentType()
);
DocumentIngestResult result = ingestDocument(newRequest);
log.info("文档更新完成: id={}, old_chunks={}, new_chunks={}",
documentId, existing.getChunkCount(), result.chunkCount());
return result;
}
/**
* 过期文档清理
* 定期清理超过保留期限的文档
*/
@Scheduled(cron = "0 2 * * * ?") // 每天凌晨2点执行
public void cleanupExpiredDocuments() {
LocalDateTime expireBefore = LocalDateTime.now().minusDays(90);
List<DocumentRecord> expiredDocs = documentRepo
.findByStatusAndSupersededAtBefore(DocumentStatus.SUPERSEDED, expireBefore);
for (DocumentRecord doc : expiredDocs) {
try {
// 彻底删除
for (String vectorId : doc.getVectorIds()) {
vectorStore.remove(vectorId);
}
documentRepo.delete(doc);
log.info("清理过期文档: id={}, name={}", doc.getDocumentId(), doc.getName());
} catch (Exception e) {
log.error("清理文档失败: id={}, error={}", doc.getDocumentId(), e.getMessage());
}
}
log.info("过期文档清理完成,共清理{}条", expiredDocs.size());
}
private String cleanContent(String content) {
// 去除重复的空白
String cleaned = content.replaceAll("\\s{3,}", "\n\n");
// 去除HTML标签
cleaned = cleaned.replaceAll("<[^>]+>", "");
// 去除特殊字符
cleaned = cleaned.replaceAll("[\\x00-\\x08\\x0B\\x0C\\x0E-\\x1F]", "");
return cleaned.trim();
}
private List<String> chunkDocument(String content, String documentType) {
// 根据文档类型选择分块策略(简化实现)
int chunkSize = "technical_doc".equals(documentType) ? 500 : 300;
List<String> chunks = new ArrayList<>();
String[] paragraphs = content.split("\n\n+");
StringBuilder current = new StringBuilder();
for (String para : paragraphs) {
if (current.length() + para.length() > chunkSize && current.length() > 0) {
chunks.add(current.toString().trim());
current = new StringBuilder();
}
current.append(para).append("\n\n");
}
if (current.length() > 0) chunks.add(current.toString().trim());
return chunks;
}
private String hashContent(String content) {
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
byte[] hash = md.digest(content.getBytes(StandardCharsets.UTF_8));
return HexFormat.of().formatHex(hash).substring(0, 16);
} catch (Exception e) {
return UUID.randomUUID().toString().substring(0, 16);
}
}
}知识库质量检查
/**
* 文档质量检查器
* 在文档入库前做质量把关
*/
@Component
@Slf4j
public class DocumentQualityChecker {
private static final int MIN_CONTENT_LENGTH = 100;
private static final int MAX_CONTENT_LENGTH = 500_000;
private static final double MIN_CHINESE_RATIO = 0.1; // 至少10%中文(中文知识库)
public QualityCheckResult check(String content, String documentName) {
List<String> issues = new ArrayList<>();
List<String> warnings = new ArrayList<>();
double score = 100.0;
// 1. 长度检查
if (content == null || content.trim().isEmpty()) {
issues.add("文档内容为空");
return new QualityCheckResult(issues, warnings, 0);
}
int length = content.trim().length();
if (length < MIN_CONTENT_LENGTH) {
issues.add(String.format("文档内容过短(%d字符,最少需要%d字符)", length, MIN_CONTENT_LENGTH));
score -= 30;
}
if (length > MAX_CONTENT_LENGTH) {
warnings.add(String.format("文档内容过长(%d字符),建议拆分", length));
score -= 5;
}
// 2. 重复内容检测
double repetitionRate = calculateRepetitionRate(content);
if (repetitionRate > 0.5) {
issues.add(String.format("文档包含大量重复内容(%.0f%%),可能是格式错误", repetitionRate * 100));
score -= 40;
} else if (repetitionRate > 0.3) {
warnings.add(String.format("文档包含较多重复内容(%.0f%%)", repetitionRate * 100));
score -= 10;
}
// 3. 乱码检测(检测大量无意义字符)
double garbledRate = calculateGarbledRate(content);
if (garbledRate > 0.3) {
issues.add(String.format("文档可能包含乱码(%.0f%%无效字符)", garbledRate * 100));
score -= 30;
}
// 4. 时效性检测(从内容推断是否过旧)
if (containsObsoleteContent(content)) {
warnings.add("文档可能包含过时的信息(检测到历史年份引用),建议人工确认");
score -= 10;
}
// 5. 结构检测
if (!hasReasonableStructure(content)) {
warnings.add("文档结构可能不完整,缺少段落分隔");
score -= 5;
}
return new QualityCheckResult(issues, warnings, Math.max(0, score));
}
private double calculateRepetitionRate(String content) {
String[] sentences = content.split("[。!?\n]");
Set<String> uniqueSentences = new HashSet<>(Arrays.asList(sentences));
return 1.0 - (double) uniqueSentences.size() / sentences.length;
}
private double calculateGarbledRate(String content) {
long invalidChars = content.chars()
.filter(c -> c < 0x20 && c != '\n' && c != '\r' && c != '\t')
.count();
return (double) invalidChars / content.length();
}
private boolean containsObsoleteContent(String content) {
// 检测超过3年前的年份引用(可能内容过时)
int currentYear = LocalDate.now().getYear();
Pattern yearPattern = Pattern.compile("20(1[0-9]|20|21|22)年"); // 2010-2022
Matcher matcher = yearPattern.matcher(content);
int obsoleteCount = 0;
while (matcher.find()) {
int year = Integer.parseInt("20" + matcher.group(1));
if (currentYear - year > 3) obsoleteCount++;
}
return obsoleteCount > 5;
}
private boolean hasReasonableStructure(String content) {
return content.contains("\n") && content.length() > 200;
}
public record QualityCheckResult(
List<String> issues,
List<String> warnings,
double score
) {
public boolean hasBlockingIssues() { return !issues.isEmpty(); }
}
}训练数据管理
/**
* 微调训练数据的版本管理
* 跟踪每个训练集的来源、质量和使用记录
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class TrainingDataManager {
private final TrainingDataRepository dataRepo;
private final ObjectMapper objectMapper;
/**
* 创建训练数据集版本
*/
public TrainingDataset createDataset(
String name,
String description,
List<TrainingSample> samples,
String createdBy) {
// 计算数据集统计信息
DatasetStats stats = calculateStats(samples);
// 质量检查
List<String> qualityIssues = checkDataQuality(samples);
TrainingDataset dataset = TrainingDataset.builder()
.datasetId(UUID.randomUUID().toString())
.name(name)
.description(description)
.version(getNextVersion(name))
.sampleCount(samples.size())
.stats(stats)
.qualityIssues(qualityIssues)
.createdBy(createdBy)
.createdAt(LocalDateTime.now())
.status(qualityIssues.isEmpty() ?
DatasetStatus.READY : DatasetStatus.NEEDS_REVIEW)
.build();
// 保存样本到文件(用内容哈希去重)
String dataPath = saveToFile(dataset.getDatasetId(), samples);
dataset.setDataPath(dataPath);
dataRepo.save(dataset);
log.info("训练数据集创建: name={}, version={}, samples={}, issues={}",
name, dataset.getVersion(), samples.size(), qualityIssues.size());
return dataset;
}
/**
* 数据质量检查
*/
private List<String> checkDataQuality(List<TrainingSample> samples) {
List<String> issues = new ArrayList<>();
// 1. 样本量检查
if (samples.size() < 100) {
issues.add("样本量不足(" + samples.size() + "条,建议至少100条)");
}
// 2. 输出长度分布检查
IntSummaryStatistics lengthStats = samples.stream()
.mapToInt(s -> s.output().length())
.summaryStatistics();
if (lengthStats.getAverage() < 50) {
issues.add("平均输出长度过短(" + String.format("%.0f", lengthStats.getAverage()) + "字符),回答可能质量不足");
}
// 3. 重复样本检测
Set<String> inputs = new HashSet<>();
long duplicates = samples.stream()
.filter(s -> !inputs.add(s.input()))
.count();
if (duplicates > samples.size() * 0.05) {
issues.add(String.format("存在%.0f%%重复输入,可能影响训练效果",
(double) duplicates / samples.size() * 100));
}
// 4. 检查是否包含敏感信息
long sensitiveSamples = samples.stream()
.filter(s -> containsSensitiveInfo(s.input() + s.output()))
.count();
if (sensitiveSamples > 0) {
issues.add("发现" + sensitiveSamples + "条样本可能包含敏感信息,请确认脱敏");
}
return issues;
}
private boolean containsSensitiveInfo(String text) {
// 简单检测:手机号、身份证、银行卡
Pattern sensitive = Pattern.compile(
"1[3-9]\\d{9}|\\d{18}|\\d{16,19}",
Pattern.CASE_INSENSITIVE
);
return sensitive.matcher(text).find();
}
private DatasetStats calculateStats(List<TrainingSample> samples) {
IntSummaryStatistics inputStats = samples.stream()
.mapToInt(s -> s.input().length()).summaryStatistics();
IntSummaryStatistics outputStats = samples.stream()
.mapToInt(s -> s.output().length()).summaryStatistics();
return new DatasetStats(
(int) inputStats.getAverage(), inputStats.getMax(),
(int) outputStats.getAverage(), outputStats.getMax()
);
}
private int getNextVersion(String name) {
return dataRepo.findMaxVersionByName(name)
.map(v -> v + 1)
.orElse(1);
}
private String saveToFile(String datasetId, List<TrainingSample> samples) {
String path = "/data/training/" + datasetId + ".jsonl";
// 实际保存逻辑
return path;
}
public record TrainingSample(String input, String output, String category) {}
public record DatasetStats(
int avgInputLength, int maxInputLength,
int avgOutputLength, int maxOutputLength
) {}
@Data @Builder
public static class TrainingDataset {
private String datasetId;
private String name;
private String description;
private int version;
private int sampleCount;
private DatasetStats stats;
private List<String> qualityIssues;
private String dataPath;
private String createdBy;
private LocalDateTime createdAt;
private DatasetStatus status;
}
public enum DatasetStatus { READY, NEEDS_REVIEW, APPROVED, REJECTED }
}用户交互数据的合规使用
/**
* 用户交互数据的合规收集和使用
* 遵循最小必要原则和用户授权
*/
@Service
@RequiredArgsConstructor
public class UserInteractionDataService {
private final UserConsentRepository consentRepo;
private final InteractionLogRepository logRepo;
/**
* 记录用户交互(仅限有授权的用户)
*/
public void logInteraction(String userId, String question, String answer) {
// 检查用户是否授权了数据收集
UserConsent consent = consentRepo.findByUserId(userId)
.orElse(null);
if (consent == null || !consent.isAllowDataCollection()) {
// 没有授权,不记录原始内容
// 只记录匿名统计(不含用户信息)
logAnonymousStats(question.length(), answer.length());
return;
}
// 有授权才记录完整内容
InteractionLog log = InteractionLog.builder()
.logId(UUID.randomUUID().toString())
.userId(userId)
.questionHash(hashContent(question)) // 存哈希用于去重
.questionPreview(question.substring(0, Math.min(50, question.length())))
.answerLength(answer.length())
.logTime(LocalDateTime.now())
.consentVersion(consent.getVersion()) // 记录授权版本
.retentionExpiry(LocalDateTime.now().plusDays(365)) // 数据保留1年
.build();
logRepo.save(log);
}
/**
* 导出用于训练的数据(经过脱敏和质量过滤)
*/
public List<TrainingDataManager.TrainingSample> exportForTraining(
LocalDate from, LocalDate to) {
// 只导出有明确授权的数据
List<InteractionLog> logs = logRepo.findByDateRangeWithConsent(from, to);
return logs.stream()
.filter(l -> l.isConsented())
.map(l -> new TrainingDataManager.TrainingSample(
l.getQuestionPreview(), // 只有预览,不是完整问题
"", // 答案需要人工标注
l.getCategory()
))
.toList();
}
private void logAnonymousStats(int questionLength, int answerLength) {
// 只记录长度统计,不记录内容
}
private String hashContent(String content) {
try {
return java.security.MessageDigest.getInstance("SHA-256")
.digest(content.getBytes(StandardCharsets.UTF_8)).toString();
} catch (Exception e) {
return "";
}
}
}数据治理不是一次性的工作,而是需要持续运营的基础设施。建立好这套机制后,最重要的是坚持执行——每次有新文档要入库,必须过质量检查;每次收集训练数据,必须有脱敏流程。只有这样,AI应用才能持续保持高质量。
