Go 实现分布式任务调度——简化版 Cron + 分布式锁 + 任务状态管理
Go 实现分布式任务调度——简化版 Cron + 分布式锁 + 任务状态管理
适读人群:Go 后端工程师、需要自建轻量级分布式任务调度的团队 | 阅读时长:约 17 分钟 | 核心价值:不依赖 XXL-Job 等重型框架,用 Go + Redis 实现生产可用的分布式 Cron
前年在一家做跨境电商的公司,他们有个痛点:几十个定时任务分散在各个服务里,谁跑了谁没跑,完全不知道。某个月底,财务结算的定时任务在多个节点都触发了,同一批订单被结算了两次,追回款项搞了好几天。
他们的技术负责人想上 XXL-Job,但 XXL-Job 是 Java 的,他们的服务是 Go 的,还要搞 Java Agent,运维成本太高。我建议用 Go 自己实现一个轻量级的分布式调度,核心就是:分布式锁保证唯一执行 + Redis 存储任务状态。
这篇文章把那个调度器的核心实现写出来。
核心设计
调度器节点1 调度器节点2 调度器节点3
| | |
+------+-------+ |
| |
Redis 分布式锁(抢锁者执行) |
| |
任务执行器(本地执行)
|
Redis(任务状态记录)关键原则:
- Cron 触发是每个节点各自计算的(无主节点)
- 同一时刻只有一个节点能抢到某个任务的执行锁
- 任务开始、结束、失败都记录到 Redis
- 支持任务超时强制释放锁
完整实现
package scheduler
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/go-redis/redis/v8"
"github.com/robfig/cron/v3"
)
// JobStatus 任务状态
type JobStatus string
const (
JobStatusIdle JobStatus = "idle"
JobStatusRunning JobStatus = "running"
JobStatusSuccess JobStatus = "success"
JobStatusFailed JobStatus = "failed"
)
// JobRecord 任务执行记录
type JobRecord struct {
JobID string
NodeID string // 执行的节点标识
StartAt time.Time
EndAt time.Time
Status JobStatus
Error string
Duration time.Duration
}
// Job 任务定义
type Job struct {
ID string
Name string
CronExpr string // 标准 Cron 表达式
Timeout time.Duration // 最长执行时间
Handler func(ctx context.Context) error
}
// Scheduler 分布式任务调度器
type Scheduler struct {
nodeID string // 当前节点唯一标识
redis *redis.Client
cron *cron.Cron
jobs map[string]*Job
mu sync.RWMutex
}
func NewScheduler(nodeID string, rdb *redis.Client) *Scheduler {
return &Scheduler{
nodeID: nodeID,
redis: rdb,
cron: cron.New(cron.WithSeconds()), // 支持秒级精度
jobs: make(map[string]*Job),
}
}
// RegisterJob 注册定时任务
func (s *Scheduler) RegisterJob(job *Job) error {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.jobs[job.ID]; exists {
return fmt.Errorf("job %q already registered", job.ID)
}
_, err := s.cron.AddFunc(job.CronExpr, func() {
s.runJob(job)
})
if err != nil {
return fmt.Errorf("invalid cron expr %q: %w", job.CronExpr, err)
}
s.jobs[job.ID] = job
log.Printf("[Scheduler] Registered job: %s (%s) on node %s", job.Name, job.CronExpr, s.nodeID)
return nil
}
// runJob 执行单个任务(带分布式锁)
func (s *Scheduler) runJob(job *Job) {
lockKey := fmt.Sprintf("scheduler:lock:%s", job.ID)
lockValue := fmt.Sprintf("%s:%d", s.nodeID, time.Now().UnixNano())
lockTTL := job.Timeout + 30*time.Second // 锁的有效期比超时长一些
ctx := context.Background()
// 尝试获取分布式锁(SET NX PX)
ok, err := s.redis.SetNX(ctx, lockKey, lockValue, lockTTL).Result()
if err != nil {
log.Printf("[Scheduler] Lock error for job %s: %v", job.ID, err)
return
}
if !ok {
// 抢锁失败,说明其他节点已经在执行
log.Printf("[Scheduler] Job %s is running on another node, skipped", job.ID)
return
}
// 抢锁成功,开始执行
defer s.releaseLock(ctx, lockKey, lockValue)
record := &JobRecord{
JobID: job.ID,
NodeID: s.nodeID,
StartAt: time.Now(),
Status: JobStatusRunning,
}
s.saveRecord(ctx, record)
log.Printf("[Scheduler] Job %s started on node %s", job.Name, s.nodeID)
// 设置超时上下文
jobCtx, cancel := context.WithTimeout(ctx, job.Timeout)
defer cancel()
// 执行任务
execErr := job.Handler(jobCtx)
record.EndAt = time.Now()
record.Duration = record.EndAt.Sub(record.StartAt)
if execErr != nil {
record.Status = JobStatusFailed
record.Error = execErr.Error()
log.Printf("[Scheduler] Job %s failed: %v (duration: %v)", job.Name, execErr, record.Duration)
} else {
record.Status = JobStatusSuccess
log.Printf("[Scheduler] Job %s succeeded (duration: %v)", job.Name, record.Duration)
}
s.saveRecord(ctx, record)
}
// releaseLock 安全释放锁(确保只释放自己的锁)
func (s *Scheduler) releaseLock(ctx context.Context, key, value string) {
// 使用 Lua 脚本确保原子性:只有值匹配时才删除
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
s.redis.Eval(ctx, script, []string{key}, value)
}
// saveRecord 保存任务执行记录
func (s *Scheduler) saveRecord(ctx context.Context, record *JobRecord) {
key := fmt.Sprintf("scheduler:record:%s", record.JobID)
val := fmt.Sprintf(`{"job_id":"%s","node":"%s","start":"%s","status":"%s","error":"%s","duration_ms":%d}`,
record.JobID,
record.NodeID,
record.StartAt.Format(time.RFC3339),
record.Status,
record.Error,
record.Duration.Milliseconds(),
)
// 保存最近 100 条记录
pipe := s.redis.Pipeline()
pipe.LPush(ctx, key, val)
pipe.LTrim(ctx, key, 0, 99)
pipe.Expire(ctx, key, 7*24*time.Hour) // 保留 7 天
pipe.Exec(ctx)
}
// GetJobRecords 查询任务历史
func (s *Scheduler) GetJobRecords(ctx context.Context, jobID string, limit int) ([]string, error) {
key := fmt.Sprintf("scheduler:record:%s", jobID)
return s.redis.LRange(ctx, key, 0, int64(limit-1)).Result()
}
// Start 启动调度器
func (s *Scheduler) Start() {
s.cron.Start()
log.Printf("[Scheduler] Started on node %s", s.nodeID)
}
// Stop 优雅停止
func (s *Scheduler) Stop() context.Context {
return s.cron.Stop()
}使用示例
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// 用机器的 hostname + 随机后缀作为节点 ID
hostname, _ := os.Hostname()
nodeID := fmt.Sprintf("%s-%d", hostname, rand.Intn(10000))
sched := scheduler.NewScheduler(nodeID, rdb)
// 注册:每天凌晨2点跑月度结算
sched.RegisterJob(&scheduler.Job{
ID: "monthly-settlement",
Name: "月度结算",
CronExpr: "0 0 2 1 * *", // 每月1日凌晨2点
Timeout: 30 * time.Minute,
Handler: func(ctx context.Context) error {
log.Println("Running monthly settlement...")
// 实际业务逻辑
return runMonthlySettlement(ctx)
},
})
// 注册:每5分钟同步一次汇率
sched.RegisterJob(&scheduler.Job{
ID: "exchange-rate-sync",
Name: "汇率同步",
CronExpr: "0 */5 * * * *", // 每5分钟
Timeout: 2 * time.Minute,
Handler: func(ctx context.Context) error {
return syncExchangeRates(ctx)
},
})
sched.Start()
// 等待退出信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
ctx := sched.Stop()
<-ctx.Done() // 等待正在执行的任务完成
log.Println("Scheduler stopped")
}踩坑实录
踩坑 1:锁过期时任务还没执行完,导致两个节点同时执行
现象:某个月底,结算任务跑了特别久(业务量暴增),锁到期后另一个节点抢到锁并开始执行,出现了重复执行。
原因:锁的 TTL 设置得太短,任务执行时间超过了 TTL。
解法:两种方案:
- 看门狗模式:任务执行过程中,每隔 lockTTL/3 时间就续期锁,如果任务节点宕机,锁自然过期
- 保守TTL:设置一个绝对足够的 TTL(比如任务 timeout 的 3 倍),接受极端情况下的超时
// 看门狗:定期续期
go func() {
ticker := time.NewTicker(lockTTL / 3)
defer ticker.Stop()
for {
select {
case <-jobCtx.Done():
return
case <-ticker.C:
// 续期:用 Lua 确保原子性
s.redis.Eval(ctx, renewScript, []string{lockKey}, lockValue, lockTTL.Milliseconds())
}
}
}()踩坑 2:时钟不同步导致 Cron 触发时间有偏差
现象:三个节点部署在不同机器,时钟差了几秒,同一个任务有时候同时有两个节点去抢锁。
原因:这不是 bug,Cron 计算本来就在各个节点独立进行,时钟差导致触发时间稍微错开,分布式锁保证了最终只有一个执行,但多余的竞争会在日志里产生很多 "Job is running on another node, skipped" 的日志。
解法:确保机器时钟同步(NTP),误差控制在 1 秒以内。这是分布式系统的基础要求,不是代码问题。
踩坑 3:任务 panic 导致锁没有释放
现象:某个任务 Handler 里有 nil pointer panic,锁没有被 defer 释放(因为 panic 在 goroutine 里没有被 recover)。
解法:在 runJob 里加 recover:
defer func() {
if r := recover(); r != nil {
record.Status = JobStatusFailed
record.Error = fmt.Sprintf("panic: %v", r)
s.saveRecord(ctx, record)
log.Printf("[Scheduler] Job %s panicked: %v", job.Name, r)
}
}()与 XXL-Job 对比
| 特性 | 本方案 | XXL-Job |
|---|---|---|
| 语言依赖 | Go | Java(Agent) |
| 部署复杂度 | 低(嵌入服务) | 中(需要 Admin 服务) |
| 可视化管理 | 无(需要自建) | 完善的 Web UI |
| 任务分片 | 不支持 | 支持 |
| 适合场景 | 中小规模 Go 服务 | 大规模 Java 生态 |
