重构 #4
| @@ -175,22 +175,21 @@ func (s *Scheduler) claimAndSubmit() { | |||||||
| 			// 提交失败,必须释放刚刚获取的锁 | 			// 提交失败,必须释放刚刚获取的锁 | ||||||
| 			s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) | 			s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) | ||||||
| 			// 同样需要将任务安全放回 | 			// 同样需要将任务安全放回 | ||||||
| 			s.handleRequeue(pendingTask) | 			s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) | ||||||
| 		} | 		} | ||||||
| 	} else { | 	} else { | ||||||
| 		// 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。 | 		// 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。 | ||||||
| 		s.handleRequeue(pendingTask) | 		s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 | // handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 | ||||||
| func (s *Scheduler) handleRequeue(taskToRequeue *models.PendingTask) { | func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) { | ||||||
| 	planLogID := taskToRequeue.PlanExecutionLogID | 	s.logger.Printf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planExecutionLogID, taskToRequeue.ID, taskToRequeue.TaskID) | ||||||
| 	s.logger.Printf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planLogID, taskToRequeue.ID, taskToRequeue.TaskID) |  | ||||||
|  |  | ||||||
| 	// 1. 阻塞式地等待,直到可以获取到该计划的锁。 | 	// 1. 阻塞式地等待,直到可以获取到该计划的锁。 | ||||||
| 	s.progressTracker.Lock(planLogID) | 	s.progressTracker.Lock(planExecutionLogID) | ||||||
| 	defer s.progressTracker.Unlock(planLogID) | 	defer s.progressTracker.Unlock(planExecutionLogID) | ||||||
|  |  | ||||||
| 	// 2. 在持有锁的情况下,将任务安全地放回队列。 | 	// 2. 在持有锁的情况下,将任务安全地放回队列。 | ||||||
| 	if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil { | 	if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil { | ||||||
| @@ -198,7 +197,7 @@ func (s *Scheduler) handleRequeue(taskToRequeue *models.PendingTask) { | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	s.logger.Printf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planLogID) | 	s.logger.Printf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID) | ||||||
| } | } | ||||||
|  |  | ||||||
| // processTask 处理单个任务的逻辑 (当前为占位符) | // processTask 处理单个任务的逻辑 (当前为占位符) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user