diff --git a/internal/app/service/task/analysis_plan_task_manager.go b/internal/app/service/task/analysis_plan_task_manager.go index 093490b..2fc1601 100644 --- a/internal/app/service/task/analysis_plan_task_manager.go +++ b/internal/app/service/task/analysis_plan_task_manager.go @@ -57,9 +57,9 @@ func (m *AnalysisPlanTaskManager) Refresh() error { m.logger.Errorf("清理无效任务时出错: %v", err) } - // 3. 为应执行但缺失的计划添加新触发器 - if err := m.addNewTriggers(runnablePlans, pendingTasks); err != nil { - return fmt.Errorf("添加新触发器时出错: %w", err) + // 3. 添加或更新触发器 + if err := m.addOrUpdateTriggers(runnablePlans, pendingTasks); err != nil { + return fmt.Errorf("添加或更新触发器时出错: %w", err) } m.logger.Info("计划任务管理器同步完成.") @@ -167,18 +167,38 @@ func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, all return nil } -// addNewTriggers 检查并为应执行但缺失的计划添加新触发器。 -func (m *AnalysisPlanTaskManager) addNewTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error { - // 创建一个集合,存放所有已在队列中的计划触发器 - pendingTriggerPlanIDs := make(map[uint]struct{}) +// addOrUpdateTriggers 检查、更新或创建触发器。 +func (m *AnalysisPlanTaskManager) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error { + // 创建一个映射,存放所有已在队列中的计划触发器 + pendingTriggersMap := make(map[uint]models.PendingTask) for _, pt := range allPendingTasks { if pt.Task != nil && pt.Task.Type == models.TaskPlanAnalysis { - pendingTriggerPlanIDs[pt.Task.PlanID] = struct{}{} + pendingTriggersMap[pt.Task.PlanID] = pt } } for _, plan := range runnablePlans { - if _, exists := pendingTriggerPlanIDs[plan.ID]; !exists { + existingTrigger, exists := pendingTriggersMap[plan.ID] + + if exists { + // --- 新增逻辑:检查并更新现有触发器 --- + // 只对自动计划检查时间更新 + if plan.ExecutionType == models.PlanExecutionTypeAutomatic { + next, err := utils.GetNextCronTime(plan.CronExpression) + if err != nil { + m.logger.Errorf("为计划 #%d 解析Cron表达式失败,跳过更新: %v", plan.ID, err) + continue + } + // 如果数据库中记录的执行时间与根据当前Cron表达式计算出的下一次时间不一致,则更新 + if !existingTrigger.ExecuteAt.Equal(next) { + m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, next) + if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, next); err != nil { + m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err) + } + } + } + } else { + // --- 原有逻辑:为缺失的计划创建新触发器 --- m.logger.Infof("发现应执行但队列中缺失的计划 #%d,正在为其创建触发器...", plan.ID) if err := m.createTriggerTask(plan); err != nil { m.logger.Errorf("为计划 #%d 创建触发器失败: %v", plan.ID, err) @@ -195,8 +215,16 @@ func (m *AnalysisPlanTaskManager) createTriggerTask(plan *models.Plan) error { if err != nil { return fmt.Errorf("查找计划分析任务失败: %w", err) } + + // --- 如果触发器任务定义不存在,则自动创建 --- if analysisTask == nil { - return fmt.Errorf("未找到计划 #%d 关联的 'plan_analysis' 任务", plan.ID) + m.logger.Warnf("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID) + newAnalysisTask, err := m.planRepo.CreatePlanAnalysisTask(plan) + if err != nil { + return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err) + } + analysisTask = newAnalysisTask + m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义 (ID: %d)", plan.ID, analysisTask.ID) } var executeAt time.Time diff --git a/internal/infra/repository/pending_task_repository.go b/internal/infra/repository/pending_task_repository.go index 166496b..cbce386 100644 --- a/internal/infra/repository/pending_task_repository.go +++ b/internal/infra/repository/pending_task_repository.go @@ -2,6 +2,7 @@ package repository import ( "errors" + "fmt" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" @@ -16,6 +17,10 @@ type PendingTaskRepository interface { DeletePendingTasksByIDs(ids []uint) error CreatePendingTask(task *models.PendingTask) error CreatePendingTasksInBatch(tasks []*models.PendingTask) error + + // UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间 + UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error + // ClaimNextAvailableTask 原子地认领下一个可用的任务。 // 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。 ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) @@ -42,9 +47,10 @@ func (r *gormPendingTaskRepository) FindAllPendingTasks() ([]models.PendingTask, func (r *gormPendingTaskRepository) FindPendingTriggerByPlanID(planID uint) (*models.PendingTask, error) { var pendingTask models.PendingTask + // 关键修改:通过 JOIN tasks 表并查询 parameters JSON 字段来查找触发器,而不是依赖 task.plan_id err := r.db. Joins("JOIN tasks ON tasks.id = pending_tasks.task_id"). - Where("tasks.plan_id = ? AND tasks.type = ?", planID, models.TaskPlanAnalysis). + Where("tasks.type = ? AND tasks.parameters->>'plan_id' = ?", models.TaskPlanAnalysis, fmt.Sprintf("%d", planID)). First(&pendingTask).Error if errors.Is(err, gorm.ErrRecordNotFound) { return nil, nil // 未找到不是错误 @@ -71,6 +77,11 @@ func (r *gormPendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.Pe return r.db.Create(&tasks).Error } +// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间 +func (r *gormPendingTaskRepository) UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error { + return r.db.Model(&models.PendingTask{}).Where("id = ?", id).Update("execute_at", executeAt).Error +} + // ClaimNextAvailableTask 以原子方式认领下一个可用的任务。 func (r *gormPendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) { var log models.TaskExecutionLog diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index 66ebd8d..ee2e760 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -48,6 +48,9 @@ type PlanRepository interface { FindDisabledAndStoppedPlans() ([]*models.Plan, error) // FindPlanAnalysisTaskByPlanID 根据 PlanID 找到其关联的 'plan_analysis' 任务 FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error) + + // CreatePlanAnalysisTask 创建一个 plan_analysis 类型的任务并返回它 + CreatePlanAnalysisTask(plan *models.Plan) (*models.Task, error) } // gormPlanRepository 是 PlanRepository 的 GORM 实现 @@ -180,7 +183,9 @@ func (r *gormPlanRepository) CreatePlan(plan *models.Plan) error { } // 3. 创建触发器Task - if err := r.createPlanAnalysisTask(tx, plan); err != nil { + // 关键修改:调用 createPlanAnalysisTask 并处理其返回的 Task 对象 + _, err := r.createPlanAnalysisTask(tx, plan) + if err != nil { return err } return nil @@ -523,52 +528,33 @@ func (r *gormPlanRepository) deleteTask(tx *gorm.DB, id int) error { // FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task func (r *gormPlanRepository) FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) { - return r.findPlanAnalysisTaskByParamsPlanID(r.db, paramsPlanID) -} - -// findPlanAnalysisTaskByParamsPlanID 使用指定db根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task -func (r *gormPlanRepository) findPlanAnalysisTaskByParamsPlanID(tx *gorm.DB, paramsPlanID uint) (*models.Task, error) { - var task models.Task - - // 构造JSON查询条件,查找Parameters中包含指定ParamsPlanID且Type为TaskPlanAnalysis的任务 - // TODO 在JSON字段中查找特定键值的语法取决于数据库类型,这里使用PostgreSQL的语法 - // TODO 如果使用的是MySQL,则需要相应调整查询条件 - result := tx.Where( - "type = ? AND parameters->>'plan_id' = ?", - models.TaskPlanAnalysis, - fmt.Sprintf("%d", paramsPlanID), - ).First(&task) - - if result.Error != nil { - if errors.Is(result.Error, gorm.ErrRecordNotFound) { - return nil, fmt.Errorf("未找到Parameters.PlanID为%d的TaskPlanAnalysis类型任务", paramsPlanID) - } - return nil, fmt.Errorf("查找任务时出错: %w", result.Error) - } - - return &task, nil + return r.findPlanAnalysisTask(r.db, paramsPlanID) } // createPlanAnalysisTask 用于创建一个TaskPlanAnalysis类型的Task -func (r *gormPlanRepository) createPlanAnalysisTask(tx *gorm.DB, plan *models.Plan) error { +// 关键修改:Task.PlanID 设置为 0,实际 PlanID 存储在 Parameters 中,并返回创建的 Task +func (r *gormPlanRepository) createPlanAnalysisTask(tx *gorm.DB, plan *models.Plan) (*models.Task, error) { m := map[string]interface{}{ models.ParamsPlanID: plan.ID, } parameters, err := json.Marshal(m) if err != nil { - return err + return nil, err } task := &models.Task{ - PlanID: plan.ID, - Name: fmt.Sprintf("'%v'计划触发器", plan.Name), - Description: fmt.Sprintf("计划名: %v, 计划ID: %v", plan.Name, plan.ID), - ExecutionOrder: 0, + PlanID: 0, // 关键:设置为 0,避免被常规 PlanID 查询查到 + Name: fmt.Sprintf("'%s'计划触发器", plan.Name), + Description: fmt.Sprintf("计划名: %s, 计划ID: %d", plan.Name, plan.ID), + ExecutionOrder: 0, // 触发器任务的执行顺序通常为0或不关心 Type: models.TaskPlanAnalysis, Parameters: datatypes.JSON(parameters), } - return tx.Create(task).Error + if err := tx.Create(task).Error; err != nil { + return nil, err + } + return task, nil } // updatePlanAnalysisTask 使用更安全的方式更新触发器任务 @@ -580,7 +566,8 @@ func (r *gormPlanRepository) updatePlanAnalysisTask(tx *gorm.DB, plan *models.Pl // 如果触发器任务不存在,则创建一个 if task == nil { - return r.createPlanAnalysisTask(tx, plan) + _, err := r.createPlanAnalysisTask(tx, plan) + return err } // 如果存在,则更新它的名称和描述以反映计划的最新信息 @@ -611,9 +598,10 @@ func (r *gormPlanRepository) FindDisabledAndStoppedPlans() ([]*models.Plan, erro } // findPlanAnalysisTask 是一个内部使用的、更高效的查找方法 +// 关键修改:通过查询 parameters JSON 字段来查找 func (r *gormPlanRepository) findPlanAnalysisTask(tx *gorm.DB, planID uint) (*models.Task, error) { var task models.Task - err := tx.Where("plan_id = ? AND type = ?", planID, models.TaskPlanAnalysis).First(&task).Error + err := tx.Where("type = ? AND parameters->>'plan_id' = ?", models.TaskPlanAnalysis, fmt.Sprintf("%d", planID)).First(&task).Error if errors.Is(err, gorm.ErrRecordNotFound) { return nil, nil // 未找到不是错误,返回nil, nil } @@ -621,6 +609,19 @@ func (r *gormPlanRepository) findPlanAnalysisTask(tx *gorm.DB, planID uint) (*mo } // FindPlanAnalysisTaskByPlanID 是暴露给外部的公共方法 +// 关键修改:通过查询 parameters JSON 字段来查找 func (r *gormPlanRepository) FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error) { return r.findPlanAnalysisTask(r.db, planID) } + +// CreatePlanAnalysisTask 创建一个 plan_analysis 类型的任务并返回它 +// 这个方法是公开的,主要由 TaskManager 在发现触发器任务定义丢失时调用。 +func (r *gormPlanRepository) CreatePlanAnalysisTask(plan *models.Plan) (*models.Task, error) { + var createdTask *models.Task + err := r.db.Transaction(func(tx *gorm.DB) error { + var err error + createdTask, err = r.createPlanAnalysisTask(tx, plan) + return err + }) + return createdTask, err +}