第1640篇:生产环境AI请求的全链路日志——从Prompt到Response的完整审计
2026/4/30大约 10 分钟
第1640篇:生产环境AI请求的全链路日志——从Prompt到Response的完整审计
线上出了问题,第一反应是看日志。但如果AI应用的日志设计不好,你拿到的可能只是"模型调用失败"这种没有任何信息量的日志,完全不知道是Prompt有问题、模型返回了什么、还是下游解析出错。
这个话题我有深刻的教训。我们曾经有个AI应用在生产上偶发性地返回"对不起我无法回答"这种回复,用户投诉了两周,但日志里根本看不到Prompt内容。那次耗了整整三天排查,最终发现是System Prompt里有一条指令的措辞触发了模型的安全策略。如果一开始日志就完整,半小时就能解决。
今天把AI应用全链路日志的设计思路讲清楚,希望别人不用重蹈这个覆辙。
AI日志的特殊性
传统应用的日志通常关注:请求URL、参数、响应状态码、耗时、异常堆栈。
AI应用在这个基础上还需要关注:
- Prompt内容:发给模型的完整上下文是什么
- 模型响应原文:模型说了什么,是否被后处理过
- Token消耗:每次调用消耗多少Token,成本如何
- 模型元信息:用的哪个模型、哪个版本、finish_reason是什么
- 工具调用详情:Function Calling时调了哪些工具、入参和出参
- RAG检索过程:检索了什么内容、相似度分数
- Advisor链执行情况:每个Advisor做了什么处理
这些信息组合起来,才能真正还原一次AI调用的完整过程。
日志数据模型设计
先设计清楚要收集哪些数据:
@Data
@Builder
public class AIRequestLog {
// 追踪信息
private String requestId; // 唯一请求ID
private String traceId; // 分布式追踪ID(来自Sleuth/SkyWalking)
private String spanId;
private String sessionId; // 会话ID(多轮对话)
private String userId; // 用户ID
private String tenantId; // 租户ID(多租户场景)
// 请求信息
private LocalDateTime requestTime;
private String clientIp;
private String endpoint; // 调用的API端点
// 模型调用信息
private String modelName; // gpt-4, qwen-7b 等
private String modelVersion;
private List<PromptMessage> promptMessages; // 完整的Prompt消息列表
private Map<String, Object> modelOptions; // temperature, max_tokens 等
// RAG信息(如果有)
private List<RetrievedDocument> retrievedDocuments;
private String retrievalQuery;
// 工具调用信息(如果有)
private List<ToolCall> toolCalls;
// 响应信息
private String responseContent; // 模型的原始响应
private String processedContent; // 后处理后的内容(如果有)
private String finishReason; // stop, length, tool_calls 等
private TokenUsage tokenUsage;
private long latencyMs;
// 状态
private boolean success;
private String errorType;
private String errorMessage;
// Advisor链执行信息
private List<AdvisorLog> advisorLogs;
@Data
@Builder
public static class PromptMessage {
private String role;
private String content;
private boolean isTruncated; // 内容是否被截断(脱敏/压缩)
private int originalLength;
}
@Data
@Builder
public static class RetrievedDocument {
private String documentId;
private String content;
private double similarityScore;
private Map<String, Object> metadata;
}
@Data
@Builder
public static class ToolCall {
private String toolName;
private String toolCallId;
private Map<String, Object> arguments;
private Object result;
private boolean success;
private long latencyMs;
}
@Data
@Builder
public static class TokenUsage {
private Long promptTokens;
private Long completionTokens;
private Long totalTokens;
private Double estimatedCostUsd; // 估算费用
}
@Data
@Builder
public static class AdvisorLog {
private String advisorName;
private String action; // 做了什么处理
private Map<String, Object> details;
private long latencyMs;
}
}核心日志收集Advisor
把日志收集逻辑封装成Advisor,自动嵌入到每次AI调用中:
@Component
@Slf4j
public class FullAuditAdvisor implements AroundAdvisor {
private final AIAuditLogService auditLogService;
private final SensitiveDataMasker dataMasker;
private final CostCalculator costCalculator;
// 是否记录完整Prompt(生产环境可能需要关闭以保护隐私)
@Value("${ai.audit.log-full-prompt:true}")
private boolean logFullPrompt;
// Prompt内容最大记录长度(超出截断)
@Value("${ai.audit.max-prompt-length:2000}")
private int maxPromptLength;
public FullAuditAdvisor(AIAuditLogService auditLogService,
SensitiveDataMasker dataMasker,
CostCalculator costCalculator) {
this.auditLogService = auditLogService;
this.dataMasker = dataMasker;
this.costCalculator = costCalculator;
}
@Override
public AdvisedResponse aroundCall(AdvisedRequest request,
CallAroundAdvisorChain chain) {
String requestId = generateRequestId();
long startTime = System.currentTimeMillis();
// 收集请求信息
AIRequestLog.AIRequestLogBuilder logBuilder = AIRequestLog.builder()
.requestId(requestId)
.traceId(MDC.get("traceId"))
.spanId(MDC.get("spanId"))
.sessionId((String) request.adviseContext().get("sessionId"))
.userId((String) request.adviseContext().get("userId"))
.tenantId((String) request.adviseContext().get("tenantId"))
.requestTime(LocalDateTime.now())
.modelOptions(extractModelOptions(request));
// 处理Prompt消息
if (logFullPrompt) {
List<AIRequestLog.PromptMessage> promptMessages = request.messages().stream()
.map(this::convertMessage)
.collect(Collectors.toList());
logBuilder.promptMessages(promptMessages);
}
// 把requestId放入MDC,让其他日志也能关联
MDC.put("aiRequestId", requestId);
AdvisedResponse response;
boolean success = true;
String errorType = null;
String errorMessage = null;
try {
response = chain.nextAroundCall(request);
} catch (Exception e) {
success = false;
errorType = e.getClass().getSimpleName();
errorMessage = e.getMessage();
throw e;
} finally {
long latency = System.currentTimeMillis() - startTime;
// 收集响应信息
if (success) {
enrichWithResponse(logBuilder, response, latency);
}
logBuilder.success(success)
.errorType(errorType)
.errorMessage(errorMessage)
.latencyMs(latency);
// 异步保存,不影响主流程性能
AIRequestLog auditLog = logBuilder.build();
auditLogService.saveAsync(auditLog);
// 同步记录关键指标到结构化日志(用于实时监控)
logKeyMetrics(auditLog);
MDC.remove("aiRequestId");
}
return response;
}
private AIRequestLog.PromptMessage convertMessage(Message message) {
String content = message.getContent();
boolean isTruncated = false;
int originalLength = content != null ? content.length() : 0;
if (content != null && content.length() > maxPromptLength) {
content = content.substring(0, maxPromptLength) + "...[已截断]";
isTruncated = true;
}
// 脱敏处理
if (content != null) {
content = dataMasker.mask(content);
}
return AIRequestLog.PromptMessage.builder()
.role(message.getMessageType().getValue())
.content(content)
.isTruncated(isTruncated)
.originalLength(originalLength)
.build();
}
private void enrichWithResponse(AIRequestLog.AIRequestLogBuilder builder,
AdvisedResponse response, long latency) {
if (response.response() == null) return;
ChatResponse chatResponse = response.response();
// 响应内容
if (!chatResponse.getResults().isEmpty()) {
String content = chatResponse.getResults().get(0).getOutput().getContent();
builder.responseContent(content);
String finishReason = chatResponse.getResults().get(0)
.getMetadata().getFinishReason();
builder.finishReason(finishReason);
}
// Token使用量
if (chatResponse.getMetadata() != null &&
chatResponse.getMetadata().getUsage() != null) {
var usage = chatResponse.getMetadata().getUsage();
Long promptTokens = usage.getPromptTokens();
Long completionTokens = usage.getGenerationTokens();
Long totalTokens = usage.getTotalTokens();
String modelName = (String) response.adviseContext().get("modelName");
Double estimatedCost = costCalculator.estimate(modelName,
promptTokens, completionTokens);
builder.tokenUsage(AIRequestLog.TokenUsage.builder()
.promptTokens(promptTokens)
.completionTokens(completionTokens)
.totalTokens(totalTokens)
.estimatedCostUsd(estimatedCost)
.build());
}
}
private void logKeyMetrics(AIRequestLog log) {
// 用结构化日志格式,方便ELK/Loki解析
log.info("[AI_AUDIT] requestId={} tenantId={} userId={} model={} " +
"latency={}ms promptTokens={} completionTokens={} " +
"cost=${} success={} finishReason={}",
log.getRequestId(), log.getTenantId(), log.getUserId(),
log.getModelName(), log.getLatencyMs(),
log.getTokenUsage() != null ? log.getTokenUsage().getPromptTokens() : 0,
log.getTokenUsage() != null ? log.getTokenUsage().getCompletionTokens() : 0,
log.getTokenUsage() != null ?
String.format("%.6f", log.getTokenUsage().getEstimatedCostUsd()) : "0",
log.isSuccess(), log.getFinishReason());
}
private String generateRequestId() {
return "ai-" + UUID.randomUUID().toString().replace("-", "").substring(0, 12);
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE; // 最外层,记录所有内容
}
}专门的RAG检索日志
RAG的检索过程需要单独记录,帮助排查"为什么没召回到相关内容"的问题:
@Component
@Slf4j
public class RAGAuditAdvisor implements AroundAdvisor {
private final AIAuditLogService auditLogService;
@Override
public AdvisedResponse aroundCall(AdvisedRequest request,
CallAroundAdvisorChain chain) {
// 注意:RAG检索通常在QuestionAnswerAdvisor里发生
// 这里是在RAG之后记录检索结果
Map<String, Object> context = new HashMap<>(request.adviseContext());
context.put("ragStartTime", System.currentTimeMillis());
AdvisedRequest requestWithTime = AdvisedRequest.from(request)
.withAdviseContext(context)
.build();
AdvisedResponse response = chain.nextAroundCall(requestWithTime);
// 从response的context里提取RAG检索结果
// (需要配合支持记录检索结果的QuestionAnswerAdvisor)
Object retrievedDocs = response.adviseContext().get("qa_retrieved_documents");
if (retrievedDocs instanceof List<?> docs) {
long ragStartTime = (long) response.adviseContext()
.getOrDefault("ragStartTime", System.currentTimeMillis());
long ragLatency = System.currentTimeMillis() - ragStartTime;
String requestId = (String) response.adviseContext().get("aiRequestId");
String query = (String) response.adviseContext().get("qa_query");
// 记录检索过程
log.info("[RAG_AUDIT] requestId={} query=\"{}\" retrievedCount={} latency={}ms",
requestId, query, docs.size(), ragLatency);
// 记录每个召回文档的详情
for (int i = 0; i < docs.size(); i++) {
if (docs.get(i) instanceof Document doc) {
log.debug("[RAG_DOC] requestId={} rank={} docId={} " +
"similarity={} contentLength={} source={}",
requestId, i + 1,
doc.getId(),
doc.getMetadata().get("distance"),
doc.getContent().length(),
doc.getMetadata().get("source"));
}
}
}
return response;
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1; // 紧跟在全局审计之后
}
}敏感数据脱敏
记录Prompt内容时,必须做敏感数据脱敏,否则可能违反数据保护法规:
@Component
public class SensitiveDataMasker {
// 各种敏感信息的正则
private static final List<MaskRule> MASK_RULES = List.of(
MaskRule.of("手机号", "1[3-9]\\d{9}", "***"),
MaskRule.of("身份证", "\\d{17}[\\dXx]", "***"),
MaskRule.of("银行卡", "\\d{16,19}", "***"),
MaskRule.of("邮箱", "[\\w.-]+@[\\w.-]+\\.[a-z]{2,}", "***@***"),
MaskRule.of("IPv4", "\\b(?:\\d{1,3}\\.){3}\\d{1,3}\\b", "x.x.x.x"),
// 简单的密码相关词汇检测(这里只是示例,实际要更精细)
MaskRule.of("密码字段", "(?i)(password|passwd|secret)[\"':\\s]+[^\\s,]+",
"$0[已脱敏]")
);
public String mask(String text) {
if (text == null || text.isEmpty()) return text;
String masked = text;
for (MaskRule rule : MASK_RULES) {
masked = masked.replaceAll(rule.getPattern(), rule.getReplacement());
}
return masked;
}
@Data
@RequiredArgsConstructor(staticName = "of")
private static class MaskRule {
private final String name;
private final String pattern;
private final String replacement;
}
}日志存储和查询
AI请求日志的量可能非常大,存储方案要考虑:
@Service
@Slf4j
public class AIAuditLogService {
private final KafkaTemplate<String, AIRequestLog> kafkaTemplate;
private final ElasticsearchOperations elasticsearchOperations;
private final AIAuditLogRepository mysqlRepository;
/**
* 三级存储策略:
* 1. 实时:Kafka发布(用于实时分析和告警)
* 2. 热数据:Elasticsearch(用于近7天的快速查询)
* 3. 冷数据:MySQL归档(用于合规和长期保存)
*/
@Async
public void saveAsync(AIRequestLog log) {
// Level 1: 发到Kafka
kafkaTemplate.send("ai-request-logs", log.getRequestId(), log);
// Level 2: 写入ES(可查询)
try {
elasticsearchOperations.save(convertToESDoc(log));
} catch (Exception e) {
log.error("写入ES失败: {}", e.getMessage());
}
// Level 3: 只保存关键字段到MySQL(完整内容在ES)
try {
AIAuditLogSummary summary = AIAuditLogSummary.builder()
.requestId(log.getRequestId())
.tenantId(log.getTenantId())
.userId(log.getUserId())
.requestTime(log.getRequestTime())
.modelName(log.getModelName())
.totalTokens(log.getTokenUsage() != null ?
log.getTokenUsage().getTotalTokens() : null)
.latencyMs(log.getLatencyMs())
.success(log.isSuccess())
.build();
mysqlRepository.save(summary);
} catch (Exception e) {
log.error("写入MySQL失败: {}", e.getMessage());
}
}
}日志查询API
日志光收集还不够,要能方便地查询:
@RestController
@RequestMapping("/admin/ai/audit")
@Slf4j
public class AIAuditLogController {
private final AIAuditQueryService queryService;
/**
* 查询特定请求的完整链路日志
*/
@GetMapping("/requests/{requestId}")
public AIRequestLog getByRequestId(@PathVariable String requestId) {
return queryService.findByRequestId(requestId);
}
/**
* 按条件搜索日志
*/
@GetMapping("/requests")
public Page<AIRequestLogSummary> search(
@RequestParam(required = false) String tenantId,
@RequestParam(required = false) String userId,
@RequestParam(required = false) String modelName,
@RequestParam(required = false) Boolean success,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
LocalDateTime fromTime,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
LocalDateTime toTime,
Pageable pageable) {
AIAuditSearchCriteria criteria = AIAuditSearchCriteria.builder()
.tenantId(tenantId)
.userId(userId)
.modelName(modelName)
.success(success)
.fromTime(fromTime)
.toTime(toTime)
.build();
return queryService.search(criteria, pageable);
}
/**
* 全文搜索Prompt内容(排查特定问题时很有用)
*/
@GetMapping("/search")
public Page<AIRequestLogSummary> fullTextSearch(
@RequestParam String query,
@RequestParam(defaultValue = "prompt") String searchIn, // prompt/response
Pageable pageable) {
return queryService.fullTextSearch(query, searchIn, pageable);
}
/**
* 统计报表:按时间、模型、租户的使用量
*/
@GetMapping("/stats")
public AIUsageStats getStats(
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
LocalDateTime fromTime,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
LocalDateTime toTime,
@RequestParam(defaultValue = "tenant") String groupBy) {
return queryService.getUsageStats(fromTime, toTime, groupBy);
}
/**
* 慢查询分析:找出P95延迟高的请求
*/
@GetMapping("/slow-requests")
public List<AIRequestLogSummary> getSlowRequests(
@RequestParam(defaultValue = "3000") long latencyThresholdMs,
@RequestParam(defaultValue = "100") int limit) {
return queryService.findSlowRequests(latencyThresholdMs, limit);
}
/**
* 异常请求分析
*/
@GetMapping("/failed-requests")
public Page<AIRequestLogSummary> getFailedRequests(
@RequestParam(required = false) String errorType,
Pageable pageable) {
return queryService.findFailedRequests(errorType, pageable);
}
}实时告警
有了完整日志,可以基于日志做实时告警:
@Component
@Slf4j
public class AIAuditAlertProcessor {
private final AlertService alertService;
private final MetricsService metricsService;
// 连续N次失败触发告警
private final AtomicInteger consecutiveFailures = new AtomicInteger(0);
private static final int FAILURE_ALERT_THRESHOLD = 10;
@KafkaListener(topics = "ai-request-logs", groupId = "ai-audit-alert")
public void process(AIRequestLog log) {
// 1. 单次高延迟告警
if (log.getLatencyMs() > 30000) { // 30秒
alertService.sendAlert(AlertLevel.WARNING,
String.format("AI请求超长延迟: requestId=%s, latency=%dms, model=%s",
log.getRequestId(), log.getLatencyMs(), log.getModelName()));
}
// 2. 连续失败告警
if (!log.isSuccess()) {
int failures = consecutiveFailures.incrementAndGet();
if (failures >= FAILURE_ALERT_THRESHOLD) {
alertService.sendAlert(AlertLevel.CRITICAL,
String.format("AI服务连续失败 %d 次,最近错误: %s",
failures, log.getErrorMessage()));
consecutiveFailures.set(0); // 重置,避免重复告警
}
} else {
consecutiveFailures.set(0);
}
// 3. 异常finish_reason告警(比如content_filter说明模型触发了内容过滤)
if ("content_filter".equals(log.getFinishReason())) {
alertService.sendAlert(AlertLevel.WARNING,
String.format("AI响应被内容过滤: requestId=%s, tenantId=%s",
log.getRequestId(), log.getTenantId()));
}
// 4. 单次Token消耗过高告警
if (log.getTokenUsage() != null &&
log.getTokenUsage().getTotalTokens() != null &&
log.getTokenUsage().getTotalTokens() > 50000) {
alertService.sendAlert(AlertLevel.WARNING,
String.format("单次AI请求Token消耗过高: requestId=%s, tokens=%d",
log.getRequestId(), log.getTokenUsage().getTotalTokens()));
}
// 5. 更新实时指标
metricsService.record(log);
}
}日志的数据治理
最后一个容易忽视的问题:日志要有生命周期管理。
@Component
@Slf4j
public class AIAuditLogRetentionPolicy {
private final ElasticsearchOperations elasticsearchOperations;
private final AIAuditLogArchiveService archiveService;
// 每天凌晨2点执行
@Scheduled(cron = "0 0 2 * * ?")
public void cleanupOldLogs() {
// 删除ES中超过7天的完整日志(内容可能包含隐私,不长期保存)
LocalDateTime sevenDaysAgo = LocalDateTime.now().minusDays(7);
deleteFromElasticsearch(sevenDaysAgo);
// MySQL中保留90天的摘要(不含Prompt内容)
LocalDateTime ninetyDaysAgo = LocalDateTime.now().minusDays(90);
archiveService.archiveOlderThan(ninetyDaysAgo);
log.info("AI审计日志清理完成,ES清理7天前数据,MySQL归档90天前数据");
}
private void deleteFromElasticsearch(LocalDateTime before) {
// 注意:要先确认合规要求,有些场景要求更长时间保存
Query query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.rangeQuery("requestTime")
.lt(before.toString()))
.build();
elasticsearchOperations.delete(query, AIRequestLogDocument.class);
}
}整体架构总结
这套日志体系建起来之后,我们团队处理AI相关问题的效率大幅提升。以前一个问题要查几天,现在一般30分钟内就能定位到根因。这是真正意义上的"可观测AI系统"。
最后强调一点:日志收集了,但没有人看没有系统分析,等于没收集。建议把关键指标接入你们的监控大盘,设置合理的告警阈值,让日志真正产生价值。
