From fe549fca4a76097b1d91d927e9e621827c618eb7 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Thu, 25 Sep 2025 09:07:32 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4task=E5=B7=A5=E5=8E=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/service/task/delay_task.go | 4 ++++ internal/app/service/task/scheduler.go | 17 +++++++++++------ internal/core/application.go | 2 +- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/internal/app/service/task/delay_task.go b/internal/app/service/task/delay_task.go index 058260e..6e57de3 100644 --- a/internal/app/service/task/delay_task.go +++ b/internal/app/service/task/delay_task.go @@ -20,6 +20,10 @@ type DelayTask struct { logger *logs.Logger } +func NewDelayTask() Task { + return &DelayTask{} +} + // Execute 执行延迟任务,等待指定的时间 func (d *DelayTask) Execute() error { d.logger.Infof("任务 %v: 开始延迟 %v...", d.executionTask.TaskID, d.duration) diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index caa135e..0208eea 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -83,7 +83,6 @@ type Scheduler struct { planRepo repository.PlanRepository analysisPlanTaskManager *AnalysisPlanTaskManager // <--- 注入共享的 Manager progressTracker *ProgressTracker - taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 pool *ants.Pool // 使用 ants 协程池来管理并发 wg sync.WaitGroup @@ -96,7 +95,6 @@ func NewScheduler( executionLogRepo repository.ExecutionLogRepository, planRepo repository.PlanRepository, analysisPlanTaskManager *AnalysisPlanTaskManager, // <--- 注入 Manager - taskFactory func(taskType models.TaskType) Task, logger *logs.Logger, interval time.Duration, numWorkers int) *Scheduler { @@ -109,7 +107,6 @@ func NewScheduler( pollingInterval: interval, workers: numWorkers, progressTracker: NewProgressTracker(), - taskFactory: taskFactory, stopChan: make(chan struct{}), // 初始化停止信号通道 } } @@ -266,15 +263,12 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { // 执行普通任务 task := s.taskFactory(claimedLog.Task.Type) if err := task.ParseParams(s.logger, claimedLog); err != nil { - s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) return err } if err := task.Execute(); err != nil { - s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) - task.OnFailure(err) return err } @@ -283,6 +277,17 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { return nil } +// taskFactory 会根据任务类型初始化对应任务 +func (s *Scheduler) taskFactory(taskType models.TaskType) Task { + switch taskType { + case models.TaskTypeWaiting: + return NewDelayTask() + default: + // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 + panic("不支持的任务类型") + } +} + // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { // 创建Plan执行记录 diff --git a/internal/core/application.go b/internal/core/application.go index 2beddd6..d9af392 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -82,7 +82,7 @@ func NewApplication(configPath string) (*Application, error) { analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) // 初始化任务执行器 - executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) + executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) // 初始化 API 服务器 apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager)