第1957篇:日志结构化在AI系统中的实践——让Prompt和Response可分析
第1957篇:日志结构化在AI系统中的实践——让Prompt和Response可分析
有一次我去看一个团队的AI系统日志,那些日志长这样:
2024-01-15 14:23:45 INFO LLM调用成功,耗时3421ms
2024-01-15 14:23:46 INFO 返回响应给用户
2024-01-15 14:24:02 ERROR LLM调用失败:timeout我问他们:能不能告诉我,这三个月里,用户问了哪类问题模型答得最差?
他们沉默了。
这不是能力问题,是日志压根就没存这些信息。日志里没有Prompt,没有Response,没有会话ID,没有用户ID,没有Token数,没有检索到的文档……有的只是"调用成功"和"耗时多少毫秒"。
这类日志对AI系统来说几乎没有价值。你无法回答任何关于"AI在干什么、干得好不好"的问题。
日志结构化,在AI系统里不是锦上添花,是基础设施。
为什么AI系统的日志需要特殊设计
普通Web服务的日志,记录HTTP请求和响应就够了。但AI系统的日志要记录的东西本质上更复杂:
第一,Prompt是核心数据。 同一个功能,Prompt改了一个字,效果可能天壤之别。如果不记录Prompt的版本和内容,你根本无法分析"上周的效果为什么比这周好"。
第二,中间步骤有价值。 RAG系统里,"检索了哪些文档"是重要信息。如果答案错了,你需要知道是检索阶段出的问题(检索到了不相关的文档)还是生成阶段出的问题(有相关文档但模型没用上)。
第三,Token是成本维度。 AI系统的直接运行成本和Token消耗正相关。不记录Token数量,你就对成本没有洞察,也无法发现"某个Prompt突然变长了把成本打高了"这类问题。
第四,会话上下文需要可重放。 AI系统出了问题,运维人员需要能"重放"那个出问题的会话——同样的用户输入、同样的历史记录、同样的Prompt,再跑一遍看问题能不能复现。没有结构化的会话日志,这根本做不到。
日志的层次设计
这三层日志各有用途,写入位置和保留时间也不同。
请求级日志的结构设计
这是最细粒度的日志,每次LLM调用写一条。
@Data
@Builder
public class AIRequestLog {
// === 标识符 ===
private String traceId; // 分布式追踪ID
private String requestId; // 本次请求唯一ID
private String sessionId; // 会话ID(多轮对话同一个)
private String userId; // 用户ID(脱敏后)
private String applicationId; // 哪个业务应用
// === 时间 ===
private Instant requestTime;
private Long latencyMs;
private Long ttftMs; // Time To First Token(流式接口)
// === Prompt信息 ===
private String promptKey; // Prompt模板的业务键(如"customer_service_v2")
private String promptVersion; // Prompt版本号
private String systemPrompt; // 系统提示词(按需脱敏)
private String userMessage; // 用户消息(按需脱敏)
// 渲染后的完整Prompt(某些场景需要,注意数据量)
@JsonInclude(JsonInclude.Include.NON_NULL)
private String fullPrompt;
// === 模型配置 ===
private String modelId;
private Double temperature;
private Integer maxTokens;
private Boolean streaming;
// === RAG信息 ===
private List<RetrievedDocInfo> retrievedDocs; // 检索到的文档列表
private Double retrievalTopScore; // 最高检索相关性分
private String retrievalStrategy; // 使用的检索策略
// === 输出 ===
private String response; // 模型回答(按需截断)
private String finishReason; // stop/length/content_filter
private Integer promptTokens;
private Integer completionTokens;
private Integer totalTokens;
// === 质量评估 ===
private Double retrievalQualityScore;
private Double hallucinationRiskScore;
private Double instructionFollowingScore;
private String qualityFlags; // JSON数组,触发的质量警告
// === 状态 ===
private String status; // SUCCESS/TIMEOUT/RATE_LIMITED/ERROR
private String errorCode;
private String errorMessage; // 脱敏后的错误信息
// === 上下文 ===
private Integer conversationTurn; // 当前是第几轮对话
private Integer historyTokens; // 历史消息占用的Token数
private String degradationLevel; // 如果发生了降级,是哪个级别
// === 用户反馈(异步更新)===
private Integer userFeedbackScore; // 1-5分,用户手动评分
private String userFeedbackText; // 用户的文字评价
private Instant feedbackTime;
}@Data
@Builder
public class RetrievedDocInfo {
private String docId;
private String docTitle;
private String docChunkId;
private Double score; // 语义相似度分
private Integer rankPosition; // 排名位置
private Boolean usedInContext; // 是否被纳入了最终Prompt
private Integer tokenCount; // 这段文档占多少Token
}日志写入的工程实现
@Service
public class AIRequestLogger {
private final ObjectMapper objectMapper;
private final LogRepository logRepository;
private final DataAnonymizer anonymizer;
private final BlockingQueue<AIRequestLog> asyncQueue;
public AIRequestLogger(ObjectMapper objectMapper,
LogRepository logRepository,
DataAnonymizer anonymizer) {
this.objectMapper = objectMapper;
this.logRepository = logRepository;
this.anonymizer = anonymizer;
// 异步写入队列,避免日志写入影响主路径延迟
this.asyncQueue = new LinkedBlockingQueue<>(50000);
startAsyncWriter();
}
/**
* 在AI调用完成后,构建并提交日志
* 这个方法要非常快,不能有阻塞操作
*/
public void logRequest(AICallContext ctx, AICallResult result) {
try {
AIRequestLog log = buildLog(ctx, result);
// 非阻塞提交,队列满了就丢弃(宁可丢日志也不影响用户)
boolean offered = asyncQueue.offer(log);
if (!offered) {
log.warn("日志队列已满,本次日志被丢弃: requestId={}", ctx.getRequestId());
}
} catch (Exception e) {
// 日志写入失败不应该影响主流程,只记录到系统日志
log.error("构建AI请求日志失败", e);
}
}
private AIRequestLog buildLog(AICallContext ctx, AICallResult result) {
return AIRequestLog.builder()
.traceId(ctx.getTraceId())
.requestId(ctx.getRequestId())
.sessionId(ctx.getSessionId())
.userId(anonymizer.anonymizeUserId(ctx.getUserId()))
.applicationId(ctx.getApplicationId())
.requestTime(ctx.getRequestTime())
.latencyMs(result.getLatencyMs())
.ttftMs(result.getTtftMs())
.promptKey(ctx.getPromptKey())
.promptVersion(ctx.getPromptVersion())
// 注意:要按数据合规要求决定是否记录完整Prompt
.systemPrompt(shouldLogFullPrompt(ctx) ? ctx.getSystemPrompt() :
truncate(ctx.getSystemPrompt(), 200))
.userMessage(anonymizer.anonymizeUserMessage(ctx.getUserMessage()))
.modelId(ctx.getModelId())
.temperature(ctx.getTemperature())
.maxTokens(ctx.getMaxTokens())
.streaming(ctx.isStreaming())
.retrievedDocs(buildDocInfoList(ctx.getRetrievedDocs()))
.retrievalTopScore(ctx.getRetrievalTopScore())
.response(truncate(result.getResponse(), 2000)) // 截断超长的回答
.finishReason(result.getFinishReason())
.promptTokens(result.getPromptTokens())
.completionTokens(result.getCompletionTokens())
.totalTokens(result.getTotalTokens())
.status(result.getStatus().name())
.errorCode(result.getErrorCode())
.conversationTurn(ctx.getConversationTurn())
.historyTokens(ctx.getHistoryTokens())
.degradationLevel(ctx.getDegradationLevel())
.build();
}
private void startAsyncWriter() {
Thread writer = new Thread(() -> {
List<AIRequestLog> batch = new ArrayList<>(100);
while (!Thread.currentThread().isInterrupted()) {
try {
// 批量写入,减少数据库压力
AIRequestLog first = asyncQueue.poll(1, TimeUnit.SECONDS);
if (first == null) continue;
batch.add(first);
asyncQueue.drainTo(batch, 99); // 最多凑100条
logRepository.batchInsert(batch);
batch.clear();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("批量写入日志失败,{}条日志丢失", batch.size(), e);
batch.clear(); // 清空,避免反复失败
}
}
});
writer.setDaemon(true);
writer.setName("ai-log-writer");
writer.start();
}
}结构化日志的查询分析
有了结构化日志,就可以做各种有价值的分析了。
@Repository
public class AIRequestLogRepository {
private final JdbcTemplate jdbcTemplate;
/**
* 分析指定时间段内的Prompt版本效果对比
* 这是最常用的分析:新Prompt和旧Prompt哪个更好?
*/
public List<PromptVersionStats> analyzePromptVersionPerformance(
String promptKey, LocalDateTime from, LocalDateTime to) {
return jdbcTemplate.query("""
SELECT
prompt_version,
COUNT(*) as request_count,
AVG(total_tokens) as avg_tokens,
AVG(latency_ms) as avg_latency_ms,
AVG(retrieval_top_score) as avg_retrieval_score,
AVG(CASE WHEN status = 'SUCCESS' THEN 1.0 ELSE 0.0 END) as success_rate,
AVG(user_feedback_score) as avg_user_score,
COUNT(CASE WHEN user_feedback_score IS NOT NULL THEN 1 END) as feedback_count
FROM ai_request_logs
WHERE prompt_key = ?
AND request_time BETWEEN ? AND ?
GROUP BY prompt_version
ORDER BY avg_user_score DESC NULLS LAST
""",
new PromptVersionStatsMapper(),
promptKey, from, to
);
}
/**
* 找出高幻觉风险的会话,供人工审核
*/
public List<SessionSummary> findHighRiskSessions(double hallucinationThreshold,
int minTurns) {
return jdbcTemplate.query("""
SELECT
session_id,
user_id,
COUNT(*) as turn_count,
AVG(hallucination_risk_score) as avg_hallucination_risk,
MAX(hallucination_risk_score) as max_hallucination_risk,
MIN(request_time) as session_start,
MAX(request_time) as session_end,
SUM(total_tokens) as total_tokens_used
FROM ai_request_logs
WHERE status = 'SUCCESS'
AND request_time > NOW() - INTERVAL '7 days'
GROUP BY session_id, user_id
HAVING COUNT(*) >= ?
AND AVG(hallucination_risk_score) > ?
ORDER BY avg_hallucination_risk DESC
LIMIT 50
""",
new SessionSummaryMapper(),
minTurns, hallucinationThreshold
);
}
/**
* Token成本分析:找出成本最高的业务场景
*/
public List<ApplicationCostStats> analyzeTokenCostByApplication(
LocalDateTime from, LocalDateTime to) {
return jdbcTemplate.query("""
SELECT
application_id,
SUM(total_tokens) as total_tokens,
SUM(prompt_tokens) as total_prompt_tokens,
SUM(completion_tokens) as total_completion_tokens,
COUNT(*) as request_count,
AVG(total_tokens) as avg_tokens_per_request,
-- 按当前定价估算成本(按实际情况调整)
SUM(prompt_tokens) * 0.000003 +
SUM(completion_tokens) * 0.000015 as estimated_cost_usd
FROM ai_request_logs
WHERE request_time BETWEEN ? AND ?
AND status = 'SUCCESS'
GROUP BY application_id
ORDER BY total_tokens DESC
""",
new ApplicationCostStatsMapper(),
from, to
);
}
/**
* 检索质量分析:哪些查询类型检索效果最差?
*/
public List<RetrievalQualityByCategory> analyzeRetrievalQuality() {
return jdbcTemplate.query("""
SELECT
LEFT(prompt_key, CHARINDEX('_', prompt_key + '_') - 1) as category,
AVG(retrieval_top_score) as avg_top_score,
COUNT(CASE WHEN retrieval_top_score < 0.6 THEN 1 END) as low_quality_count,
COUNT(*) as total_count,
COUNT(CASE WHEN retrieval_top_score < 0.6 THEN 1 END) * 100.0 /
COUNT(*) as low_quality_rate
FROM ai_request_logs
WHERE retrieval_top_score IS NOT NULL
AND request_time > NOW() - INTERVAL '30 days'
GROUP BY category
ORDER BY avg_top_score ASC
""",
new RetrievalQualityMapper()
);
}
}会话级日志的重放能力
这是个很有用但很多团队忽略的功能:当一个会话出现了问题,能够完整重放。
@Service
public class SessionReplayService {
private final AIRequestLogRepository logRepo;
private final LLMService llmService;
/**
* 重放一个历史会话
* 用于问题复现和效果对比
*/
public SessionReplayResult replaySession(String sessionId,
ReplayConfig config) {
// 按时间顺序拉取会话的所有轮次
List<AIRequestLog> sessionLogs = logRepo.findBySessionId(sessionId);
if (sessionLogs.isEmpty()) {
throw new SessionNotFoundException(sessionId);
}
List<TurnReplayResult> turnResults = new ArrayList<>();
List<ChatMessage> simulatedHistory = new ArrayList<>();
for (AIRequestLog originalTurn : sessionLogs) {
TurnReplayResult turnResult = replayTurn(
originalTurn,
simulatedHistory,
config
);
turnResults.add(turnResult);
// 把这一轮加入历史,用于下一轮的上下文
simulatedHistory.add(ChatMessage.user(originalTurn.getUserMessage()));
simulatedHistory.add(ChatMessage.assistant(turnResult.getReplayedResponse()));
}
return SessionReplayResult.builder()
.sessionId(sessionId)
.originalLogs(sessionLogs)
.turnResults(turnResults)
.differenceReport(buildDifferenceReport(sessionLogs, turnResults))
.build();
}
private TurnReplayResult replayTurn(AIRequestLog original,
List<ChatMessage> history,
ReplayConfig config) {
// 使用原始Prompt版本还是当前最新版本?
String promptVersion = config.isUseOriginalPrompt() ?
original.getPromptVersion() : "current";
AICallContext ctx = AICallContext.builder()
.sessionId("replay-" + original.getSessionId())
.promptKey(original.getPromptKey())
.promptVersion(promptVersion)
.userMessage(original.getUserMessage())
.conversationHistory(history)
.build();
AICallResult result = llmService.call(ctx);
// 对比原始回答和重放回答的差异
double similarity = computeTextSimilarity(
original.getResponse(),
result.getResponse()
);
return TurnReplayResult.builder()
.originalResponse(original.getResponse())
.replayedResponse(result.getResponse())
.similarity(similarity)
.significant_difference(similarity < 0.7)
.originalLatencyMs(original.getLatencyMs())
.replayedLatencyMs(result.getLatencyMs())
.build();
}
}数据合规:什么能记,什么不能记
这是工程上必须认真对待的问题。
@Component
public class DataAnonymizer {
/**
* 用户消息脱敏
* 要根据你的业务和数据合规要求来决定规则
*/
public String anonymizeUserMessage(String message) {
if (message == null) return null;
String result = message;
// 手机号脱敏
result = result.replaceAll("1[3-9]\\d{9}", "1**********");
// 身份证号脱敏
result = result.replaceAll("\\d{17}[\\dXx]", "***身份证号***");
// 邮箱脱敏(保留域名,隐藏账号)
result = result.replaceAll("[a-zA-Z0-9._%+\\-]+@", "***@");
// 银行卡号脱敏
result = result.replaceAll("\\d{16,19}", "****");
return result;
}
/**
* 用户ID的不可逆哈希
* 保证同一用户在日志里对应同一个匿名ID,但无法反推真实ID
*/
public String anonymizeUserId(String userId) {
if (userId == null) return null;
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
// 加盐,避免彩虹表攻击
byte[] saltedUserId = (SALT + userId).getBytes(StandardCharsets.UTF_8);
byte[] hash = md.digest(saltedUserId);
// 取前16字节作为匿名ID
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 8; i++) {
sb.append(String.format("%02x", hash[i]));
}
return sb.toString();
} catch (NoSuchAlgorithmException e) {
return "anonymized";
}
}
/**
* 判断用户消息是否包含敏感内容
* 如果包含,可以选择不记录完整内容
*/
public boolean containsSensitiveContent(String message) {
if (message == null) return false;
// 检查是否包含明显的个人信息模式
return message.matches(".*1[3-9]\\d{9}.*") ||
message.matches(".*\\d{17}[\\dXx].*") ||
message.matches(".*[a-zA-Z0-9._%+\\-]+@[a-zA-Z0-9.\\-]+\\.[a-zA-Z]{2,}.*");
}
}日志的生命周期管理
@Service
public class LogLifecycleManager {
/**
* 分层存储策略
* 热数据(近7天):PostgreSQL,支持复杂查询
* 温数据(7-90天):ClickHouse,支持大数据量分析
* 冷数据(90天+):S3对象存储,只用于审计和合规
*/
@Scheduled(cron = "0 0 3 * * *") // 每天凌晨3点执行
public void archiveLogs() {
LocalDate cutoffWarm = LocalDate.now().minusDays(7);
LocalDate cutoffCold = LocalDate.now().minusDays(90);
// 将7天前的日志迁移到ClickHouse
int movedToWarm = migrateToClickHouse(cutoffWarm);
log.info("迁移到温存储: {} 条日志", movedToWarm);
// 将90天前的日志迁移到S3,并从ClickHouse删除
int movedToCold = migrateToS3(cutoffCold);
log.info("迁移到冷存储: {} 条日志", movedToCold);
// 合规要求:保留至少2年的审计日志
// 超过2年的日志才可以真正删除
int deleted = deleteExpiredLogs(LocalDate.now().minusYears(2));
log.info("删除过期日志: {} 条", deleted);
}
}小结
日志结构化是AI系统工程化的基础设施投入。它本身不会让你的AI变得更聪明,但它会让你有能力知道"AI在哪里不聪明",然后去改进。
我见过很多团队在AI系统上线后,因为日志不完善,优化工作完全靠猜——猜用户在问什么,猜为什么效果差,猜Prompt改成什么样更好。
这种猜法,有时候猜对了,更多时候猜错了还不知道。
