diff --git a/internal/app/api/api.go b/internal/app/api/api.go index f418118..404c190 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -27,7 +27,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/service" "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" + domain_plan "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" @@ -39,21 +39,21 @@ import ( // API 结构体定义了 HTTP 服务器及其依赖 type API struct { - echo *echo.Echo // Echo 引擎实例,用于处理 HTTP 请求 - logger *logs.Logger // 日志记录器,用于输出日志信息 - userRepo repository.UserRepository // 用户数据仓库接口,用于用户数据操作 - tokenService token.Service // Token 服务接口,用于 JWT token 的生成和解析 - auditService audit.Service // 审计服务,用于记录用户操作 - httpServer *http.Server // 标准库的 HTTP 服务器实例,用于启动和停止服务 - config config.ServerConfig // API 服务器的配置,使用 infra/config 包中的 ServerConfig - userController *user.Controller // 用户控制器实例 - deviceController *device.Controller // 设备控制器实例 - planController *plan.Controller // 计划控制器实例 - pigFarmController *management.PigFarmController // 猪场管理控制器实例 - pigBatchController *management.PigBatchController // 猪群控制器实例 - monitorController *monitor.Controller // 数据监控控制器实例 - listenHandler webhook.ListenHandler // 设备上行事件监听器 - analysisTaskManager *scheduler.AnalysisPlanTaskManager // 计划触发器管理器实例 + echo *echo.Echo // Echo 引擎实例,用于处理 HTTP 请求 + logger *logs.Logger // 日志记录器,用于输出日志信息 + userRepo repository.UserRepository // 用户数据仓库接口,用于用户数据操作 + tokenService token.Service // Token 服务接口,用于 JWT token 的生成和解析 + auditService audit.Service // 审计服务,用于记录用户操作 + httpServer *http.Server // 标准库的 HTTP 服务器实例,用于启动和停止服务 + config config.ServerConfig // API 服务器的配置,使用 infra/config 包中的 ServerConfig + userController *user.Controller // 用户控制器实例 + deviceController *device.Controller // 设备控制器实例 + planController *plan.Controller // 计划控制器实例 + pigFarmController *management.PigFarmController // 猪场管理控制器实例 + pigBatchController *management.PigBatchController // 猪群控制器实例 + monitorController *monitor.Controller // 数据监控控制器实例 + listenHandler webhook.ListenHandler // 设备上行事件监听器 + analysisTaskManager *domain_plan.AnalysisPlanTaskManager // 计划触发器管理器实例 } // NewAPI 创建并返回一个新的 API 实例 diff --git a/internal/app/service/plan_service.go b/internal/app/service/plan_service.go index 30afb54..d59b618 100644 --- a/internal/app/service/plan_service.go +++ b/internal/app/service/plan_service.go @@ -4,7 +4,7 @@ import ( "errors" "git.huangwc.com/pig/pig-farm-controller/internal/app/dto" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -50,14 +50,14 @@ type PlanService interface { type planService struct { logger *logs.Logger planRepo repository.PlanRepository - analysisPlanTaskManager *scheduler.AnalysisPlanTaskManager + analysisPlanTaskManager *plan.AnalysisPlanTaskManager } // NewPlanService 创建一个新的 PlanService 实例 func NewPlanService( logger *logs.Logger, planRepo repository.PlanRepository, - analysisPlanTaskManager *scheduler.AnalysisPlanTaskManager, + analysisPlanTaskManager *plan.AnalysisPlanTaskManager, ) PlanService { return &planService{ logger: logger, diff --git a/internal/core/application.go b/internal/core/application.go index da87813..33e61c3 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -84,7 +84,7 @@ func (app *Application) Start() error { } // 3. 启动后台工作协程 - app.Domain.Scheduler.Start() + app.Domain.PlanExecutionManager.Start() // 4. 启动 API 服务器 app.API.Start() @@ -106,7 +106,7 @@ func (app *Application) Stop() error { app.API.Stop() // 关闭任务执行器 - app.Domain.Scheduler.Stop() + app.Domain.PlanExecutionManager.Stop() // 断开数据库连接 if err := app.Infra.Storage.Disconnect(); err != nil { diff --git a/internal/core/component_initializers.go b/internal/core/component_initializers.go index f4e4894..09aed72 100644 --- a/internal/core/component_initializers.go +++ b/internal/core/component_initializers.go @@ -10,7 +10,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" @@ -124,9 +124,9 @@ type DomainServices struct { PigSickManager pig.SickPigManager PigBatchDomain pig.PigBatchService GeneralDeviceService device.Service - taskFactory scheduler.TaskFactory - AnalysisPlanTaskManager *scheduler.AnalysisPlanTaskManager - Scheduler *scheduler.Scheduler + taskFactory plan.TaskFactory + AnalysisPlanTaskManager *plan.AnalysisPlanTaskManager + PlanExecutionManager *plan.PlanExecutionManager } // initDomainServices 初始化所有的领域服务。 @@ -148,13 +148,13 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. ) // 计划任务管理器 - analysisPlanTaskManager := scheduler.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) + analysisPlanTaskManager := plan.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) // 任务工厂 taskFactory := task.NewTaskFactory(logger, infra.Repos.SensorDataRepo, infra.Repos.DeviceRepo, generalDeviceService) // 任务执行器 - planScheduler := scheduler.NewScheduler( + planExecutionManager := plan.NewPlanExecutionManager( infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, infra.Repos.DeviceRepo, @@ -176,7 +176,7 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. GeneralDeviceService: generalDeviceService, AnalysisPlanTaskManager: analysisPlanTaskManager, taskFactory: taskFactory, - Scheduler: planScheduler, + PlanExecutionManager: planExecutionManager, } } diff --git a/internal/domain/scheduler/analysis_plan_task_manager.go b/internal/domain/plan/analysis_plan_task_manager.go similarity index 99% rename from internal/domain/scheduler/analysis_plan_task_manager.go rename to internal/domain/plan/analysis_plan_task_manager.go index a443e78..39be0c7 100644 --- a/internal/domain/scheduler/analysis_plan_task_manager.go +++ b/internal/domain/plan/analysis_plan_task_manager.go @@ -1,4 +1,4 @@ -package scheduler +package plan import ( "fmt" diff --git a/internal/domain/scheduler/scheduler.go b/internal/domain/plan/plan_execution_manager.go similarity index 93% rename from internal/domain/scheduler/scheduler.go rename to internal/domain/plan/plan_execution_manager.go index 4c4863d..867a5f3 100644 --- a/internal/domain/scheduler/scheduler.go +++ b/internal/domain/plan/plan_execution_manager.go @@ -1,4 +1,4 @@ -package scheduler +package plan import ( "errors" @@ -73,8 +73,8 @@ func (t *ProgressTracker) GetRunningPlanIDs() []uint { return ids } -// Scheduler 是核心的、持久化的任务调度器 -type Scheduler struct { +// PlanExecutionManager 是核心的、持久化的任务调度器 +type PlanExecutionManager struct { logger *logs.Logger pollingInterval time.Duration workers int @@ -93,8 +93,8 @@ type Scheduler struct { stopChan chan struct{} // 用于停止主循环的信号通道 } -// NewScheduler 创建一个新的调度器实例 -func NewScheduler( +// NewPlanExecutionManager 创建一个新的调度器实例 +func NewPlanExecutionManager( pendingTaskRepo repository.PendingTaskRepository, executionLogRepo repository.ExecutionLogRepository, deviceRepo repository.DeviceRepository, @@ -106,8 +106,8 @@ func NewScheduler( deviceService device.Service, interval time.Duration, numWorkers int, -) *Scheduler { - return &Scheduler{ +) *PlanExecutionManager { + return &PlanExecutionManager{ pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, deviceRepo: deviceRepo, @@ -125,7 +125,7 @@ func NewScheduler( } // Start 启动调度器,包括初始化协程池和启动主轮询循环 -func (s *Scheduler) Start() { +func (s *PlanExecutionManager) Start() { s.logger.Warnf("任务调度器正在启动,工作协程数: %d...", s.workers) pool, err := ants.NewPool(s.workers, ants.WithPanicHandler(func(err interface{}) { s.logger.Errorf("[严重] 任务执行时发生 panic: %v", err) @@ -141,7 +141,7 @@ func (s *Scheduler) Start() { } // Stop 优雅地停止调度器 -func (s *Scheduler) Stop() { +func (s *PlanExecutionManager) Stop() { s.logger.Warnf("正在停止任务调度器...") close(s.stopChan) // 1. 发出停止信号,停止主循环 s.wg.Wait() // 2. 等待主循环完成 @@ -150,7 +150,7 @@ func (s *Scheduler) Stop() { } // run 是主轮询循环,负责从数据库认领任务并提交到协程池 -func (s *Scheduler) run() { +func (s *PlanExecutionManager) run() { defer s.wg.Done() ticker := time.NewTicker(s.pollingInterval) defer ticker.Stop() @@ -168,7 +168,7 @@ func (s *Scheduler) run() { } // claimAndSubmit 实现了最终的“认领-锁定-执行 或 等待-放回”的健壮逻辑 -func (s *Scheduler) claimAndSubmit() { +func (s *PlanExecutionManager) claimAndSubmit() { runningPlanIDs := s.progressTracker.GetRunningPlanIDs() claimedLog, pendingTask, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs) @@ -201,7 +201,7 @@ func (s *Scheduler) claimAndSubmit() { } // handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 -func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) { +func (s *PlanExecutionManager) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) { s.logger.Warnf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planExecutionLogID, taskToRequeue.ID, taskToRequeue.TaskID) // 1. 阻塞式地等待,直到可以获取到该计划的锁。 @@ -218,7 +218,7 @@ func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models } // processTask 处理单个任务的逻辑 -func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { +func (s *PlanExecutionManager) processTask(claimedLog *models.TaskExecutionLog) { s.logger.Warnf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s, 描述: %s", claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name, claimedLog.Task.Description) @@ -261,7 +261,7 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { } // runTask 用于执行具体任务 -func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { +func (s *PlanExecutionManager) runTask(claimedLog *models.TaskExecutionLog) error { // 这是个特殊任务, 用于解析Plan并将解析出的任务队列添加到待执行队列中 if claimedLog.Task.Type == models.TaskPlanAnalysis { // 解析plan @@ -287,7 +287,7 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { } // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 -func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { +func (s *PlanExecutionManager) analysisPlan(claimedLog *models.TaskExecutionLog) error { // 创建Plan执行记录 // 从任务的 Parameters 中解析出真实的 PlanID var params struct { @@ -360,7 +360,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { } // updateTaskExecutionLogStatus 修改任务历史中的执行状态 -func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutionLog) error { +func (s *PlanExecutionManager) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutionLog) error { claimedLog.EndedAt = time.Now() if err := s.executionLogRepo.UpdateTaskExecutionLog(claimedLog); err != nil { @@ -372,7 +372,7 @@ func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutio } // handlePlanTermination 集中处理计划的终止逻辑(失败或取消) -func (s *Scheduler) handlePlanTermination(planLogID uint, reason string) { +func (s *PlanExecutionManager) handlePlanTermination(planLogID uint, reason string) { // 1. 从待执行队列中删除所有相关的子任务 if err := s.pendingTaskRepo.DeletePendingTasksByPlanLogID(planLogID); err != nil { s.logger.Errorf("从待执行队列中删除计划 %d 的后续任务时出错: %v", planLogID, err) @@ -415,7 +415,7 @@ func (s *Scheduler) handlePlanTermination(planLogID uint, reason string) { } // handlePlanCompletion 集中处理计划成功完成后的所有逻辑 -func (s *Scheduler) handlePlanCompletion(planLogID uint) { +func (s *PlanExecutionManager) handlePlanCompletion(planLogID uint) { s.logger.Infof("计划执行 %d 的所有任务已完成,开始处理计划完成逻辑...", planLogID) // 1. 通过 PlanExecutionLog 反查正确的顶层 PlanID diff --git a/internal/domain/scheduler/task.go b/internal/domain/plan/task.go similarity index 98% rename from internal/domain/scheduler/task.go rename to internal/domain/plan/task.go index a8019be..3c505e7 100644 --- a/internal/domain/scheduler/task.go +++ b/internal/domain/plan/task.go @@ -1,4 +1,4 @@ -package scheduler +package plan import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" diff --git a/internal/domain/task/delay_task.go b/internal/domain/task/delay_task.go index 62eb53b..9abc888 100644 --- a/internal/domain/task/delay_task.go +++ b/internal/domain/task/delay_task.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" ) @@ -20,7 +20,7 @@ type DelayTask struct { logger *logs.Logger } -func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) scheduler.Task { +func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) plan.Task { return &DelayTask{ executionTask: executionTask, logger: logger, diff --git a/internal/domain/task/full_collection_task.go b/internal/domain/task/full_collection_task.go index 8802600..fc7d7f9 100644 --- a/internal/domain/task/full_collection_task.go +++ b/internal/domain/task/full_collection_task.go @@ -9,7 +9,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) -// FullCollectionTask 实现了 scheduler.Task 接口,用于执行一次全量的设备数据采集 +// FullCollectionTask 实现了 plan.Task 接口,用于执行一次全量的设备数据采集 type FullCollectionTask struct { log *models.TaskExecutionLog deviceRepo repository.DeviceRepository diff --git a/internal/domain/task/release_feed_weight_task.go b/internal/domain/task/release_feed_weight_task.go index 0e0a3bd..38d8d2c 100644 --- a/internal/domain/task/release_feed_weight_task.go +++ b/internal/domain/task/release_feed_weight_task.go @@ -6,7 +6,7 @@ import ( "time" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -41,7 +41,7 @@ func NewReleaseFeedWeightTask( deviceRepo repository.DeviceRepository, deviceService device.Service, logger *logs.Logger, -) scheduler.Task { +) plan.Task { return &ReleaseFeedWeightTask{ claimedLog: claimedLog, deviceRepo: deviceRepo, diff --git a/internal/domain/task/task.go b/internal/domain/task/task.go index 54d23e1..e2cf98d 100644 --- a/internal/domain/task/task.go +++ b/internal/domain/task/task.go @@ -2,7 +2,7 @@ package task import ( "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -20,7 +20,7 @@ func NewTaskFactory( sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, deviceService device.Service, -) scheduler.TaskFactory { +) plan.TaskFactory { return &taskFactory{ logger: logger, sensorDataRepo: sensorDataRepo, @@ -29,7 +29,7 @@ func NewTaskFactory( } } -func (t *taskFactory) Production(claimedLog *models.TaskExecutionLog) scheduler.Task { +func (t *taskFactory) Production(claimedLog *models.TaskExecutionLog) plan.Task { switch claimedLog.Task.Type { case models.TaskTypeWaiting: return NewDelayTask(t.logger, claimedLog)