package task import ( "context" "errors" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "github.com/panjf2000/ants/v2" "gorm.io/gorm" ) // Logger 定义了调度器期望的日志接口,方便替换为项目中的日志组件 type Logger interface { Printf(format string, v ...interface{}) } // ProgressTracker 在内存中跟踪计划的执行状态,包括进度和执行锁 type ProgressTracker struct { mu sync.Mutex cond *sync.Cond // 用于实现阻塞锁 totalTasks map[uint]int // key: planExecutionLogID, value: total tasks completedTasks map[uint]int // key: planExecutionLogID, value: completed tasks runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁) } // NewProgressTracker 创建一个新的进度跟踪器 func NewProgressTracker() *ProgressTracker { t := &ProgressTracker{ totalTasks: make(map[uint]int), completedTasks: make(map[uint]int), runningPlans: make(map[uint]bool), } t.cond = sync.NewCond(&t.mu) return t } // TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。 func (t *ProgressTracker) TryLock(planLogID uint) bool { t.mu.Lock() defer t.mu.Unlock() if t.runningPlans[planLogID] { return false // 已被锁定 } t.runningPlans[planLogID] = true return true } // Lock (阻塞) 获取一个计划的执行锁。如果锁已被占用,则会一直等待直到锁被释放。 func (t *ProgressTracker) Lock(planLogID uint) { t.mu.Lock() // 当计划正在运行时,调用 t.cond.Wait() 会原子地解锁 mu 并挂起当前协程。 // 当被唤醒时,它会重新锁定 mu 并再次检查循环条件。 for t.runningPlans[planLogID] { t.cond.Wait() } // 获取到锁 t.runningPlans[planLogID] = true t.mu.Unlock() } // Unlock 解锁一个计划,并唤醒所有正在等待此锁的协程。 func (t *ProgressTracker) Unlock(planLogID uint) { t.mu.Lock() defer t.mu.Unlock() delete(t.runningPlans, planLogID) // 唤醒所有在此条件上等待的协程 t.cond.Broadcast() } // GetRunningPlanIDs 获取当前所有正在执行的计划ID列表 func (t *ProgressTracker) GetRunningPlanIDs() []uint { t.mu.Lock() defer t.mu.Unlock() ids := make([]uint, 0, len(t.runningPlans)) for id := range t.runningPlans { ids = append(ids, id) } return ids } // Scheduler 是核心的、持久化的任务调度器 type Scheduler struct { logger Logger pollingInterval time.Duration workers int pendingTaskRepo repository.PendingTaskRepository progressTracker *ProgressTracker pool *ants.Pool // 使用 ants 协程池来管理并发 wg sync.WaitGroup ctx context.Context cancel context.CancelFunc } // NewScheduler 创建一个新的调度器实例 func NewScheduler(pendingTaskRepo repository.PendingTaskRepository, logger Logger, interval time.Duration, numWorkers int) *Scheduler { ctx, cancel := context.WithCancel(context.Background()) return &Scheduler{ pendingTaskRepo: pendingTaskRepo, logger: logger, pollingInterval: interval, workers: numWorkers, progressTracker: NewProgressTracker(), ctx: ctx, cancel: cancel, } } // Start 启动调度器,包括初始化协程池和启动主轮询循环 func (s *Scheduler) Start() { s.logger.Printf("任务调度器正在启动,工作协程数: %d...", s.workers) pool, err := ants.NewPool(s.workers, ants.WithPanicHandler(func(err interface{}) { s.logger.Printf("[严重] 任务执行时发生 panic: %v", err) })) if err != nil { panic("初始化协程池失败: " + err.Error()) } s.pool = pool s.wg.Add(1) go s.run() s.logger.Printf("任务调度器已成功启动") } // Stop 优雅地停止调度器 func (s *Scheduler) Stop() { s.logger.Printf("正在停止任务调度器...") s.cancel() // 1. 发出取消信号,停止主循环 s.wg.Wait() // 2. 等待主循环完成 s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕) s.logger.Printf("任务调度器已安全停止") } // run 是主轮询循环,负责从数据库认领任务并提交到协程池 func (s *Scheduler) run() { defer s.wg.Done() ticker := time.NewTicker(s.pollingInterval) defer ticker.Stop() for { select { case <-s.ctx.Done(): return case <-ticker.C: go s.claimAndSubmit() } } } // claimAndSubmit 实现了最终的“认领-锁定-执行 或 等待-放回”的健壮逻辑 func (s *Scheduler) claimAndSubmit() { runningPlanIDs := s.progressTracker.GetRunningPlanIDs() claimedLog, pendingTask, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs) if err != nil { if !errors.Is(err, gorm.ErrRecordNotFound) { s.logger.Printf("认领任务时发生错误: %v", err) } // gorm.ErrRecordNotFound 说明没任务要执行 return } // 尝试获取内存执行锁 if s.progressTracker.TryLock(claimedLog.PlanExecutionLogID) { // 成功获取锁,正常派发任务 err = s.pool.Submit(func() { defer s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) s.processTask(claimedLog) }) if err != nil { s.logger.Printf("向协程池提交任务失败: %v", err) // 提交失败,必须释放刚刚获取的锁 s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) // 同样需要将任务安全放回 s.handleRequeue(pendingTask) } } else { // 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。 s.handleRequeue(pendingTask) } } // handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 func (s *Scheduler) handleRequeue(taskToRequeue *models.PendingTask) { planLogID := taskToRequeue.PlanExecutionLogID s.logger.Printf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planLogID, taskToRequeue.ID, taskToRequeue.TaskID) // 1. 阻塞式地等待,直到可以获取到该计划的锁。 s.progressTracker.Lock(planLogID) defer s.progressTracker.Unlock(planLogID) // 2. 在持有锁的情况下,将任务安全地放回队列。 if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil { s.logger.Printf("[严重] 任务重新入队失败, 原始PendingTaskID: %d, 错误: %v", taskToRequeue.ID, err) return } s.logger.Printf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planLogID) } // processTask 处理单个任务的逻辑 (当前为占位符) func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { s.logger.Printf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s", claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name) time.Sleep(2 * time.Second) // 模拟任务执行 s.logger.Printf("完成任务, 日志ID: %d", claimedLog.ID) }