Go 定时任务实战——cron 库、分布式定时任务、任务调度与去重
Go 定时任务实战——cron 库、分布式定时任务、任务调度与去重
适读人群:需要在 Go 服务中实现定时任务的工程师、从 Java Quartz/Spring Scheduler 迁移的开发者 | 阅读时长:约18分钟 | 核心价值:从单机 cron 到分布式任务调度,彻底搞懂 Go 定时任务的正确实现方式
从一个"账单对账任务"跑了三次的事故说起
2023年6月,我们的账单对账定时任务出了一个诡异的问题:每次执行后,数据库里的对账记录数量是预期的3倍。
排查了两个小时,才找到原因:服务部署了3个实例,三台机器上的定时任务同时执行了,同一笔账单被对账了三次。
这是多实例部署下定时任务的经典陷阱——单机定时任务在分布式环境里天然会重复执行。
更气的是,这个问题不是第一次发生。我们之前就有一个发送营销邮件的任务,出现过同一个用户收到3封完全一样邮件的投诉。
那次账单事故之后,我把团队里所有的定时任务做了一次全面梳理,建立了一套分布式任务调度规范。今天把这套方案完整写出来。
方案一:单机 cron(go-co-op/gocron)
Go 里最好用的 cron 库是 robfig/cron 或 go-co-op/gocron。gocron 是对 robfig/cron 的封装,API 更友好。
go get github.com/go-co-op/gocron/v2基础用法
package main
import (
"fmt"
"log"
"time"
"github.com/go-co-op/gocron/v2"
)
func main() {
// 创建调度器(默认使用本地时区,生产中推荐明确指定时区)
s, err := gocron.NewScheduler(gocron.WithLocation(time.Local))
if err != nil {
log.Fatal(err)
}
// 每5分钟执行一次
_, err = s.NewJob(
gocron.DurationJob(5*time.Minute),
gocron.NewTask(func() {
fmt.Println("每5分钟: ", time.Now().Format("15:04:05"))
}),
)
// Cron 表达式(每天0点执行)
_, err = s.NewJob(
gocron.CronJob("0 0 * * *", false),
gocron.NewTask(dailyBillReconcile),
gocron.WithName("账单对账"),
// 同一任务的上一次还在运行时,跳过这次(防止并发执行)
gocron.WithSingletonMode(gocron.LimitModeSkip),
)
if err != nil {
log.Fatal(err)
}
// 每月1号0点执行(复杂场景用 Cron 表达式)
_, err = s.NewJob(
gocron.CronJob("0 0 1 * *", false),
gocron.NewTask(monthlyReport),
gocron.WithName("月度报表"),
)
s.Start()
defer s.Shutdown()
// 阻塞主 goroutine(或者在 web 服务里,让 scheduler 在后台运行)
select {}
}
func dailyBillReconcile() {
log.Println("开始账单对账...")
// 实际对账逻辑
time.Sleep(30 * time.Second) // 模拟耗时
log.Println("账单对账完成")
}
func monthlyReport() {
log.Println("生成月度报表...")
}Java 对比:
@Scheduled(fixedRate = 300000)→gocron.DurationJob(5*time.Minute)@Scheduled(cron = "0 0 * * *")→gocron.CronJob("0 0 * * *", false)spring.task.scheduling.pool.size→ Go 里默认并发,用WithSingletonMode控制单例
方案二:分布式定时任务(Redis 分布式锁方案)
单机 cron 在多实例部署时,所有实例都会执行。解决方案是在任务执行前,先抢占一个分布式锁。
package distlock
import (
"context"
"fmt"
"log"
"time"
"github.com/go-co-op/gocron/v2"
"github.com/redis/go-redis/v9"
)
// RedisDistLockLocker 实现 gocron 的分布式锁接口
type RedisDistLockLocker struct {
redisClient *redis.Client
ttl time.Duration
}
func NewRedisLocker(client *redis.Client, ttl time.Duration) *RedisDistLockLocker {
return &RedisDistLockLocker{redisClient: client, ttl: ttl}
}
// Lock 尝试获取分布式锁(gocron 接口要求)
func (l *RedisDistLockLocker) Lock(ctx context.Context, key string) (bool, error) {
lockKey := fmt.Sprintf("cron:lock:%s", key)
ok, err := l.redisClient.SetNX(ctx, lockKey, "1", l.ttl).Result()
if err != nil {
return false, fmt.Errorf("获取分布式锁失败: %w", err)
}
return ok, nil
}
// Unlock 释放分布式锁
func (l *RedisDistLockLocker) Unlock(ctx context.Context, key string) error {
lockKey := fmt.Sprintf("cron:lock:%s", key)
return l.redisClient.Del(ctx, lockKey).Err()
}
// NewDistributedScheduler 创建带分布式锁的调度器
func NewDistributedScheduler(redisClient *redis.Client) (gocron.Scheduler, error) {
locker := NewRedisLocker(redisClient, 10*time.Minute)
s, err := gocron.NewScheduler(
gocron.WithLocation(time.Local),
gocron.WithDistributedLocker(locker),
)
return s, err
}使用分布式锁调度器
func main() {
redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
s, err := NewDistributedScheduler(redisClient)
if err != nil {
log.Fatal(err)
}
// 注册任务:多实例下,只有一台机器抢到锁才会执行
_, err = s.NewJob(
gocron.CronJob("0 0 * * *", false),
gocron.NewTask(dailyBillReconcile),
gocron.WithName("账单对账"),
// 分布式锁的 key 就是任务名,需要唯一
)
s.Start()
defer s.Shutdown()
select {}
}方案三:完整的分布式任务调度系统
如果业务复杂,需要任务管理面板、任务历史记录、失败重试,推荐用 asynq + Redis:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
// 定义任务类型常量
const (
TypeDailyBill = "task:daily_bill"
TypeSendEmail = "task:send_email"
TypeMonthlyReport = "task:monthly_report"
)
// EmailPayload 邮件任务参数
type EmailPayload struct {
UserID int64 `json:"user_id"`
Subject string `json:"subject"`
Body string `json:"body"`
}
// NewEmailTask 创建邮件任务
func NewEmailTask(userID int64, subject, body string) (*asynq.Task, error) {
payload, err := json.Marshal(EmailPayload{UserID: userID, Subject: subject, Body: body})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeSendEmail, payload), nil
}
// ---- Worker(处理任务)----
type EmailProcessor struct{}
func (p *EmailProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
var payload EmailPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析任务参数失败: %w", err)
}
log.Printf("发送邮件: userID=%d, subject=%s", payload.UserID, payload.Subject)
// 实际发送逻辑...
return nil
}
// ---- 启动 Worker 和 Scheduler ----
func main() {
redisOpt := asynq.RedisClientOpt{Addr: "localhost:6379"}
// 启动任务处理器(Worker)
srv := asynq.NewServer(redisOpt, asynq.Config{
Concurrency: 10, // 最大并发处理数
Queues: map[string]int{
"critical": 6, // critical 队列权重60%
"default": 3,
"low": 1,
},
// 任务失败重试配置
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
// 指数退避重试
return time.Duration(n*n) * time.Second
},
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
log.Printf("任务失败(不再重试): type=%s, err=%v", task.Type(), err)
}),
})
mux := asynq.NewServeMux()
mux.HandleFunc(TypeSendEmail, (&EmailProcessor{}).ProcessTask)
go func() {
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}()
// 启动定时调度器
scheduler := asynq.NewScheduler(redisOpt, nil)
// 每天0点执行账单对账
if _, err := scheduler.Register("0 0 * * *",
asynq.NewTask(TypeDailyBill, nil),
asynq.Queue("critical"),
// 任务唯一性:相同任务在24小时内只入队一次(防止重复)
asynq.Unique(24*time.Hour),
); err != nil {
log.Fatal(err)
}
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}踩坑实录
坑1:分布式锁 TTL 设置比任务执行时间短,锁过期后多个实例同时执行
现象: 账单对账任务偶发重复执行,对账结果翻倍。仔细一查,是锁的有效期(5分钟)比任务实际执行时间(8分钟)短。
原因: Redis 锁过期后,其他实例成功抢锁并开始执行,而原来的实例还在跑,导致两个实例并发执行同一任务。
解法:
- 锁的 TTL 要设置为任务最大执行时间的2-3倍,留足余量
- 或者在任务执行过程中定期续期锁(Watchdog 机制,Redisson 默认有这个):
go func() {
ticker := time.NewTicker(lockTTL / 3)
defer ticker.Stop()
for range ticker.C {
redisClient.Expire(ctx, lockKey, lockTTL)
}
}()坑2:Cron 表达式时区搞错,任务在错误的时间执行
现象: 设置的是"每天0点执行",但实际执行时间是早上8点(UTC+8 = UTC 0点)。
原因: robfig/cron 默认用 UTC 时间,但我们的业务时间是 Asia/Shanghai(UTC+8)。
解法: 创建调度器时明确指定时区:
loc, _ := time.LoadLocation("Asia/Shanghai")
s, _ := gocron.NewScheduler(gocron.WithLocation(loc))坑3:任务上次还没跑完,下一次触发时间到了,产生并发执行
现象: 一个邮件发送任务,正常情况20秒跑完,但某次因为下游服务慢,跑了3分钟还没结束。3分钟内触发的下几次调度和上次并发执行,发送了大量重复邮件。
原因: 默认情况下,gocron 不等上一次任务完成,时间到了就触发新一次。
解法: 设置 WithSingletonMode:
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(sendEmails),
gocron.WithSingletonMode(gocron.LimitModeSkip), // 上次还在跑就跳过本次
)或者用 LimitModeWait(等待上次完成再开始)。
我的定时任务技术选型建议
| 场景 | 推荐方案 |
|---|---|
| 单机,任务简单 | go-co-op/gocron |
| 多实例,任务幂等,偶发重复无害 | gocron + Redis 分布式锁 |
| 多实例,任务不能重复,需要重试记录 | asynq |
| 高可用、有 UI 管理面板 | xxl-job(Go 客户端)或 asynq + asynqmon |
对于我们的账单对账场景,最终选了 asynq:任务有明确的唯一性保证、有失败重试、有执行记录,复盘问题时能查到每次任务的执行状态。
