AI应用的日志分析:用AI分析AI的日志
AI应用的日志分析:用AI分析AI的日志
那4个小时
2025年1月的一个凌晨两点,王磊盯着满屏的日志,眼睛发红。
他们公司的AI智能问答系统从晚上10点开始出现异常,大约15%的用户反馈答案质量突然变差,部分用户收到了无意义的重复内容。客服工单量在90分钟内涌入了342条投诉。
王磊用ELK打开日志面板,眼前出现的是——10万4千条日志记录。
他开始手动过滤。先按错误级别筛ERROR……还有3200条。再按服务名筛ai-chat-service……还有1800条。再按时间段……
"这个ERROR是正常的,这个是偶发的,这个……"
两小时后,他找到了一批"似乎相关"的日志,但不确定是不是根因。又花了一小时找到了问题所在:Prompt模板在某个边界条件下生成了超长输入,触发了模型的Context Window限制,模型的返回从正常降级到了截断内容,而截断内容恰好是重复文本。
根因找到了,凌晨2点。修复5分钟,恢复正常。
4个小时,找到5分钟能修复的问题。
这就是没有建立AI日志体系的代价。
AI应用日志的特殊性
传统Java应用的日志记录通常关注:请求/响应、异常堆栈、数据库查询、服务耗时。
AI应用的日志需要额外记录:
AI应用的独特错误模式
传统应用:NullPointerException, SQLException, TimeoutException
AI应用:RateLimitException, ContextLengthExceededException,
ContentFilterException, ModelNotAvailableException,
InvalidRequestError, InsufficientQuotaException每种AI错误都有不同的处理策略,没有结构化日志,根本无法区分。
结构化日志设计:AI调用的完整Schema
核心日志实体定义
package com.laozhang.ai.logging;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.List;
import java.util.Map;
/**
* AI调用日志结构
*
* 设计原则:
* 1. 可查询性:每个字段都应该是可以过滤/聚合的维度
* 2. 完整性:能够重现问题场景的所有信息
* 3. 合规性:敏感信息截断/脱敏
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public record AiCallLog(
// === 基础标识 ===
@JsonProperty("trace_id")
String traceId, // 分布式追踪ID,关联整个请求链路
@JsonProperty("span_id")
String spanId, // 当前AI调用的Span ID
@JsonProperty("request_id")
String requestId, // AI提供商返回的请求ID(用于官方支持联系)
// === 时间信息 ===
@JsonProperty("timestamp")
Instant timestamp, // 调用开始时间(UTC)
@JsonProperty("duration_ms")
Long durationMs, // 总耗时(毫秒)
@JsonProperty("ttft_ms")
Long ttftMs, // 首Token时间(Time To First Token,流式响应)
// === 模型信息 ===
@JsonProperty("provider")
String provider, // 模型提供商:openai/anthropic/azure/qianfan
@JsonProperty("model")
String model, // 具体模型:gpt-4o-mini/claude-3-5-haiku等
@JsonProperty("call_type")
String callType, // 调用类型:chat/embedding/completion/image
// === Token消耗 ===
@JsonProperty("tokens")
TokenInfo tokens, // Token详情
// === 调用上下文 ===
@JsonProperty("service_name")
String serviceName, // 调用方服务名
@JsonProperty("feature_name")
String featureName, // 业务功能名:customer_service_chat/product_search等
@JsonProperty("user_id")
String userId, // 用户ID(脱敏后)
@JsonProperty("session_id")
String sessionId, // 会话ID
// === 请求内容(脱敏摘要)===
@JsonProperty("prompt_summary")
PromptSummary promptSummary, // Prompt摘要,不存完整内容
// === 响应信息 ===
@JsonProperty("response_summary")
ResponseSummary responseSummary,
// === 状态和错误 ===
@JsonProperty("status")
String status, // success/error/timeout/rate_limited
@JsonProperty("error")
ErrorInfo error, // 错误详情(仅在失败时)
// === RAG相关(可选)===
@JsonProperty("rag_context")
RagContext ragContext, // 检索到的文档信息
// === 成本 ===
@JsonProperty("cost")
CostInfo cost, // 本次调用的费用估算
// === 安全 ===
@JsonProperty("security")
SecurityInfo security // 安全事件标记
) {
@JsonInclude(JsonInclude.Include.NON_NULL)
public record TokenInfo(
@JsonProperty("input_tokens") int inputTokens,
@JsonProperty("output_tokens") int outputTokens,
@JsonProperty("total_tokens") int totalTokens,
@JsonProperty("cached_tokens") Integer cachedTokens // OpenAI的prompt缓存
) {}
@JsonInclude(JsonInclude.Include.NON_NULL)
public record PromptSummary(
@JsonProperty("message_count") int messageCount, // 消息条数
@JsonProperty("system_prompt_hash") String systemPromptHash, // 系统提示的哈希
@JsonProperty("user_input_length") int userInputLength, // 用户输入长度
@JsonProperty("has_tools") boolean hasTools, // 是否启用了函数调用
@JsonProperty("context_docs_count") Integer contextDocsCount // RAG注入文档数
) {}
@JsonInclude(JsonInclude.Include.NON_NULL)
public record ResponseSummary(
@JsonProperty("output_length") int outputLength, // 输出字符长度
@JsonProperty("finish_reason") String finishReason, // stop/length/tool_calls/content_filter
@JsonProperty("tool_calls_count") Integer toolCallsCount,
@JsonProperty("response_quality_score") Double qualityScore // 可选:自动评分
) {}
@JsonInclude(JsonInclude.Include.NON_NULL)
public record ErrorInfo(
@JsonProperty("error_type") String errorType, // 错误类型枚举
@JsonProperty("error_code") String errorCode, // 原始错误码
@JsonProperty("error_message") String errorMessage, // 脱敏后的错误信息
@JsonProperty("is_retryable") boolean isRetryable, // 是否可重试
@JsonProperty("retry_count") int retryCount // 重试次数
) {}
@JsonInclude(JsonInclude.Include.NON_NULL)
public record RagContext(
@JsonProperty("query_vector_ms") Long queryVectorMs, // 向量化耗时
@JsonProperty("search_ms") Long searchMs, // 向量搜索耗时
@JsonProperty("retrieved_count") int retrievedCount, // 检索到文档数
@JsonProperty("used_count") int usedCount, // 实际使用文档数
@JsonProperty("max_similarity") Double maxSimilarity, // 最高相似度
@JsonProperty("min_similarity") Double minSimilarity // 最低相似度
) {}
@JsonInclude(JsonInclude.Include.NON_NULL)
public record CostInfo(
@JsonProperty("input_cost_usd") Double inputCostUsd,
@JsonProperty("output_cost_usd") Double outputCostUsd,
@JsonProperty("total_cost_usd") Double totalCostUsd,
@JsonProperty("total_cost_cny") Double totalCostCny // 人民币换算
) {}
@JsonInclude(JsonInclude.Include.NON_NULL)
public record SecurityInfo(
@JsonProperty("pii_detected") boolean piiDetected, // 是否检测到PII
@JsonProperty("pii_types") List<String> piiTypes, // PII类型列表
@JsonProperty("prompt_injection_score") Double promptInjectionScore, // 注入风险
@JsonProperty("content_policy_flags") List<String> contentPolicyFlags
) {}
}AiCallLogger:核心日志记录器
package com.laozhang.ai.logging;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.metadata.Usage;
import org.springframework.stereotype.Component;
import java.security.MessageDigest;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* AI调用日志记录器
*
* 使用方式:
* 1. 在AI调用前创建Builder
* 2. AI调用完成后调用complete()或fail()
* 3. 自动写入结构化JSON日志
*/
@Component
public class AiCallLogger {
private static final Logger log = LoggerFactory.getLogger("ai.calls");
private static final Logger auditLog = LoggerFactory.getLogger("ai.audit");
private final ObjectMapper objectMapper;
private final MeterRegistry meterRegistry;
private final PiiDetector piiDetector;
private final CostCalculator costCalculator;
public AiCallLogger(ObjectMapper objectMapper,
MeterRegistry meterRegistry,
PiiDetector piiDetector,
CostCalculator costCalculator) {
this.objectMapper = objectMapper;
this.meterRegistry = meterRegistry;
this.piiDetector = piiDetector;
this.costCalculator = costCalculator;
}
/**
* 开始记录一次AI调用
*/
public AiCallContext startCall(String provider, String model,
String featureName, String userId) {
return new AiCallContext(provider, model, featureName,
hashUserId(userId), Instant.now());
}
/**
* AI调用成功完成
*/
public void logSuccess(AiCallContext ctx, ChatResponse response,
String userInput) {
Usage usage = response.getMetadata().getUsage();
// 计算成本
AiCallLog.CostInfo cost = costCalculator.calculate(
ctx.model(), usage.getPromptTokens(), usage.getGenerationTokens());
// 检测PII(异步,不阻塞主流程)
AiCallLog.SecurityInfo security = piiDetector.scanAsync(userInput);
AiCallLog logEntry = new AiCallLog(
ctx.traceId(),
ctx.spanId(),
extractRequestId(response),
ctx.startTime(),
System.currentTimeMillis() - ctx.startTimeMs(),
null, // TTFT仅流式场景有
ctx.provider(),
ctx.model(),
"chat",
new AiCallLog.TokenInfo(
usage.getPromptTokens(),
usage.getGenerationTokens(),
usage.getTotalTokens(),
null
),
"ai-service",
ctx.featureName(),
ctx.userId(),
ctx.sessionId(),
buildPromptSummary(userInput),
buildResponseSummary(response),
"success",
null,
null,
cost,
security
);
writeLog(logEntry);
recordMetrics(logEntry);
}
/**
* AI调用失败
*/
public void logFailure(AiCallContext ctx, Exception e,
String userInput, int retryCount) {
AiCallLog.ErrorInfo error = classifyError(e, retryCount);
AiCallLog logEntry = new AiCallLog(
ctx.traceId(), ctx.spanId(), null,
ctx.startTime(),
System.currentTimeMillis() - ctx.startTimeMs(),
null, ctx.provider(), ctx.model(), "chat",
null, // 失败时可能没有token信息
"ai-service", ctx.featureName(), ctx.userId(), ctx.sessionId(),
buildPromptSummary(userInput),
null,
"error",
error,
null, null, null
);
writeLog(logEntry);
recordErrorMetrics(logEntry);
}
private void writeLog(AiCallLog entry) {
try {
String json = objectMapper.writeValueAsString(entry);
log.info(json);
// 安全事件写入审计日志
if (entry.security() != null &&
(entry.security().piiDetected() ||
(entry.security().promptInjectionScore() != null &&
entry.security().promptInjectionScore() > 0.7))) {
auditLog.warn("SECURITY_EVENT: " + json);
}
} catch (Exception e) {
log.error("日志序列化失败", e);
}
}
private void recordMetrics(AiCallLog entry) {
// 请求计数
Counter.builder("ai.calls.total")
.tag("provider", entry.provider())
.tag("model", entry.model())
.tag("feature", entry.featureName())
.tag("status", entry.status())
.register(meterRegistry).increment();
// 延迟分布
Timer.builder("ai.calls.duration")
.tag("model", entry.model())
.tag("feature", entry.featureName())
.register(meterRegistry)
.record(entry.durationMs(), TimeUnit.MILLISECONDS);
// Token消耗
if (entry.tokens() != null) {
meterRegistry.counter("ai.tokens.total",
"type", "input",
"model", entry.model()
).increment(entry.tokens().inputTokens());
meterRegistry.counter("ai.tokens.total",
"type", "output",
"model", entry.model()
).increment(entry.tokens().outputTokens());
}
// 成本统计
if (entry.cost() != null && entry.cost().totalCostUsd() != null) {
meterRegistry.counter("ai.cost.usd",
"model", entry.model(),
"feature", entry.featureName()
).increment(entry.cost().totalCostUsd());
}
}
private AiCallLog.PromptSummary buildPromptSummary(String userInput) {
return new AiCallLog.PromptSummary(
1, // 消息条数
null, // 不记录系统提示哈希(减少信息量)
userInput != null ? userInput.length() : 0,
false,
null
);
}
private AiCallLog.ResponseSummary buildResponseSummary(ChatResponse response) {
return new AiCallLog.ResponseSummary(
response.getResult().getOutput().getContent() != null
? response.getResult().getOutput().getContent().length() : 0,
response.getResult().getMetadata().getFinishReason(),
null,
null
);
}
private AiCallLog.ErrorInfo classifyError(Exception e, int retryCount) {
String errorType;
boolean isRetryable;
String message = e.getMessage() != null ? e.getMessage() : "";
if (message.contains("rate limit") || message.contains("429")) {
errorType = "RATE_LIMIT";
isRetryable = true;
} else if (message.contains("context length") || message.contains("maximum context")) {
errorType = "CONTEXT_LENGTH_EXCEEDED";
isRetryable = false;
} else if (message.contains("content policy") || message.contains("content filter")) {
errorType = "CONTENT_POLICY_VIOLATION";
isRetryable = false;
} else if (message.contains("timeout") || message.contains("timed out")) {
errorType = "TIMEOUT";
isRetryable = true;
} else if (message.contains("503") || message.contains("unavailable")) {
errorType = "MODEL_UNAVAILABLE";
isRetryable = true;
} else if (message.contains("401") || message.contains("API key")) {
errorType = "AUTHENTICATION_ERROR";
isRetryable = false;
} else {
errorType = "UNKNOWN_ERROR";
isRetryable = true;
}
// 脱敏:只记录错误类型,不记录可能包含敏感内容的完整错误信息
String sanitizedMessage = message.length() > 200
? message.substring(0, 200) + "..." : message;
return new AiCallLog.ErrorInfo(errorType, null, sanitizedMessage,
isRetryable, retryCount);
}
private String hashUserId(String userId) {
if (userId == null) return "anonymous";
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
byte[] hash = md.digest(userId.getBytes());
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 8; i++) {
sb.append(String.format("%02x", hash[i]));
}
return sb.toString(); // 前16字符的哈希,不可逆
} catch (Exception e) {
return "hashed_" + userId.hashCode();
}
}
private String extractRequestId(ChatResponse response) {
try {
return (String) response.getMetadata().get("id");
} catch (Exception e) {
return null;
}
}
private void recordErrorMetrics(AiCallLog entry) {
Counter.builder("ai.errors.total")
.tag("provider", entry.provider())
.tag("model", entry.model())
.tag("error_type", entry.error() != null ? entry.error().errorType() : "unknown")
.register(meterRegistry).increment();
}
/**
* AI调用上下文(不可变)
*/
public record AiCallContext(
String provider,
String model,
String featureName,
String userId,
Instant startTime,
long startTimeMs,
String traceId,
String spanId,
String sessionId
) {
public AiCallContext(String provider, String model,
String featureName, String userId,
Instant startTime) {
this(provider, model, featureName, userId, startTime,
System.currentTimeMillis(),
UUID.randomUUID().toString(),
UUID.randomUUID().toString().substring(0, 8),
null);
}
}
}ELK Stack集成:Logstash解析AI日志
Logstash Pipeline配置
# /etc/logstash/conf.d/ai-logs.conf
input {
# 从Filebeat收集日志
beats {
port => 5044
tags => ["ai-service"]
}
}
filter {
# 只处理AI调用日志
if "ai.calls" in [logger_name] or [log][logger] == "ai.calls" {
# 解析JSON格式的AI日志
json {
source => "message"
target => "ai"
remove_field => ["message"]
}
# 处理时间戳
date {
match => ["[ai][timestamp]", "ISO8601"]
target => "@timestamp"
}
# 计算延迟分级
ruby {
code => '
duration = event.get("[ai][duration_ms]")
if duration
level = if duration < 500 then "fast"
elsif duration < 2000 then "normal"
elsif duration < 5000 then "slow"
else "very_slow"
end
event.set("[ai][latency_level]", level)
end
'
}
# 计算Token效率(输出Token/输入Token)
ruby {
code => '
tokens = event.get("[ai][tokens]")
if tokens
input = tokens["input_tokens"].to_f
output = tokens["output_tokens"].to_f
if input > 0
efficiency = (output / input).round(3)
event.set("[ai][token_efficiency]", efficiency)
end
end
'
}
# 标记高成本调用(超过$0.01)
ruby {
code => '
cost = event.get("[ai][cost][total_cost_usd]")
if cost && cost.to_f > 0.01
event.set("[ai][is_high_cost]", true)
event.tag("high_cost_call")
else
event.set("[ai][is_high_cost]", false)
end
'
}
# 处理错误分类
if [ai][status] == "error" {
mutate {
add_tag => ["ai_error"]
add_field => { "[ai][is_error]" => true }
}
# 标记严重错误
if [ai][error][error_type] in ["AUTHENTICATION_ERROR", "INSUFFICIENT_QUOTA"] {
mutate {
add_tag => ["critical_ai_error"]
}
}
}
# 提取向量检索质量指标
if [ai][rag_context] {
ruby {
code => '
rag = event.get("[ai][rag_context]")
if rag
max_sim = rag["max_similarity"]
if max_sim
quality = if max_sim > 0.9 then "excellent"
elsif max_sim > 0.75 then "good"
elsif max_sim > 0.6 then "fair"
else "poor"
end
event.set("[ai][rag_quality]", quality)
end
end
'
}
}
# 地理位置(如果有IP)
if [client_ip] {
geoip {
source => "client_ip"
target => "geoip"
}
}
}
# 审计日志特殊处理
if "ai.audit" in [logger_name] {
mutate {
add_tag => ["security_event"]
}
# 立即发送告警到Elasticsearch的告警索引
# 实际用Kibana Watcher或ElastAlert处理
}
}
output {
# 普通AI日志
if "ai-service" in [tags] and "security_event" not in [tags] {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "ai-logs-%{+YYYY.MM.dd}"
user => "logstash_writer"
password => "${LOGSTASH_PASSWORD}"
# ILM策略:7天热存储,30天温存储,90天冷存储
ilm_rollover_alias => "ai-logs"
ilm_pattern => "{now/d}-000001"
ilm_policy => "ai-logs-policy"
}
}
# 安全事件写入专用索引
if "security_event" in [tags] {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "ai-security-events-%{+YYYY.MM.dd}"
user => "logstash_writer"
password => "${LOGSTASH_PASSWORD}"
}
# 同时发钉钉告警
http {
url => "${DINGTALK_WEBHOOK}"
http_method => "post"
content_type => "application/json"
format => "message"
message => '{"msgtype":"text","text":{"content":"AI安全告警:检测到异常事件\n日志ID:%{[ai][trace_id]}"}}'
}
}
# 调试环境输出到控制台
if "debug" in [tags] {
stdout { codec => rubydebug }
}
}用AI分析AI日志:LLM分析异常
这是本文最有意思的部分:让LLM来分析AI应用的日志,找出问题根因。
package com.laozhang.ai.analysis;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.prompt.SystemPromptTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* AI驱动的日志分析服务
*
* 核心思想:当系统出现异常时,自动收集相关日志片段,
* 调用LLM分析根因,生成可执行的修复建议
*/
@Service
public class AiLogAnalysisService {
private final ChatClient chatClient;
private final ElasticsearchLogFetcher logFetcher;
private static final String ANALYSIS_SYSTEM_PROMPT = """
你是一个专业的AI应用运维专家,擅长分析AI系统的异常日志。
你的分析能力:
1. 识别AI调用的错误模式(速率限制、上下文超长、模型不可用等)
2. 区分临时性故障和持续性问题
3. 理解AI延迟异常的可能原因
4. 发现Token消耗异常模式
5. 识别可能的成本泄露
输出格式要求:
- 根因分析(1-3个最可能的原因,按可能性排序)
- 影响范围评估(影响了多少用户/请求)
- 立即行动建议(可以在5分钟内执行的操作)
- 长期优化建议(系统性改进方向)
- 严重程度评级(P0/P1/P2/P3)
注意:日志中的用户数据已脱敏,分析时请关注技术指标而非内容。
""";
public AiLogAnalysisService(ChatClient.Builder chatClientBuilder,
ElasticsearchLogFetcher logFetcher) {
this.chatClient = chatClientBuilder
.defaultSystem(ANALYSIS_SYSTEM_PROMPT)
.build();
this.logFetcher = logFetcher;
}
/**
* 分析指定时间窗口内的异常
*
* @param alertContext 告警上下文(从监控系统触发)
* @return 结构化的根因分析报告
*/
public IncidentAnalysisReport analyzeIncident(AlertContext alertContext) {
// 1. 收集相关日志
LogBundle logBundle = collectRelevantLogs(alertContext);
// 2. 预处理:聚合统计信息
LogStatistics stats = aggregateStatistics(logBundle);
// 3. 构建分析请求
String analysisRequest = buildAnalysisRequest(alertContext, logBundle, stats);
// 4. 调用LLM分析
String rawAnalysis = chatClient.prompt()
.user(analysisRequest)
.call()
.content();
// 5. 解析分析结果
return parseAnalysisReport(rawAnalysis, alertContext, stats);
}
/**
* 收集与异常相关的日志
* 策略:告警前后10分钟的日志,优先收集错误和慢请求
*/
private LogBundle collectRelevantLogs(AlertContext alertContext) {
long startTime = alertContext.alertTime().toEpochMilli() - 10 * 60 * 1000;
long endTime = alertContext.alertTime().toEpochMilli() + 10 * 60 * 1000;
// 收集错误日志(最多100条,按时间排序)
List<AiCallLog> errorLogs = logFetcher.fetchErrorLogs(
startTime, endTime, 100, alertContext.affectedService());
// 收集慢请求日志(P99超过阈值的,最多50条)
List<AiCallLog> slowLogs = logFetcher.fetchSlowLogs(
startTime, endTime, 50, 5000); // 超过5秒
// 收集告警触发前5分钟的"正常"日志作为对比基线
List<AiCallLog> baselineLogs = logFetcher.fetchSuccessLogs(
startTime - 5 * 60 * 1000, startTime, 20);
return new LogBundle(errorLogs, slowLogs, baselineLogs);
}
/**
* 聚合统计:生成紧凑的统计摘要,节省Token
*/
private LogStatistics aggregateStatistics(LogBundle bundle) {
List<AiCallLog> allLogs = bundle.allLogs();
// 错误类型分布
Map<String, Long> errorDistribution = bundle.errorLogs().stream()
.filter(l -> l.error() != null)
.collect(Collectors.groupingBy(
l -> l.error().errorType(),
Collectors.counting()
));
// 延迟分布
double avgDuration = allLogs.stream()
.filter(l -> l.durationMs() != null)
.mapToLong(AiCallLog::durationMs)
.average().orElse(0);
long p99Duration = allLogs.stream()
.filter(l -> l.durationMs() != null)
.mapToLong(AiCallLog::durationMs)
.sorted()
.skip((long) (allLogs.size() * 0.99))
.findFirst().orElse(0);
// Token消耗趋势
double avgTokens = allLogs.stream()
.filter(l -> l.tokens() != null)
.mapToInt(l -> l.tokens().totalTokens())
.average().orElse(0);
// 错误率
long totalRequests = allLogs.size();
long errorCount = bundle.errorLogs().size();
double errorRate = totalRequests > 0 ? (double) errorCount / totalRequests : 0;
return new LogStatistics(
totalRequests, errorCount, errorRate,
avgDuration, p99Duration, avgTokens,
errorDistribution
);
}
/**
* 构建发送给LLM的分析请求
*
* 关键:不发送完整日志(太多Token),而是发送统计摘要+代表性样本
*/
private String buildAnalysisRequest(AlertContext alertContext,
LogBundle bundle,
LogStatistics stats) {
StringBuilder sb = new StringBuilder();
sb.append("## 告警概况\n");
sb.append("- 告警类型:").append(alertContext.alertType()).append("\n");
sb.append("- 告警时间:").append(alertContext.alertTime()).append("\n");
sb.append("- 影响服务:").append(alertContext.affectedService()).append("\n");
sb.append("- 告警描述:").append(alertContext.description()).append("\n\n");
sb.append("## 统计摘要(告警前后10分钟)\n");
sb.append("- 总请求数:").append(stats.totalRequests()).append("\n");
sb.append("- 错误数:").append(stats.errorCount()).append("\n");
sb.append("- 错误率:").append(String.format("%.1f%%", stats.errorRate() * 100)).append("\n");
sb.append("- 平均延迟:").append(String.format("%.0fms", stats.avgDuration())).append("\n");
sb.append("- P99延迟:").append(stats.p99Duration()).append("ms\n");
sb.append("- 平均Token消耗:").append(String.format("%.0f", stats.avgTokens())).append("\n\n");
sb.append("## 错误类型分布\n");
stats.errorDistribution().forEach((type, count) ->
sb.append("- ").append(type).append(": ").append(count).append("次\n"));
sb.append("\n");
// 发送5条代表性错误日志(不是全部,节省Token)
sb.append("## 代表性错误日志样本(前5条)\n");
bundle.errorLogs().stream().limit(5).forEach(log -> {
sb.append("```json\n");
sb.append(formatLogForLLM(log));
sb.append("\n```\n");
});
// 发送2条慢请求日志
if (!bundle.slowLogs().isEmpty()) {
sb.append("\n## 慢请求样本(前2条)\n");
bundle.slowLogs().stream().limit(2).forEach(log -> {
sb.append("```json\n");
sb.append(formatLogForLLM(log));
sb.append("\n```\n");
});
}
sb.append("\n请基于以上信息进行根因分析。");
return sb.toString();
}
/**
* 格式化日志用于LLM分析(只保留关键字段,减少Token消耗)
*/
private String formatLogForLLM(AiCallLog log) {
Map<String, Object> compact = new java.util.LinkedHashMap<>();
compact.put("time", log.timestamp());
compact.put("model", log.model());
compact.put("feature", log.featureName());
compact.put("duration_ms", log.durationMs());
compact.put("status", log.status());
if (log.tokens() != null) {
compact.put("tokens", Map.of(
"in", log.tokens().inputTokens(),
"out", log.tokens().outputTokens()
));
}
if (log.error() != null) {
compact.put("error", Map.of(
"type", log.error().errorType(),
"message", log.error().errorMessage(),
"retryable", log.error().isRetryable()
));
}
if (log.responseSummary() != null) {
compact.put("finish_reason", log.responseSummary().finishReason());
}
try {
return new com.fasterxml.jackson.databind.ObjectMapper()
.writerWithDefaultPrettyPrinter()
.writeValueAsString(compact);
} catch (Exception e) {
return compact.toString();
}
}
private IncidentAnalysisReport parseAnalysisReport(String rawAnalysis,
AlertContext ctx,
LogStatistics stats) {
return new IncidentAnalysisReport(
ctx.alertTime(),
rawAnalysis, // 实际可以用结构化输出解析
stats.errorRate() > 0.3 ? "P1" : stats.errorRate() > 0.1 ? "P2" : "P3",
stats
);
}
// 数据类
public record AlertContext(
java.time.Instant alertTime,
String alertType,
String affectedService,
String description
) {}
public record LogBundle(
List<AiCallLog> errorLogs,
List<AiCallLog> slowLogs,
List<AiCallLog> baselineLogs
) {
public List<AiCallLog> allLogs() {
List<AiCallLog> all = new java.util.ArrayList<>();
all.addAll(errorLogs);
all.addAll(slowLogs);
all.addAll(baselineLogs);
return all;
}
}
public record LogStatistics(
long totalRequests,
long errorCount,
double errorRate,
double avgDuration,
long p99Duration,
double avgTokens,
Map<String, Long> errorDistribution
) {}
public record IncidentAnalysisReport(
java.time.Instant analyzedAt,
String analysis,
String severity,
LogStatistics statistics
) {}
}向量化日志:用聚类发现相似错误
有些错误不是单次出现,而是一个"错误家族"。向量化日志可以自动把相似错误聚在一起。
package com.laozhang.ai.analysis;
import org.springframework.ai.embedding.EmbeddingModel;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.ai.document.Document;
import org.springframework.ai.vectorstore.SearchRequest;
import org.springframework.stereotype.Service;
import java.util.*;
/**
* 日志向量化和相似错误聚类
*
* 应用场景:
* 1. 新错误出现时,自动找"过去发生过类似错误吗?怎么解决的?"
* 2. 每天对错误日志聚类,生成"错误摘要报告"
* 3. 告警降噪:同一类错误只告警一次
*/
@Service
public class LogVectorizationService {
private final EmbeddingModel embeddingModel;
private final VectorStore errorLogStore; // 专门存储错误日志向量的向量库
public LogVectorizationService(EmbeddingModel embeddingModel,
VectorStore errorLogStore) {
this.embeddingModel = embeddingModel;
this.errorLogStore = errorLogStore;
}
/**
* 将错误日志向量化并存入向量库
*
* @param errorLog 错误日志
*/
public void indexErrorLog(AiCallLog errorLog) {
if (errorLog.error() == null) return;
// 构建用于向量化的文本(结合错误信息的关键字段)
String logText = buildLogTextForEmbedding(errorLog);
// 创建文档,附带元数据
Map<String, Object> metadata = new HashMap<>();
metadata.put("error_type", errorLog.error().errorType());
metadata.put("model", errorLog.model());
metadata.put("feature", errorLog.featureName());
metadata.put("trace_id", errorLog.traceId());
metadata.put("timestamp", errorLog.timestamp().toString());
metadata.put("duration_ms", errorLog.durationMs());
Document doc = new Document(logText, metadata);
errorLogStore.add(List.of(doc));
}
/**
* 找出与当前错误最相似的历史错误
* 用于快速定位:"这个问题之前见过吗?怎么解决的?"
*/
public List<SimilarErrorResult> findSimilarErrors(AiCallLog currentError,
int topK) {
String logText = buildLogTextForEmbedding(currentError);
List<Document> similarDocs = errorLogStore.similaritySearch(
SearchRequest.query(logText)
.withTopK(topK)
.withSimilarityThreshold(0.75) // 相似度门槛
);
return similarDocs.stream()
.map(doc -> new SimilarErrorResult(
doc.getId(),
(String) doc.getMetadata().get("trace_id"),
(String) doc.getMetadata().get("error_type"),
(String) doc.getMetadata().get("timestamp"),
doc.getScore() // 相似度分数
))
.toList();
}
/**
* 对一批错误日志进行聚类
* 找出"错误家族"——多条报错其实来自同一根因
*/
public List<ErrorCluster> clusterErrors(List<AiCallLog> errorLogs) {
// 1. 向量化所有错误
List<float[]> vectors = errorLogs.stream()
.map(log -> vectorizeLog(buildLogTextForEmbedding(log)))
.toList();
// 2. K-Means聚类(简化版:基于相似度的贪心聚类)
List<ErrorCluster> clusters = new ArrayList<>();
boolean[] assigned = new boolean[errorLogs.size()];
for (int i = 0; i < errorLogs.size(); i++) {
if (assigned[i]) continue;
// 新簇的中心
List<Integer> clusterMembers = new ArrayList<>();
clusterMembers.add(i);
assigned[i] = true;
// 找相似错误加入同一簇
for (int j = i + 1; j < errorLogs.size(); j++) {
if (assigned[j]) continue;
float similarity = cosineSimilarity(vectors.get(i), vectors.get(j));
if (similarity > 0.85) { // 85%相似度认为是同一类问题
clusterMembers.add(j);
assigned[j] = true;
}
}
// 生成簇摘要
List<AiCallLog> clusterLogs = clusterMembers.stream()
.map(errorLogs::get).toList();
clusters.add(new ErrorCluster(
"cluster_" + clusters.size(),
clusterLogs,
identifyClusterPattern(clusterLogs)
));
}
// 按簇大小排序(大簇优先)
clusters.sort((a, b) -> b.logs().size() - a.logs().size());
return clusters;
}
private String buildLogTextForEmbedding(AiCallLog log) {
StringBuilder sb = new StringBuilder();
sb.append("模型:").append(log.model()).append(" ");
sb.append("功能:").append(log.featureName()).append(" ");
if (log.error() != null) {
sb.append("错误类型:").append(log.error().errorType()).append(" ");
sb.append("错误信息:").append(log.error().errorMessage()).append(" ");
}
if (log.responseSummary() != null && log.responseSummary().finishReason() != null) {
sb.append("结束原因:").append(log.responseSummary().finishReason()).append(" ");
}
if (log.durationMs() != null) {
String latencyDesc = log.durationMs() > 10000 ? "极慢" :
log.durationMs() > 5000 ? "很慢" :
log.durationMs() > 2000 ? "慢" : "正常";
sb.append("延迟:").append(latencyDesc).append(" ");
}
return sb.toString().trim();
}
private float[] vectorizeLog(String text) {
return embeddingModel.embed(text);
}
private float cosineSimilarity(float[] a, float[] b) {
float dot = 0, normA = 0, normB = 0;
for (int i = 0; i < a.length; i++) {
dot += a[i] * b[i];
normA += a[i] * a[i];
normB += b[i] * b[i];
}
return (float) (dot / (Math.sqrt(normA) * Math.sqrt(normB)));
}
private String identifyClusterPattern(List<AiCallLog> logs) {
// 找最常见的错误类型
Map<String, Long> errorTypes = logs.stream()
.filter(l -> l.error() != null)
.collect(Collectors.groupingBy(
l -> l.error().errorType(), Collectors.counting()));
return errorTypes.entrySet().stream()
.max(Map.Entry.comparingByValue())
.map(e -> e.getKey() + "(×" + e.getValue() + ")")
.orElse("UNKNOWN");
}
public record SimilarErrorResult(
String docId,
String traceId,
String errorType,
String timestamp,
Double similarity
) {}
public record ErrorCluster(
String clusterId,
List<AiCallLog> logs,
String dominantPattern
) {
public int size() { return logs.size(); }
}
import java.util.stream.Collectors;
}日志脱敏:处理AI日志中的PII数据
AI应用日志面临独特的PII风险:用户可能在对话中输入姓名、手机号、身份证号等敏感信息。
package com.laozhang.ai.logging;
import org.springframework.stereotype.Component;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
/**
* AI日志PII脱敏处理器
*
* 保护范围:手机号、身份证号、银行卡号、邮箱、姓名模式
*
* 设计原则:
* 1. 宁可误报不漏报(false positive比false negative安全)
* 2. 脱敏后保留格式特征(如手机号变成138****8888,可知类型)
* 3. 异步执行,不阻塞主流程
*/
@Component
public class PiiDetector {
// 中国手机号:1开头,第二位3-9
private static final Pattern PHONE_PATTERN =
Pattern.compile("(?<![\\d])(1[3-9]\\d{9})(?![\\d])");
// 中国身份证号:18位,最后一位可能是X
private static final Pattern ID_CARD_PATTERN =
Pattern.compile("([1-9]\\d{5})(18|19|20)(\\d{2})(0[1-9]|1[0-2])(0[1-9]|[12]\\d|3[01])(\\d{3})(\\d|X|x)");
// 银行卡号:16-19位数字
private static final Pattern BANK_CARD_PATTERN =
Pattern.compile("(?<![\\d])(\\d{4}[\\s-]?\\d{4}[\\s-]?\\d{4}[\\s-]?\\d{4,7})(?![\\d])");
// 邮箱地址
private static final Pattern EMAIL_PATTERN =
Pattern.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}");
// 姓名模式(2-4个汉字,前后有特定关键词)
private static final Pattern NAME_PATTERN =
Pattern.compile("(?:姓名|我叫|叫做|称呼为)[:: ]?([\\u4e00-\\u9fa5]{2,4})");
/**
* 检测并脱敏文本中的PII
*
* @param text 原始文本
* @return 脱敏后的文本
*/
public SanitizationResult sanitize(String text) {
if (text == null || text.isBlank()) {
return new SanitizationResult(text, false, java.util.List.of());
}
String sanitized = text;
java.util.List<String> detectedTypes = new java.util.ArrayList<>();
// 按敏感程度从高到低处理
// 1. 脱敏身份证号(最敏感)
if (ID_CARD_PATTERN.matcher(sanitized).find()) {
sanitized = ID_CARD_PATTERN.matcher(sanitized)
.replaceAll(m -> maskIdCard(m.group()));
detectedTypes.add("ID_CARD");
}
// 2. 脱敏银行卡号
if (BANK_CARD_PATTERN.matcher(sanitized).find()) {
sanitized = BANK_CARD_PATTERN.matcher(sanitized)
.replaceAll(m -> maskBankCard(m.group()));
detectedTypes.add("BANK_CARD");
}
// 3. 脱敏手机号
if (PHONE_PATTERN.matcher(sanitized).find()) {
sanitized = PHONE_PATTERN.matcher(sanitized)
.replaceAll(m -> maskPhone(m.group()));
detectedTypes.add("PHONE");
}
// 4. 脱敏邮箱
if (EMAIL_PATTERN.matcher(sanitized).find()) {
sanitized = EMAIL_PATTERN.matcher(sanitized)
.replaceAll(m -> maskEmail(m.group()));
detectedTypes.add("EMAIL");
}
// 5. 脱敏姓名
if (NAME_PATTERN.matcher(sanitized).find()) {
sanitized = NAME_PATTERN.matcher(sanitized)
.replaceAll(m -> m.group(0).replace(m.group(1), "**"));
detectedTypes.add("NAME");
}
boolean piiDetected = !detectedTypes.isEmpty();
return new SanitizationResult(sanitized, piiDetected, detectedTypes);
}
/**
* 异步扫描(用于日志记录,不阻塞业务)
*/
public AiCallLog.SecurityInfo scanAsync(String text) {
try {
SanitizationResult result = sanitize(text);
return new AiCallLog.SecurityInfo(
result.piiDetected(),
result.piiTypes(),
computeInjectionRisk(text),
java.util.List.of()
);
} catch (Exception e) {
return new AiCallLog.SecurityInfo(false, java.util.List.of(), null, java.util.List.of());
}
}
/**
* 简单的提示词注入风险评分
* 基于关键词模式,非严格检测
*/
private Double computeInjectionRisk(String text) {
if (text == null) return 0.0;
String lower = text.toLowerCase();
double risk = 0.0;
String[] injectionPatterns = {
"ignore previous", "ignore all", "forget everything",
"you are now", "act as", "pretend you are",
"system:", "[[", "]]", "<|im_start|>",
"忽略之前", "忘记所有", "你现在是", "扮演"
};
for (String pattern : injectionPatterns) {
if (lower.contains(pattern)) {
risk += 0.15;
}
}
return Math.min(1.0, risk);
}
// 脱敏方法
private String maskPhone(String phone) {
// 138****8888
return phone.substring(0, 3) + "****" + phone.substring(7);
}
private String maskIdCard(String idCard) {
// 110101********1234
if (idCard.length() < 14) return "***身份证***";
return idCard.substring(0, 6) + "********" + idCard.substring(idCard.length() - 4);
}
private String maskBankCard(String card) {
// 6222 **** **** 1234
String digits = card.replaceAll("[\\s-]", "");
if (digits.length() < 8) return "***银行卡***";
return digits.substring(0, 4) + " **** **** " + digits.substring(digits.length() - 4);
}
private String maskEmail(String email) {
// u***@domain.com
int atIndex = email.indexOf('@');
if (atIndex <= 1) return "***@***";
return email.substring(0, 1) + "***" + email.substring(atIndex);
}
public record SanitizationResult(
String sanitizedText,
boolean piiDetected,
java.util.List<String> piiTypes
) {}
}成本日志分析:Token消耗趋势
package com.laozhang.ai.cost;
import org.springframework.stereotype.Service;
import java.time.*;
import java.util.*;
/**
* AI成本分析服务
*
* 功能:
* 1. 按日/周/月统计Token消耗和成本
* 2. 检测成本异常(突增/持续增长)
* 3. 按功能/用户分组的成本归因
* 4. 成本趋势预测(简单线性预测)
*/
@Service
public class CostAnalysisService {
private final CostRepository costRepository;
// 各模型的每百万Token价格(美元)
private static final Map<String, ModelPricing> MODEL_PRICING = Map.of(
"gpt-4o-mini", new ModelPricing(0.15, 0.60),
"gpt-4o", new ModelPricing(5.0, 15.0),
"gpt-4o-2024-08-06", new ModelPricing(2.5, 10.0),
"claude-3-5-haiku-20241022", new ModelPricing(1.0, 5.0),
"claude-3-5-sonnet-20241022", new ModelPricing(3.0, 15.0),
"text-embedding-3-small", new ModelPricing(0.02, 0.0)
);
public CostAnalysisService(CostRepository costRepository) {
this.costRepository = costRepository;
}
/**
* 计算单次调用成本
*/
public static double calculateCostUsd(String model, int inputTokens, int outputTokens) {
ModelPricing pricing = MODEL_PRICING.getOrDefault(model,
new ModelPricing(5.0, 15.0)); // 未知模型用保守估算
return (inputTokens * pricing.inputPricePer1M() / 1_000_000.0) +
(outputTokens * pricing.outputPricePer1M() / 1_000_000.0);
}
/**
* 获取每日成本摘要
*/
public DailyCostReport getDailyCostReport(LocalDate date) {
Map<String, Double> costByModel = costRepository.getCostByModel(date);
Map<String, Double> costByFeature = costRepository.getCostByFeature(date);
double totalCost = costByModel.values().stream().mapToDouble(Double::doubleValue).sum();
// 检测异常(与过去7天均值对比)
double avgDailyCost = costRepository.getAverageDailyCost(date.minusDays(7), date.minusDays(1));
boolean isAbnormal = totalCost > avgDailyCost * 1.5; // 超过均值50%告警
return new DailyCostReport(date, totalCost, costByModel, costByFeature,
isAbnormal, avgDailyCost);
}
/**
* 月度成本预测(基于已用天数的线性外推)
*/
public CostForecast forecastMonthlySpend() {
LocalDate today = LocalDate.now();
LocalDate monthStart = today.withDayOfMonth(1);
double spentSoFar = costRepository.getTotalCost(monthStart, today);
int daysElapsed = today.getDayOfMonth();
int totalDaysInMonth = today.lengthOfMonth();
double dailyAvg = spentSoFar / daysElapsed;
double projectedTotal = dailyAvg * totalDaysInMonth;
return new CostForecast(
today.getMonth(),
spentSoFar,
projectedTotal,
dailyAvg,
daysElapsed,
totalDaysInMonth
);
}
// 数据类
public record ModelPricing(double inputPricePer1M, double outputPricePer1M) {}
public record DailyCostReport(
LocalDate date,
double totalCostUsd,
Map<String, Double> costByModel,
Map<String, Double> costByFeature,
boolean isAbnormal,
double sevenDayAvg
) {}
public record CostForecast(
Month month,
double spentSoFar,
double projectedTotal,
double dailyAverage,
int daysElapsed,
int totalDays
) {}
}日志保留策略:AI应用的存储规划
实际存储估算(以每日10万次AI调用为例):
| 日志类型 | 每条大小 | 每日条数 | 每日大小 | 30天 |
|---|---|---|---|---|
| AI调用日志 | ~2KB | 100,000 | ~200MB | ~6GB |
| 错误日志 | ~3KB | ~5,000 | ~15MB | ~450MB |
| 安全审计 | ~1KB | ~100 | ~100KB | ~3MB |
| 合计 | ~215MB/天 | ~6.5GB |
ILM(Index Lifecycle Management)配置建议:
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "10gb",
"max_age": "7d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 }
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}FAQ
Q:AI日志太大了,存Elasticsearch成本很高,有什么优化方法?
A:三个优化方向。一是只索引关键字段(不存完整Prompt),控制每条日志大小在500字节以内。二是错误日志全量存,成功日志按10%采样存(成本降低90%,仍有统计代表性)。三是使用时序数据库(如InfluxDB或ClickHouse)存储数值型的Token消耗和延迟数据,比Elasticsearch便宜5-10倍。
Q:用AI分析日志,会不会把用户数据泄露给LLM?
A:必须做好脱敏。两层防护:第一层,在写日志时就做PII脱敏(本文的PiiDetector);第二层,发给LLM分析的是统计摘要和代表性样本,不是全量日志。同时建议用企业版API(有数据处理协议),或者使用本地部署的模型来分析日志。
Q:Kibana告警总是误报,怎么减少?
A:告警配置的关键是设置合理的时间窗口和阈值。不要用单点触发,而是用滑动窗口均值:过去5分钟错误率 > 5% 且持续时间 > 2分钟,才触发告警。同时为不同功能设置不同阈值(核心功能更严格)。
Q:如何区分是模型API不稳定,还是我们代码的问题?
A:记录几个关键字段帮助区分:request_id(AI提供商返回的),如果相同request_id失败,是提供商问题;如果不同request_id都失败,先看是否所有模型都失败(提供商级别问题),还是特定功能失败(可能是Prompt问题或代码bug)。同时监控AI提供商的status page。
总结
王磊那4个小时的痛苦经历,本质是缺少了三样东西:
- 结构化日志 → 无法快速过滤和查询
- AI特定字段 → 不知道是哪种AI错误、什么原因
- 自动分析工具 → 只能靠人眼扫描
建立完善的AI日志体系后,同样的问题:
- 自动聚类发现"Context超长"错误家族 → 30秒定位问题类型
- LLM自动分析根因报告 → 3分钟得到修复建议
- PII脱敏保护 → 合规安全
- 成本监控 → 提前预警异常消耗
用AI分析AI的日志,是AI时代运维工程师最有价值的武器之一。
