diff --git a/internal/app/service/task/delay_task.go b/internal/app/service/task/delay_task.go index 058260e..6e57de3 100644 --- a/internal/app/service/task/delay_task.go +++ b/internal/app/service/task/delay_task.go @@ -20,6 +20,10 @@ type DelayTask struct { logger *logs.Logger } +func NewDelayTask() Task { + return &DelayTask{} +} + // Execute 执行延迟任务,等待指定的时间 func (d *DelayTask) Execute() error { d.logger.Infof("任务 %v: 开始延迟 %v...", d.executionTask.TaskID, d.duration) diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index caa135e..0208eea 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -83,7 +83,6 @@ type Scheduler struct { planRepo repository.PlanRepository analysisPlanTaskManager *AnalysisPlanTaskManager // <--- 注入共享的 Manager progressTracker *ProgressTracker - taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 pool *ants.Pool // 使用 ants 协程池来管理并发 wg sync.WaitGroup @@ -96,7 +95,6 @@ func NewScheduler( executionLogRepo repository.ExecutionLogRepository, planRepo repository.PlanRepository, analysisPlanTaskManager *AnalysisPlanTaskManager, // <--- 注入 Manager - taskFactory func(taskType models.TaskType) Task, logger *logs.Logger, interval time.Duration, numWorkers int) *Scheduler { @@ -109,7 +107,6 @@ func NewScheduler( pollingInterval: interval, workers: numWorkers, progressTracker: NewProgressTracker(), - taskFactory: taskFactory, stopChan: make(chan struct{}), // 初始化停止信号通道 } } @@ -266,15 +263,12 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { // 执行普通任务 task := s.taskFactory(claimedLog.Task.Type) if err := task.ParseParams(s.logger, claimedLog); err != nil { - s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) return err } if err := task.Execute(); err != nil { - s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) - task.OnFailure(err) return err } @@ -283,6 +277,17 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { return nil } +// taskFactory 会根据任务类型初始化对应任务 +func (s *Scheduler) taskFactory(taskType models.TaskType) Task { + switch taskType { + case models.TaskTypeWaiting: + return NewDelayTask() + default: + // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 + panic("不支持的任务类型") + } +} + // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { // 创建Plan执行记录 diff --git a/internal/core/application.go b/internal/core/application.go index 2beddd6..d9af392 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -82,7 +82,7 @@ func NewApplication(configPath string) (*Application, error) { analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) // 初始化任务执行器 - executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) + executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) // 初始化 API 服务器 apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager)