Go 开发日志分析工具——实时 tail、正则解析、异常告警的 Go 实现
Go 开发日志分析工具——实时 tail、正则解析、异常告警的 Go 实现
适读人群:Go 开发者、需要处理大量日志文件的运维和后端工程师 | 阅读时长:约 16 分钟 | 核心价值:用 Go 实现日志实时监控、结构化解析和规则告警,替代简单 grep 脚本
有一次,一个做在线直播平台的朋友半夜打电话给我,他们的推流服务偶发性卡顿,已经发生三次了,但每次等运维去看日志时卡顿已经结束,什么都没发现。
问题出在没有实时的日志分析能力。他们的告警系统是基于业务指标的,但这个 bug 的症状只在应用日志里能看到——一串特定的错误模式出现后,几秒钟之内就会有用户投诉。
我帮他们花了两天写了一个日志分析工具。这篇文章把核心实现整理出来。
工具功能设计
- 实时 tail:监控日志文件,新内容立即处理(处理日志滚动)
- 正则解析:把非结构化日志解析成结构化字段
- 规则匹配:支持多条规则,每条规则有独立的告警策略
- 告警发送:支持 Webhook(钉钉、飞书、自定义)
实时 Tail 实现
Go 里实现 tail 的最佳选择是 github.com/nxadm/tail,它处理好了文件滚动(logrotate)的情况。
go get github.com/nxadm/tailpackage tailer
import (
"context"
"fmt"
"time"
"github.com/nxadm/tail"
)
// LogTailer 日志实时读取器
type LogTailer struct {
filePath string
lineCh chan string
}
func NewLogTailer(filePath string) *LogTailer {
return &LogTailer{
filePath: filePath,
lineCh: make(chan string, 1000), // 缓冲,防止处理跟不上
}
}
// Start 开始 tail,新日志行会发送到 Lines() channel
func (t *LogTailer) Start(ctx context.Context) error {
config := tail.Config{
Follow: true, // 持续监听
ReOpen: true, // 文件滚动后重新打开
MustExist: false, // 文件不存在时等待
Poll: false, // 用 inotify,不用轮询
Location: &tail.SeekInfo{ // 从文件末尾开始
Offset: 0,
Whence: 2,
},
Logger: tail.DiscardingLogger,
}
t2, err := tail.TailFile(t.filePath, config)
if err != nil {
return fmt.Errorf("tail %s: %w", t.filePath, err)
}
go func() {
defer close(t.lineCh)
defer t2.Cleanup()
for {
select {
case <-ctx.Done():
t2.Stop()
return
case line, ok := <-t2.Lines:
if !ok {
return
}
if line.Err != nil {
fmt.Printf("tail error: %v\n", line.Err)
continue
}
select {
case t.lineCh <- line.Text:
case <-ctx.Done():
return
}
}
}
}()
return nil
}
func (t *LogTailer) Lines() <-chan string {
return t.lineCh
}日志解析器
package parser
import (
"regexp"
"time"
)
// LogEntry 解析后的日志条目
type LogEntry struct {
Raw string
Timestamp time.Time
Level string
Service string
TraceID string
Message string
Fields map[string]string
}
// LogParser 日志解析器
type LogParser struct {
patterns []*regexp.Regexp
}
// 常见日志格式的正则模式
var (
// 格式:2024-01-15 14:30:25.123 ERROR [user-service] traceId=xxx msg=...
standardLogRE = regexp.MustCompile(
`^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?)\s+(\w+)\s+\[([^\]]+)\](?:\s+traceId=(\S+))?\s+(.+)$`,
)
// JSON 格式日志
jsonLogRE = regexp.MustCompile(`^\{.*\}$`)
// 自定义 KV 字段解析
kvRE = regexp.MustCompile(`(\w+)=("[^"]*"|\S+)`)
)
func NewLogParser() *LogParser {
return &LogParser{}
}
// Parse 解析一行日志
func (p *LogParser) Parse(line string) *LogEntry {
entry := &LogEntry{
Raw: line,
Fields: make(map[string]string),
}
// 尝试标准格式
if m := standardLogRE.FindStringSubmatch(line); m != nil {
t, err := time.ParseInLocation("2006-01-02 15:04:05", m[1], time.Local)
if err == nil {
entry.Timestamp = t
}
entry.Level = m[2]
entry.Service = m[3]
entry.TraceID = m[4]
entry.Message = m[5]
// 从 message 里提取 KV 字段
for _, kv := range kvRE.FindAllStringSubmatch(m[5], -1) {
entry.Fields[kv[1]] = kv[2]
}
return entry
}
// 如果解析失败,只设置时间戳
entry.Timestamp = time.Now()
entry.Message = line
return entry
}告警规则引擎
package rules
import (
"context"
"fmt"
"regexp"
"sync"
"time"
"your-project/parser"
)
// Rule 告警规则
type Rule struct {
Name string
Pattern *regexp.Regexp // 匹配的正则
Level string // 触发级别过滤(为空则不过滤)
Threshold int // 在 Window 时间内出现 Threshold 次则告警
Window time.Duration
Cooldown time.Duration // 告警冷却时间,避免告警轰炸
Description string
}
// Alert 告警信息
type Alert struct {
Rule string
Count int
Sample string // 触发告警的样本日志
Timestamp time.Time
}
// RuleEngine 规则引擎
type RuleEngine struct {
rules []*Rule
alertCh chan Alert
counters map[string]*windowCounter
mu sync.Mutex
}
// windowCounter 滑动窗口计数器
type windowCounter struct {
rule *Rule
events []time.Time
lastAlert time.Time
}
func (wc *windowCounter) add(t time.Time) int {
// 清除过期事件
cutoff := t.Add(-wc.rule.Window)
start := 0
for start < len(wc.events) && wc.events[start].Before(cutoff) {
start++
}
wc.events = append(wc.events[start:], t)
return len(wc.events)
}
func (wc *windowCounter) canAlert(now time.Time) bool {
return now.Sub(wc.lastAlert) >= wc.rule.Cooldown
}
func NewRuleEngine() *RuleEngine {
return &RuleEngine{
alertCh: make(chan Alert, 100),
counters: make(map[string]*windowCounter),
}
}
func (e *RuleEngine) AddRule(r *Rule) {
e.mu.Lock()
defer e.mu.Unlock()
e.rules = append(e.rules, r)
e.counters[r.Name] = &windowCounter{rule: r}
}
func (e *RuleEngine) Alerts() <-chan Alert {
return e.alertCh
}
// Process 处理一条日志
func (e *RuleEngine) Process(entry *parser.LogEntry) {
e.mu.Lock()
defer e.mu.Unlock()
now := entry.Timestamp
if now.IsZero() {
now = time.Now()
}
for _, rule := range e.rules {
// 级别过滤
if rule.Level != "" && entry.Level != rule.Level {
continue
}
// 正则匹配
if !rule.Pattern.MatchString(entry.Message) {
continue
}
// 更新计数器
counter := e.counters[rule.Name]
count := counter.add(now)
// 检查是否达到阈值且不在冷却期
if count >= rule.Threshold && counter.canAlert(now) {
counter.lastAlert = now
select {
case e.alertCh <- Alert{
Rule: rule.Name,
Count: count,
Sample: entry.Raw,
Timestamp: now,
}:
default:
// channel 满了,丢弃
}
}
}
}告警发送:Webhook
package notifier
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
"your-project/rules"
)
// WebhookNotifier Webhook 告警发送器(兼容钉钉/飞书格式)
type WebhookNotifier struct {
url string
client *http.Client
}
func NewWebhookNotifier(url string) *WebhookNotifier {
return &WebhookNotifier{
url: url,
client: &http.Client{Timeout: 5 * time.Second},
}
}
// Send 发送告警(钉钉 Webhook 格式)
func (n *WebhookNotifier) Send(alert rules.Alert) error {
payload := map[string]interface{}{
"msgtype": "markdown",
"markdown": map[string]string{
"title": fmt.Sprintf("日志告警: %s", alert.Rule),
"text": fmt.Sprintf(
"## 日志告警\n\n**规则**: %s\n\n**触发次数**: %d\n\n**时间**: %s\n\n**样本**:\n```\n%s\n```",
alert.Rule,
alert.Count,
alert.Timestamp.Format("2006-01-02 15:04:05"),
truncate(alert.Sample, 200),
),
},
}
body, _ := json.Marshal(payload)
resp, err := n.client.Post(n.url, "application/json", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("webhook post: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("webhook returned %d", resp.StatusCode)
}
return nil
}
func truncate(s string, maxLen int) string {
runes := []rune(s)
if len(runes) <= maxLen {
return s
}
return string(runes[:maxLen]) + "..."
}完整主程序
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 初始化各组件
tailer := tailer.NewLogTailer("/var/log/app/application.log")
p := parser.NewLogParser()
engine := rules.NewRuleEngine()
notifier := notifier.NewWebhookNotifier(os.Getenv("WEBHOOK_URL"))
// 添加规则:推流错误在1分钟内出现3次就告警
engine.AddRule(&rules.Rule{
Name: "streaming-error",
Pattern: regexp.MustCompile(`(?i)(stream.*error|push.*failed|rtmp.*disconnect)`),
Level: "ERROR",
Threshold: 3,
Window: time.Minute,
Cooldown: 5 * time.Minute,
Description: "推流异常告警",
})
// 添加规则:OOM 关键字立即告警
engine.AddRule(&rules.Rule{
Name: "oom-warning",
Pattern: regexp.MustCompile(`(?i)(out of memory|OOM|MemoryError)`),
Threshold: 1,
Window: time.Minute,
Cooldown: 10 * time.Minute,
Description: "内存不足告警",
})
// 启动 tail
if err := tailer.Start(ctx); err != nil {
log.Fatal(err)
}
// 处理告警
go func() {
for alert := range engine.Alerts() {
log.Printf("ALERT [%s]: count=%d", alert.Rule, alert.Count)
if err := notifier.Send(alert); err != nil {
log.Printf("Failed to send alert: %v", err)
}
}
}()
// 主循环:处理日志行
for line := range tailer.Lines() {
entry := p.Parse(line)
engine.Process(entry)
}
}踩坑实录
踩坑 1:logrotate 后 tail 停止读取新内容
现象:配置了 logrotate 的机器上,凌晨日志滚动后,tailer 读到文件末尾就停止了,不再读新写入的日志。
原因:inotify 监听的是 inode,日志文件被 rotate 后新文件是新 inode,旧的 inotify 监听失效了。
解法:tail.Config 里设置 ReOpen: true,这会让 nxadm/tail 在检测到文件被截断或更名时自动重新打开。
踩坑 2:高速日志导致 channel 满了丢消息
现象:某次压测时,日志写入速度极快,lineCh 缓冲区满了,新日志被丢弃,告警没有触发。
原因:解析和规则匹配比日志写入慢,缓冲区撑不住。
解法:lineCh buffer 调大(从 1000 到 10000),同时用 goroutine 池并行处理解析(解析是无状态的,可以并行)。规则引擎保持单 goroutine,用 mutex 保护状态。
踩坑 3:规则 Pattern 写的太宽泛,误报太多
现象:规则里写了 error,结果把 "error handling in progress"(这是正常的 INFO 日志)也匹配到了,一直在告警。
解法:
- 规则要加级别过滤(
Level: "ERROR") - Pattern 用更精确的正则,加词边界
\berror\b而不是error - 加排除列表:如果 Message 匹配了某个 excludePattern,跳过
