From 2593097989707a1fc72b3dc4545d555aeedf2e6f Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Sat, 13 Sep 2025 12:12:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=91=BD=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/infra/task/task.go | 60 ++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/internal/infra/task/task.go b/internal/infra/task/task.go index fdfbf1e..1128e40 100644 --- a/internal/infra/task/task.go +++ b/internal/infra/task/task.go @@ -1,5 +1,3 @@ -// Package task 提供任务队列和执行框架 -// 负责管理任务队列、调度和执行各种控制任务 package task import ( @@ -34,8 +32,8 @@ type taskItem struct { index int } -// TaskQueue 代表任务队列 -type TaskQueue struct { +// Queue 代表任务队列 +type Queue struct { // queue 任务队列(按优先级排序) queue *priorityQueue @@ -46,50 +44,50 @@ type TaskQueue struct { logger *logs.Logger } -// NewTaskQueue 创建并返回一个新的任务队列实例。 -func NewTaskQueue(logger *logs.Logger) *TaskQueue { +// NewQueue 创建并返回一个新的任务队列实例。 +func NewQueue(logger *logs.Logger) *Queue { pq := make(priorityQueue, 0) heap.Init(&pq) - return &TaskQueue{ + return &Queue{ queue: &pq, logger: logger, } } // AddTask 向队列中添加任务 -func (tq *TaskQueue) AddTask(task Task) { - tq.mutex.Lock() - defer tq.mutex.Unlock() +func (q *Queue) AddTask(task Task) { + q.mutex.Lock() + defer q.mutex.Unlock() item := &taskItem{ task: task, priority: task.GetPriority(), } - heap.Push(tq.queue, item) - tq.logger.Infow("任务已添加到队列", "任务ID", task.GetID()) + heap.Push(q.queue, item) + q.logger.Infow("任务已添加到队列", "任务ID", task.GetID()) } // GetNextTask 获取下一个要执行的任务(优先级最高的任务) -func (tq *TaskQueue) GetNextTask() Task { - tq.mutex.Lock() - defer tq.mutex.Unlock() +func (q *Queue) GetNextTask() Task { + q.mutex.Lock() + defer q.mutex.Unlock() - if tq.queue.Len() == 0 { + if q.queue.Len() == 0 { return nil } - item := heap.Pop(tq.queue).(*taskItem) - tq.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID()) + item := heap.Pop(q.queue).(*taskItem) + q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID()) return item.task } // GetTaskCount 获取队列中的任务数量 -func (tq *TaskQueue) GetTaskCount() int { - tq.mutex.Lock() - defer tq.mutex.Unlock() +func (q *Queue) GetTaskCount() int { + q.mutex.Lock() + defer q.mutex.Unlock() - return tq.queue.Len() + return q.queue.Len() } // priorityQueue 实现优先级队列 @@ -126,8 +124,8 @@ func (pq *priorityQueue) Pop() interface{} { // Executor 代表任务执行器 type Executor struct { - // taskQueue 任务队列 - taskQueue *TaskQueue + // queue 任务队列 + queue *Queue // workers 工作协程数量 workers int @@ -150,11 +148,11 @@ func NewExecutor(workers int, logger *logs.Logger) *Executor { ctx, cancel := context.WithCancel(context.Background()) return &Executor{ - taskQueue: NewTaskQueue(logger), // 将 logger 传递给 TaskQueue - workers: workers, - ctx: ctx, - cancel: cancel, - logger: logger, + queue: NewQueue(logger), // 将 logger 传递给 Queue + workers: workers, + ctx: ctx, + cancel: cancel, + logger: logger, } } @@ -186,7 +184,7 @@ func (e *Executor) Stop() { // SubmitTask 提交任务到执行器 func (e *Executor) SubmitTask(task Task) { - e.taskQueue.AddTask(task) + e.queue.AddTask(task) e.logger.Infow("任务已提交", "任务ID", task.GetID()) } @@ -203,7 +201,7 @@ func (e *Executor) worker(id int) { return default: // 获取下一个任务 - task := e.taskQueue.GetNextTask() + task := e.queue.GetNextTask() if task != nil { e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID())