package task import ( "context" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils" ) // AnalysisPlanTaskManager 封装了创建和更新计划分析任务(即触发器)的逻辑。 // 这是一个可被 Scheduler 和其他应用服务(如 PlanService)共享的无状态组件。 type AnalysisPlanTaskManager struct { planRepo repository.PlanRepository pendingTaskRepo repository.PendingTaskRepository executionLogRepo repository.ExecutionLogRepository logger *logs.Logger } // NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。 func NewAnalysisPlanTaskManager( planRepo repository.PlanRepository, pendingTaskRepo repository.PendingTaskRepository, executionLogRepo repository.ExecutionLogRepository, logger *logs.Logger, ) *AnalysisPlanTaskManager { return &AnalysisPlanTaskManager{ planRepo: planRepo, pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, logger: logger, } } // CreateOrUpdateTrigger 为给定的 planID 创建或更新其关联的下一次触发任务。 // 这个方法是幂等的,可以安全地被多次调用。 func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(ctx context.Context, planID uint) error { // 获取计划信息 plan, err := m.planRepo.GetBasicPlanByID(planID) if err != nil { m.logger.Errorf("[严重] 获取计划失败, 错误: %v", err) return err } // 获取触发任务 task, err := m.planRepo.FindPlanAnalysisTaskByParamsPlanID(planID) if err != nil { m.logger.Errorf("[严重] 获取计划解析任务失败, 错误: %v", err) return err } // 写入执行日志 taskLog := &models.TaskExecutionLog{ TaskID: task.ID, Status: models.ExecutionStatusWaiting, } if err := m.executionLogRepo.CreateTaskExecutionLogsInBatch([]*models.TaskExecutionLog{taskLog}); err != nil { m.logger.Errorf("[严重] 创建任务执行日志失败, 错误: %v", err) return err } // 写入待执行队列 next, err := utils.GetNextCronTime(plan.CronExpression) if err != nil { m.logger.Errorf("[严重] 执行时间解析失败, 错误: %v", err) return err } pendingTask := &models.PendingTask{ TaskID: task.ID, ExecuteAt: next, TaskExecutionLogID: taskLog.ID, } err = m.pendingTaskRepo.CreatePendingTasksInBatch([]*models.PendingTask{pendingTask}) if err != nil { m.logger.Errorf("[严重] 创建待执行任务失败, 错误: %v", err) return err } m.logger.Infof("成功为 Plan %d 创建/更新了下一次的触发任务,执行时间: %v", planID, next) return nil }