diff --git a/internal/app/service/task/analysis_plan_task_manager.go b/internal/app/service/task/analysis_plan_task_manager.go new file mode 100644 index 0000000..1d4dee6 --- /dev/null +++ b/internal/app/service/task/analysis_plan_task_manager.go @@ -0,0 +1,82 @@ +package task + +import ( + "context" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils" +) + +// AnalysisPlanTaskManager 封装了创建和更新计划分析任务(即触发器)的逻辑。 +// 这是一个可被 Scheduler 和其他应用服务(如 PlanService)共享的无状态组件。 +type AnalysisPlanTaskManager struct { + planRepo repository.PlanRepository + pendingTaskRepo repository.PendingTaskRepository + executionLogRepo repository.ExecutionLogRepository + logger *logs.Logger +} + +// NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。 +func NewAnalysisPlanTaskManager( + planRepo repository.PlanRepository, + pendingTaskRepo repository.PendingTaskRepository, + executionLogRepo repository.ExecutionLogRepository, + logger *logs.Logger, +) *AnalysisPlanTaskManager { + return &AnalysisPlanTaskManager{ + planRepo: planRepo, + pendingTaskRepo: pendingTaskRepo, + executionLogRepo: executionLogRepo, + logger: logger, + } +} + +// CreateOrUpdateTrigger 为给定的 planID 创建或更新其关联的下一次触发任务。 +// 这个方法是幂等的,可以安全地被多次调用。 +func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(ctx context.Context, planID uint) error { + // 获取计划信息 + plan, err := m.planRepo.GetBasicPlanByID(planID) + if err != nil { + m.logger.Errorf("[严重] 获取计划失败, 错误: %v", err) + return err + } + + // 获取触发任务 + task, err := m.planRepo.FindPlanAnalysisTaskByParamsPlanID(planID) + if err != nil { + m.logger.Errorf("[严重] 获取计划解析任务失败, 错误: %v", err) + return err + } + + // 写入执行日志 + taskLog := &models.TaskExecutionLog{ + TaskID: task.ID, + Status: models.ExecutionStatusWaiting, + } + if err := m.executionLogRepo.CreateTaskExecutionLogsInBatch([]*models.TaskExecutionLog{taskLog}); err != nil { + m.logger.Errorf("[严重] 创建任务执行日志失败, 错误: %v", err) + return err + } + + // 写入待执行队列 + next, err := utils.GetNextCronTime(plan.CronExpression) + if err != nil { + m.logger.Errorf("[严重] 执行时间解析失败, 错误: %v", err) + return err + } + pendingTask := &models.PendingTask{ + TaskID: task.ID, + ExecuteAt: next, + TaskExecutionLogID: taskLog.ID, + } + err = m.pendingTaskRepo.CreatePendingTasksInBatch([]*models.PendingTask{pendingTask}) + if err != nil { + m.logger.Errorf("[严重] 创建待执行任务失败, 错误: %v", err) + return err + } + + m.logger.Infof("成功为 Plan %d 创建/更新了下一次的触发任务,执行时间: %v", planID, next) + return nil +} diff --git a/internal/infra/task/plan_analysis_task.go b/internal/app/service/task/plan_analysis_task.go similarity index 100% rename from internal/infra/task/plan_analysis_task.go rename to internal/app/service/task/plan_analysis_task.go diff --git a/internal/infra/task/scheduler.go b/internal/app/service/task/scheduler.go similarity index 83% rename from internal/infra/task/scheduler.go rename to internal/app/service/task/scheduler.go index d2be868..18523bf 100644 --- a/internal/infra/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -9,7 +9,6 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils" "github.com/panjf2000/ants/v2" "gorm.io/gorm" ) @@ -103,14 +102,15 @@ func (t *ProgressTracker) GetRunningPlanIDs() []uint { // Scheduler 是核心的、持久化的任务调度器 type Scheduler struct { - logger *logs.Logger - pollingInterval time.Duration - workers int - pendingTaskRepo repository.PendingTaskRepository - executionLogRepo repository.ExecutionLogRepository - planRepo repository.PlanRepository - progressTracker *ProgressTracker - taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 + logger *logs.Logger + pollingInterval time.Duration + workers int + pendingTaskRepo repository.PendingTaskRepository + executionLogRepo repository.ExecutionLogRepository + planRepo repository.PlanRepository + analysisPlanTaskManager *AnalysisPlanTaskManager // <--- 注入共享的 Manager + progressTracker *ProgressTracker + taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 pool *ants.Pool // 使用 ants 协程池来管理并发 wg sync.WaitGroup @@ -123,6 +123,7 @@ func NewScheduler( pendingTaskRepo repository.PendingTaskRepository, executionLogRepo repository.ExecutionLogRepository, planRepo repository.PlanRepository, + analysisPlanTaskManager *AnalysisPlanTaskManager, // <--- 注入 Manager taskFactory func(taskType models.TaskType) Task, logger *logs.Logger, interval time.Duration, @@ -130,16 +131,17 @@ func NewScheduler( ctx, cancel := context.WithCancel(context.Background()) return &Scheduler{ - pendingTaskRepo: pendingTaskRepo, - executionLogRepo: executionLogRepo, - planRepo: planRepo, - logger: logger, - pollingInterval: interval, - workers: numWorkers, - progressTracker: NewProgressTracker(), - taskFactory: taskFactory, - ctx: ctx, - cancel: cancel, + pendingTaskRepo: pendingTaskRepo, + executionLogRepo: executionLogRepo, + planRepo: planRepo, + analysisPlanTaskManager: analysisPlanTaskManager, // <--- 注入 Manager + logger: logger, + pollingInterval: interval, + workers: numWorkers, + progressTracker: NewProgressTracker(), + taskFactory: taskFactory, + ctx: ctx, + cancel: cancel, } } @@ -254,7 +256,8 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { // 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行 if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) { - err = s.createNewAnalysisPlanTask(claimedLog.Task.PlanID) + // 调用共享的 Manager 来处理触发器更新逻辑 + err = s.analysisPlanTaskManager.CreateOrUpdateTrigger(s.ctx, claimedLog.Task.PlanID) if err != nil { s.logger.Errorf("[严重] 创建计划分析任务失败, 当前Plan(%v)将无法进行下次触发, 错误: %v", claimedLog.Task.PlanID, err) } @@ -365,48 +368,3 @@ func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutio return nil } - -// createNewAnalysisPlanTask 创建一个新的Plan解析任务用于下次触发plan执行 -func (s *Scheduler) createNewAnalysisPlanTask(planID uint) error { - // 获取计划信息 - plan, err := s.planRepo.GetBasicPlanByID(planID) - if err != nil { - s.logger.Errorf("[严重] 获取计划失败, 错误: %v", err) - return err - } - - // 获取触发任务 - task, err := s.planRepo.FindPlanAnalysisTaskByParamsPlanID(planID) - if err != nil { - s.logger.Errorf("[严重] 获取计划解析任务失败, 错误: %v", err) - return err - } - - // 写入执行日志 - taskLog := &models.TaskExecutionLog{ - TaskID: task.ID, - Status: models.ExecutionStatusWaiting, - } - if err := s.executionLogRepo.CreateTaskExecutionLogsInBatch([]*models.TaskExecutionLog{taskLog}); err != nil { - s.logger.Errorf("[严重] 创建任务执行日志失败, 错误: %v", err) - return err - } - - // 写入待执行队列 - next, err := utils.GetNextCronTime(plan.CronExpression) - if err != nil { - s.logger.Errorf("[严重] 执行时间解析失败, 错误: %v", err) - return err - } - pendingTask := &models.PendingTask{ - TaskID: task.ID, - ExecuteAt: next, - TaskExecutionLogID: taskLog.ID, - } - err = s.pendingTaskRepo.CreatePendingTasksInBatch([]*models.PendingTask{pendingTask}) - if err != nil { - s.logger.Errorf("[严重] 创建待执行任务失败, 错误: %v", err) - return err - } - return nil -} diff --git a/internal/app/service/task/task.go b/internal/app/service/task/task.go index b66a289..5426fc7 100644 --- a/internal/app/service/task/task.go +++ b/internal/app/service/task/task.go @@ -1,12 +1,29 @@ package task import ( + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/task" ) +// Task 定义了所有可被调度器执行的任务必须实现的接口。 +type Task interface { + // Execute 是任务的核心执行逻辑。 + // ctx: 用于控制任务的超时或取消。 + // log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。 + // 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 + Execute() error + + // ParseParams 解析及校验参数 + ParseParams(logger *logs.Logger, claimedLog *models.TaskExecutionLog) error + + // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 + // log: 任务执行的上下文。 + // executeErr: 从 Execute 方法返回的原始错误。 + OnFailure(executeErr error) +} + // TaskFactory 是一个任务组装工厂, 可以根据Task类型获取到对应的初始化函数 -var TaskFactory = func(tt models.TaskType) task.Task { +var TaskFactory = func(tt models.TaskType) Task { switch tt { case models.TaskTypeWaiting: return &DelayTask{} diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index efea8ce..f9d7a03 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -1,10 +1,12 @@ package repository import ( + "encoding/json" "errors" "fmt" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "gorm.io/datatypes" "gorm.io/gorm" ) @@ -170,6 +172,11 @@ func (r *gormPlanRepository) CreatePlan(plan *models.Plan) error { if err := tx.Create(plan).Error; err != nil { return err } + + // 3. 创建触发器Task + if err := r.createPlanAnalysisTask(tx, plan); err != nil { + return err + } return nil }) } @@ -186,7 +193,12 @@ func (r *gormPlanRepository) updatePlanTx(tx *gorm.DB, plan *models.Plan) error if err := r.validatePlanTree(tx, plan); err != nil { return err } - return r.reconcilePlanNode(tx, plan) + if err := r.reconcilePlanNode(tx, plan); err != nil { + return err + } + + // 更新Plan触发器 + return r.updatePlanAnalysisTask(tx, plan) } // validatePlanTree 对整个计划树进行全面的只读健康检查 @@ -461,51 +473,61 @@ func (r *gormPlanRepository) flattenPlanTasksRecursive(plan *models.Plan) ([]mod func (r *gormPlanRepository) DeleteTask(id int) error { // 使用事务确保操作的原子性 return r.db.Transaction(func(tx *gorm.DB) error { - // 1. 检查是否有待执行任务引用了这个任务 - var pendingTaskCount int64 - if err := tx.Model(&models.PendingTask{}).Where("task_id = ?", id).Count(&pendingTaskCount).Error; err != nil { - return fmt.Errorf("检查待执行任务时出错: %w", err) - } - - // 如果有待执行任务引用该任务,不能删除 - if pendingTaskCount > 0 { - return fmt.Errorf("无法删除任务(ID: %d),因为存在 %d 条待执行任务引用该任务", id, pendingTaskCount) - } - - // 2. 检查是否有计划仍在使用这个任务 - var planCount int64 - if err := tx.Model(&models.Plan{}).Joins("JOIN tasks ON plans.id = tasks.plan_id").Where("tasks.id = ?", id).Count(&planCount).Error; err != nil { - return fmt.Errorf("检查计划引用任务时出错: %w", err) - } - - // 如果有计划在使用该任务,不能删除 - if planCount > 0 { - return fmt.Errorf("无法删除任务(ID: %d),因为存在 %d 个计划仍在使用该任务", id, planCount) - } - - // 3. 执行删除操作 - result := tx.Delete(&models.Task{}, id) - if result.Error != nil { - return fmt.Errorf("删除任务失败: %w", result.Error) - } - - // 检查是否实际删除了记录 - if result.RowsAffected == 0 { - return gorm.ErrRecordNotFound - } - - return nil + return r.deleteTask(tx, id) }) } +// deleteTask 根据ID删除任务 +func (r *gormPlanRepository) deleteTask(tx *gorm.DB, id int) error { + // 1. 检查是否有待执行任务引用了这个任务 + var pendingTaskCount int64 + if err := tx.Model(&models.PendingTask{}).Where("task_id = ?", id).Count(&pendingTaskCount).Error; err != nil { + return fmt.Errorf("检查待执行任务时出错: %w", err) + } + + // 如果有待执行任务引用该任务,不能删除 + if pendingTaskCount > 0 { + return fmt.Errorf("无法删除任务(ID: %d),因为存在 %d 条待执行任务引用该任务", id, pendingTaskCount) + } + + // 2. 检查是否有计划仍在使用这个任务 + var planCount int64 + if err := tx.Model(&models.Plan{}).Joins("JOIN tasks ON plans.id = tasks.plan_id").Where("tasks.id = ?", id).Count(&planCount).Error; err != nil { + return fmt.Errorf("检查计划引用任务时出错: %w", err) + } + + // 如果有计划在使用该任务,不能删除 + if planCount > 0 { + return fmt.Errorf("无法删除任务(ID: %d),因为存在 %d 个计划仍在使用该任务", id, planCount) + } + + // 3. 执行删除操作 + result := tx.Delete(&models.Task{}, id) + if result.Error != nil { + return fmt.Errorf("删除任务失败: %w", result.Error) + } + + // 检查是否实际删除了记录 + if result.RowsAffected == 0 { + return gorm.ErrRecordNotFound + } + + return nil +} + // FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task func (r *gormPlanRepository) FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) { + return r.findPlanAnalysisTaskByParamsPlanID(r.db, paramsPlanID) +} + +// findPlanAnalysisTaskByParamsPlanID 使用指定db根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task +func (r *gormPlanRepository) findPlanAnalysisTaskByParamsPlanID(tx *gorm.DB, paramsPlanID uint) (*models.Task, error) { var task models.Task // 构造JSON查询条件,查找Parameters中包含指定ParamsPlanID且Type为TaskPlanAnalysis的任务 // TODO 在JSON字段中查找特定键值的语法取决于数据库类型,这里使用PostgreSQL的语法 // TODO 如果使用的是MySQL,则需要相应调整查询条件 - result := r.db.Where( + result := tx.Where( "type = ? AND parameters->>'plan_id' = ?", models.TaskPlanAnalysis, fmt.Sprintf("%d", paramsPlanID), @@ -520,3 +542,38 @@ func (r *gormPlanRepository) FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uin return &task, nil } + +// createPlanAnalysisTask 用于创建一个TaskPlanAnalysis类型的Task +func (r *gormPlanRepository) createPlanAnalysisTask(tx *gorm.DB, plan *models.Plan) error { + m := map[string]interface{}{ + models.ParamsPlanID: plan.ID, + } + parameters, err := json.Marshal(m) + if err != nil { + return err + } + + task := &models.Task{ + PlanID: plan.ID, + Name: fmt.Sprintf("'%v'计划触发器", plan.Name), + Description: fmt.Sprintf("计划名: %v, 计划ID: %v", plan.Name, plan.ID), + ExecutionOrder: 0, + Type: models.TaskPlanAnalysis, + Parameters: datatypes.JSON(parameters), + } + + return tx.Create(task).Error +} + +// updatePlanAnalysisTask 使用简单粗暴的删除再创建方式实现更新, 以控制AnalysisPlanTask的定义全部在createPlanAnalysisTask方法中 +func (r *gormPlanRepository) updatePlanAnalysisTask(tx *gorm.DB, plan *models.Plan) error { + task, err := r.findPlanAnalysisTaskByParamsPlanID(tx, plan.ID) + if err != nil { + return err + } + err = r.deleteTask(tx, task.ID) + if err != nil { + return err + } + return r.createPlanAnalysisTask(tx, plan) +} diff --git a/internal/infra/task/task.go b/internal/infra/task/task.go deleted file mode 100644 index 310b92c..0000000 --- a/internal/infra/task/task.go +++ /dev/null @@ -1,23 +0,0 @@ -package task - -import ( - "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" -) - -// Task 定义了所有可被调度器执行的任务必须实现的接口。 -type Task interface { - // Execute 是任务的核心执行逻辑。 - // ctx: 用于控制任务的超时或取消。 - // log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。 - // 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 - Execute() error - - // ParseParams 解析及校验参数 - ParseParams(logger *logs.Logger, claimedLog *models.TaskExecutionLog) error - - // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 - // log: 任务执行的上下文。 - // executeErr: 从 Execute 方法返回的原始错误。 - OnFailure(executeErr error) -}