diff --git a/internal/app/service/plan_service.go b/internal/app/service/plan_service.go index d59b618..de3c123 100644 --- a/internal/app/service/plan_service.go +++ b/internal/app/service/plan_service.go @@ -50,14 +50,14 @@ type PlanService interface { type planService struct { logger *logs.Logger planRepo repository.PlanRepository - analysisPlanTaskManager *plan.AnalysisPlanTaskManager + analysisPlanTaskManager plan.AnalysisPlanTaskManager } // NewPlanService 创建一个新的 PlanService 实例 func NewPlanService( logger *logs.Logger, planRepo repository.PlanRepository, - analysisPlanTaskManager *plan.AnalysisPlanTaskManager, + analysisPlanTaskManager plan.AnalysisPlanTaskManager, ) PlanService { return &planService{ logger: logger, diff --git a/internal/core/component_initializers.go b/internal/core/component_initializers.go index 09aed72..fcbf121 100644 --- a/internal/core/component_initializers.go +++ b/internal/core/component_initializers.go @@ -125,8 +125,8 @@ type DomainServices struct { PigBatchDomain pig.PigBatchService GeneralDeviceService device.Service taskFactory plan.TaskFactory - AnalysisPlanTaskManager *plan.AnalysisPlanTaskManager - PlanExecutionManager *plan.PlanExecutionManager + PlanExecutionManager plan.ExecutionManager + AnalysisPlanTaskManager plan.AnalysisPlanTaskManager } // initDomainServices 初始化所有的领域服务。 diff --git a/internal/domain/plan/analysis_plan_task_manager.go b/internal/domain/plan/analysis_plan_task_manager.go index 39be0c7..22f5357 100644 --- a/internal/domain/plan/analysis_plan_task_manager.go +++ b/internal/domain/plan/analysis_plan_task_manager.go @@ -11,10 +11,22 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils" ) -// AnalysisPlanTaskManager 负责管理分析计划的触发器任务。 +// AnalysisPlanTaskManager 定义了分析计划任务管理器的接口。 +type AnalysisPlanTaskManager interface { + // Refresh 同步数据库中的计划状态和待执行队列中的触发器任务。 + Refresh() error + // CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。 + // 如果触发器已存在,会根据计划类型更新其执行时间。 + CreateOrUpdateTrigger(planID uint) error + // EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。 + // 如果不存在,则会自动创建。此方法不涉及待执行队列。 + EnsureAnalysisTaskDefinition(planID uint) error +} + +// analysisPlanTaskManagerImpl 负责管理分析计划的触发器任务。 // 它确保数据库中可执行的计划在待执行队列中有对应的触发器,并移除无效的触发器。 // 这是一个有状态的组件,包含一个互斥锁以确保并发安全。 -type AnalysisPlanTaskManager struct { +type analysisPlanTaskManagerImpl struct { planRepo repository.PlanRepository pendingTaskRepo repository.PendingTaskRepository executionLogRepo repository.ExecutionLogRepository @@ -22,14 +34,14 @@ type AnalysisPlanTaskManager struct { mu sync.Mutex } -// NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。 +// NewAnalysisPlanTaskManager 是 analysisPlanTaskManagerImpl 的构造函数。 func NewAnalysisPlanTaskManager( planRepo repository.PlanRepository, pendingTaskRepo repository.PendingTaskRepository, executionLogRepo repository.ExecutionLogRepository, logger *logs.Logger, -) *AnalysisPlanTaskManager { - return &AnalysisPlanTaskManager{ +) AnalysisPlanTaskManager { + return &analysisPlanTaskManagerImpl{ planRepo: planRepo, pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, @@ -39,7 +51,7 @@ func NewAnalysisPlanTaskManager( // Refresh 同步数据库中的计划状态和待执行队列中的触发器任务。 // 这是一个编排方法,将复杂的逻辑分解到多个内部方法中。 -func (m *AnalysisPlanTaskManager) Refresh() error { +func (m *analysisPlanTaskManagerImpl) Refresh() error { m.mu.Lock() defer m.mu.Unlock() @@ -68,7 +80,7 @@ func (m *AnalysisPlanTaskManager) Refresh() error { // CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。 // 如果触发器已存在,会根据计划类型更新其执行时间。 -func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error { +func (m *analysisPlanTaskManagerImpl) CreateOrUpdateTrigger(planID uint) error { m.mu.Lock() defer m.mu.Unlock() @@ -123,7 +135,7 @@ func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error { // EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。 // 如果不存在,则会自动创建。此方法不涉及待执行队列。 -func (m *AnalysisPlanTaskManager) EnsureAnalysisTaskDefinition(planID uint) error { +func (m *analysisPlanTaskManagerImpl) EnsureAnalysisTaskDefinition(planID uint) error { m.mu.Lock() defer m.mu.Unlock() @@ -154,7 +166,7 @@ func (m *AnalysisPlanTaskManager) EnsureAnalysisTaskDefinition(planID uint) erro // --- 内部私有方法 --- // getRefreshData 从数据库获取刷新所需的所有数据。 -func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) { +func (m *analysisPlanTaskManagerImpl) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) { runnablePlans, err = m.planRepo.FindRunnablePlans() if err != nil { m.logger.Errorf("获取可执行计划列表失败: %v", err) @@ -180,7 +192,7 @@ func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan } // cleanupInvalidTasks 清理所有与失效计划相关的待执行任务。 -func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error { +func (m *analysisPlanTaskManagerImpl) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error { if len(invalidPlanIDs) == 0 { return nil // 没有需要清理的计划 } @@ -224,7 +236,7 @@ func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, all } // addOrUpdateTriggers 检查、更新或创建触发器。 -func (m *AnalysisPlanTaskManager) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error { +func (m *analysisPlanTaskManagerImpl) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error { // 创建一个映射,存放所有已在队列中的计划触发器 pendingTriggersMap := make(map[uint]models.PendingTask) for _, pt := range allPendingTasks { @@ -266,7 +278,7 @@ func (m *AnalysisPlanTaskManager) addOrUpdateTriggers(runnablePlans []*models.Pl } // createTriggerTask 是创建触发器任务的内部核心逻辑。 -func (m *AnalysisPlanTaskManager) createTriggerTask(plan *models.Plan) error { +func (m *analysisPlanTaskManagerImpl) createTriggerTask(plan *models.Plan) error { analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID) if err != nil { return fmt.Errorf("查找计划分析任务失败: %w", err) diff --git a/internal/domain/plan/plan_execution_manager.go b/internal/domain/plan/plan_execution_manager.go index 867a5f3..3a11688 100644 --- a/internal/domain/plan/plan_execution_manager.go +++ b/internal/domain/plan/plan_execution_manager.go @@ -13,6 +13,14 @@ import ( "gorm.io/gorm" ) +// ExecutionManager 定义了计划执行管理器的接口。 +type ExecutionManager interface { + // Start 启动计划执行管理器。 + Start() + // Stop 优雅地停止计划执行管理器。 + Stop() +} + // ProgressTracker 仅用于在内存中提供计划执行的并发锁 type ProgressTracker struct { mu sync.Mutex @@ -73,8 +81,8 @@ func (t *ProgressTracker) GetRunningPlanIDs() []uint { return ids } -// PlanExecutionManager 是核心的、持久化的任务调度器 -type PlanExecutionManager struct { +// planExecutionManagerImpl 是核心的、持久化的任务调度器 +type planExecutionManagerImpl struct { logger *logs.Logger pollingInterval time.Duration workers int @@ -84,7 +92,7 @@ type PlanExecutionManager struct { sensorDataRepo repository.SensorDataRepository planRepo repository.PlanRepository taskFactory TaskFactory - analysisPlanTaskManager *AnalysisPlanTaskManager + analysisPlanTaskManager AnalysisPlanTaskManager progressTracker *ProgressTracker deviceService device.Service @@ -100,14 +108,14 @@ func NewPlanExecutionManager( deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, planRepo repository.PlanRepository, - analysisPlanTaskManager *AnalysisPlanTaskManager, + analysisPlanTaskManager AnalysisPlanTaskManager, taskFactory TaskFactory, logger *logs.Logger, deviceService device.Service, interval time.Duration, numWorkers int, -) *PlanExecutionManager { - return &PlanExecutionManager{ +) ExecutionManager { + return &planExecutionManagerImpl{ pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, deviceRepo: deviceRepo, @@ -125,7 +133,7 @@ func NewPlanExecutionManager( } // Start 启动调度器,包括初始化协程池和启动主轮询循环 -func (s *PlanExecutionManager) Start() { +func (s *planExecutionManagerImpl) Start() { s.logger.Warnf("任务调度器正在启动,工作协程数: %d...", s.workers) pool, err := ants.NewPool(s.workers, ants.WithPanicHandler(func(err interface{}) { s.logger.Errorf("[严重] 任务执行时发生 panic: %v", err) @@ -141,7 +149,7 @@ func (s *PlanExecutionManager) Start() { } // Stop 优雅地停止调度器 -func (s *PlanExecutionManager) Stop() { +func (s *planExecutionManagerImpl) Stop() { s.logger.Warnf("正在停止任务调度器...") close(s.stopChan) // 1. 发出停止信号,停止主循环 s.wg.Wait() // 2. 等待主循环完成 @@ -150,7 +158,7 @@ func (s *PlanExecutionManager) Stop() { } // run 是主轮询循环,负责从数据库认领任务并提交到协程池 -func (s *PlanExecutionManager) run() { +func (s *planExecutionManagerImpl) run() { defer s.wg.Done() ticker := time.NewTicker(s.pollingInterval) defer ticker.Stop() @@ -168,7 +176,7 @@ func (s *PlanExecutionManager) run() { } // claimAndSubmit 实现了最终的“认领-锁定-执行 或 等待-放回”的健壮逻辑 -func (s *PlanExecutionManager) claimAndSubmit() { +func (s *planExecutionManagerImpl) claimAndSubmit() { runningPlanIDs := s.progressTracker.GetRunningPlanIDs() claimedLog, pendingTask, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs) @@ -201,7 +209,7 @@ func (s *PlanExecutionManager) claimAndSubmit() { } // handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 -func (s *PlanExecutionManager) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) { +func (s *planExecutionManagerImpl) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) { s.logger.Warnf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planExecutionLogID, taskToRequeue.ID, taskToRequeue.TaskID) // 1. 阻塞式地等待,直到可以获取到该计划的锁。 @@ -218,7 +226,7 @@ func (s *PlanExecutionManager) handleRequeue(planExecutionLogID uint, taskToRequ } // processTask 处理单个任务的逻辑 -func (s *PlanExecutionManager) processTask(claimedLog *models.TaskExecutionLog) { +func (s *planExecutionManagerImpl) processTask(claimedLog *models.TaskExecutionLog) { s.logger.Warnf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s, 描述: %s", claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name, claimedLog.Task.Description) @@ -261,7 +269,7 @@ func (s *PlanExecutionManager) processTask(claimedLog *models.TaskExecutionLog) } // runTask 用于执行具体任务 -func (s *PlanExecutionManager) runTask(claimedLog *models.TaskExecutionLog) error { +func (s *planExecutionManagerImpl) runTask(claimedLog *models.TaskExecutionLog) error { // 这是个特殊任务, 用于解析Plan并将解析出的任务队列添加到待执行队列中 if claimedLog.Task.Type == models.TaskPlanAnalysis { // 解析plan @@ -287,7 +295,7 @@ func (s *PlanExecutionManager) runTask(claimedLog *models.TaskExecutionLog) erro } // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 -func (s *PlanExecutionManager) analysisPlan(claimedLog *models.TaskExecutionLog) error { +func (s *planExecutionManagerImpl) analysisPlan(claimedLog *models.TaskExecutionLog) error { // 创建Plan执行记录 // 从任务的 Parameters 中解析出真实的 PlanID var params struct { @@ -360,7 +368,7 @@ func (s *PlanExecutionManager) analysisPlan(claimedLog *models.TaskExecutionLog) } // updateTaskExecutionLogStatus 修改任务历史中的执行状态 -func (s *PlanExecutionManager) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutionLog) error { +func (s *planExecutionManagerImpl) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutionLog) error { claimedLog.EndedAt = time.Now() if err := s.executionLogRepo.UpdateTaskExecutionLog(claimedLog); err != nil { @@ -372,7 +380,7 @@ func (s *PlanExecutionManager) updateTaskExecutionLogStatus(claimedLog *models.T } // handlePlanTermination 集中处理计划的终止逻辑(失败或取消) -func (s *PlanExecutionManager) handlePlanTermination(planLogID uint, reason string) { +func (s *planExecutionManagerImpl) handlePlanTermination(planLogID uint, reason string) { // 1. 从待执行队列中删除所有相关的子任务 if err := s.pendingTaskRepo.DeletePendingTasksByPlanLogID(planLogID); err != nil { s.logger.Errorf("从待执行队列中删除计划 %d 的后续任务时出错: %v", planLogID, err) @@ -415,7 +423,7 @@ func (s *PlanExecutionManager) handlePlanTermination(planLogID uint, reason stri } // handlePlanCompletion 集中处理计划成功完成后的所有逻辑 -func (s *PlanExecutionManager) handlePlanCompletion(planLogID uint) { +func (s *planExecutionManagerImpl) handlePlanCompletion(planLogID uint) { s.logger.Infof("计划执行 %d 的所有任务已完成,开始处理计划完成逻辑...", planLogID) // 1. 通过 PlanExecutionLog 反查正确的顶层 PlanID