From 06f327518aaaa248a725ff7b8e989034421b29b7 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 17:19:39 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=BB=E5=8A=A1=E8=BF=9B?= =?UTF-8?q?=E5=BA=A6=E8=B7=9F=E8=B8=AA=E5=99=A8,=20=E6=94=B9=E4=B8=BA?= =?UTF-8?q?=E4=BB=8E=E6=95=B0=E6=8D=AE=E5=BA=93=E8=8E=B7=E5=8F=96=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E8=BF=9B=E5=BA=A6:?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 调整任务调度器 --- internal/app/service/task/scheduler.go | 122 +++++++++++++------------ 1 file changed, 65 insertions(+), 57 deletions(-) diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index cea91ee..0076f83 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -13,49 +13,22 @@ import ( "gorm.io/gorm" ) -// ProgressTracker 在内存中跟踪计划的执行状态,包括进度和执行锁 +// ProgressTracker 仅用于在内存中提供计划执行的并发锁 type ProgressTracker struct { - mu sync.Mutex - cond *sync.Cond // 用于实现阻塞锁 - totalTasks map[uint]int // key: planExecutionLogID, value: total tasks - completedTasks map[uint]int // key: planExecutionLogID, value: completed tasks - runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁) + mu sync.Mutex + cond *sync.Cond // 用于实现阻塞锁 + runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁) } // NewProgressTracker 创建一个新的进度跟踪器 func NewProgressTracker() *ProgressTracker { t := &ProgressTracker{ - totalTasks: make(map[uint]int), - completedTasks: make(map[uint]int), - runningPlans: make(map[uint]bool), + runningPlans: make(map[uint]bool), } t.cond = sync.NewCond(&t.mu) return t } -// AddNewPlan 添加一个新的计划,并初始化进度跟踪器 -func (t *ProgressTracker) AddNewPlan(planLogID uint, totalTasks int) { - t.mu.Lock() - t.totalTasks[planLogID] = totalTasks - t.completedTasks[planLogID] = 0 - t.mu.Unlock() -} - -// CompletedTask 通知计数器一个任务被完成 -func (t *ProgressTracker) CompletedTask(planLogID uint) { - t.mu.Lock() - t.completedTasks[planLogID]++ - t.mu.Unlock() -} - -// IsPlanOver 检查计划是否完成 -func (t *ProgressTracker) IsPlanOver(planLogID uint) bool { - t.mu.Lock() - defer t.mu.Unlock() - - return t.completedTasks[planLogID] >= t.totalTasks[planLogID] -} - // TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。 func (t *ProgressTracker) TryLock(planLogID uint) bool { t.mu.Lock() @@ -248,32 +221,44 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { if err != nil { claimedLog.Status = models.ExecutionStatusFailed claimedLog.Output = err.Error() + + // 任务失败时,调用统一的终止服务 + s.handlePlanTermination(claimedLog.PlanExecutionLogID, "子任务执行失败: "+err.Error()) return } + + // 如果是计划分析任务,它的职责是解析和分发任务,到此即完成,不参与后续的计划完成度检查。 + if claimedLog.Task.Type == models.TaskPlanAnalysis { + s.logger.Warnf("完成计划分析任务, 日志ID: %d", claimedLog.ID) + return + } + + // --- 以下是常规任务的完成逻辑 --- s.logger.Warnf("完成任务, 日志ID: %d", claimedLog.ID) - // 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行 - if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) { - // --- 新增逻辑:更新计划执行次数并判断是否需要触发下一次执行 --- - var planID uint - // 根据任务类型获取正确的 PlanID - if claimedLog.Task.Type == models.TaskPlanAnalysis { - var params struct { - PlanID uint `json:"plan_id"` - } - if err := json.Unmarshal(claimedLog.Task.Parameters, ¶ms); err != nil { - s.logger.Errorf("解析任务参数中的计划ID失败,日志ID: %d, 错误: %v", claimedLog.ID, err) - return - } - planID = params.PlanID - } else { - planID = claimedLog.Task.PlanID + // 检查是否是最后一个任务 + incompleteCount, err := s.executionLogRepo.CountIncompleteTasksByPlanLogID(claimedLog.PlanExecutionLogID) + if err != nil { + s.logger.Errorf("检查计划 %d 的未完成任务数时出错: %v", claimedLog.PlanExecutionLogID, err) + return + } + + // 如果此计划执行中已没有其他“等待中”或“进行中”的任务,则认为计划已完成 + if incompleteCount == 0 { + s.logger.Infof("计划执行 %d 的所有任务已完成,开始处理计划完成逻辑...", claimedLog.PlanExecutionLogID) + + // 通过 PlanExecutionLog 反查正确的顶层 PlanID + planExecutionLog, err := s.executionLogRepo.FindPlanExecutionLogByID(claimedLog.PlanExecutionLogID) + if err != nil { + s.logger.Errorf("获取计划执行日志 %d 失败: %v", claimedLog.PlanExecutionLogID, err) + return } + planID := planExecutionLog.PlanID // 这才是正确的顶层计划ID // 获取计划的最新数据 - plan, err := s.planRepo.GetPlanByID(planID) // Changed to GetPlanByID to include sub-plans + plan, err := s.planRepo.GetPlanByID(planID) if err != nil { - s.logger.Errorf("获取计划 %d 的完整信息失败: %v", planID, err) // Updated error message + s.logger.Errorf("获取计划 %d 的完整信息失败: %v", planID, err) return } @@ -287,7 +272,7 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { } // 保存更新后的计划状态和执行计数 - if err := s.planRepo.UpdatePlan(plan); err != nil { // UpdatePlan 可以更新整个 Plan 对象 + if err := s.planRepo.UpdatePlan(plan); err != nil { s.logger.Errorf("更新计划 %d 的执行计数和状态失败: %v", planID, err) return } @@ -295,15 +280,13 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { // 更新计划执行日志状态为完成 if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(claimedLog.PlanExecutionLogID, models.ExecutionStatusCompleted); err != nil { s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", claimedLog.PlanExecutionLogID, err) - // 这是一个非阻塞性错误,不中断后续流程 } - // 调用共享的 Manager 来处理触发器更新逻辑 (Manager 会根据最新的 Plan 状态决定是否创建新触发器) + // 调用共享的 Manager 来处理触发器更新逻辑 if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(planID); err != nil { s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", planID, err) } } - } // runTask 用于执行具体任务 @@ -402,9 +385,6 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { return err } - // 将Task列表加入待执行队列中 - s.progressTracker.AddNewPlan(planLog.ID, len(tasks)) - return nil } @@ -419,3 +399,31 @@ func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutio return nil } + +// handlePlanTermination 集中处理计划的终止逻辑(失败或取消) +func (s *Scheduler) handlePlanTermination(planLogID uint, reason string) { + // 1. 从待执行队列中删除所有相关的子任务 + if err := s.pendingTaskRepo.DeletePendingTasksByPlanLogID(planLogID); err != nil { + s.logger.Errorf("从待执行队列中删除计划 %d 的后续任务时出错: %v", planLogID, err) + } + + // 2. 将父计划的执行日志标记为失败 + if err := s.executionLogRepo.FailPlanExecution(planLogID, reason); err != nil { + s.logger.Errorf("标记计划执行日志 %d 为失败时出错: %v", planLogID, err) + } + + // 3. 将所有未完成的子任务日志标记为已取消 + if err := s.executionLogRepo.CancelIncompleteTasksByPlanLogID(planLogID, "父计划失败或被取消"); err != nil { + s.logger.Errorf("取消计划 %d 的后续任务日志时出错: %v", planLogID, err) + } + + // 4. 将计划本身的状态更新为失败 + planLog, err := s.executionLogRepo.FindPlanExecutionLogByID(planLogID) + if err != nil { + s.logger.Errorf("无法找到计划执行日志 %d 以更新父计划状态: %v", planLogID, err) + return + } + if err := s.planRepo.UpdatePlanStatus(planLog.PlanID, models.PlanStatusFailed); err != nil { + s.logger.Errorf("更新计划 %d 状态为 '失败' 时出错: %v", planLog.PlanID, err) + } +}