package task import ( "fmt" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils" ) // AnalysisPlanTaskManager 负责管理分析计划的触发器任务。 // 它确保数据库中可执行的计划在待执行队列中有对应的触发器,并移除无效的触发器。 // 这是一个有状态的组件,包含一个互斥锁以确保并发安全。 type AnalysisPlanTaskManager struct { planRepo repository.PlanRepository pendingTaskRepo repository.PendingTaskRepository executionLogRepo repository.ExecutionLogRepository logger *logs.Logger mu sync.Mutex } // NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。 func NewAnalysisPlanTaskManager( planRepo repository.PlanRepository, pendingTaskRepo repository.PendingTaskRepository, executionLogRepo repository.ExecutionLogRepository, logger *logs.Logger, ) *AnalysisPlanTaskManager { return &AnalysisPlanTaskManager{ planRepo: planRepo, pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, logger: logger, } } // Refresh 同步数据库中的计划状态和待执行队列中的触发器任务。 // 这是一个编排方法,将复杂的逻辑分解到多个内部方法中。 func (m *AnalysisPlanTaskManager) Refresh() error { m.mu.Lock() defer m.mu.Unlock() m.logger.Info("开始同步计划任务管理器...") // 1. 一次性获取所有需要的数据 runnablePlans, invalidPlanIDs, pendingTasks, err := m.getRefreshData() if err != nil { return fmt.Errorf("获取刷新数据失败: %w", err) } // 2. 清理所有与失效计划相关的待执行任务 if err := m.cleanupInvalidTasks(invalidPlanIDs, pendingTasks); err != nil { // 仅记录错误,清理失败不应阻止新任务的添加 m.logger.Errorf("清理无效任务时出错: %v", err) } // 3. 添加或更新触发器 if err := m.addOrUpdateTriggers(runnablePlans, pendingTasks); err != nil { return fmt.Errorf("添加或更新触发器时出错: %w", err) } m.logger.Info("计划任务管理器同步完成.") return nil } // CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。 // 这个方法是幂等的:如果一个有效的触发器已存在,它将不会重复创建。 func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error { m.mu.Lock() defer m.mu.Unlock() // 检查计划是否可执行 plan, err := m.planRepo.GetBasicPlanByID(planID) if err != nil { return fmt.Errorf("获取计划基本信息失败: %w", err) } if plan.Status != models.PlanStatusEnabled { return fmt.Errorf("计划 #%d 当前状态为 '%d',无法创建触发器", planID, plan.Status) } // 幂等性检查:如果触发器已存在,则直接返回 existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(planID) if err != nil { return fmt.Errorf("查找现有触发器失败: %w", err) } if existingTrigger != nil { m.logger.Infof("计划 #%d 的触发器已存在于待执行队列中,无需重复创建。", planID) return nil } m.logger.Infof("为计划 #%d 创建新的触发器...", planID) return m.createTriggerTask(plan) } // --- 内部私有方法 --- // getRefreshData 从数据库获取刷新所需的所有数据。 func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) { runnablePlans, err = m.planRepo.FindRunnablePlans() if err != nil { m.logger.Errorf("获取可执行计划列表失败: %v", err) return } invalidPlans, err := m.planRepo.FindDisabledAndStoppedPlans() if err != nil { m.logger.Errorf("获取失效计划列表失败: %v", err) return } invalidPlanIDs = make([]uint, len(invalidPlans)) for i, p := range invalidPlans { invalidPlanIDs[i] = p.ID } pendingTasks, err = m.pendingTaskRepo.FindAllPendingTasks() if err != nil { m.logger.Errorf("获取所有待执行任务失败: %v", err) return } return } // cleanupInvalidTasks 清理所有与失效计划相关的待执行任务。 func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error { if len(invalidPlanIDs) == 0 { return nil // 没有需要清理的计划 } invalidPlanIDSet := make(map[uint]struct{}, len(invalidPlanIDs)) for _, id := range invalidPlanIDs { invalidPlanIDSet[id] = struct{}{} } var tasksToDeleteIDs []uint var logsToCancelIDs []uint for _, pt := range allPendingTasks { if pt.Task == nil { // 防御性编程,确保 Task 被预加载 continue } if _, isInvalid := invalidPlanIDSet[pt.Task.PlanID]; isInvalid { tasksToDeleteIDs = append(tasksToDeleteIDs, pt.ID) logsToCancelIDs = append(logsToCancelIDs, pt.TaskExecutionLogID) } } if len(tasksToDeleteIDs) == 0 { return nil // 没有找到需要清理的任务 } m.logger.Infof("准备从待执行队列中清理 %d 个与失效计划相关的任务...", len(tasksToDeleteIDs)) // 批量删除待执行任务 if err := m.pendingTaskRepo.DeletePendingTasksByIDs(tasksToDeleteIDs); err != nil { return fmt.Errorf("批量删除待执行任务失败: %w", err) } // 批量更新相关执行日志状态为“已取消” if err := m.executionLogRepo.UpdateLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil { // 这是一个非关键性错误,只记录日志 m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err) } return nil } // addOrUpdateTriggers 检查、更新或创建触发器。 func (m *AnalysisPlanTaskManager) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error { // 创建一个映射,存放所有已在队列中的计划触发器 pendingTriggersMap := make(map[uint]models.PendingTask) for _, pt := range allPendingTasks { if pt.Task != nil && pt.Task.Type == models.TaskPlanAnalysis { pendingTriggersMap[pt.Task.PlanID] = pt } } for _, plan := range runnablePlans { existingTrigger, exists := pendingTriggersMap[plan.ID] if exists { // --- 新增逻辑:检查并更新现有触发器 --- // 只对自动计划检查时间更新 if plan.ExecutionType == models.PlanExecutionTypeAutomatic { next, err := utils.GetNextCronTime(plan.CronExpression) if err != nil { m.logger.Errorf("为计划 #%d 解析Cron表达式失败,跳过更新: %v", plan.ID, err) continue } // 如果数据库中记录的执行时间与根据当前Cron表达式计算出的下一次时间不一致,则更新 if !existingTrigger.ExecuteAt.Equal(next) { m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, next) if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, next); err != nil { m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err) } } } } else { // --- 原有逻辑:为缺失的计划创建新触发器 --- m.logger.Infof("发现应执行但队列中缺失的计划 #%d,正在为其创建触发器...", plan.ID) if err := m.createTriggerTask(plan); err != nil { m.logger.Errorf("为计划 #%d 创建触发器失败: %v", plan.ID, err) // 继续处理下一个,不因单点失败而中断 } } } return nil } // createTriggerTask 是创建触发器任务的内部核心逻辑。 func (m *AnalysisPlanTaskManager) createTriggerTask(plan *models.Plan) error { analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID) if err != nil { return fmt.Errorf("查找计划分析任务失败: %w", err) } // --- 如果触发器任务定义不存在,则自动创建 --- if analysisTask == nil { m.logger.Warnf("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID) newAnalysisTask, err := m.planRepo.CreatePlanAnalysisTask(plan) if err != nil { return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err) } analysisTask = newAnalysisTask m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义 (ID: %d)", plan.ID, analysisTask.ID) } var executeAt time.Time if plan.ExecutionType == models.PlanExecutionTypeManual { executeAt = time.Now() } else { next, err := utils.GetNextCronTime(plan.CronExpression) if err != nil { return fmt.Errorf("解析 Cron 表达式 '%s' 失败: %w", plan.CronExpression, err) } executeAt = next } taskLog := &models.TaskExecutionLog{ TaskID: analysisTask.ID, Status: models.ExecutionStatusWaiting, } if err := m.executionLogRepo.CreateTaskExecutionLog(taskLog); err != nil { return fmt.Errorf("创建任务执行日志失败: %w", err) } pendingTask := &models.PendingTask{ TaskID: analysisTask.ID, ExecuteAt: executeAt, TaskExecutionLogID: taskLog.ID, } if err := m.pendingTaskRepo.CreatePendingTask(pendingTask); err != nil { return fmt.Errorf("创建待执行任务失败: %w", err) } m.logger.Infof("成功为计划 #%d 创建触发器 (任务ID: %d),执行时间: %v", plan.ID, analysisTask.ID, executeAt) return nil }