第1748篇:数据治理框架在AI项目中的落地——元数据管理与数据资产目录
第1748篇:数据治理框架在AI项目中的落地——元数据管理与数据资产目录
前些天参加了一个 AI 工程师交流会,有人提了一个问题:"你们公司的 AI 团队,有几个人知道公司里总共有多少张数据库表、每张表是干什么的?"
全场沉默了一会儿,然后大家都笑了,因为每个人心里都知道答案:没几个人。
这不是笑话,这是大多数公司数据现状的真实写照。数据分散在各个系统里,没有人统一管理,要用某份数据的时候靠口口相传找到负责人问,数据质量靠运气,数据格式靠猜,数据是否还在用靠经验。
在传统 BI 时代,这种状况也许还能凑合。但到了 AI 时代,这种混乱是致命的——训练数据找不到来源、特征工程重复造轮子、新来的工程师完全摸不清数据脉络。数据治理不再是"有时间再搞的可选项",而是 AI 项目的基础设施之一。
一、数据治理的核心组成
先定义一下范围,这篇文章聚焦的是 AI 项目里最需要的几块:
元数据管理:描述数据的数据。包括技术元数据(字段名、类型、大小)、业务元数据(字段含义、业务规则、负责人)、操作元数据(创建时间、最后更新、访问频率)。
数据资产目录:把企业里所有的数据资产(表、文件、API、特征、模型)整理成一个可检索的目录,让人能快速发现需要的数据。
数据质量管理:定义数据质量规则,持续监控数据是否符合规则,有问题及时告警。
数据访问控制:谁能访问哪些数据,访问记录审计。
二、元数据管理的实现
2.1 技术元数据的自动采集
手工录入元数据是死路一条——太耗人力,而且很快就会过时。技术元数据必须自动采集。
@Service
public class MetadataCollectorService {
@Autowired
private DataSourceRegistry dataSourceRegistry;
@Autowired
private MetadataRepository metadataRepository;
/**
* 自动扫描 MySQL 数据库,采集表和字段的技术元数据
*/
public void collectMysqlMetadata(String dataSourceName) {
DataSource ds = dataSourceRegistry.getDataSource(dataSourceName);
try (Connection conn = ds.getConnection()) {
DatabaseMetaData dbMeta = conn.getMetaData();
// 采集所有表
ResultSet tables = dbMeta.getTables(null, null, "%",
new String[]{"TABLE"});
while (tables.next()) {
String tableName = tables.getString("TABLE_NAME");
String tableComment = tables.getString("REMARKS");
TableMetadata tableMeta = new TableMetadata();
tableMeta.setDataSourceName(dataSourceName);
tableMeta.setTableName(tableName);
tableMeta.setComment(tableComment);
tableMeta.setCollectedAt(LocalDateTime.now());
// 采集表统计信息(行数、大小)
enrichTableStats(conn, dataSourceName, tableName, tableMeta);
// 采集字段信息
List<ColumnMetadata> columns = collectColumns(
dbMeta, tableName);
tableMeta.setColumns(columns);
metadataRepository.saveTableMetadata(tableMeta);
}
} catch (SQLException e) {
log.error("采集元数据失败: dataSource={}", dataSourceName, e);
}
}
private void enrichTableStats(Connection conn, String dataSource,
String tableName, TableMetadata meta) {
try {
// 查询行数(通过 information_schema,避免 count(*) 的开销)
String rowCountQuery = "SELECT TABLE_ROWS, DATA_LENGTH " +
"FROM information_schema.TABLES " +
"WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ?";
try (PreparedStatement ps = conn.prepareStatement(rowCountQuery)) {
ps.setString(1, tableName);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
meta.setEstimatedRowCount(rs.getLong("TABLE_ROWS"));
meta.setDataSizeBytes(rs.getLong("DATA_LENGTH"));
}
}
} catch (SQLException e) {
log.warn("获取表统计信息失败: {}", tableName, e);
}
}
private List<ColumnMetadata> collectColumns(
DatabaseMetaData dbMeta, String tableName) throws SQLException {
List<ColumnMetadata> columns = new ArrayList<>();
ResultSet cols = dbMeta.getColumns(null, null, tableName, null);
while (cols.next()) {
ColumnMetadata col = new ColumnMetadata();
col.setColumnName(cols.getString("COLUMN_NAME"));
col.setDataType(cols.getString("TYPE_NAME"));
col.setColumnSize(cols.getInt("COLUMN_SIZE"));
col.setNullable("YES".equals(cols.getString("IS_NULLABLE")));
col.setComment(cols.getString("REMARKS"));
col.setOrdinalPosition(cols.getInt("ORDINAL_POSITION"));
columns.add(col);
}
return columns;
}
/**
* 采集 Kafka Topic 的元数据
*/
public void collectKafkaMetadata(String bootstrapServers) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
try (AdminClient adminClient = AdminClient.create(props)) {
ListTopicsResult topicList = adminClient.listTopics();
Set<String> topicNames = topicList.names().get();
for (String topicName : topicNames) {
TopicMetadata topicMeta = new TopicMetadata();
topicMeta.setTopicName(topicName);
topicMeta.setAssetType("KAFKA_TOPIC");
// 获取 Topic 描述(分区数、副本因子)
DescribeTopicsResult desc = adminClient.describeTopics(
Collections.singleton(topicName));
TopicDescription topicDesc = desc.values().get(topicName).get();
topicMeta.setPartitionCount(topicDesc.partitions().size());
metadataRepository.saveTopicMetadata(topicMeta);
}
} catch (Exception e) {
log.error("采集 Kafka 元数据失败", e);
}
}
}2.2 业务元数据的结构化存储
技术元数据是自动的,但业务含义(这个字段是什么意思、有什么业务规则)必须由人来补充。关键是要设计一个让业务人员容易填写的界面和数据结构:
@Entity
@Table(name = "business_metadata")
public class BusinessMetadata {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
// 关联到技术元数据
private String assetType; // TABLE, COLUMN, KAFKA_TOPIC, FEATURE, MODEL
private String assetId; // 技术元数据的唯一 ID
// 业务属性
private String businessName; // 业务名称(中文)
private String businessDesc; // 业务描述
private String businessOwner; // 数据负责人(邮件/工号)
private String ownerTeam; // 所属团队
// 数据分类分级(用于访问控制)
private String dataClassification; // PUBLIC, INTERNAL, CONFIDENTIAL, SECRET
private String dataCategory; // 个人信息, 财务数据, 运营数据, etc.
// 业务规则
@Column(columnDefinition = "TEXT")
private String businessRules; // JSON 格式的规则列表
// 使用场景
@Column(columnDefinition = "TEXT")
private String useCases; // 这份数据用在哪些业务场景
// 关联的 AI 特征(如果这个字段被用于特征工程)
private String relatedFeatures; // 逗号分隔的特征名
private LocalDateTime lastUpdatedAt;
private String lastUpdatedBy;
private Integer completenessScore; // 元数据完整度(0-100),自动计算
}2.3 元数据完整度评分
鼓励团队填写元数据,要有可量化的指标:
@Component
public class MetadataCompletenessEvaluator {
private static final Map<String, Integer> FIELD_WEIGHTS = Map.of(
"businessName", 20,
"businessDesc", 20,
"businessOwner", 15,
"dataClassification", 15,
"businessRules", 15,
"useCases", 15
);
public int computeScore(BusinessMetadata meta) {
int score = 0;
if (StringUtils.hasText(meta.getBusinessName()))
score += FIELD_WEIGHTS.get("businessName");
if (StringUtils.hasText(meta.getBusinessDesc())
&& meta.getBusinessDesc().length() >= 20)
score += FIELD_WEIGHTS.get("businessDesc");
if (StringUtils.hasText(meta.getBusinessOwner()))
score += FIELD_WEIGHTS.get("businessOwner");
if (StringUtils.hasText(meta.getDataClassification()))
score += FIELD_WEIGHTS.get("dataClassification");
if (StringUtils.hasText(meta.getBusinessRules()))
score += FIELD_WEIGHTS.get("businessRules");
if (StringUtils.hasText(meta.getUseCases()))
score += FIELD_WEIGHTS.get("useCases");
return score;
}
/**
* 每周生成元数据覆盖度报告
*/
@Scheduled(cron = "0 0 9 * * MON")
public void generateWeeklyCoverageReport() {
List<TableMetadata> allTables = metadataRepository.findAllTables();
long totalTables = allTables.size();
long tablesWithBusiness = allTables.stream()
.filter(t -> metadataRepository.hasBusinessMetadata(t.getId()))
.count();
double coverageRate = totalTables > 0 ?
(double) tablesWithBusiness / totalTables : 0;
// 找出低质量元数据(得分 < 60 的)
List<String> lowQualityAssets = allTables.stream()
.map(t -> metadataRepository.findBusinessMetadata(t.getId()))
.filter(Optional::isPresent)
.map(Optional::get)
.filter(m -> m.getCompletenessScore() < 60)
.map(m -> m.getAssetId())
.collect(Collectors.toList());
log.info("元数据覆盖率: {}/{} ({:.1f}%), 低质量: {}",
tablesWithBusiness, totalTables, coverageRate * 100,
lowQualityAssets.size());
// 推送报告给数据负责人
notifyDataOwners(coverageRate, lowQualityAssets);
}
}三、数据资产目录的实现
3.1 数据资产的统一模型
AI 项目涉及的数据资产类型很多:数据库表、Kafka 消息、特征、模型、向量库集合、API 接口……要把它们统一纳入一个目录,需要一个通用的资产模型。
@Entity
@Table(name = "data_asset")
public class DataAsset {
@Id
private String assetId; // 全局唯一(UUID)
private String assetType; // TABLE, KAFKA_TOPIC, FEATURE, MODEL, VECTOR_COLLECTION, API
// 基础信息
private String assetName; // 资产名称
private String displayName; // 友好展示名(中文)
private String description; // 简短描述
private String systemName; // 所属系统(如 ORDER_DB, RECOMMEND_SERVICE)
// 所有权
private String ownerEmail;
private String ownerTeam;
// 分类标签(多个,逗号分隔)
private String tags;
// 数据敏感级别
private String sensitivityLevel; // L1(公开)到 L4(机密)
// 状态
private String status; // ACTIVE, DEPRECATED, ARCHIVED
// 使用统计
private Long viewCount; // 被查看次数
private Long referencedByCount; // 被多少个下游任务引用
// 技术属性(JSON 格式,根据类型不同)
@Column(columnDefinition = "TEXT")
private String technicalAttributes;
private LocalDateTime createdAt;
private LocalDateTime lastModifiedAt;
private LocalDateTime lastAccessedAt;
}3.2 全文检索能力
数据目录的核心价值是"可发现性"——工程师能快速找到需要的数据。全文搜索是基础能力,这里用 Elasticsearch 来实现:
@Service
public class DataAssetSearchService {
@Autowired
private ElasticsearchRestTemplate esTemplate;
private static final String INDEX_NAME = "data_assets";
/**
* 搜索数据资产(支持名称、描述、标签的全文搜索)
*/
public SearchResult<DataAsset> search(DataAssetSearchRequest request) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 主搜索条件:在名称、描述、标签中搜索
if (StringUtils.hasText(request.getKeyword())) {
boolQuery.must(QueryBuilders.multiMatchQuery(request.getKeyword())
.field("displayName", 3.0f) // 名称权重最高
.field("description", 2.0f)
.field("tags", 1.5f)
.field("assetName", 1.0f)
.type(MultiMatchQueryBuilder.Type.BEST_FIELDS)
.fuzziness(Fuzziness.AUTO)); // 支持模糊匹配
}
// 过滤条件
if (request.getAssetType() != null) {
boolQuery.filter(QueryBuilders.termQuery(
"assetType", request.getAssetType()));
}
if (request.getOwnerTeam() != null) {
boolQuery.filter(QueryBuilders.termQuery(
"ownerTeam", request.getOwnerTeam()));
}
if (!request.getStatus().isEmpty()) {
boolQuery.filter(QueryBuilders.termsQuery(
"status", request.getStatus()));
}
// 构建查询
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(boolQuery)
.withSort(SortBuilders
.fieldSort("referencedByCount")
.order(SortOrder.DESC)) // 按引用数排序(热门优先)
.withPageable(PageRequest.of(
request.getPage(), request.getPageSize()))
.withHighlightFields(
new HighlightBuilder.Field("description"),
new HighlightBuilder.Field("tags"))
.build();
SearchHits<DataAsset> hits = esTemplate.search(
searchQuery, DataAsset.class, IndexCoordinates.of(INDEX_NAME));
return SearchResult.from(hits);
}
/**
* 获取数据资产的相关推荐(相似资产)
*/
public List<DataAsset> getRelatedAssets(String assetId) {
DataAsset asset = findById(assetId);
// 使用 MLT(More Like This)查询找相似资产
MoreLikeThisQueryBuilder mltQuery = QueryBuilders
.moreLikeThisQuery(
new String[]{"description", "tags"},
null,
new MoreLikeThisQueryBuilder.Item[]{
new MoreLikeThisQueryBuilder.Item(INDEX_NAME, assetId)
})
.minTermFreq(1)
.maxQueryTerms(12);
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(mltQuery)
.withPageable(PageRequest.of(0, 5))
.build();
return esTemplate.search(query, DataAsset.class,
IndexCoordinates.of(INDEX_NAME))
.map(SearchHit::getContent)
.toList();
}
}3.3 数据资产的访问记录与热度统计
@Service
public class DataAssetUsageTracker {
@Autowired
private DataAssetRepository assetRepository;
/**
* 记录资产访问(异步,不影响主链路)
*/
@Async
public void recordAccess(String assetId, String accessType,
String accessorId) {
// 更新访问统计
assetRepository.incrementViewCount(assetId);
assetRepository.updateLastAccessedAt(assetId, LocalDateTime.now());
// 记录详细访问日志(用于审计)
DataAccessLog log = new DataAccessLog();
log.setAssetId(assetId);
log.setAccessType(accessType); // VIEW, DOWNLOAD, QUERY, TRAIN
log.setAccessorId(accessorId);
log.setAccessedAt(LocalDateTime.now());
log.setClientIp(getCurrentClientIp());
accessLogRepository.save(log);
}
/**
* 统计资产的引用关系(有多少上下游任务用到了这个资产)
*/
@Scheduled(cron = "0 0 3 * * ?") // 每天凌晨 3 点更新
public void updateReferenceCount() {
List<DataAsset> allAssets = assetRepository.findAll();
for (DataAsset asset : allAssets) {
// 从血缘图中查询引用数
long downstreamCount = lineageService
.countDownstreamNodes(asset.getAssetId());
assetRepository.updateReferencedByCount(
asset.getAssetId(), downstreamCount);
}
}
}四、数据质量规则引擎
数据治理少不了数据质量的持续监控:
@Service
public class DataQualityRuleEngine {
@Autowired
private DataQualityRuleRepository ruleRepository;
/**
* 执行数据质量检查
*/
public DataQualityReport runQualityCheck(String datasetId,
List<Map<String, Object>> data) {
List<DataQualityRule> rules = ruleRepository
.findByDatasetId(datasetId);
DataQualityReport report = new DataQualityReport();
report.setDatasetId(datasetId);
report.setCheckedAt(LocalDateTime.now());
report.setTotalRecords(data.size());
List<RuleCheckResult> results = new ArrayList<>();
for (DataQualityRule rule : rules) {
RuleCheckResult result = executeRule(rule, data);
results.add(result);
}
report.setRuleResults(results);
// 计算综合质量分
double avgScore = results.stream()
.mapToDouble(RuleCheckResult::getPassRate)
.average()
.orElse(1.0);
report.setOverallQualityScore(avgScore);
return report;
}
private RuleCheckResult executeRule(DataQualityRule rule,
List<Map<String, Object>> data) {
int passCount = 0;
List<String> failedRecordIds = new ArrayList<>();
for (Map<String, Object> record : data) {
boolean pass = evaluateRule(rule, record);
if (pass) {
passCount++;
} else {
failedRecordIds.add(String.valueOf(record.get("id")));
}
}
double passRate = (double) passCount / data.size();
boolean passed = passRate >= rule.getThreshold();
RuleCheckResult result = new RuleCheckResult();
result.setRuleId(rule.getId());
result.setRuleName(rule.getName());
result.setPassRate(passRate);
result.setPassed(passed);
result.setFailedCount(data.size() - passCount);
result.setFailedRecordSample(failedRecordIds.subList(
0, Math.min(10, failedRecordIds.size())));
return result;
}
private boolean evaluateRule(DataQualityRule rule,
Map<String, Object> record) {
String fieldName = rule.getFieldName();
Object value = record.get(fieldName);
switch (rule.getRuleType()) {
case "NOT_NULL":
return value != null && !value.toString().isEmpty();
case "VALUE_RANGE":
if (value == null) return false;
double numVal = Double.parseDouble(value.toString());
return numVal >= rule.getMinValue() &&
numVal <= rule.getMaxValue();
case "REGEX_MATCH":
return value != null &&
value.toString().matches(rule.getRegexPattern());
case "VALUE_IN_SET":
Set<String> allowedValues = new HashSet<>(rule.getAllowedValues());
return value != null &&
allowedValues.contains(value.toString());
case "UNIQUE":
// 跨记录的唯一性检查需要在外层统计,这里简化处理
return true;
default:
log.warn("未知规则类型: {}", rule.getRuleType());
return true;
}
}
}五、Apache Atlas 的集成
对于规模较大的团队,自建元数据管理系统成本高。Apache Atlas 是 Hadoop 生态里成熟的数据治理工具,可以直接集成。
@Service
public class ApacheAtlasClient {
private final AtlasClientV2 atlasClient;
public ApacheAtlasClient(String atlasUrl, String username, String password) {
String[] urls = {atlasUrl};
atlasClient = new AtlasClientV2(urls, new String[]{username, password});
}
/**
* 注册 AI 模型到 Atlas
*/
public AtlasEntity.AtlasEntityWithExtInfo registerModel(
String modelName, String modelVersion,
String trainingDatasetId, Map<String, Object> attrs) {
AtlasEntity modelEntity = new AtlasEntity("ml_model");
modelEntity.setAttribute("name", modelName);
modelEntity.setAttribute("version", modelVersion);
modelEntity.setAttribute("modelType", attrs.get("modelType"));
modelEntity.setAttribute("accuracy", attrs.get("accuracy"));
modelEntity.setAttribute("trainingDataset",
new AtlasObjectId("dataset", "qualifiedName", trainingDatasetId));
modelEntity.setAttribute("qualifiedName",
modelName + "@" + modelVersion);
try {
EntityMutationResponse response = atlasClient.createEntity(
new AtlasEntity.AtlasEntitiesWithExtInfo(modelEntity));
log.info("模型注册到 Atlas: {}", response.getCreatedEntities());
} catch (AtlasServiceException e) {
log.error("Atlas 注册失败", e);
}
return new AtlasEntity.AtlasEntityWithExtInfo(modelEntity);
}
}六、落地时的建议和踩坑
坑一:一开始就想建完整的治理平台,结果什么都没建起来
数据治理是持续工程,不是一个项目。建议从最小可行版本开始:先把所有数据资产名称和负责人记录到一张表里,这本身就是巨大进步。然后逐步加字段、加规则、加自动化。
坑二:元数据没人维护,很快就失效
建了目录,没人填,填了没人更新,两三个月后目录和现实完全脱节,比没有更糟糕。
解决方案:把元数据完整度和团队 OKR 绑定,在代码 Review 流程里要求新增数据资产必须更新目录,技术元数据全部自动化采集。
坑三:数据分级没有做,所有人能访问所有数据
没有分级控制的数据目录,把敏感数据暴露给了不应该访问的人。数据治理的第一天就应该建立数据分级标准,哪怕就简单的三级:公开、内部、敏感。
七、小结
数据治理不是 AI 团队的"额外工作",它是让 AI 项目可持续运作的基础设施。没有治理框架,每次新项目都要重新挖掘数据资产,每次排查问题都不知道从哪下手,每次数据问题都要拉一堆人对齐。
建立了治理框架之后,这些摩擦会大幅减少,团队的有效产出才能真正提升。
