AI 应用的 ELK 接入——从日志到洞察的完整链路
AI 应用的 ELK 接入——从日志到洞察的完整链路
我们的 AI 应用上线初期,日志是这样的:
2024-01-15 10:23:45 INFO ChatService - User request received
2024-01-15 10:23:46 INFO ChatService - LLM response generated
2024-01-15 10:23:46 INFO ChatService - Response sent to user出了问题,你能从这三行日志里得到什么信息?什么都没有。用户输入是什么?模型返回了什么?花了多少 Token?用了哪个模型?全部都没有。
然后我们走向了另一个极端,把所有东西都打进日志:
2024-01-15 10:23:45 INFO ChatService - User message: [用户完整的输入,可能有 2000 字]
2024-01-15 10:23:46 INFO ChatService - System prompt: [完整的 System Prompt,800 字]
2024-01-15 10:23:46 INFO ChatService - LLM response: [完整的模型输出,1500 字]存储爆炸了,而且大量的非结构化文本根本没办法做统计分析。
真正有用的 AI 日志应该在两个极端之间:结构化、有度量意义、能做聚合分析,同时不存储没必要存的原始内容。
AI 应用日志的结构化规范
先定义什么是"AI 日志的结构化规范"。
核心原则:每条 AI 调用日志必须包含可量化的度量字段,而不只是人类可读的文本。
{
"timestamp": "2024-01-15T10:23:46.523Z",
"level": "INFO",
"service": "ai-chat-service",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7",
"event_type": "ai_call_completed",
"request": {
"feature": "customer-service",
"model": "gpt-4o",
"temperature": 0.3,
"user_tier": "standard",
"session_id": "sess_abc123",
"prompt_tokens": 856,
"prompt_length_chars": 3424
},
"response": {
"completion_tokens": 234,
"total_tokens": 1090,
"finish_reason": "stop",
"latency_ms": 1823,
"cost_cents": 12
},
"quality": {
"output_format_valid": true,
"fallback_triggered": false,
"retry_count": 0
},
"user_context": {
"user_id_hash": "a7f3c2",
"session_turn": 3
}
}注意几个设计决策:
不存原始 Prompt 和 Response 内容,只存元数据(长度、Token 数)。原始内容如果需要,单独存到对象存储(S3/OSS),通过 trace_id 关联。这样日志系统存储量可控,又不丢失可追溯性。
user_id 做 hash 处理,不存原始 ID,保护隐私。
event_type 字段区分不同类型的 AI 事件,便于在 Kibana 里过滤。
quality 字段记录输出质量相关的信号,比如是否触发了降级逻辑,是否发生了重试。
Java 端的结构化日志实现
// AI 日志事件对象
@Data
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public class AICallLogEvent {
private String eventType;
private String service;
private String traceId;
private String spanId;
private RequestInfo request;
private ResponseInfo response;
private QualityInfo quality;
private UserContext userContext;
@Data
@Builder
public static class RequestInfo {
private String feature;
private String model;
private Double temperature;
private String userTier;
private String sessionId;
private Integer promptTokens;
private Integer promptLengthChars;
}
@Data
@Builder
public static class ResponseInfo {
private Integer completionTokens;
private Integer totalTokens;
private String finishReason;
private Long latencyMs;
private Long costCents;
}
@Data
@Builder
public static class QualityInfo {
private Boolean outputFormatValid;
private Boolean fallbackTriggered;
private Integer retryCount;
private String errorType; // 错误类型(如果有)
}
@Data
@Builder
public static class UserContext {
private String userIdHash;
private Integer sessionTurn;
}
}// AI 日志记录器
@Component
@Slf4j
public class AICallLogger {
private final ObjectMapper objectMapper;
private final Tracer tracer;
// 使用 Logback 的结构化日志能力,输出 JSON 格式
private static final Logger AI_LOGGER = LoggerFactory.getLogger("ai.calls");
public void logAICall(AICallContext ctx, ChatResponse response, long latencyMs) {
try {
// 从当前 trace 上下文获取 trace_id 和 span_id
String traceId = Optional.ofNullable(tracer.currentSpan())
.map(span -> span.context().traceId())
.orElse("unknown");
String spanId = Optional.ofNullable(tracer.currentSpan())
.map(span -> span.context().spanId())
.orElse("unknown");
Usage usage = response.getMetadata().getUsage();
int promptTokens = usage != null ? usage.getPromptTokens().intValue() : 0;
int completionTokens = usage != null ? usage.getGenerationTokens().intValue() : 0;
long costCents = calculateCostCents(ctx.getModel(), promptTokens, completionTokens);
AICallLogEvent event = AICallLogEvent.builder()
.eventType("ai_call_completed")
.service("ai-chat-service")
.traceId(traceId)
.spanId(spanId)
.request(AICallLogEvent.RequestInfo.builder()
.feature(ctx.getFeature())
.model(ctx.getModel())
.temperature(ctx.getTemperature())
.userTier(ctx.getUserTier())
.sessionId(ctx.getSessionId())
.promptTokens(promptTokens)
.promptLengthChars(ctx.getPromptText().length())
.build())
.response(AICallLogEvent.ResponseInfo.builder()
.completionTokens(completionTokens)
.totalTokens(promptTokens + completionTokens)
.finishReason(extractFinishReason(response))
.latencyMs(latencyMs)
.costCents(costCents)
.build())
.quality(AICallLogEvent.QualityInfo.builder()
.outputFormatValid(ctx.isOutputFormatValid())
.fallbackTriggered(ctx.isFallbackTriggered())
.retryCount(ctx.getRetryCount())
.build())
.userContext(AICallLogEvent.UserContext.builder()
.userIdHash(hashUserId(ctx.getUserId()))
.sessionTurn(ctx.getSessionTurn())
.build())
.build();
// 用 JSON 序列化,利用 Logback 的 JSON 布局输出
AI_LOGGER.info(objectMapper.writeValueAsString(event));
} catch (Exception e) {
log.error("Failed to write AI call log", e);
}
}
public void logAICallError(AICallContext ctx, Exception error, long latencyMs) {
// 错误日志使用相同的结构,但 quality.errorType 非空
// ... 类似逻辑
}
private String hashUserId(String userId) {
if (userId == null) return null;
return DigestUtils.md5DigestAsHex(userId.getBytes()).substring(0, 8);
}
private long calculateCostCents(String model, int promptTokens, int completionTokens) {
record ModelPrice(double inputPer1M, double outputPer1M) {}
Map<String, ModelPrice> prices = Map.of(
"gpt-4o", new ModelPrice(5.0, 15.0),
"gpt-4o-mini", new ModelPrice(0.15, 0.6)
);
ModelPrice price = prices.getOrDefault(model, new ModelPrice(5.0, 15.0));
return Math.round(
(promptTokens * price.inputPer1M() + completionTokens * price.outputPer1M())
/ 1_000_000 * 100
);
}
}Logstash Pipeline 处理 AI 日志
日志收集到 Elasticsearch 之前,需要经过 Logstash 做解析和转换:
# logstash/pipelines/ai-logs.conf
input {
beats {
port => 5044
# 只接收 AI 相关日志流
add_field => { "[@metadata][pipeline]" => "ai-logs" }
}
}
filter {
# 解析 JSON 格式的 AI 日志
if [message] =~ /^\{.*\}$/ {
json {
source => "message"
target => "ai_data"
remove_field => ["message"]
}
# 展开嵌套字段,方便 Kibana 里直接过滤
mutate {
rename => {
"[ai_data][event_type]" => "event_type"
"[ai_data][trace_id]" => "trace_id"
"[ai_data][request][feature]" => "ai.feature"
"[ai_data][request][model]" => "ai.model"
"[ai_data][request][user_tier]" => "ai.user_tier"
"[ai_data][request][prompt_tokens]" => "ai.prompt_tokens"
"[ai_data][response][completion_tokens]" => "ai.completion_tokens"
"[ai_data][response][total_tokens]" => "ai.total_tokens"
"[ai_data][response][latency_ms]" => "ai.latency_ms"
"[ai_data][response][cost_cents]" => "ai.cost_cents"
"[ai_data][response][finish_reason]" => "ai.finish_reason"
"[ai_data][quality][output_format_valid]" => "ai.output_format_valid"
"[ai_data][quality][fallback_triggered]" => "ai.fallback_triggered"
"[ai_data][quality][retry_count]" => "ai.retry_count"
}
}
# 添加派生字段
# 计算 completion/prompt 比率(用于检测异常输出)
if [ai.prompt_tokens] and [ai.completion_tokens] and [ai.prompt_tokens] > 0 {
ruby {
code => "
ratio = event.get('ai.completion_tokens').to_f / event.get('ai.prompt_tokens').to_f
event.set('ai.completion_prompt_ratio', ratio.round(3))
"
}
}
# 按延迟分档:fast (<1s) / normal (1-5s) / slow (5-15s) / very_slow (>15s)
if [ai.latency_ms] {
ruby {
code => "
latency = event.get('ai.latency_ms').to_i
bucket = if latency < 1000 then 'fast'
elsif latency < 5000 then 'normal'
elsif latency < 15000 then 'slow'
else 'very_slow' end
event.set('ai.latency_bucket', bucket)
"
}
}
# 按成本分档
if [ai.cost_cents] {
ruby {
code => "
cost = event.get('ai.cost_cents').to_i
bucket = if cost < 5 then 'cheap'
elsif cost < 20 then 'normal'
elsif cost < 100 then 'expensive'
else 'very_expensive' end
event.set('ai.cost_bucket', bucket)
"
}
}
}
# 非 AI 结构化日志,走普通解析
else {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:log_message}" }
}
}
# 去掉不需要的字段,节省存储
mutate {
remove_field => ["ai_data", "agent", "ecs", "input", "log"]
}
}
output {
if [event_type] == "ai_call_completed" or [event_type] == "ai_call_error" {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "ai-calls-%{+YYYY.MM.dd}"
# 使用 ILM(Index Lifecycle Management)自动管理索引
ilm_enabled => true
ilm_policy => "ai-logs-policy"
# 使用 trace_id 做去重,防止重复写入
document_id => "%{trace_id}-%{span_id}"
action => "create"
}
} else {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
}
}Kibana 看板设计
有了结构化数据,可以在 Kibana 里构建几个非常有价值的看板:
看板 1:AI 服务健康概览
- 实时调用量(每分钟)
- 成功率(1 - (fallback_triggered=true 的比例))
- P50/P95/P99 延迟(按 feature 分组)
- 总成本趋势(按小时)
看板 2:Prompt 效果分析 这是我最喜欢的一个看板,把 AI 的日志用起来了:
在 Kibana 里做了几个聚合:
- 按
ai.feature分组,统计ai.completion_prompt_ratio的均值和分位数 - 按
ai.finish_reason分组,看有多少请求因为 length 截断(说明 max_tokens 设太小) - 按
ai.latency_bucket分组,看慢请求的特征(哪些 feature 慢请求最多) ai.output_format_valid = false的请求比例,看 Prompt 的格式化指令是否稳定
看板 3:成本分析
- 各 feature 的 Token 消耗分布
ai.cost_bucket = very_expensive的请求特征分析(排查 Token 爆炸问题)- 成本趋势 vs 请求量趋势的对比(成本涨幅超过请求量涨幅说明有 Prompt 膨胀)
ELK 日志处理链路架构
基于日志的告警规则
Kibana 的 Watcher 或者 Elastic Alerts 可以基于日志数据设置告警:
{
"trigger": {
"schedule": { "interval": "5m" }
},
"input": {
"search": {
"request": {
"indices": ["ai-calls-*"],
"body": {
"query": {
"bool": {
"filter": [
{ "range": { "@timestamp": { "gte": "now-5m" } } },
{ "term": { "ai.feature": "customer-service" } }
]
}
},
"aggs": {
"error_rate": {
"filter": { "term": { "ai.fallback_triggered": true } }
},
"total": { "value_count": { "field": "trace_id" } }
}
}
}
}
},
"condition": {
"script": {
"source": "ctx.payload.aggregations.error_rate.doc_count / ctx.payload.aggregations.total.value > 0.05"
}
},
"actions": {
"notify_slack": {
"webhook": {
"method": "POST",
"url": "{{SLACK_WEBHOOK_URL}}",
"body": "{\"text\": \"AI 服务告警: customer-service 错误率超过 5%,过去 5 分钟错误 {{ctx.payload.aggregations.error_rate.doc_count}} 次\"}"
}
}
}
}实际发现的问题案例
做完这套体系后,我们通过日志分析发现了几个之前完全不知道的问题:
问题一:某个功能的 completion_prompt_ratio 异常高。我们的"内容摘要"功能,completion/prompt 比率居然是 2.3(输出比输入还长)。排查后发现,摘要的 Prompt 里写的是"请详细总结",模型理解为要输出详细内容。改成"请用 200 字以内总结",比率降到了 0.15,成本降低 80%。
问题二:finish_reason = length 的请求占比 15%。说明我们的 max_tokens 设置不够,有 15% 的响应被截断了,用户收到的是不完整的回答。把 max_tokens 从 1024 调到 2048,这个问题解决了。
问题三:深夜的成本是白天的 30%,但请求量只有白天的 3%。意味着深夜每个请求的平均 Token 消耗是白天的 10 倍。追踪下去发现,有个定时任务在深夜批量处理长文档,消耗了大量 Token,但这个任务的任务类型没有区分标记,被混入了普通用户的成本统计。加了 task_type: batch_job 的区分后,看清楚了。
总结
AI 应用的 ELK 接入,关键在于三件事:
日志结构化:每条 AI 调用日志必须包含 model、token 数、延迟、成本、输出质量等可量化字段,而不是自由文本。
Logstash 做语义解析:字段展开、派生字段(延迟分档、成本分档、比率计算),让 Kibana 里的聚合分析更直观。
基于日志的洞察:Prompt 效果分析、成本异常检测、格式合规率监控,这些都是 AI 应用特有的运营视角,通过结构化日志才能实现。
把日志从"记录发生了什么"升级到"帮助决策",这是 AI 应用日志体系的核心价值。
