From 4a24e1a08d656e1817fa927095cc4acaf7371b74 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 17 Sep 2025 15:56:08 +0800 Subject: [PATCH] =?UTF-8?q?task.go=20=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/service/task/delay_task.go | 5 - internal/infra/task/scheduler.go | 209 ++++++++++++++++++++++++ internal/infra/task/task.go | 208 ----------------------- internal/infra/task/task_test.go | 77 --------- 4 files changed, 209 insertions(+), 290 deletions(-) create mode 100644 internal/infra/task/scheduler.go delete mode 100644 internal/infra/task/task_test.go diff --git a/internal/app/service/task/delay_task.go b/internal/app/service/task/delay_task.go index 0b90eeb..a39779a 100644 --- a/internal/app/service/task/delay_task.go +++ b/internal/app/service/task/delay_task.go @@ -37,11 +37,6 @@ func (d *DelayTask) GetID() string { return d.id } -// GetPriority 获取任务优先级 -func (d *DelayTask) GetPriority() int { - return d.priority -} - // IsDone 检查任务是否已完成 func (d *DelayTask) IsDone() bool { return d.done diff --git a/internal/infra/task/scheduler.go b/internal/infra/task/scheduler.go new file mode 100644 index 0000000..387e02c --- /dev/null +++ b/internal/infra/task/scheduler.go @@ -0,0 +1,209 @@ +package task + +import ( + "context" + "errors" + "sync" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "github.com/panjf2000/ants/v2" + "gorm.io/gorm" +) + +// Logger 定义了调度器期望的日志接口,方便替换为项目中的日志组件 +type Logger interface { + Printf(format string, v ...interface{}) +} + +// ProgressTracker 在内存中跟踪计划的执行状态,包括进度和执行锁 +type ProgressTracker struct { + mu sync.Mutex + cond *sync.Cond // 用于实现阻塞锁 + totalTasks map[uint]int // key: planExecutionLogID, value: total tasks + completedTasks map[uint]int // key: planExecutionLogID, value: completed tasks + runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁) +} + +// NewProgressTracker 创建一个新的进度跟踪器 +func NewProgressTracker() *ProgressTracker { + t := &ProgressTracker{ + totalTasks: make(map[uint]int), + completedTasks: make(map[uint]int), + runningPlans: make(map[uint]bool), + } + t.cond = sync.NewCond(&t.mu) + return t +} + +// TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。 +func (t *ProgressTracker) TryLock(planLogID uint) bool { + t.mu.Lock() + defer t.mu.Unlock() + if t.runningPlans[planLogID] { + return false // 已被锁定 + } + t.runningPlans[planLogID] = true + return true +} + +// Lock (阻塞) 获取一个计划的执行锁。如果锁已被占用,则会一直等待直到锁被释放。 +func (t *ProgressTracker) Lock(planLogID uint) { + t.mu.Lock() + // 当计划正在运行时,调用 t.cond.Wait() 会原子地解锁 mu 并挂起当前协程。 + // 当被唤醒时,它会重新锁定 mu 并再次检查循环条件。 + for t.runningPlans[planLogID] { + t.cond.Wait() + } + // 获取到锁 + t.runningPlans[planLogID] = true + t.mu.Unlock() +} + +// Unlock 解锁一个计划,并唤醒所有正在等待此锁的协程。 +func (t *ProgressTracker) Unlock(planLogID uint) { + t.mu.Lock() + defer t.mu.Unlock() + delete(t.runningPlans, planLogID) + // 唤醒所有在此条件上等待的协程 + t.cond.Broadcast() +} + +// GetRunningPlanIDs 获取当前所有正在执行的计划ID列表 +func (t *ProgressTracker) GetRunningPlanIDs() []uint { + t.mu.Lock() + defer t.mu.Unlock() + ids := make([]uint, 0, len(t.runningPlans)) + for id := range t.runningPlans { + ids = append(ids, id) + } + return ids +} + +// Scheduler 是核心的、持久化的任务调度器 +type Scheduler struct { + logger Logger + pollingInterval time.Duration + workers int + pendingTaskRepo repository.PendingTaskRepository + progressTracker *ProgressTracker + + pool *ants.Pool // 使用 ants 协程池来管理并发 + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc +} + +// NewScheduler 创建一个新的调度器实例 +func NewScheduler(pendingTaskRepo repository.PendingTaskRepository, logger Logger, interval time.Duration, numWorkers int) *Scheduler { + ctx, cancel := context.WithCancel(context.Background()) + return &Scheduler{ + pendingTaskRepo: pendingTaskRepo, + logger: logger, + pollingInterval: interval, + workers: numWorkers, + progressTracker: NewProgressTracker(), + ctx: ctx, + cancel: cancel, + } +} + +// Start 启动调度器,包括初始化协程池和启动主轮询循环 +func (s *Scheduler) Start() { + s.logger.Printf("任务调度器正在启动,工作协程数: %d...", s.workers) + pool, err := ants.NewPool(s.workers, ants.WithPanicHandler(func(err interface{}) { + s.logger.Printf("[严重] 任务执行时发生 panic: %v", err) + })) + if err != nil { + panic("初始化协程池失败: " + err.Error()) + } + s.pool = pool + + s.wg.Add(1) + go s.run() + s.logger.Printf("任务调度器已成功启动") +} + +// Stop 优雅地停止调度器 +func (s *Scheduler) Stop() { + s.logger.Printf("正在停止任务调度器...") + s.cancel() // 1. 发出取消信号,停止主循环 + s.wg.Wait() // 2. 等待主循环完成 + s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕) + s.logger.Printf("任务调度器已安全停止") +} + +// run 是主轮询循环,负责从数据库认领任务并提交到协程池 +func (s *Scheduler) run() { + defer s.wg.Done() + ticker := time.NewTicker(s.pollingInterval) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + go s.claimAndSubmit() + } + } +} + +// claimAndSubmit 实现了最终的“认领-锁定-执行 或 等待-放回”的健壮逻辑 +func (s *Scheduler) claimAndSubmit() { + runningPlanIDs := s.progressTracker.GetRunningPlanIDs() + + claimedLog, pendingTask, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs) + if err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + s.logger.Printf("认领任务时发生错误: %v", err) + } + // gorm.ErrRecordNotFound 说明没任务要执行 + return + } + + // 尝试获取内存执行锁 + if s.progressTracker.TryLock(claimedLog.PlanExecutionLogID) { + // 成功获取锁,正常派发任务 + err = s.pool.Submit(func() { + defer s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) + s.processTask(claimedLog) + }) + if err != nil { + s.logger.Printf("向协程池提交任务失败: %v", err) + // 提交失败,必须释放刚刚获取的锁 + s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) + // 同样需要将任务安全放回 + s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) + } + } else { + // 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。 + s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) + } +} + +// handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 +func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) { + s.logger.Printf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planExecutionLogID, taskToRequeue.ID, taskToRequeue.TaskID) + + // 1. 阻塞式地等待,直到可以获取到该计划的锁。 + s.progressTracker.Lock(planExecutionLogID) + defer s.progressTracker.Unlock(planExecutionLogID) + + // 2. 在持有锁的情况下,将任务安全地放回队列。 + if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil { + s.logger.Printf("[严重] 任务重新入队失败, 原始PendingTaskID: %d, 错误: %v", taskToRequeue.ID, err) + return + } + + s.logger.Printf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID) +} + +// processTask 处理单个任务的逻辑 (当前为占位符) +func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { + s.logger.Printf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s", + claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name) + time.Sleep(2 * time.Second) // 模拟任务执行 + s.logger.Printf("完成任务, 日志ID: %d", claimedLog.ID) +} diff --git a/internal/infra/task/task.go b/internal/infra/task/task.go index 387e02c..b028dfd 100644 --- a/internal/infra/task/task.go +++ b/internal/infra/task/task.go @@ -1,209 +1 @@ package task - -import ( - "context" - "errors" - "sync" - "time" - - "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" - "github.com/panjf2000/ants/v2" - "gorm.io/gorm" -) - -// Logger 定义了调度器期望的日志接口,方便替换为项目中的日志组件 -type Logger interface { - Printf(format string, v ...interface{}) -} - -// ProgressTracker 在内存中跟踪计划的执行状态,包括进度和执行锁 -type ProgressTracker struct { - mu sync.Mutex - cond *sync.Cond // 用于实现阻塞锁 - totalTasks map[uint]int // key: planExecutionLogID, value: total tasks - completedTasks map[uint]int // key: planExecutionLogID, value: completed tasks - runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁) -} - -// NewProgressTracker 创建一个新的进度跟踪器 -func NewProgressTracker() *ProgressTracker { - t := &ProgressTracker{ - totalTasks: make(map[uint]int), - completedTasks: make(map[uint]int), - runningPlans: make(map[uint]bool), - } - t.cond = sync.NewCond(&t.mu) - return t -} - -// TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。 -func (t *ProgressTracker) TryLock(planLogID uint) bool { - t.mu.Lock() - defer t.mu.Unlock() - if t.runningPlans[planLogID] { - return false // 已被锁定 - } - t.runningPlans[planLogID] = true - return true -} - -// Lock (阻塞) 获取一个计划的执行锁。如果锁已被占用,则会一直等待直到锁被释放。 -func (t *ProgressTracker) Lock(planLogID uint) { - t.mu.Lock() - // 当计划正在运行时,调用 t.cond.Wait() 会原子地解锁 mu 并挂起当前协程。 - // 当被唤醒时,它会重新锁定 mu 并再次检查循环条件。 - for t.runningPlans[planLogID] { - t.cond.Wait() - } - // 获取到锁 - t.runningPlans[planLogID] = true - t.mu.Unlock() -} - -// Unlock 解锁一个计划,并唤醒所有正在等待此锁的协程。 -func (t *ProgressTracker) Unlock(planLogID uint) { - t.mu.Lock() - defer t.mu.Unlock() - delete(t.runningPlans, planLogID) - // 唤醒所有在此条件上等待的协程 - t.cond.Broadcast() -} - -// GetRunningPlanIDs 获取当前所有正在执行的计划ID列表 -func (t *ProgressTracker) GetRunningPlanIDs() []uint { - t.mu.Lock() - defer t.mu.Unlock() - ids := make([]uint, 0, len(t.runningPlans)) - for id := range t.runningPlans { - ids = append(ids, id) - } - return ids -} - -// Scheduler 是核心的、持久化的任务调度器 -type Scheduler struct { - logger Logger - pollingInterval time.Duration - workers int - pendingTaskRepo repository.PendingTaskRepository - progressTracker *ProgressTracker - - pool *ants.Pool // 使用 ants 协程池来管理并发 - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc -} - -// NewScheduler 创建一个新的调度器实例 -func NewScheduler(pendingTaskRepo repository.PendingTaskRepository, logger Logger, interval time.Duration, numWorkers int) *Scheduler { - ctx, cancel := context.WithCancel(context.Background()) - return &Scheduler{ - pendingTaskRepo: pendingTaskRepo, - logger: logger, - pollingInterval: interval, - workers: numWorkers, - progressTracker: NewProgressTracker(), - ctx: ctx, - cancel: cancel, - } -} - -// Start 启动调度器,包括初始化协程池和启动主轮询循环 -func (s *Scheduler) Start() { - s.logger.Printf("任务调度器正在启动,工作协程数: %d...", s.workers) - pool, err := ants.NewPool(s.workers, ants.WithPanicHandler(func(err interface{}) { - s.logger.Printf("[严重] 任务执行时发生 panic: %v", err) - })) - if err != nil { - panic("初始化协程池失败: " + err.Error()) - } - s.pool = pool - - s.wg.Add(1) - go s.run() - s.logger.Printf("任务调度器已成功启动") -} - -// Stop 优雅地停止调度器 -func (s *Scheduler) Stop() { - s.logger.Printf("正在停止任务调度器...") - s.cancel() // 1. 发出取消信号,停止主循环 - s.wg.Wait() // 2. 等待主循环完成 - s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕) - s.logger.Printf("任务调度器已安全停止") -} - -// run 是主轮询循环,负责从数据库认领任务并提交到协程池 -func (s *Scheduler) run() { - defer s.wg.Done() - ticker := time.NewTicker(s.pollingInterval) - defer ticker.Stop() - - for { - select { - case <-s.ctx.Done(): - return - case <-ticker.C: - go s.claimAndSubmit() - } - } -} - -// claimAndSubmit 实现了最终的“认领-锁定-执行 或 等待-放回”的健壮逻辑 -func (s *Scheduler) claimAndSubmit() { - runningPlanIDs := s.progressTracker.GetRunningPlanIDs() - - claimedLog, pendingTask, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs) - if err != nil { - if !errors.Is(err, gorm.ErrRecordNotFound) { - s.logger.Printf("认领任务时发生错误: %v", err) - } - // gorm.ErrRecordNotFound 说明没任务要执行 - return - } - - // 尝试获取内存执行锁 - if s.progressTracker.TryLock(claimedLog.PlanExecutionLogID) { - // 成功获取锁,正常派发任务 - err = s.pool.Submit(func() { - defer s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) - s.processTask(claimedLog) - }) - if err != nil { - s.logger.Printf("向协程池提交任务失败: %v", err) - // 提交失败,必须释放刚刚获取的锁 - s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) - // 同样需要将任务安全放回 - s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) - } - } else { - // 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。 - s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) - } -} - -// handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 -func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) { - s.logger.Printf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planExecutionLogID, taskToRequeue.ID, taskToRequeue.TaskID) - - // 1. 阻塞式地等待,直到可以获取到该计划的锁。 - s.progressTracker.Lock(planExecutionLogID) - defer s.progressTracker.Unlock(planExecutionLogID) - - // 2. 在持有锁的情况下,将任务安全地放回队列。 - if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil { - s.logger.Printf("[严重] 任务重新入队失败, 原始PendingTaskID: %d, 错误: %v", taskToRequeue.ID, err) - return - } - - s.logger.Printf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID) -} - -// processTask 处理单个任务的逻辑 (当前为占位符) -func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { - s.logger.Printf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s", - claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name) - time.Sleep(2 * time.Second) // 模拟任务执行 - s.logger.Printf("完成任务, 日志ID: %d", claimedLog.ID) -} diff --git a/internal/infra/task/task_test.go b/internal/infra/task/task_test.go deleted file mode 100644 index 2cc19e1..0000000 --- a/internal/infra/task/task_test.go +++ /dev/null @@ -1,77 +0,0 @@ -// Package task_test 包含对 task 包的单元测试 -package task_test - -import ( - "sync" - "sync/atomic" - "testing" - "time" - - "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" -) - -// testLogger 是一个用于所有测试用例的静默 logger 实例。 -var testLogger *logs.Logger - -func init() { - // 使用 "fatal" 级别来创建一个在测试期间不会产生任何输出的 logger。 - // 这避免了在运行 `go test` 时被日志淹没。 - cfg := config.LogConfig{Level: "fatal"} - testLogger = logs.NewLogger(cfg) -} - -// MockTask 用于测试的模拟任务 -type MockTask struct { - id string - priority int - isDone bool - execute func() error - executed int32 // 使用原子操作来跟踪执行次数 -} - -// Execute 实现了 Task 接口,并确保每次调用都增加执行计数 -func (m *MockTask) Execute() error { - atomic.AddInt32(&m.executed, 1) - if m.execute != nil { - return m.execute() - } - return nil -} - -func (m *MockTask) GetID() string { - return m.id -} - -func (m *MockTask) GetPriority() int { - return m.priority -} - -func (m *MockTask) IsDone() bool { - return m.isDone -} - -// ExecutedCount 返回任务被执行的次数 -func (m *MockTask) ExecutedCount() int32 { - return atomic.LoadInt32(&m.executed) -} - -func (m *MockTask) GetDescription() string { - return "Mock Task" -} - -// --- 健壮等待的辅助函数 --- -func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { - waitChan := make(chan struct{}) - go func() { - defer close(waitChan) - wg.Wait() - }() - - select { - case <-waitChan: - // 等待成功 - case <-time.After(timeout): - t.Fatal("等待任务完成超时") - } -}