From bb4214797428a4a90ebac58e804a86016bd65ca9 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Sun, 2 Nov 2025 19:46:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8plan=20service=20=E6=9B=BF?= =?UTF-8?q?=E6=8D=A2=E5=AD=90=E9=A2=86=E5=9F=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/core/application.go | 4 ++-- internal/core/component_initializers.go | 11 ++++++++--- internal/core/data_initializer.go | 4 ++-- internal/domain/plan/plan_service.go | 20 -------------------- 4 files changed, 12 insertions(+), 27 deletions(-) diff --git a/internal/core/application.go b/internal/core/application.go index 33e61c3..49f9649 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -84,7 +84,7 @@ func (app *Application) Start() error { } // 3. 启动后台工作协程 - app.Domain.PlanExecutionManager.Start() + app.Domain.planService.Start() // 4. 启动 API 服务器 app.API.Start() @@ -106,7 +106,7 @@ func (app *Application) Stop() error { app.API.Stop() // 关闭任务执行器 - app.Domain.PlanExecutionManager.Stop() + app.Domain.planService.Stop() // 断开数据库连接 if err := app.Infra.Storage.Disconnect(); err != nil { diff --git a/internal/core/component_initializers.go b/internal/core/component_initializers.go index fcbf121..97dbec7 100644 --- a/internal/core/component_initializers.go +++ b/internal/core/component_initializers.go @@ -127,6 +127,7 @@ type DomainServices struct { taskFactory plan.TaskFactory PlanExecutionManager plan.ExecutionManager AnalysisPlanTaskManager plan.AnalysisPlanTaskManager + planService plan.Service } // initDomainServices 初始化所有的领域服务。 @@ -147,12 +148,12 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. infra.Lora.Comm, ) - // 计划任务管理器 - analysisPlanTaskManager := plan.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) - // 任务工厂 taskFactory := task.NewTaskFactory(logger, infra.Repos.SensorDataRepo, infra.Repos.DeviceRepo, generalDeviceService) + // 计划任务管理器 + analysisPlanTaskManager := plan.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) + // 任务执行器 planExecutionManager := plan.NewPlanExecutionManager( infra.Repos.PendingTaskRepo, @@ -168,6 +169,9 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. cfg.Task.NumWorkers, ) + // 计划管理器 + planService := plan.NewPlanService(planExecutionManager, analysisPlanTaskManager, logger) + return &DomainServices{ PigPenTransferManager: pigPenTransferManager, PigTradeManager: pigTradeManager, @@ -177,6 +181,7 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. AnalysisPlanTaskManager: analysisPlanTaskManager, taskFactory: taskFactory, PlanExecutionManager: planExecutionManager, + planService: planService, } } diff --git a/internal/core/data_initializer.go b/internal/core/data_initializer.go index ab8bce9..a17ce8d 100644 --- a/internal/core/data_initializer.go +++ b/internal/core/data_initializer.go @@ -141,7 +141,7 @@ func (app *Application) initializePendingTasks() error { planRepo := app.Infra.Repos.PlanRepo pendingTaskRepo := app.Infra.Repos.PendingTaskRepo executionLogRepo := app.Infra.Repos.ExecutionLogRepo - analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager + planService := app.Domain.planService logger.Info("开始初始化待执行任务列表...") @@ -235,7 +235,7 @@ func (app *Application) initializePendingTasks() error { // 阶段三:初始刷新 logger.Info("阶段三:开始刷新待执行列表...") - if err := analysisPlanTaskManager.Refresh(); err != nil { + if err := planService.RefreshPlanTriggers(); err != nil { return fmt.Errorf("刷新待执行任务列表失败: %w", err) } logger.Info("阶段三:待执行任务列表初始化完成。") diff --git a/internal/domain/plan/plan_service.go b/internal/domain/plan/plan_service.go index 24c9447..5dc108f 100644 --- a/internal/domain/plan/plan_service.go +++ b/internal/domain/plan/plan_service.go @@ -12,12 +12,6 @@ type Service interface { Stop() // RefreshPlanTriggers 刷新计划触发器,同步数据库中的计划状态和待执行队列中的触发器任务。 RefreshPlanTriggers() error - // CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。 - // 如果触发器已存在,会根据计划类型更新其执行时间。 - CreateOrUpdateTrigger(planID uint) error - // EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。 - // 如果不存在,则会自动创建。此方法不涉及待执行队列。 - EnsureAnalysisTaskDefinition(planID uint) error // TODO: 在这里添加其他与计划相关的领域服务方法 } @@ -58,17 +52,3 @@ func (s *planServiceImpl) RefreshPlanTriggers() error { s.logger.Infof("PlanService 正在刷新计划触发器...") return s.taskManager.Refresh() } - -// CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。 -// 如果触发器已存在,会根据计划类型更新其执行时间。 -func (s *planServiceImpl) CreateOrUpdateTrigger(planID uint) error { - s.logger.Infof("PlanService 正在为计划 %d 创建或更新触发器...", planID) - return s.taskManager.CreateOrUpdateTrigger(planID) -} - -// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。 -// 如果不存在,则会自动创建。此方法不涉及待执行队列。 -func (s *planServiceImpl) EnsureAnalysisTaskDefinition(planID uint) error { - s.logger.Infof("PlanService 正在确保计划 %d 的分析任务定义...", planID) - return s.taskManager.EnsureAnalysisTaskDefinition(planID) -}