From 056279bdc22dfffc87429a8b42bfe07d87c619ab Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Sat, 20 Sep 2025 22:32:00 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84AnalysisPlanTaskManager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/controller/plan/plan_controller.go | 12 +-- .../task/analysis_plan_task_manager.go | 65 +++++++++++++++- internal/app/service/task/scheduler.go | 74 ++++++++++++------- 3 files changed, 116 insertions(+), 35 deletions(-) diff --git a/internal/app/controller/plan/plan_controller.go b/internal/app/controller/plan/plan_controller.go index fc5bb08..5eb85bd 100644 --- a/internal/app/controller/plan/plan_controller.go +++ b/internal/app/controller/plan/plan_controller.go @@ -138,10 +138,10 @@ func (c *Controller) CreatePlan(ctx *gin.Context) { return } - // 创建成功后,调用 manager 创建或更新触发器 - if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToCreate.ID); err != nil { + // 创建成功后,调用 manager 确保触发器任务定义存在,但不立即加入待执行队列 + if err := c.analysisPlanTaskManager.EnsureAnalysisTaskDefinition(planToCreate.ID); err != nil { // 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功 - c.logger.Errorf("为新创建的计划 %d 创建触发器失败: %v", planToCreate.ID, err) + c.logger.Errorf("为新创建的计划 %d 确保触发器任务定义失败: %v", planToCreate.ID, err) } // 使用已有的转换函数将创建后的模型转换为响应对象 @@ -287,10 +287,10 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) { return } - // 更新成功后,调用 manager 创建或更新触发器 - if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToUpdate.ID); err != nil { + // 更新成功后,调用 manager 确保触发器任务定义存在 + if err := c.analysisPlanTaskManager.EnsureAnalysisTaskDefinition(planToUpdate.ID); err != nil { // 这是一个非阻塞性错误,我们只记录日志 - c.logger.Errorf("为更新后的计划 %d 更新触发器失败: %v", planToUpdate.ID, err) + c.logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err) } // 6. 获取更新后的完整计划用于响应 diff --git a/internal/app/service/task/analysis_plan_task_manager.go b/internal/app/service/task/analysis_plan_task_manager.go index 2fc1601..6576239 100644 --- a/internal/app/service/task/analysis_plan_task_manager.go +++ b/internal/app/service/task/analysis_plan_task_manager.go @@ -68,6 +68,7 @@ func (m *AnalysisPlanTaskManager) Refresh() error { // CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。 // 这个方法是幂等的:如果一个有效的触发器已存在,它将不会重复创建。 +// 关键修改:如果触发器已存在,会根据计划类型更新其执行时间。 func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error { m.mu.Lock() defer m.mu.Unlock() @@ -78,23 +79,79 @@ func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error { return fmt.Errorf("获取计划基本信息失败: %w", err) } if plan.Status != models.PlanStatusEnabled { - return fmt.Errorf("计划 #%d 当前状态为 '%d',无法创建触发器", planID, plan.Status) + return fmt.Errorf("计划 #%d 当前状态为 '%d',无法创建或更新触发器", planID, plan.Status) } - // 幂等性检查:如果触发器已存在,则直接返回 + // 查找现有触发器 existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(planID) if err != nil { return fmt.Errorf("查找现有触发器失败: %w", err) } + + // 如果触发器已存在,则根据计划类型更新其执行时间 if existingTrigger != nil { - m.logger.Infof("计划 #%d 的触发器已存在于待执行队列中,无需重复创建。", planID) - return nil + var expectedExecuteAt time.Time + if plan.ExecutionType == models.PlanExecutionTypeManual { + // 手动计划,如果再次触发,则立即执行 + expectedExecuteAt = time.Now() + } else { // 自动计划 + // 自动计划,根据 Cron 表达式计算下一次执行时间 + next, err := utils.GetNextCronTime(plan.CronExpression) + if err != nil { + m.logger.Errorf("为计划 #%d 解析Cron表达式失败,无法更新触发器: %v", plan.ID, err) + return fmt.Errorf("解析 Cron 表达式失败: %w", err) + } + expectedExecuteAt = next + } + + // 如果计算出的执行时间与当前待执行任务的时间不一致,则更新 + if !existingTrigger.ExecuteAt.Equal(expectedExecuteAt) { + m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, expectedExecuteAt) + if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, expectedExecuteAt); err != nil { + m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err) + return fmt.Errorf("更新触发器执行时间失败: %w", err) + } + } else { + m.logger.Infof("计划 #%d 的触发器已存在且执行时间无需更新。", plan.ID) + } + return nil // 触发器已存在且已处理更新,直接返回 } + // 如果触发器不存在,则创建新的触发器 m.logger.Infof("为计划 #%d 创建新的触发器...", planID) return m.createTriggerTask(plan) } +// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。 +// 如果不存在,则会自动创建。此方法不涉及待执行队列。 +func (m *AnalysisPlanTaskManager) EnsureAnalysisTaskDefinition(planID uint) error { + m.mu.Lock() + defer m.mu.Unlock() + + plan, err := m.planRepo.GetBasicPlanByID(planID) + if err != nil { + return fmt.Errorf("确保分析任务定义失败:获取计划 #%d 基本信息时出错: %w", planID, err) + } + + analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID) + if err != nil { + return fmt.Errorf("确保分析任务定义失败:查找计划 #%d 的分析任务时出错: %w", plan.ID, err) + } + + if analysisTask == nil { + m.logger.Infof("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID) + _, err := m.planRepo.CreatePlanAnalysisTask(plan) // CreatePlanAnalysisTask returns *models.Task, error + if err != nil { + return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err) + } + m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义。", plan.ID) + } else { + m.logger.Infof("计划 #%d 的 'plan_analysis' 任务定义已存在。", plan.ID) + } + + return nil +} + // --- 内部私有方法 --- // getRefreshData 从数据库获取刷新所需的所有数据。 diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index 18523bf..e12300e 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -1,7 +1,7 @@ package task import ( - "context" + "encoding/json" "errors" "sync" "time" @@ -112,10 +112,9 @@ type Scheduler struct { progressTracker *ProgressTracker taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 - pool *ants.Pool // 使用 ants 协程池来管理并发 - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc + pool *ants.Pool // 使用 ants 协程池来管理并发 + wg sync.WaitGroup + stopChan chan struct{} // 用于停止主循环的信号通道 } // NewScheduler 创建一个新的调度器实例 @@ -128,8 +127,6 @@ func NewScheduler( logger *logs.Logger, interval time.Duration, numWorkers int) *Scheduler { - ctx, cancel := context.WithCancel(context.Background()) - return &Scheduler{ pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, @@ -140,8 +137,7 @@ func NewScheduler( workers: numWorkers, progressTracker: NewProgressTracker(), taskFactory: taskFactory, - ctx: ctx, - cancel: cancel, + stopChan: make(chan struct{}), // 初始化停止信号通道 } } @@ -164,9 +160,9 @@ func (s *Scheduler) Start() { // Stop 优雅地停止调度器 func (s *Scheduler) Stop() { s.logger.Warnf("正在停止任务调度器...") - s.cancel() // 1. 发出取消信号,停止主循环 - s.wg.Wait() // 2. 等待主循环完成 - s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕) + close(s.stopChan) // 1. 发出停止信号,停止主循环 + s.wg.Wait() // 2. 等待主循环完成 + s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕) s.logger.Warnf("任务调度器已安全停止") } @@ -178,9 +174,11 @@ func (s *Scheduler) run() { for { select { - case <-s.ctx.Done(): + case <-s.stopChan: + // 收到停止信号,退出循环 return case <-ticker.C: + // 定时触发任务认领和提交 go s.claimAndSubmit() } } @@ -256,10 +254,26 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { // 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行 if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) { - // 调用共享的 Manager 来处理触发器更新逻辑 - err = s.analysisPlanTaskManager.CreateOrUpdateTrigger(s.ctx, claimedLog.Task.PlanID) + // --- 新增逻辑:更新计划执行次数并判断是否需要触发下一次执行 --- + planID := claimedLog.Task.PlanID + + // 获取计划的最新数据 + plan, err := s.planRepo.GetBasicPlanByID(planID) if err != nil { - s.logger.Errorf("[严重] 创建计划分析任务失败, 当前Plan(%v)将无法进行下次触发, 错误: %v", claimedLog.Task.PlanID, err) + s.logger.Errorf("获取计划 %d 的基本信息失败: %v", planID, err) + return + } + + // 更新计划的执行计数器 + plan.ExecuteCount++ + if err := s.planRepo.UpdatePlan(plan); err != nil { // UpdatePlan 可以更新整个 Plan 对象 + s.logger.Errorf("更新计划 %d 的执行计数失败: %v", planID, err) + return + } + + // 调用共享的 Manager 来处理触发器更新逻辑 + if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(planID); err != nil { + s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", planID, err) } } @@ -301,8 +315,18 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { // 创建Plan执行记录 + // 从任务的 Parameters 中解析出真实的 PlanID + var params struct { + PlanID uint `json:"plan_id"` + } + if err := json.Unmarshal(claimedLog.Task.Parameters, ¶ms); err != nil { + s.logger.Errorf("解析任务参数中的计划ID失败,日志ID: %d, 错误: %v", claimedLog.ID, err) + return err + } + realPlanID := params.PlanID + planLog := &models.PlanExecutionLog{ - PlanID: claimedLog.Task.PlanID, + PlanID: realPlanID, // 使用从参数中解析出的真实 PlanID Status: models.ExecutionStatusStarted, StartedAt: time.Now(), } @@ -312,7 +336,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { } // 解析出Task列表 - tasks, err := s.planRepo.FlattenPlanTasks(claimedLog.Task.PlanID) + tasks, err := s.planRepo.FlattenPlanTasks(realPlanID) if err != nil { s.logger.Errorf("[严重] 解析计划失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) return err @@ -320,12 +344,12 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { // 写入执行历史 taskLogs := make([]*models.TaskExecutionLog, len(tasks)) - for _, task := range tasks { - taskLogs = append(taskLogs, &models.TaskExecutionLog{ + for i, task := range tasks { + taskLogs[i] = &models.TaskExecutionLog{ PlanExecutionLogID: planLog.ID, TaskID: task.ID, Status: models.ExecutionStatusWaiting, - }) + } } err = s.executionLogRepo.CreateTaskExecutionLogsInBatch(taskLogs) @@ -337,13 +361,13 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { // 写入待执行队列 pendingTasks := make([]*models.PendingTask, len(tasks)) for i, task := range tasks { - pendingTasks = append(pendingTasks, &models.PendingTask{ + pendingTasks[i] = &models.PendingTask{ TaskID: task.ID, - TaskExecutionLogID: pendingTasks[i].ID, + TaskExecutionLogID: taskLogs[i].ID, // 使用正确的 TaskExecutionLogID // 待执行队列是通过任务触发时间排序的, 且只要在调度器获取的时间点之前的都可以被触发 ExecuteAt: time.Now().Add(time.Duration(i) * time.Second), - }) + } } err = s.pendingTaskRepo.CreatePendingTasksInBatch(pendingTasks) if err != nil { @@ -352,7 +376,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { } // 将Task列表加入待执行队列中 - s.progressTracker.AddNewPlan(claimedLog.PlanExecutionLogID, len(tasks)) + s.progressTracker.AddNewPlan(planLog.ID, len(tasks)) return nil }