diff --git a/internal/core/initializers.go b/internal/core/initializers.go index b9819b3..91d8a6a 100644 --- a/internal/core/initializers.go +++ b/internal/core/initializers.go @@ -11,6 +11,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" @@ -125,8 +126,9 @@ type DomainServices struct { PigBatchDomain pig.PigBatchService TimedCollector collection.Collector GeneralDeviceService device.Service - AnalysisPlanTaskManager *task.AnalysisPlanTaskManager - Scheduler *task.Scheduler + taskFactory scheduler.TaskFactory + AnalysisPlanTaskManager *scheduler.AnalysisPlanTaskManager + Scheduler *scheduler.Scheduler } // initDomainServices 初始化所有的领域服务。 @@ -148,16 +150,20 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. ) // 计划任务管理器 - analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) + analysisPlanTaskManager := scheduler.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) + + // 任务工厂 + taskFactory := task.NewTaskFactory(logger, infra.Repos.SensorDataRepo, infra.Repos.DeviceRepo, generalDeviceService) // 任务执行器 - scheduler := task.NewScheduler( + planScheduler := scheduler.NewScheduler( infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, infra.Repos.DeviceRepo, infra.Repos.SensorDataRepo, infra.Repos.PlanRepo, analysisPlanTaskManager, + taskFactory, logger, generalDeviceService, time.Duration(cfg.Task.Interval)*time.Second, @@ -179,7 +185,8 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. PigBatchDomain: pigBatchDomain, GeneralDeviceService: generalDeviceService, AnalysisPlanTaskManager: analysisPlanTaskManager, - Scheduler: scheduler, + taskFactory: taskFactory, + Scheduler: planScheduler, TimedCollector: timedCollector, } } diff --git a/internal/domain/task/analysis_plan_task_manager.go b/internal/domain/scheduler/analysis_plan_task_manager.go similarity index 99% rename from internal/domain/task/analysis_plan_task_manager.go rename to internal/domain/scheduler/analysis_plan_task_manager.go index 2a77f16..a443e78 100644 --- a/internal/domain/task/analysis_plan_task_manager.go +++ b/internal/domain/scheduler/analysis_plan_task_manager.go @@ -1,4 +1,4 @@ -package task +package scheduler import ( "fmt" diff --git a/internal/domain/task/scheduler.go b/internal/domain/scheduler/scheduler.go similarity index 96% rename from internal/domain/task/scheduler.go rename to internal/domain/scheduler/scheduler.go index ebb8ca6..9ef01a9 100644 --- a/internal/domain/task/scheduler.go +++ b/internal/domain/scheduler/scheduler.go @@ -1,4 +1,4 @@ -package task +package scheduler import ( "errors" @@ -83,6 +83,7 @@ type Scheduler struct { deviceRepo repository.DeviceRepository sensorDataRepo repository.SensorDataRepository planRepo repository.PlanRepository + taskFactory TaskFactory analysisPlanTaskManager *AnalysisPlanTaskManager progressTracker *ProgressTracker deviceService device.Service @@ -100,6 +101,7 @@ func NewScheduler( sensorDataRepo repository.SensorDataRepository, planRepo repository.PlanRepository, analysisPlanTaskManager *AnalysisPlanTaskManager, + taskFactory TaskFactory, logger *logs.Logger, deviceService device.Service, interval time.Duration, @@ -112,6 +114,7 @@ func NewScheduler( sensorDataRepo: sensorDataRepo, planRepo: planRepo, analysisPlanTaskManager: analysisPlanTaskManager, + taskFactory: taskFactory, logger: logger, deviceService: deviceService, pollingInterval: interval, @@ -271,7 +274,7 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { } else { // 执行普通任务 - task := s.taskFactory(claimedLog) + task := s.taskFactory.Production(claimedLog) if err := task.Execute(); err != nil { s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) @@ -283,20 +286,6 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { return nil } -// taskFactory 会根据任务类型初始化对应任务 -func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task { - switch claimedLog.Task.Type { - case models.TaskTypeWaiting: - return NewDelayTask(s.logger, claimedLog) - case models.TaskTypeReleaseFeedWeight: - return NewReleaseFeedWeightTask(claimedLog, s.sensorDataRepo, s.deviceRepo, s.deviceService, s.logger) - - default: - // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 - panic("不支持的任务类型") - } -} - // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { // 创建Plan执行记录 diff --git a/internal/domain/scheduler/task.go b/internal/domain/scheduler/task.go new file mode 100644 index 0000000..a8019be --- /dev/null +++ b/internal/domain/scheduler/task.go @@ -0,0 +1,23 @@ +package scheduler + +import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + +// Task 定义了所有可被调度器执行的任务必须实现的接口。 +type Task interface { + // Execute 是任务的核心执行逻辑。 + // ctx: 用于控制任务的超时或取消。 + // log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。 + // 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 + Execute() error + + // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 + // log: 任务执行的上下文。 + // executeErr: 从 Execute 方法返回的原始错误。 + OnFailure(executeErr error) +} + +// TaskFactory 是一个工厂接口,用于根据任务执行日志创建任务实例。 +type TaskFactory interface { + // Production 根据指定的任务执行日志创建一个任务实例。 + Production(claimedLog *models.TaskExecutionLog) Task +} diff --git a/internal/domain/task/delay_task.go b/internal/domain/task/delay_task.go index dcfdb41..62eb53b 100644 --- a/internal/domain/task/delay_task.go +++ b/internal/domain/task/delay_task.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" ) @@ -19,7 +20,7 @@ type DelayTask struct { logger *logs.Logger } -func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) Task { +func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) scheduler.Task { return &DelayTask{ executionTask: executionTask, logger: logger, diff --git a/internal/domain/task/release_feed_weight_task.go b/internal/domain/task/release_feed_weight_task.go index a8c7d15..0e0a3bd 100644 --- a/internal/domain/task/release_feed_weight_task.go +++ b/internal/domain/task/release_feed_weight_task.go @@ -6,6 +6,7 @@ import ( "time" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -40,12 +41,12 @@ func NewReleaseFeedWeightTask( deviceRepo repository.DeviceRepository, deviceService device.Service, logger *logs.Logger, -) Task { +) scheduler.Task { return &ReleaseFeedWeightTask{ claimedLog: claimedLog, deviceRepo: deviceRepo, sensorDataRepo: sensorDataRepo, - feedPort: deviceService, // 直接注入 + feedPort: deviceService, logger: logger, } } diff --git a/internal/domain/task/task.go b/internal/domain/task/task.go index 77623aa..6a5dba4 100644 --- a/internal/domain/task/task.go +++ b/internal/domain/task/task.go @@ -1,30 +1,43 @@ package task import ( + "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) -// Task 定义了所有可被调度器执行的任务必须实现的接口。 -type Task interface { - // Execute 是任务的核心执行逻辑。 - // ctx: 用于控制任务的超时或取消。 - // log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。 - // 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 - Execute() error - - // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 - // log: 任务执行的上下文。 - // executeErr: 从 Execute 方法返回的原始错误。 - OnFailure(executeErr error) +type taskFactory struct { + logger *logs.Logger + sensorDataRepo repository.SensorDataRepository + deviceRepo repository.DeviceRepository + deviceService device.Service } -// TaskFactory 是一个任务组装工厂, 可以根据Task类型获取到对应的初始化函数 -var TaskFactory = func(tt models.TaskType) Task { - switch tt { - case models.TaskTypeWaiting: - return &DelayTask{} - default: - // 出现位置任务类型说明业务逻辑出现重大问题, 一个异常任务被创建了出来 - panic("发现未知任务类型") +func NewTaskFactory( + logger *logs.Logger, + sensorDataRepo repository.SensorDataRepository, + deviceRepo repository.DeviceRepository, + deviceService device.Service, +) scheduler.TaskFactory { + return &taskFactory{ + logger: logger, + sensorDataRepo: sensorDataRepo, + deviceRepo: deviceRepo, + deviceService: deviceService, + } +} + +func (t *taskFactory) Production(claimedLog *models.TaskExecutionLog) scheduler.Task { + switch claimedLog.Task.Type { + case models.TaskTypeWaiting: + return NewDelayTask(t.logger, claimedLog) + case models.TaskTypeReleaseFeedWeight: + return NewReleaseFeedWeightTask(claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService, t.logger) + default: + // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 + t.logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type) + panic("不支持的任务类型") // 显式panic防编译器报错 } }