Go 定时任务高可用——单点失效处理、分布式去重、任务状态追踪
Go 定时任务高可用——单点失效处理、分布式去重、任务状态追踪
适读人群:需要在 Go 分布式服务中运行可靠定时任务的工程师 | 阅读时长:约16分钟 | 核心价值:让定时任务在多实例环境下既不漏跑也不重跑的完整方案
一个在生产上重复扣费的 bug
那是我负责过最严重的一次线上 bug。
那天早上 9 点,客服那边突然涌来了大量投诉,说账户被重复扣款了。我一看,几乎所有该扣的款都扣了两次。
排查了大约 35 分钟,终于找到了原因:前天晚上我们做了服务扩容,把负责跑定时任务的服务实例从1个扩到了3个,但我没有给定时任务加分布式锁。结果每天早上 9 点的扣费任务,在3台机器上同时跑了,每个账户被扣了 3 次。
那次事故导致我们紧急处理了一整天的退款,损失了大量人力,也让我对"定时任务高可用"这个话题有了刻骨铭心的认识。
定时任务的几个核心问题
在分布式环境下,定时任务面临以下挑战:
- 重复执行:多实例同时运行同一个任务
- 任务丢失:执行中的实例崩溃,任务未完成
- 执行时间过长:任务没执行完,下一次触发时间到了
- 状态不可见:任务跑了什么,跑到哪了,不清楚
踩坑实录
坑一:用 Redis SETNX 做分布式锁,但没有设过期时间
现象: 某次任务执行实例崩溃,Redis 里的锁没有释放,之后任务再也跑不起来了——永远锁住。
原因: SETNX key value 不带 TTL,实例崩溃后锁永久存在。
解法: 用 SET key value NX EX ttl 同时设置 NX 和 TTL,确保锁有过期时间。
坑二:任务超时但锁没超时,导致任务被重跑
现象: 某个任务正常跑需要 5 分钟,但有一次数据库慢,跑了 8 分钟。第 5 分 01 秒时,锁过期,另一个实例抢到了锁,两个实例同时处理剩余的数据。
原因: 锁的 TTL(5分钟)设得比任务最长执行时间还短。
解法: 锁续期(Watchdog 机制):任务运行期间定期续期锁,只要任务还在跑,锁就不过期。
坑三:任务状态没有持久化,任务失败后不知道处理到哪了
现象: 一个每天凌晨跑的账单生成任务,因为磁盘满了跑到一半失败了。但因为没有记录处理进度,第二天重跑时从头开始,一部分账单被重复生成。
解法: 任务状态和处理进度持久化到数据库,支持断点续跑。
完整实现方案
package scheduler
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
// JobStatus 任务执行状态
type JobStatus string
const (
JobStatusPending JobStatus = "pending"
JobStatusRunning JobStatus = "running"
JobStatusSucceeded JobStatus = "succeeded"
JobStatusFailed JobStatus = "failed"
)
// JobRecord 任务执行记录
type JobRecord struct {
ID string `db:"id"`
JobName string `db:"job_name"`
Status JobStatus `db:"status"`
StartedAt time.Time `db:"started_at"`
FinishedAt *time.Time `db:"finished_at"`
Error string `db:"error"`
NodeID string `db:"node_id"` // 哪台机器在跑
}
// DistributedScheduler 分布式定时任务调度器
type DistributedScheduler struct {
cron *cron.Cron
rdb *redis.Client
repo JobRepository
nodeID string // 当前节点唯一标识(一般用 hostname + PID)
logger *slog.Logger
mu sync.Mutex
jobs map[string]*JobSpec
}
// JobSpec 任务规格
type JobSpec struct {
Name string
CronExpr string
LockTTL time.Duration // 分布式锁 TTL
Timeout time.Duration // 任务超时时间
Handler func(ctx context.Context) error
}
func NewDistributedScheduler(rdb *redis.Client, repo JobRepository, nodeID string, logger *slog.Logger) *DistributedScheduler {
return &DistributedScheduler{
cron: cron.New(cron.WithSeconds()), // 支持秒级调度
rdb: rdb,
repo: repo,
nodeID: nodeID,
logger: logger,
jobs: make(map[string]*JobSpec),
}
}
// Register 注册定时任务
func (s *DistributedScheduler) Register(spec JobSpec) error {
s.mu.Lock()
s.jobs[spec.Name] = &spec
s.mu.Unlock()
_, err := s.cron.AddFunc(spec.CronExpr, func() {
s.runJob(context.Background(), &spec)
})
return err
}
// runJob 执行一个任务(带分布式锁)
func (s *DistributedScheduler) runJob(ctx context.Context, spec *JobSpec) {
lockKey := fmt.Sprintf("scheduler:lock:%s", spec.Name)
// 尝试获取分布式锁
acquired, err := s.acquireLock(ctx, lockKey, spec.LockTTL)
if err != nil {
s.logger.Error("failed to acquire lock",
slog.String("job", spec.Name),
slog.String("error", err.Error()),
)
return
}
if !acquired {
s.logger.Debug("job already running on another node",
slog.String("job", spec.Name),
)
return
}
// 创建任务执行记录
record := &JobRecord{
ID: generateJobID(),
JobName: spec.Name,
Status: JobStatusRunning,
StartedAt: time.Now(),
NodeID: s.nodeID,
}
if err := s.repo.CreateRecord(ctx, record); err != nil {
s.logger.Error("failed to create job record", "error", err)
s.releaseLock(ctx, lockKey)
return
}
s.logger.Info("job started",
slog.String("job", spec.Name),
slog.String("record_id", record.ID),
slog.String("node", s.nodeID),
)
// 启动锁续期 goroutine(Watchdog)
stopWatchdog := s.startWatchdog(ctx, lockKey, spec.LockTTL)
defer stopWatchdog()
defer s.releaseLock(ctx, lockKey)
// 创建带超时的 context 执行任务
jobCtx, cancel := context.WithTimeout(ctx, spec.Timeout)
defer cancel()
startTime := time.Now()
jobErr := spec.Handler(jobCtx)
duration := time.Since(startTime)
// 更新任务记录
now := time.Now()
record.FinishedAt = &now
if jobErr != nil {
record.Status = JobStatusFailed
record.Error = jobErr.Error()
s.logger.Error("job failed",
slog.String("job", spec.Name),
slog.String("error", jobErr.Error()),
slog.Int64("duration_ms", duration.Milliseconds()),
)
} else {
record.Status = JobStatusSucceeded
s.logger.Info("job succeeded",
slog.String("job", spec.Name),
slog.Int64("duration_ms", duration.Milliseconds()),
)
}
if err := s.repo.UpdateRecord(ctx, record); err != nil {
s.logger.Error("failed to update job record", "error", err)
}
}
// acquireLock 获取分布式锁
func (s *DistributedScheduler) acquireLock(ctx context.Context, key string, ttl time.Duration) (bool, error) {
// SET key nodeID NX EX ttl
result, err := s.rdb.SetNX(ctx, key, s.nodeID, ttl).Result()
if err != nil {
return false, err
}
return result, nil
}
// releaseLock 释放锁(只释放自己持有的锁)
var releaseLockScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
func (s *DistributedScheduler) releaseLock(ctx context.Context, key string) {
releaseLockScript.Run(ctx, s.rdb, []string{key}, s.nodeID)
}
// startWatchdog 启动锁续期协程
func (s *DistributedScheduler) startWatchdog(ctx context.Context, key string, ttl time.Duration) func() {
stopCh := make(chan struct{})
go func() {
// 每 TTL/3 续期一次,保证在锁过期前续期
ticker := time.NewTicker(ttl / 3)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ctx.Done():
return
case <-ticker.C:
// 续期锁(只有当前持有者才能续期)
renewScript := redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("EXPIRE", KEYS[1], tonumber(ARGV[2]))
return 1
end
return 0
`)
result, err := renewScript.Run(ctx, s.rdb, []string{key}, s.nodeID, int(ttl.Seconds())).Int()
if err != nil || result == 0 {
s.logger.Warn("failed to renew lock, job may have lost lock",
slog.String("key", key),
)
}
}
}
}()
return func() {
close(stopCh)
}
}
// Start 启动调度器
func (s *DistributedScheduler) Start() {
s.cron.Start()
s.logger.Info("scheduler started", slog.String("node", s.nodeID))
}
// Stop 停止调度器
func (s *DistributedScheduler) Stop() {
ctx := s.cron.Stop()
<-ctx.Done()
s.logger.Info("scheduler stopped")
}
type JobRepository interface {
CreateRecord(ctx context.Context, record *JobRecord) error
UpdateRecord(ctx context.Context, record *JobRecord) error
GetLastRecord(ctx context.Context, jobName string) (*JobRecord, error)
}
func generateJobID() string {
return fmt.Sprintf("job_%d", time.Now().UnixNano())
}使用示例
func main() {
nodeID := fmt.Sprintf("%s_%d", hostname(), os.Getpid())
scheduler := scheduler.NewDistributedScheduler(rdb, jobRepo, nodeID, logger)
// 注册每天9点的扣费任务
scheduler.Register(scheduler.JobSpec{
Name: "daily_billing",
CronExpr: "0 0 9 * * *", // 每天9点
LockTTL: 30 * time.Minute,
Timeout: 25 * time.Minute,
Handler: billingService.ProcessDailyBilling,
})
scheduler.Start()
}任务状态追踪的价值
有了任务执行记录,你可以做很多事:
- 监控任务的成功率和平均执行时间
- 告警:任务连续失败 N 次
- 审计:某个时间点任务是否执行了
- 调试:某次失败的具体错误是什么
这些在 Java 的 Quartz 框架里都是内置的,Go 生态里需要自己搭建,但好处是完全可控,可以根据业务需求定制。
