package task import ( "container/heap" "context" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" ) // Task 代表一个任务接口 // 所有任务都需要实现此接口 type Task interface { // Execute 执行任务 Execute() error // GetID 获取任务ID GetID() string // GetPriority 获取任务优先级 GetPriority() int // IsDone 检查任务是否已完成 IsDone() bool } // taskItem 任务队列中的元素 type taskItem struct { task Task priority int index int } // Queue 代表任务队列 type Queue struct { // queue 任务队列(按优先级排序) queue *priorityQueue // mutex 互斥锁 mutex sync.Mutex // logger 日志记录器 logger *logs.Logger } // NewQueue 创建并返回一个新的任务队列实例。 func NewQueue(logger *logs.Logger) *Queue { pq := make(priorityQueue, 0) heap.Init(&pq) return &Queue{ queue: &pq, logger: logger, } } // AddTask 向队列中添加任务 func (q *Queue) AddTask(task Task) { q.mutex.Lock() defer q.mutex.Unlock() item := &taskItem{ task: task, priority: task.GetPriority(), } heap.Push(q.queue, item) q.logger.Infow("任务已添加到队列", "任务ID", task.GetID()) } // GetNextTask 获取下一个要执行的任务(优先级最高的任务) func (q *Queue) GetNextTask() Task { q.mutex.Lock() defer q.mutex.Unlock() if q.queue.Len() == 0 { return nil } item := heap.Pop(q.queue).(*taskItem) q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID()) return item.task } // GetTaskCount 获取队列中的任务数量 func (q *Queue) GetTaskCount() int { q.mutex.Lock() defer q.mutex.Unlock() return q.queue.Len() } // priorityQueue 实现优先级队列 type priorityQueue []*taskItem func (pq priorityQueue) Len() int { return len(pq) } func (pq priorityQueue) Less(i, j int) bool { return pq[i].priority < pq[j].priority } func (pq priorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } func (pq *priorityQueue) Push(x interface{}) { n := len(*pq) item := x.(*taskItem) item.index = n *pq = append(*pq, item) } func (pq *priorityQueue) Pop() interface{} { old := *pq n := len(old) item := old[n-1] old[n-1] = nil // 避免内存泄漏 item.index = -1 // 无效索引 *pq = old[0 : n-1] return item } // Executor 代表任务执行器 type Executor struct { // queue 任务队列 queue *Queue // workers 工作协程数量 workers int // ctx 执行上下文 ctx context.Context // cancel 取消函数 cancel context.CancelFunc // wg 等待组 wg sync.WaitGroup // logger 日志记录器 logger *logs.Logger } // NewExecutor 创建并返回一个新的任务执行器实例。 func NewExecutor(workers int, logger *logs.Logger) *Executor { ctx, cancel := context.WithCancel(context.Background()) return &Executor{ queue: NewQueue(logger), // 将 logger 传递给 Queue workers: workers, ctx: ctx, cancel: cancel, logger: logger, } } // Start 启动任务执行器 func (e *Executor) Start() { e.logger.Infow("正在启动任务执行器", "工作协程数", e.workers) // 启动工作协程 for i := 0; i < e.workers; i++ { e.wg.Add(1) go e.worker(i) } e.logger.Info("任务执行器启动成功") } // Stop 停止任务执行器 func (e *Executor) Stop() { e.logger.Info("正在停止任务执行器") // 取消上下文 e.cancel() // 等待所有工作协程结束 e.wg.Wait() e.logger.Info("任务执行器已停止") } // SubmitTask 提交任务到执行器 func (e *Executor) SubmitTask(task Task) { e.queue.AddTask(task) e.logger.Infow("任务已提交", "任务ID", task.GetID()) } // worker 工作协程 func (e *Executor) worker(id int) { defer e.wg.Done() e.logger.Infow("工作协程已启动", "工作协程ID", id) for { select { case <-e.ctx.Done(): e.logger.Infow("工作协程已停止", "工作协程ID", id) return default: // 获取下一个任务 task := e.queue.GetNextTask() if task != nil { e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID()) // 执行任务 if err := task.Execute(); err != nil { e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "错误", err) } else { e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID()) } } else { // 没有任务时短暂休眠 time.Sleep(100 * time.Millisecond) } } } }