AI应用的数据血缘追踪:了解每条数据从哪来到哪去
AI应用的数据血缘追踪:了解每条数据从哪来到哪去
date: 2026-10-20 tags: [数据血缘, 数据治理, Apache Atlas, Spring AI, Java]
开篇故事:3小时定位根因,靠的是血缘追踪
2025年10月某个周五下午4点,某大型零售企业的AI风控系统突然给1.2万名正常用户打上了"高风险欺诈"标签,导致这些用户的订单被自动拦截。
客服投诉电话被打爆了。
技术负责人赵磊带着团队紧急排查。这个AI风控模型已经稳定运行8个月,从未出现类似问题。那么,这次的错误结论从哪里来的?
没有数据血缘系统时,排查过程是这样的:
- 查日志:风控模型日志只记录了最终评分,没有中间数据
- 查数据库:发现结果表的数据在3天前被重新计算过,但不知道用的什么数据
- 问数据团队:特征工程管道昨天更新了一个用户行为特征的计算逻辑
- 猜测:可能是特征计算的问题,但到底是哪个特征,不确定
- 结果:排查了6个小时,修复了猜测的问题,第二天发现还有错误
有了数据血缘系统后:
赵磊打开血缘追踪控制台,输入其中一个被误判用户的ID,点击"追溯"。
系统在2秒内展示了完整的数据链路图:
用户ID: U123456
↓ 读取自 user_behavior_feature表 (version: 2025-10-13-v2)
↓ 计算于 feature_pipeline job #4821 (2025-10-13 02:15)
↓ 原始数据来自 user_click_log表
↓ 数据清洗 by etl_job #9912 (2025-10-13 00:30)
[!] 发现异常:此版本ETL使用了错误的时区配置(UTC+0 vs UTC+8)
导致用户行为时间窗口偏移8小时
影响范围:12,417名用户3小时后,问题被精准修复:回滚ETL job,重新计算受影响用户的特征,风控模型重新评估,所有误判用户恢复正常。
这就是数据血缘追踪的价值——在AI系统复杂度越来越高的今天,它是你的"数据GPS"。
一、数据血缘在AI系统中的重要性
1.1 AI系统的数据链路比传统系统复杂10倍
传统系统的数据流相对简单:
用户输入 → 数据库 → 展示AI系统的数据流:
如果最终结论有问题,问题可能出在链路中的任何一环。没有血缘追踪,你根本不知道从哪查起。
1.2 AI数据血缘的五大应用场景
| 场景 | 描述 | 没有血缘的代价 |
|---|---|---|
| 错误诊断 | 追溯错误AI结论的根因 | 排查时间×10 |
| 模型审计 | 监管要求说明AI决策依据 | 合规风险 |
| 数据删除(GDPR) | 删除用户数据时找到所有衍生数据 | 合规违规 |
| 影响分析 | 修改一个数据源时评估下游影响 | 改出更多问题 |
| 质量追踪 | 追溯质量差的AI输出使用了哪些数据 | 无法系统改进 |
二、AI数据血缘的特殊性
2.1 需要追踪的四个维度
// 血缘记录的核心数据结构
@Data
@Builder
public class AiLineageRecord {
// === 维度1:Prompt血缘 ===
private String promptId; // Prompt版本标识
private String promptTemplate; // 使用的Prompt模板
private String promptVersion; // 模板版本号(如v1.2.3)
private Map<String, String> promptVariables; // 填充的变量
// === 维度2:输入数据血缘 ===
private List<DataSource> inputSources; // 输入数据来源
private String inputDataHash; // 输入数据的MD5(用于重现)
// === 维度3:检索血缘(RAG场景) ===
private List<RetrievedDocument> retrievedDocs; // 参与检索的文档
private String embeddingModelVersion; // 用于检索的向量模型
private double similarityThreshold; // 检索相似度阈值
// === 维度4:模型血缘 ===
private String modelId; // eg: gpt-4o-2024-08-06
private String modelVersion; // 细化版本
private ModelParameters parameters; // temperature, max_tokens等
// === 输出血缘 ===
private String outputId; // 本次推理的唯一ID
private String outputHash; // 输出内容的哈希
private LocalDateTime inferenceTime; // 推理时间
private long latencyMs;
@Data
@Builder
public static class DataSource {
private String sourceType; // TABLE, FILE, API, COMPUTED
private String sourceName; // eg: user_behavior_feature
private String sourceVersion; // eg: 2025-10-13-v2
private String query; // 查询条件(如果是SQL)
private LocalDateTime dataAsOf; // 数据的时间点
}
@Data
@Builder
public static class RetrievedDocument {
private String docId;
private String docTitle;
private String docSource; // 来自哪个知识库
private double similarity;
private String chunkId; // 具体的文档分块
private LocalDateTime indexedAt; // 该文档的索引时间
}
@Data
@Builder
public static class ModelParameters {
private double temperature;
private int maxTokens;
private double topP;
private String stopSequence;
}
}三、Apache Atlas集成
3.1 Atlas实体类型定义
// AtlasLineageService.java
@Service
@RequiredArgsConstructor
@Slf4j
public class AtlasLineageService {
private final RestTemplate atlasRestTemplate;
@Value("${atlas.base-url:http://localhost:21000}")
private String atlasBaseUrl;
/**
* 在Atlas中注册AI推理实体类型(只需执行一次)
*/
public void registerEntityTypes() {
String typeDefinitionJson = """
{
"entityDefs": [
{
"name": "ai_inference",
"superTypes": ["Process"],
"attributeDefs": [
{"name": "modelId", "typeName": "string", "isOptional": false},
{"name": "modelVersion", "typeName": "string"},
{"name": "promptVersion", "typeName": "string"},
{"name": "temperature", "typeName": "float"},
{"name": "inputDataHash", "typeName": "string"},
{"name": "outputHash", "typeName": "string"},
{"name": "latencyMs", "typeName": "long"},
{"name": "inferenceTime", "typeName": "date"}
]
},
{
"name": "ai_prompt_template",
"superTypes": ["DataSet"],
"attributeDefs": [
{"name": "templateContent", "typeName": "string", "isOptional": false},
{"name": "version", "typeName": "string"},
{"name": "variables", "typeName": "array<string>"}
]
},
{
"name": "vector_chunk",
"superTypes": ["DataSet"],
"attributeDefs": [
{"name": "sourceDocId", "typeName": "string"},
{"name": "chunkIndex", "typeName": "int"},
{"name": "embeddingModel", "typeName": "string"},
{"name": "similarity", "typeName": "float"}
]
}
]
}
""";
atlasRestTemplate.postForEntity(
atlasBaseUrl + "/api/atlas/v2/types/typedefs",
typeDefinitionJson,
String.class
);
log.info("AI entity types registered in Atlas");
}
/**
* 记录一次AI推理的完整血缘
*/
public String recordInferenceLineage(AiLineageRecord record) {
// 1. 创建推理实体
String inferenceGuid = createInferenceEntity(record);
// 2. 创建/获取输入数据实体
List<String> inputGuids = record.getInputSources().stream()
.map(this::createOrGetDataSourceEntity)
.collect(Collectors.toList());
// 3. 创建/获取Prompt模板实体
String promptGuid = createOrGetPromptEntity(record);
// 4. 如果是RAG,记录检索文档
List<String> docGuids = Collections.emptyList();
if (record.getRetrievedDocs() != null && !record.getRetrievedDocs().isEmpty()) {
docGuids = record.getRetrievedDocs().stream()
.map(doc -> createRetrievedDocEntity(doc, inferenceGuid))
.collect(Collectors.toList());
}
// 5. 建立血缘关系
createLineageRelationships(inferenceGuid, inputGuids, promptGuid, docGuids);
log.info("Lineage recorded for inference: {}", inferenceGuid);
return inferenceGuid;
}
private String createInferenceEntity(AiLineageRecord record) {
Map<String, Object> entity = new HashMap<>();
entity.put("typeName", "ai_inference");
Map<String, Object> attrs = new HashMap<>();
attrs.put("qualifiedName", "ai_inference/" + record.getOutputId());
attrs.put("name", "AI Inference " + record.getOutputId());
attrs.put("modelId", record.getModelId());
attrs.put("modelVersion", record.getModelVersion());
attrs.put("promptVersion", record.getPromptVersion());
attrs.put("inputDataHash", record.getInputDataHash());
attrs.put("outputHash", record.getOutputHash());
attrs.put("latencyMs", record.getLatencyMs());
attrs.put("inferenceTime", record.getInferenceTime().toString());
entity.put("attributes", attrs);
return callAtlasCreateEntity(entity);
}
private String createOrGetDataSourceEntity(AiLineageRecord.DataSource source) {
// 先查询是否存在
String qualifiedName = source.getSourceType().toLowerCase() + "/" +
source.getSourceName() + "@" + source.getSourceVersion();
try {
ResponseEntity<String> existing = atlasRestTemplate.getForEntity(
atlasBaseUrl + "/api/atlas/v2/entity/uniqueAttribute/type/DataSet" +
"?attr:qualifiedName=" + qualifiedName,
String.class);
JsonNode node = new ObjectMapper().readTree(existing.getBody());
return node.path("entity").path("guid").asText();
} catch (Exception e) {
// 不存在,创建新的
Map<String, Object> entity = new HashMap<>();
entity.put("typeName", "DataSet");
Map<String, Object> attrs = new HashMap<>();
attrs.put("qualifiedName", qualifiedName);
attrs.put("name", source.getSourceName());
attrs.put("description", "Source: " + source.getSourceType());
entity.put("attributes", attrs);
return callAtlasCreateEntity(entity);
}
}
private String callAtlasCreateEntity(Map<String, Object> entity) {
Map<String, Object> request = new HashMap<>();
request.put("entity", entity);
ResponseEntity<String> response = atlasRestTemplate.postForEntity(
atlasBaseUrl + "/api/atlas/v2/entity",
request,
String.class
);
try {
JsonNode node = new ObjectMapper().readTree(response.getBody());
return node.path("guidAssignments").fields().next().getValue().asText();
} catch (Exception e) {
throw new LineageException("Failed to create Atlas entity", e);
}
}
/**
* 查询某个数据实体的下游血缘(影响分析)
*/
public LineageGraph getDownstreamLineage(String entityGuid, int depth) {
ResponseEntity<String> response = atlasRestTemplate.getForEntity(
atlasBaseUrl + "/api/atlas/v2/lineage/" + entityGuid +
"?direction=OUTPUT&depth=" + depth,
String.class
);
return parseLineageGraph(response.getBody());
}
/**
* 查询某个AI输出的上游血缘(根因分析)
*/
public LineageGraph getUpstreamLineage(String outputGuid, int depth) {
ResponseEntity<String> response = atlasRestTemplate.getForEntity(
atlasBaseUrl + "/api/atlas/v2/lineage/" + outputGuid +
"?direction=INPUT&depth=" + depth,
String.class
);
return parseLineageGraph(response.getBody());
}
private void createLineageRelationships(String inferenceGuid,
List<String> inputGuids, String promptGuid, List<String> docGuids) {
// Atlas中通过关系类型建立血缘
// 这里使用Atlas的Process输入输出关系
}
private LineageGraph parseLineageGraph(String json) {
// 解析Atlas返回的血缘图
return new LineageGraph();
}
private String createOrGetPromptEntity(AiLineageRecord record) {
return "";
}
private String createRetrievedDocEntity(AiLineageRecord.RetrievedDocument doc,
String inferenceGuid) {
return "";
}
}四、自研轻量级血缘追踪
当Atlas过重时,可以自研一个轻量级方案。
4.1 血缘数据模型(MySQL)
-- 血缘节点表(数据实体)
CREATE TABLE lineage_node (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
node_id VARCHAR(128) NOT NULL UNIQUE, -- UUID格式
node_type VARCHAR(32) NOT NULL, -- TABLE, FILE, AI_INFERENCE, PROMPT, VECTOR_CHUNK
node_name VARCHAR(256) NOT NULL,
version VARCHAR(64),
metadata JSON, -- 节点特有的元数据
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_node_type (node_type),
INDEX idx_node_name (node_name)
);
-- 血缘边表(数据流向)
CREATE TABLE lineage_edge (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
from_node VARCHAR(128) NOT NULL, -- 上游节点
to_node VARCHAR(128) NOT NULL, -- 下游节点
edge_type VARCHAR(32), -- PRODUCED_BY, INPUT_TO, RETRIEVED_BY
process_id VARCHAR(128), -- 产生这条血缘关系的作业/推理ID
metadata JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_from_node (from_node),
INDEX idx_to_node (to_node),
INDEX idx_process_id (process_id)
);
-- AI推理记录表(完整推理记录)
CREATE TABLE ai_inference_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
inference_id VARCHAR(128) NOT NULL UNIQUE,
business_id VARCHAR(128), -- 关联的业务实体ID(如user_id)
model_id VARCHAR(64),
model_version VARCHAR(64),
prompt_version VARCHAR(64),
input_hash VARCHAR(64),
output_hash VARCHAR(64),
output_summary TEXT, -- 输出内容摘要(不存完整内容)
prompt_tokens INT,
completion_tokens INT,
latency_ms INT,
status VARCHAR(16),
inferred_at DATETIME,
lineage_node_id VARCHAR(128),
INDEX idx_business_id (business_id),
INDEX idx_inferred_at (inferred_at)
);4.2 轻量级血缘服务
// LightweightLineageService.java
@Service
@RequiredArgsConstructor
@Slf4j
@Transactional
public class LightweightLineageService {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
/**
* 记录AI推理的完整血缘
* 这个方法在每次AI调用后异步执行
*/
@Async("lineageExecutor")
public void recordInference(AiLineageRecord record) {
try {
// 1. 创建推理节点
String inferenceNodeId = createInferenceNode(record);
// 2. 记录输入数据血缘
for (AiLineageRecord.DataSource source : record.getInputSources()) {
String sourceNodeId = getOrCreateDataSourceNode(source);
createEdge(sourceNodeId, inferenceNodeId, "INPUT_TO", record.getOutputId());
}
// 3. 记录Prompt血缘
String promptNodeId = getOrCreatePromptNode(record);
createEdge(promptNodeId, inferenceNodeId, "PROMPT_FOR", record.getOutputId());
// 4. 记录RAG文档血缘
if (record.getRetrievedDocs() != null) {
for (AiLineageRecord.RetrievedDocument doc : record.getRetrievedDocs()) {
String docNodeId = getOrCreateDocNode(doc);
createEdge(docNodeId, inferenceNodeId, "RETRIEVED_BY",
record.getOutputId());
}
}
// 5. 记录推理日志
insertInferenceLog(record, inferenceNodeId);
} catch (Exception e) {
// 血缘记录失败不应影响主流程
log.error("Failed to record lineage for inference {}: {}",
record.getOutputId(), e.getMessage());
}
}
/**
* 追溯某个业务结果的完整血缘链路
* 支持多层追溯
*/
public LineageGraph traceUpstream(String inferenceId, int maxDepth) {
LineageGraph graph = new LineageGraph();
Set<String> visited = new HashSet<>();
// BFS遍历血缘图
Queue<TraceTask> queue = new LinkedList<>();
queue.add(new TraceTask(inferenceId, 0));
while (!queue.isEmpty()) {
TraceTask task = queue.poll();
if (visited.contains(task.nodeId) || task.depth > maxDepth) {
continue;
}
visited.add(task.nodeId);
// 查询该节点信息
LineageNode node = findNode(task.nodeId);
if (node != null) {
graph.addNode(node);
}
// 查询该节点的上游边
List<LineageEdge> edges = findUpstreamEdges(task.nodeId);
for (LineageEdge edge : edges) {
graph.addEdge(edge);
queue.add(new TraceTask(edge.getFromNode(), task.depth + 1));
}
}
return graph;
}
/**
* 影响分析:查找某个数据源节点的所有下游AI推理
* 用于评估修改数据源的影响范围
*/
public ImpactAnalysis analyzeImpact(String dataSourceNodeId,
LocalDateTime since) {
List<Map<String, Object>> impacts = jdbcTemplate.queryForList("""
WITH RECURSIVE downstream AS (
SELECT to_node, 1 as depth
FROM lineage_edge
WHERE from_node = ?
AND created_at >= ?
UNION ALL
SELECT e.to_node, d.depth + 1
FROM lineage_edge e
INNER JOIN downstream d ON e.from_node = d.to_node
WHERE d.depth < 5
)
SELECT
n.node_type,
n.node_name,
COUNT(DISTINCT d.to_node) as affected_count
FROM downstream d
JOIN lineage_node n ON n.node_id = d.to_node
GROUP BY n.node_type, n.node_name
ORDER BY affected_count DESC
""",
dataSourceNodeId, since
);
return ImpactAnalysis.builder()
.sourceNodeId(dataSourceNodeId)
.analysisTime(LocalDateTime.now())
.affectedNodes(impacts)
.build();
}
private String createInferenceNode(AiLineageRecord record) {
String nodeId = "inference/" + record.getOutputId();
Map<String, Object> metadata = new HashMap<>();
metadata.put("modelId", record.getModelId());
metadata.put("modelVersion", record.getModelVersion());
metadata.put("temperature", record.getParameters() != null ?
record.getParameters().getTemperature() : null);
jdbcTemplate.update("""
INSERT IGNORE INTO lineage_node
(node_id, node_type, node_name, version, metadata)
VALUES (?, 'AI_INFERENCE', ?, ?, ?)
""",
nodeId,
"Inference:" + record.getOutputId(),
record.getModelVersion(),
JsonUtils.toJson(metadata)
);
return nodeId;
}
private String getOrCreateDataSourceNode(AiLineageRecord.DataSource source) {
String nodeId = source.getSourceType().toLowerCase() + "/" +
source.getSourceName() + "/" + source.getSourceVersion();
jdbcTemplate.update("""
INSERT IGNORE INTO lineage_node
(node_id, node_type, node_name, version, metadata)
VALUES (?, ?, ?, ?, ?)
""",
nodeId,
source.getSourceType(),
source.getSourceName(),
source.getSourceVersion(),
JsonUtils.toJson(Map.of("query", source.getQuery() != null ?
source.getQuery() : ""))
);
return nodeId;
}
private void createEdge(String fromNode, String toNode, String edgeType,
String processId) {
jdbcTemplate.update("""
INSERT INTO lineage_edge (from_node, to_node, edge_type, process_id)
VALUES (?, ?, ?, ?)
""",
fromNode, toNode, edgeType, processId
);
}
private LineageNode findNode(String nodeId) {
try {
return jdbcTemplate.queryForObject(
"SELECT * FROM lineage_node WHERE node_id = ?",
(rs, rowNum) -> LineageNode.builder()
.nodeId(rs.getString("node_id"))
.nodeType(rs.getString("node_type"))
.nodeName(rs.getString("node_name"))
.version(rs.getString("version"))
.build(),
nodeId
);
} catch (EmptyResultDataAccessException e) {
return null;
}
}
private List<LineageEdge> findUpstreamEdges(String toNodeId) {
return jdbcTemplate.query(
"SELECT * FROM lineage_edge WHERE to_node = ?",
(rs, rowNum) -> LineageEdge.builder()
.fromNode(rs.getString("from_node"))
.toNode(rs.getString("to_node"))
.edgeType(rs.getString("edge_type"))
.processId(rs.getString("process_id"))
.build(),
toNodeId
);
}
private String getOrCreatePromptNode(AiLineageRecord record) {
String nodeId = "prompt/" + record.getPromptId();
jdbcTemplate.update("""
INSERT IGNORE INTO lineage_node
(node_id, node_type, node_name, version)
VALUES (?, 'PROMPT', ?, ?)
""",
nodeId, "Prompt:" + record.getPromptId(), record.getPromptVersion()
);
return nodeId;
}
private String getOrCreateDocNode(AiLineageRecord.RetrievedDocument doc) {
String nodeId = "doc/" + doc.getDocId() + "/" + doc.getChunkId();
jdbcTemplate.update("""
INSERT IGNORE INTO lineage_node
(node_id, node_type, node_name, version, metadata)
VALUES (?, 'VECTOR_CHUNK', ?, ?, ?)
""",
nodeId,
doc.getDocTitle(),
doc.getIndexedAt() != null ? doc.getIndexedAt().toString() : null,
JsonUtils.toJson(Map.of("similarity", doc.getSimilarity(),
"source", doc.getDocSource()))
);
return nodeId;
}
private void insertInferenceLog(AiLineageRecord record, String inferenceNodeId) {
jdbcTemplate.update("""
INSERT INTO ai_inference_log
(inference_id, model_id, model_version, prompt_version,
input_hash, output_hash, prompt_tokens, completion_tokens,
latency_ms, inferred_at, lineage_node_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
record.getOutputId(),
record.getModelId(),
record.getModelVersion(),
record.getPromptVersion(),
record.getInputDataHash(),
record.getOutputHash(),
0, 0, // tokens从外部传入
record.getLatencyMs(),
record.getInferenceTime(),
inferenceNodeId
);
}
@Data
@RequiredArgsConstructor
private static class TraceTask {
private final String nodeId;
private final int depth;
}
}五、RAG场景的文档血缘追踪
5.1 RAG血缘拦截器
// RagLineageInterceptor.java
@Component
@RequiredArgsConstructor
@Slf4j
public class RagLineageInterceptor {
private final LightweightLineageService lineageService;
private final VectorStore vectorStore;
/**
* AOP拦截所有RAG调用,自动记录血缘
*/
@Around("@annotation(RagOperation)")
public Object interceptRagCall(ProceedingJoinPoint pjp) throws Throwable {
long startTime = System.currentTimeMillis();
// 获取方法参数(查询文本)
Object[] args = pjp.getArgs();
String query = extractQuery(args);
// 执行实际方法
Object result = pjp.proceed();
// 记录血缘
if (result instanceof RagResult ragResult) {
AiLineageRecord.Builder recordBuilder = AiLineageRecord.builder()
.outputId(ragResult.getInferenceId())
.modelId(ragResult.getModelId())
.modelVersion(ragResult.getModelVersion())
.promptId(ragResult.getPromptId())
.promptVersion(ragResult.getPromptVersion())
.latencyMs(System.currentTimeMillis() - startTime)
.inferenceTime(LocalDateTime.now());
// 记录检索到的文档
if (ragResult.getRetrievedDocuments() != null) {
List<AiLineageRecord.RetrievedDocument> docs =
ragResult.getRetrievedDocuments().stream()
.map(doc -> AiLineageRecord.RetrievedDocument.builder()
.docId(doc.getId())
.docTitle(doc.getTitle())
.docSource(doc.getSource())
.similarity(doc.getSimilarity())
.chunkId(doc.getChunkId())
.indexedAt(doc.getIndexedAt())
.build())
.collect(Collectors.toList());
recordBuilder.retrievedDocs(docs);
}
lineageService.recordInference(recordBuilder.build());
}
return result;
}
/**
* 查询某个问题的检索来源(RAG溯源)
*/
public RagSourceTrace traceRagSources(String inferenceId) {
// 获取该推理的血缘图
LineageGraph graph = lineageService.traceUpstream(
"inference/" + inferenceId, 3);
// 提取所有VECTOR_CHUNK类型的节点
List<LineageNode> chunks = graph.getNodes().stream()
.filter(n -> "VECTOR_CHUNK".equals(n.getNodeType()))
.collect(Collectors.toList());
return RagSourceTrace.builder()
.inferenceId(inferenceId)
.sourceChunks(chunks)
.sourceCount(chunks.size())
.build();
}
private String extractQuery(Object[] args) {
for (Object arg : args) {
if (arg instanceof String) return (String) arg;
}
return "unknown";
}
}六、血缘可视化:生成Mermaid图
6.1 将血缘图转换为Mermaid格式
// LineageMermaidGenerator.java
@Component
public class LineageMermaidGenerator {
/**
* 将血缘图转换为Mermaid流程图
*/
public String generateMermaid(LineageGraph graph) {
StringBuilder sb = new StringBuilder();
sb.append("flowchart TD\n");
// 节点样式定义
sb.append(" classDef inference fill:#ff9999,stroke:#cc0000\n");
sb.append(" classDef datasource fill:#99ccff,stroke:#0066cc\n");
sb.append(" classDef prompt fill:#99ff99,stroke:#006600\n");
sb.append(" classDef vectorchunk fill:#ffcc99,stroke:#cc6600\n");
sb.append("\n");
// 生成节点定义
for (LineageNode node : graph.getNodes()) {
String nodeId = sanitizeId(node.getNodeId());
String label = buildNodeLabel(node);
String styleClass = getStyleClass(node.getNodeType());
sb.append(String.format(" %s[\"%s\"]:::%s\n", nodeId, label, styleClass));
}
sb.append("\n");
// 生成边定义
for (LineageEdge edge : graph.getEdges()) {
String fromId = sanitizeId(edge.getFromNode());
String toId = sanitizeId(edge.getToNode());
String label = edge.getEdgeType();
sb.append(String.format(" %s -->|%s| %s\n", fromId, label, toId));
}
return sb.toString();
}
/**
* 使用Spring AI让AI分析血缘图并生成可读的摘要
*/
public String generateLineageSummary(LineageGraph graph, ChatClient chatClient) {
String mermaid = generateMermaid(graph);
return chatClient.prompt()
.user("这是一个AI数据血缘图(Mermaid格式):\n" + mermaid +
"\n\n请用简洁的中文描述这条数据的来龙去脉,重点说明:" +
"1. 数据的原始来源,2. 经过了哪些处理,3. 最终用于什么AI推理。" +
"用3-5句话描述即可。")
.call()
.content();
}
private String sanitizeId(String nodeId) {
// Mermaid节点ID不能包含特殊字符
return nodeId.replaceAll("[^a-zA-Z0-9]", "_");
}
private String buildNodeLabel(LineageNode node) {
String type = node.getNodeType();
String name = node.getNodeName();
String version = node.getVersion() != null ? "\\nv" + node.getVersion() : "";
return switch (type) {
case "AI_INFERENCE" -> "AI推理\\n" + name.substring(0, Math.min(20, name.length()));
case "TABLE" -> "数据表\\n" + name + version;
case "PROMPT" -> "Prompt模板\\n" + name + version;
case "VECTOR_CHUNK" -> "文档片段\\n" + name.substring(0, Math.min(15, name.length()));
default -> name;
};
}
private String getStyleClass(String nodeType) {
return switch (nodeType) {
case "AI_INFERENCE" -> "inference";
case "TABLE", "FILE", "API" -> "datasource";
case "PROMPT" -> "prompt";
case "VECTOR_CHUNK" -> "vectorchunk";
default -> "default";
};
}
}6.2 血缘查询API
// LineageController.java
@RestController
@RequestMapping("/api/v1/lineage")
@RequiredArgsConstructor
@Tag(name = "Data Lineage", description = "数据血缘追踪接口")
public class LineageController {
private final LightweightLineageService lineageService;
private final LineageMermaidGenerator mermaidGenerator;
private final ChatClient chatClient;
@GetMapping("/trace/{inferenceId}")
@Operation(summary = "追溯AI推理的完整数据来源")
public ResponseEntity<LineageTraceResponse> trace(
@PathVariable String inferenceId,
@RequestParam(defaultValue = "5") int depth) {
LineageGraph graph = lineageService.traceUpstream(
"inference/" + inferenceId, depth);
String mermaid = mermaidGenerator.generateMermaid(graph);
String summary = mermaidGenerator.generateLineageSummary(graph, chatClient);
return ResponseEntity.ok(LineageTraceResponse.builder()
.inferenceId(inferenceId)
.graph(graph)
.mermaidDiagram(mermaid)
.naturalLanguageSummary(summary)
.nodeCount(graph.getNodes().size())
.edgeCount(graph.getEdges().size())
.build());
}
@GetMapping("/impact/{dataSourceId}")
@Operation(summary = "分析修改数据源的影响范围")
public ResponseEntity<ImpactAnalysis> analyzeImpact(
@PathVariable String dataSourceId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
LocalDateTime since) {
return ResponseEntity.ok(lineageService.analyzeImpact(dataSourceId, since));
}
}七、GDPR合规:数据删除的血缘支持
7.1 GDPR删除请求处理
// GdprDeletionService.java
@Service
@RequiredArgsConstructor
@Slf4j
@Transactional
public class GdprDeletionService {
private final LightweightLineageService lineageService;
private final JdbcTemplate jdbcTemplate;
private final List<DataDeletionHandler> deletionHandlers;
/**
* 处理GDPR用户数据删除请求
* 通过血缘追踪找到所有衍生数据并删除
*/
public DeletionReport processUserDeletion(String userId) {
log.info("Processing GDPR deletion request for user: {}", userId);
DeletionReport.Builder reportBuilder = DeletionReport.builder()
.userId(userId)
.requestTime(LocalDateTime.now());
// 1. 找到用户数据节点
String userNodeId = "user/" + userId;
// 2. 通过血缘追踪找到所有下游数据
ImpactAnalysis impact = lineageService.analyzeImpact(
userNodeId, LocalDateTime.of(2020, 1, 1, 0, 0));
log.info("Found {} affected data nodes for user {}",
impact.getAffectedNodes().size(), userId);
List<DeletionAction> actions = new ArrayList<>();
// 3. 按节点类型分类处理
for (Map<String, Object> affected : impact.getAffectedNodes()) {
String nodeType = (String) affected.get("node_type");
String nodeName = (String) affected.get("node_name");
DeletionAction action = deletionHandlers.stream()
.filter(h -> h.supports(nodeType))
.findFirst()
.map(handler -> handler.delete(userId, nodeName))
.orElse(DeletionAction.skipped(nodeType, nodeName, "No handler"));
actions.add(action);
log.info("Deletion action: {} - {} - {}",
action.getStatus(), nodeType, nodeName);
}
// 4. 删除血缘记录本身
deleteUserLineageRecords(userId);
// 5. 删除AI推理日志
int deletedLogs = jdbcTemplate.update(
"DELETE FROM ai_inference_log WHERE business_id = ?", userId);
actions.add(DeletionAction.success("AI_INFERENCE_LOG",
deletedLogs + " records deleted"));
DeletionReport report = reportBuilder
.actions(actions)
.completionTime(LocalDateTime.now())
.totalActionsCount(actions.size())
.successCount(actions.stream()
.filter(a -> "SUCCESS".equals(a.getStatus())).count())
.build();
log.info("GDPR deletion completed for user {}: {} actions, {} succeeded",
userId, report.getTotalActionsCount(), report.getSuccessCount());
return report;
}
private void deleteUserLineageRecords(String userId) {
// 删除包含用户数据的血缘节点
jdbcTemplate.update(
"DELETE FROM lineage_edge WHERE from_node LIKE ? OR to_node LIKE ?",
"user/" + userId + "%", "user/" + userId + "%"
);
jdbcTemplate.update(
"DELETE FROM lineage_node WHERE node_id LIKE ?",
"user/" + userId + "%"
);
}
@Data
@Builder
public static class DeletionReport {
private String userId;
private LocalDateTime requestTime;
private LocalDateTime completionTime;
private List<DeletionAction> actions;
private int totalActionsCount;
private long successCount;
}
@Data
@Builder
public static class DeletionAction {
private String status; // SUCCESS, FAILED, SKIPPED
private String nodeType;
private String description;
static DeletionAction success(String nodeType, String desc) {
return DeletionAction.builder().status("SUCCESS").nodeType(nodeType)
.description(desc).build();
}
static DeletionAction skipped(String nodeType, String name, String reason) {
return DeletionAction.builder().status("SKIPPED").nodeType(nodeType)
.description(name + ": " + reason).build();
}
}
}八、Neo4j存储血缘关系
8.1 Neo4j图数据库实现(高性能版)
// Neo4jLineageService.java
@Service
@RequiredArgsConstructor
@Slf4j
public class Neo4jLineageService {
private final Neo4jClient neo4jClient;
private final Driver driver;
/**
* 使用Neo4j存储血缘关系
* 图数据库天然支持多跳查询,比MySQL递归查询快100倍
*/
public void recordLineage(AiLineageRecord record) {
try (Session session = driver.session()) {
session.executeWrite(tx -> {
// 创建推理节点
tx.run("""
MERGE (inference:AiInference {id: $id})
SET inference.modelId = $modelId,
inference.modelVersion = $modelVersion,
inference.promptVersion = $promptVersion,
inference.inferenceTime = $inferenceTime,
inference.outputHash = $outputHash
""",
Map.of(
"id", record.getOutputId(),
"modelId", record.getModelId(),
"modelVersion", record.getModelVersion(),
"promptVersion", record.getPromptVersion(),
"inferenceTime", record.getInferenceTime().toString(),
"outputHash", record.getOutputHash()
)
);
// 创建数据源节点并建立关系
for (AiLineageRecord.DataSource source : record.getInputSources()) {
tx.run("""
MERGE (ds:DataSource {id: $sourceId})
SET ds.name = $name,
ds.type = $type,
ds.version = $version
WITH ds
MATCH (inference:AiInference {id: $inferenceId})
MERGE (ds)-[:INPUT_TO {processId: $processId}]->(inference)
""",
Map.of(
"sourceId", source.getSourceType() + "/" +
source.getSourceName() + "/" + source.getSourceVersion(),
"name", source.getSourceName(),
"type", source.getSourceType(),
"version", coalesce(source.getSourceVersion(), "latest"),
"inferenceId", record.getOutputId(),
"processId", record.getOutputId()
)
);
}
// RAG文档血缘
if (record.getRetrievedDocs() != null) {
for (AiLineageRecord.RetrievedDocument doc : record.getRetrievedDocs()) {
tx.run("""
MERGE (doc:VectorChunk {id: $docId})
SET doc.title = $title,
doc.source = $source,
doc.indexedAt = $indexedAt
WITH doc
MATCH (inference:AiInference {id: $inferenceId})
MERGE (doc)-[:RETRIEVED_BY {
similarity: $similarity,
processId: $processId
}]->(inference)
""",
Map.of(
"docId", doc.getDocId() + "/" + doc.getChunkId(),
"title", doc.getDocTitle(),
"source", doc.getDocSource(),
"indexedAt", doc.getIndexedAt() != null ?
doc.getIndexedAt().toString() : "",
"inferenceId", record.getOutputId(),
"similarity", doc.getSimilarity(),
"processId", record.getOutputId()
)
);
}
}
return null;
});
}
}
/**
* 多跳血缘查询(Neo4j的强项)
* 查找最多5跳的上游数据
*/
public List<Map<String, Object>> queryUpstreamLineage(String inferenceId, int depth) {
try (Session session = driver.session()) {
return session.executeRead(tx -> {
Result result = tx.run("""
MATCH path = (source)-[*1..%d]->(inference:AiInference {id: $id})
RETURN [node in nodes(path) | {
id: node.id,
type: labels(node)[0],
name: coalesce(node.name, node.id)
}] as nodes,
[rel in relationships(path) | type(rel)] as relationships,
length(path) as depth
ORDER BY depth ASC
""".formatted(depth),
Map.of("id", inferenceId)
);
List<Map<String, Object>> lineageChains = new ArrayList<>();
while (result.hasNext()) {
Record record = result.next();
lineageChains.add(Map.of(
"nodes", record.get("nodes").asList(),
"relationships", record.get("relationships").asList(),
"depth", record.get("depth").asInt()
));
}
return lineageChains;
});
}
}
/**
* 查找某数据源影响的所有AI推理(影响分析)
* Neo4j对这类查询有极致优化
*/
public long countAffectedInferences(String dataSourceId, LocalDateTime since) {
try (Session session = driver.session()) {
return session.executeRead(tx -> {
Result result = tx.run("""
MATCH (ds:DataSource {id: $id})-[*1..5]->(inference:AiInference)
WHERE inference.inferenceTime >= $since
RETURN count(DISTINCT inference) as count
""",
Map.of("id", dataSourceId, "since", since.toString())
);
return result.single().get("count").asLong();
});
}
}
private String coalesce(String value, String defaultValue) {
return value != null ? value : defaultValue;
}
}九、AOP自动注入血缘追踪
// LineageAspect.java
@Aspect
@Component
@RequiredArgsConstructor
@Slf4j
public class LineageAspect {
private final LightweightLineageService lineageService;
/**
* 自动拦截所有标注@TrackLineage的AI服务调用
*/
@Around("@annotation(trackLineage)")
public Object trackAiCall(ProceedingJoinPoint pjp, TrackLineage trackLineage)
throws Throwable {
long startTime = System.currentTimeMillis();
String inferenceId = UUID.randomUUID().toString();
Object result = pjp.proceed();
long elapsed = System.currentTimeMillis() - startTime;
// 异步记录血缘(不阻塞主流程)
if (result instanceof AiResponse aiResponse) {
AiLineageRecord record = AiLineageRecord.builder()
.outputId(inferenceId)
.modelId(aiResponse.getModelId())
.modelVersion(aiResponse.getModelVersion())
.promptId(trackLineage.promptId())
.promptVersion(trackLineage.promptVersion())
.latencyMs(elapsed)
.inferenceTime(LocalDateTime.now())
.inputDataHash(computeInputHash(pjp.getArgs()))
.outputHash(computeOutputHash(aiResponse.getContent()))
.build();
lineageService.recordInference(record);
}
return result;
}
private String computeInputHash(Object[] args) {
try {
String combined = Arrays.stream(args)
.filter(Objects::nonNull)
.map(Object::toString)
.collect(Collectors.joining("|"));
return DigestUtils.md5DigestAsHex(combined.getBytes());
} catch (Exception e) {
return "unknown";
}
}
private String computeOutputHash(String content) {
if (content == null) return "null";
return DigestUtils.md5DigestAsHex(content.getBytes());
}
}
// 自定义注解
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface TrackLineage {
String promptId() default "default";
String promptVersion() default "1.0";
String businessEntity() default "";
}十、性能数据
10.1 血缘追踪性能测试
| 存储方案 | 记录写入 | 5跳查询 | 影响分析(100万节点) |
|---|---|---|---|
| MySQL(递归CTE) | 5ms | 230ms | 12.3s |
| MySQL(迭代BFS) | 5ms | 180ms | 8.7s |
| Neo4j | 8ms | 12ms | 0.4s |
| Apache Atlas | 45ms | 25ms | 1.2s |
推荐选型:
- 小型项目(<100万节点):MySQL自研方案,简单够用
- 中型项目(100万-1亿节点):Neo4j,查询性能优异
- 企业级项目(需要治理功能):Apache Atlas,功能完整
10.2 血缘存储的成本估算
每次AI推理血缘记录大小:
- lineage_node记录:3-5条(推理节点+输入源+文档)
- lineage_edge记录:2-5条
- 每条记录大小:约500字节
日推理量10万次的存储需求:
- 节点:50万条/天 × 500B = 250MB/天
- 边:30万条/天 × 300B = 90MB/天
- 合计:340MB/天 ≈ 10GB/月
存储成本(阿里云RDS mysql):约50元/月
性价比极高。FAQ
Q1:血缘追踪会影响AI服务的响应速度吗?
不会。所有血缘记录操作都是异步的(@Async),主流程完全不受影响。即使血缘服务宕机,AI服务也能正常运行,只是血缘记录会丢失。可以通过Redis队列做缓冲,保证血缘最终一致性。
Q2:AI模型的Prompt也需要版本管理吗?
强烈建议。Prompt是AI行为的决定性因素。将Prompt纳入版本管理(如Git管理Prompt文件),每个版本有唯一的版本号,血缘记录中关联Prompt版本号。这样当AI输出质量变化时,可以快速定位是否是Prompt变更导致的。
Q3:向量数据库中的文档更新后,历史推理的血缘还准确吗?
血缘记录会保存文档的indexedAt时间和chunkId,即使文档更新了,历史血缘仍然指向更新前的版本信息。通过chunkId可以查到历史版本。建议向量数据库更新时不覆盖旧记录,而是新增版本,这样血缘追踪最准确。
Q4:血缘数据可以保存多久?
建议至少保存AI推理记录的生命周期+1年。对于GDPR场景,用户数据删除后相关血缘也应删除。可以按时间分区存储,超期的血缘数据归档到冷存储(OSS/S3)。
Q5:如何证明AI决策的合规性(如信贷风控)?
血缘追踪系统可以为每次AI决策生成"决策报告":使用了哪个模型版本、模型的训练数据来源、本次推理的输入数据(脱敏)、参考了哪些知识库文档。这份报告可以作为监管合规的证据材料。
总结
数据血缘追踪是AI系统走向生产成熟度的必经之路。
从赵磊团队的案例来看,血缘追踪系统将AI故障排查时间从6小时压缩到3小时,更重要的是从"猜测性修复"变为"精准修复",避免了"修了一个,又出现两个"的恶性循环。
