日志智能分析——从 TB 级日志里找信号
日志智能分析——从 TB 级日志里找信号
适读人群:需要做故障排查或日志分析的后端工程师 | 阅读时长:约 13 分钟 | 核心价值:掌握用 AI 处理大规模日志的工程方案,而不是天真地"把日志全扔给 AI"
三个月前,我们有个服务在凌晨 3 点发生了一次故障,影响了核心交易链路,持续了大约 40 分钟。
复盘的时候,技术总监问:"为什么排查花了这么长时间?"
答案很简单:日志太多了。那个服务一天的日志量大概是 200GB,出问题的那段时间前后 2 小时,日志压缩包就有 8GB 解压出来 40GB。
怎么从 40GB 日志里找到那根"信号",而不是被噪音淹死?
这篇文章写的就是我在那次故障之后,搭建的一套用 AI 辅助做日志分析的工程方案。
天真方案为什么不行
很多人第一个想法是:把日志直接丢给 AI 分析。
不行。原因有两个:
原因 1:Token 限制
哪怕是 200K context window 的模型,200K token 大概是 15 万个英文单词,换成日志大概是 1.5MB 左右的文本。而我们需要分析的日志是 40GB。这中间差了 4 个数量级。
原因 2:信噪比问题
即使你能塞进去,大量重复的正常日志会稀释掉真正有价值的异常信号。AI 会被正常日志的模式主导,很可能漏掉那几行关键的错误。
正确的工程思路是:用程序处理量,用 AI 处理质。让程序先做量的工作(过滤、采样、聚合),再把压缩后的高密度信息给 AI 做质的分析。
工程方案:三层漏斗
我设计了一个三层的日志处理漏斗:
原始日志 (40GB)
|
v
[第一层:快速过滤] -- 正则匹配 ERROR/WARN/Exception -- 提取 ~500MB
|
v
[第二层:模式聚合] -- 相似日志合并,统计频率 -- 压缩到 ~50KB
|
v
[第三层:AI 分析] -- 输入摘要,输出假设和排查方向每一层都有具体的实现。
第一层:快速过滤
这层用 grep 加一些简单的规则,不需要 AI 参与:
import subprocess
import re
from datetime import datetime, timedelta
def extract_anomalies(log_path: str, start_time: str, end_time: str) -> list[str]:
"""
从日志文件中提取指定时间段内的异常日志
"""
# 构建时间范围过滤
start = datetime.fromisoformat(start_time)
end = datetime.fromisoformat(end_time)
# 异常关键词
error_patterns = [
r'ERROR',
r'WARN',
r'Exception',
r'Error:',
r'FATAL',
r'timeout',
r'refused',
r'failed',
]
pattern = '|'.join(error_patterns)
results = []
with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
# 提取日志时间戳(根据你的日志格式调整)
time_match = re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', line)
if time_match:
log_time = datetime.strptime(time_match.group(), '%Y-%m-%d %H:%M:%S')
if not (start <= log_time <= end):
continue
# 匹配异常模式
if re.search(pattern, line, re.IGNORECASE):
results.append(line.strip())
return results
# 使用示例
anomalies = extract_anomalies(
'/logs/app.log',
'2024-01-15 02:50:00',
'2024-01-15 03:30:00'
)
print(f"提取到 {len(anomalies)} 条异常日志")这一步通常能把日志量缩减 90% 以上。40GB 的日志,异常日志大概是 500MB 左右。
第二层:模式聚合
500MB 还是太多。这层的核心逻辑是把相似的日志合并,统计出每种模式出现的频率。
import re
from collections import defaultdict
from dataclasses import dataclass
@dataclass
class LogPattern:
template: str # 模板(去掉变量部分)
count: int # 出现次数
first_occurrence: str # 第一次出现时间
last_occurrence: str # 最后一次出现时间
sample: str # 一条完整样例
def normalize_log(log_line: str) -> str:
"""
把日志里的变量替换成占位符,提取模板
"""
# 替换 UUID
line = re.sub(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
'<UUID>', line, flags=re.IGNORECASE)
# 替换数字(保留异常行号,因为有意义)
line = re.sub(r'\b\d+\.\d+\.\d+\.\d+\b', '<IP>', line) # IP地址
line = re.sub(r'\b\d{10,}\b', '<ID>', line) # 长数字(ID类)
# 替换引号里的内容(通常是变量值)
line = re.sub(r'"[^"]{20,}"', '"<VALUE>"', line)
line = re.sub(r"'[^']{20,}'", "'<VALUE>'", line)
return line
def aggregate_patterns(log_lines: list[str]) -> list[LogPattern]:
"""
对日志列表做模式聚合
"""
pattern_map = defaultdict(list)
for line in log_lines:
template = normalize_log(line)
pattern_map[template].append(line)
patterns = []
for template, lines in pattern_map.items():
# 提取时间戳
times = []
for line in lines:
t = re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', line)
if t:
times.append(t.group())
patterns.append(LogPattern(
template=template,
count=len(lines),
first_occurrence=min(times) if times else '',
last_occurrence=max(times) if times else '',
sample=lines[0]
))
# 按出现频率排序
patterns.sort(key=lambda x: x.count, reverse=True)
return patterns
def format_for_ai(patterns: list[LogPattern], top_n: int = 30) -> str:
"""
把聚合结果格式化成适合 AI 分析的文本
"""
lines = [
"=== 日志模式聚合报告 ===",
f"总计 {len(patterns)} 种不同的日志模式",
f"展示 Top {top_n} 高频模式",
"",
]
for i, p in enumerate(patterns[:top_n], 1):
lines.append(f"【模式 {i}】出现 {p.count} 次")
lines.append(f"时间范围: {p.first_occurrence} ~ {p.last_occurrence}")
lines.append(f"样例: {p.sample[:200]}")
lines.append("")
return '\n'.join(lines)这一步通常能把 500MB 的异常日志压缩成几十 KB 的摘要文本。关键是:相似的日志不管出现多少次,在摘要里只占一行,但会标注出现频率。频率本身就是重要的信号。
第三层:AI 分析
现在我有了一份几十 KB 的日志摘要,可以喂给 AI 了。
from openai import OpenAI
def analyze_with_ai(log_summary: str, incident_description: str) -> str:
client = OpenAI()
prompt = f"""你是一位有经验的 SRE 工程师,正在分析一次生产故障。
故障描述:
{incident_description}
以下是故障时间段内的日志模式聚合报告(已按出现频率排序):
{log_summary}
请帮我:
1. 识别最可能的根因(列举 2-3 个候选,按可能性排序)
2. 对于每个候选根因,说明支撑证据来自日志的哪些模式
3. 给出下一步验证步骤(具体的命令或查询方法)
4. 识别任何值得关注但可能是次要问题的日志模式
请基于日志证据推理,不要做无法从日志里得出的假设。"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.3 # 分析类任务用低温度
)
return response.choices[0].message.content实际案例:那次凌晨故障
用这个方案重新分析那次故障,整个流程耗时大概 15 分钟:
- 第一层过滤:3 分钟(脚本跑完)
- 第二层聚合:2 分钟
- 整理摘要 + AI 分析:5 分钟
- 阅读分析结果:5 分钟
AI 的分析输出(关键部分):
根因候选 1(可能性:高):数据库连接池耗尽
支撑证据:
- 模式 3:HikariPool-1 - Connection is not available, request timed out after 30000ms
出现 847 次,集中在 03:02 ~ 03:18
- 模式 7:Unable to acquire JDBC Connection
出现 312 次,时间与模式 3 高度重叠
- 模式 12:Transaction silently rolled back because it has been marked as rollback-only
出现 445 次,时间在上述两个模式之后,可能是级联影响
根因候选 2(可能性:中):上游服务超时导致级联故障
支撑证据:
- 模式 2:Read timed out after 5000ms calling inventory-service
出现 1203 次,最早出现在 02:58,早于数据库连接问题
注意:这个模式在 02:58 就开始出现,可能是触发器
下一步验证:
1. 查看 02:58 左右 inventory-service 的状态(该服务的监控/日志)
2. 检查数据库连接池配置:maximumPoolSize 当前设置是多少?
3. 查看 03:02 之前的业务流量是否有突然增大这个分析直接指向了正确方向:inventory-service 在 02:58 开始超时,导致调用线程被占用,连接池被耗尽,进而引发级联故障。
我后来复盘确认,inventory-service 在那个时间点做了一次数据库备份,导致负载飙升,响应时间超出了我们设置的 5s 超时。
这个根因如果靠人工翻日志,在凌晨 3 点的状态下,很难在 15 分钟内找到。
几个工程细节
关于采样策略
如果连异常日志都太多(比如 ERROR 本身就有几 GB),可以做时间窗口采样:每分钟随机取 100 条,而不是全量处理。故障期间的模式分布在时间维度上通常是稳定的,采样不会漏掉重要模式。
关于 Stack Trace 的处理
Stack Trace 在聚合时要特殊处理——它本身就是一个很好的分组 key。可以用 at com.example.XXX.method 的第一行作为 key,把完整 trace 作为 sample 保存。
def extract_stack_trace_key(lines: list[str], start_idx: int) -> tuple[str, int]:
"""
提取 stack trace 的 key(第一个 at 行)和结束行号
"""
key_lines = []
i = start_idx
while i < len(lines) and (lines[i].strip().startswith('at ') or
lines[i].strip().startswith('Caused by:')):
if lines[i].strip().startswith('at ') and len(key_lines) < 3:
key_lines.append(lines[i].strip())
i += 1
return ' | '.join(key_lines), i关于时间序列重建
日志分析里很重要的一件事是时间序列。AI 最后给出的分析里,它正确识别出 inventory-service 超时在数据库连接池耗尽之前,这是因为我在摘要里保留了每个模式的最早出现时间。如果只给 AI 模式和计数,它很难推断因果关系。
这套方案现在已经在我的日常工作里成为标配。每次出故障,不管日志多大,15-20 分钟内必然能给出可验证的假设。
比凌晨盯着屏幕翻日志好多了。
