From db753708731c2499318b97364852f7aecdd482eb Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 17 Sep 2025 15:47:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E6=84=8F=E5=A4=96=E8=8E=B7?= =?UTF-8?q?=E5=8F=96Task=E5=90=8E=E9=87=8D=E6=96=B0=E6=94=BE=E5=9B=9E?= =?UTF-8?q?=E5=8E=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/infra/task/task.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/internal/infra/task/task.go b/internal/infra/task/task.go index 87de76f..387e02c 100644 --- a/internal/infra/task/task.go +++ b/internal/infra/task/task.go @@ -175,22 +175,21 @@ func (s *Scheduler) claimAndSubmit() { // 提交失败,必须释放刚刚获取的锁 s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) // 同样需要将任务安全放回 - s.handleRequeue(pendingTask) + s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) } } else { // 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。 - s.handleRequeue(pendingTask) + s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) } } // handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 -func (s *Scheduler) handleRequeue(taskToRequeue *models.PendingTask) { - planLogID := taskToRequeue.PlanExecutionLogID - s.logger.Printf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planLogID, taskToRequeue.ID, taskToRequeue.TaskID) +func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) { + s.logger.Printf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planExecutionLogID, taskToRequeue.ID, taskToRequeue.TaskID) // 1. 阻塞式地等待,直到可以获取到该计划的锁。 - s.progressTracker.Lock(planLogID) - defer s.progressTracker.Unlock(planLogID) + s.progressTracker.Lock(planExecutionLogID) + defer s.progressTracker.Unlock(planExecutionLogID) // 2. 在持有锁的情况下,将任务安全地放回队列。 if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil { @@ -198,7 +197,7 @@ func (s *Scheduler) handleRequeue(taskToRequeue *models.PendingTask) { return } - s.logger.Printf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planLogID) + s.logger.Printf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID) } // processTask 处理单个任务的逻辑 (当前为占位符)