golang封装可扩展的crontab
1. 导入cron第三方包
go get github.com/robfig/cron/v32. 申明变量
// 单个cron的interface
type RunCronItem interface {Run() // 计划任务执行体Start() error // 初始化用的,第一次启动的时候,调用,后续不再执行Name() string // 计划任务配置名
}// 申明所有计划任务的配置
type CronTaskItem struct {name stringrunItem RunCronItem
}// JobStatus 任务状态
type JobStatus struct {LastRun time.TimeLastSuccess boolIsRunning boolMutex sync.Mutex
}// CronManager Cron任务管理器
type CronManager struct {cron *cron.Cronjobs map[string]func()jobStatus map[string]*JobStatusconfig config.ProxyConfig // 此处为配置文件
}3. 配置文件
config.yaml
cron_jobs:start_file_resend:Schedule: "0 */2 * * * *" # 每2分钟将trace文件发送到客户端Enabled: falseDescription: "每2分钟将文件发到云端"config.go
import ("flag""github.com/spf13/viper""log"
)type ProxyConfig struct {
...LogLevel uint32 `mapstructure:"log_level"`Log LogConfig `mapstructure:"log"`CronJobs map[string]CronJobConfig `mapstructure:"cron_jobs"` //此处为计划任务的配置
....}4. 创建计划任务管理器
// NewCronManager 创建Cron管理器
func NewCronManager(appConfig config.ProxyConfig) *CronManager {// 创建Cron实例,使用秒级精度(支持6位cron表达式)c := cron.New(cron.WithSeconds())manager := &CronManager{cron: c,jobs: make(map[string]func()),jobStatus: make(map[string]*JobStatus),config: appConfig,}return manager
}// AddJob 添加任务
func (m *CronManager) AddJob(name string, jobItem RunCronItem) error {// 查找配置jobConfig, exists := m.config.CronJobs[name]if !exists {return fmt.Errorf("定时任务名称找不到:%s", name)}if !jobConfig.Enabled {logger.Log.Error("定时任务未启用", zap.String("name", name))return nil}// 初始化任务状态m.jobStatus[name] = &JobStatus{}// 包装任务函数,添加并发控制wrappedFunc := func() {status := m.jobStatus[name]// 检查任务是否正在运行status.Mutex.Lock()if status.IsRunning {logger.Log.Info("定时任务已经运行中,请稍后", zap.String("name", name))status.Mutex.Unlock()return}// 标记任务开始运行status.IsRunning = truestatus.LastRun = time.Now()status.Mutex.Unlock()// 执行任务logger.Log.Debug("定时任务正在执行", zap.String("name", name))startTime := time.Now()defer func() {// 捕获panic,确保任务状态正确更新if r := recover(); r != nil {logger.Log.Error(fmt.Sprintf("定时任务 %s panicked: %v", name, r))status.LastSuccess = false}// 更新任务状态status.Mutex.Lock()status.IsRunning = falsestatus.Mutex.Unlock()duration := time.Since(startTime)logger.Log.Debug(fmt.Sprintf("定时任务完成:%s, duration: %v", name, duration))}()// 执行实际任务jobItem.Run()status.LastSuccess = true}// 添加任务到cronm.jobs[name] = wrappedFunc_, err := m.cron.AddFunc(jobConfig.Schedule, wrappedFunc)if err != nil {return fmt.Errorf("failed to add job %s: %v", name, err)}logger.Log.Debug(fmt.Sprintf("添加定时任务成功: %s with schedule: %s", name, jobConfig.Schedule))return nil
}// StartAllJobs 启动所有配置中启用的任务
func (m *CronManager) StartAllJobs() error {for name := range m.config.CronJobs {// 这里不直接添加任务,因为任务函数需要外部提供// 外部需要先调用 AddJob 添加具体的任务函数logger.Log.Debug(fmt.Sprintf("启用所有配置中的定时任务: %s", name))}m.cron.Start()logger.Log.Debug("定时任务已全部开启")return nil
}// Start 启动所有任务
func (m *CronManager) Start() {m.cron.Start()logger.Log.Debug("Cron manager started")
}// Stop 停止所有任务
func (m *CronManager) Stop() {m.cron.Stop()logger.Log.Debug("Cron manager stopped")
}// GetJobStatus 获取任务状态
func (m *CronManager) GetJobStatus(name string) (*JobStatus, error) {status, exists := m.jobStatus[name]if !exists {return nil, fmt.Errorf("job %s not found", name)}return status, nil
}// ListJobs 列出所有任务
func (m *CronManager) ListJobs() []string {var jobs []stringfor name := range m.jobs {jobs = append(jobs, name)}return jobs
}5. 配置cron和启动
// 此处配置您的计划任务列表
func crontMangerList() []CronTaskItem {cronTaskList := []RunCronItem{... // 这里可以配置你自己的计划任务的struct,返回满足interface RunCronItem}crontaskItems := make([]CronTaskItem, 0, len(cronTaskList))for _, item := range cronTaskList {crontaskItems = append(crontaskItems, CronTaskItem{name: item.Name(),runItem: item,})}return crontaskItems
}// 启动job客户端
func InitCronJobClient(config config.ProxyConfig) {manager := NewCronManager(config)// 添加任务,使用 WaitGroup 跟踪cronList := crontMangerList()for _, cronItem := range cronList {err := manager.AddJob(cronItem.name, cronItem.runItem)if err != nil {logger.Log.Error("加入定时任务失败job:%s err:%v", zap.String("job_name", cronItem.name), zap.Error(err))}err = cronItem.runItem.Start()if err != nil {panic(fmt.Sprintf("计划任务初始化失败:%v", err))}}// 启动 Cron 管理器manager.Start()logger.Log.Info("CronJob InitCronJobClient")
}