diff --git a/internal/infra/repository/pending_task_repository.go b/internal/infra/repository/pending_task_repository.go index a5b57a9..dfee75b 100644 --- a/internal/infra/repository/pending_task_repository.go +++ b/internal/infra/repository/pending_task_repository.go @@ -11,8 +11,11 @@ import ( // PendingTaskRepository 定义了与待执行任务队列交互的接口。 type PendingTaskRepository interface { CreatePendingTasksInBatch(tasks []*models.PendingTask) error - ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, error) - RequeueTask(log *models.TaskExecutionLog) error + // ClaimNextAvailableTask 原子地认领下一个可用的任务。 + // 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。 + ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) + // RequeueTask 安全地将一个任务重新放回队列。 + RequeueTask(originalPendingTask *models.PendingTask) error } // pendingTaskRepository 是使用 GORM 的具体实现。 @@ -31,11 +34,11 @@ func (r *pendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.Pendin } // ClaimNextAvailableTask 以原子方式认领下一个可用的任务。 -func (r *pendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, error) { +func (r *pendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) { var log models.TaskExecutionLog + var pendingTask models.PendingTask err := r.db.Transaction(func(tx *gorm.DB) error { - var pendingTask models.PendingTask query := tx.Clauses(clause.Locking{Strength: "UPDATE"}). Where("execute_at <= ?", time.Now()). Order("execute_at ASC") @@ -68,27 +71,26 @@ func (r *pendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (* }) if err != nil { - return nil, err + return nil, nil, err } - return &log, nil + return &log, &pendingTask, nil } -// RequeueTask 安全地将一个已被认领但无法执行的任务放回队列。 -// 它在一个事务中原子地将日志状态恢复为 'waiting',并重新创建待办任务。 -func (r *pendingTaskRepository) RequeueTask(log *models.TaskExecutionLog) error { +// RequeueTask 安全地将一个任务重新放回队列。 +// 它通过将原始 PendingTask 的 ID 重置为 0,并重新创建它来实现。 +func (r *pendingTaskRepository) RequeueTask(originalPendingTask *models.PendingTask) error { return r.db.Transaction(func(tx *gorm.DB) error { // 1. 将日志状态恢复为 waiting - if err := tx.Model(log).Update("status", models.ExecutionStatusWaiting).Error; err != nil { + if err := tx.Model(&models.TaskExecutionLog{}).Where("id = ?", originalPendingTask.TaskExecutionLogID).Update("status", models.ExecutionStatusWaiting).Error; err != nil { return err } - // 2. 重新创建待办任务,立即执行 - newPendingTask := models.PendingTask{ - TaskID: log.TaskID, - TaskExecutionLogID: log.ID, - ExecuteAt: time.Now(), - } - return tx.Create(&newPendingTask).Error + // 2. 关键:将传入的 PendingTask 的 ID 重置为 0。 + // 这会告诉 GORM,这是一个需要创建(INSERT)的新记录,而不是更新。 + originalPendingTask.ID = 0 + + // 3. 重新创建待办任务。GORM 会忽略掉已被重置的 ID,并让数据库生成一个新的主键。 + return tx.Create(originalPendingTask).Error }) } diff --git a/internal/infra/task/task.go b/internal/infra/task/task.go index 7719500..87de76f 100644 --- a/internal/infra/task/task.go +++ b/internal/infra/task/task.go @@ -26,6 +26,7 @@ type ProgressTracker struct { runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁) } +// NewProgressTracker 创建一个新的进度跟踪器 func NewProgressTracker() *ProgressTracker { t := &ProgressTracker{ totalTasks: make(map[uint]int), @@ -149,11 +150,11 @@ func (s *Scheduler) run() { } } -// claimAndSubmit 实现了“认领-锁定-执行 或 等待-放回”的健壮逻辑 +// claimAndSubmit 实现了最终的“认领-锁定-执行 或 等待-放回”的健壮逻辑 func (s *Scheduler) claimAndSubmit() { runningPlanIDs := s.progressTracker.GetRunningPlanIDs() - claimedLog, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs) + claimedLog, pendingTask, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs) if err != nil { if !errors.Is(err, gorm.ErrRecordNotFound) { s.logger.Printf("认领任务时发生错误: %v", err) @@ -171,32 +172,36 @@ func (s *Scheduler) claimAndSubmit() { }) if err != nil { s.logger.Printf("向协程池提交任务失败: %v", err) - s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) // 提交失败,必须释放刚刚获取的锁 + // 提交失败,必须释放刚刚获取的锁 + s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) // 同样需要将任务安全放回 - s.handleRequeue(claimedLog) + s.handleRequeue(pendingTask) } } else { // 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。 - s.handleRequeue(claimedLog) + s.handleRequeue(pendingTask) } } -// handleRequeue 处理需要被安全放回队列的任务 -func (s *Scheduler) handleRequeue(log *models.TaskExecutionLog) { - s.logger.Printf("计划 %d 正在执行,任务 %d 将等待并重新入队...", log.PlanExecutionLogID, log.ID) +// handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 +func (s *Scheduler) handleRequeue(taskToRequeue *models.PendingTask) { + planLogID := taskToRequeue.PlanExecutionLogID + s.logger.Printf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planLogID, taskToRequeue.ID, taskToRequeue.TaskID) - // 1. 阻塞式地等待,直到可以获取到该计划的锁 - s.progressTracker.Lock(log.PlanExecutionLogID) - // 2. 在持有锁的情况下,将任务安全地放回队列 - // 增加一个小的延迟(例如1秒),以避免与主循环发生过于频繁的竞争 - if err := s.pendingTaskRepo.RequeueTask(log); err != nil { - s.logger.Printf("任务重新入队失败, 日志ID: %d, 错误: %v", log.ID, err) + // 1. 阻塞式地等待,直到可以获取到该计划的锁。 + s.progressTracker.Lock(planLogID) + defer s.progressTracker.Unlock(planLogID) + + // 2. 在持有锁的情况下,将任务安全地放回队列。 + if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil { + s.logger.Printf("[严重] 任务重新入队失败, 原始PendingTaskID: %d, 错误: %v", taskToRequeue.ID, err) + return } - // 3. 释放锁,让其他等待的任务(或主循环)可以继续 - s.progressTracker.Unlock(log.PlanExecutionLogID) + + s.logger.Printf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planLogID) } -// processTask 处理单个任务的逻辑 (保持不变) +// processTask 处理单个任务的逻辑 (当前为占位符) func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { s.logger.Printf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s", claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name)