Files
pig-farm-controller/internal/task/task.go

223 lines
4.5 KiB
Go

// Package task 提供任务队列和执行框架
// 负责管理任务队列、调度和执行各种控制任务
package task
import (
"container/heap"
"context"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
)
// Task 代表一个任务接口
// 所有任务都需要实现此接口
type Task interface {
// Execute 执行任务
Execute() error
// GetID 获取任务ID
GetID() string
// GetPriority 获取任务优先级
GetPriority() int
}
// taskItem 任务队列中的元素
type taskItem struct {
task Task
priority int
index int
}
// TaskQueue 代表任务队列
type TaskQueue struct {
// queue 任务队列(按优先级排序)
queue *priorityQueue
// mutex 互斥锁
mutex sync.Mutex
// logger 日志记录器
logger *logs.Logger
}
// NewTaskQueue 创建并返回一个新的任务队列实例
func NewTaskQueue() *TaskQueue {
pq := make(priorityQueue, 0)
heap.Init(&pq)
return &TaskQueue{
queue: &pq,
logger: logs.NewLogger(),
}
}
// AddTask 向队列中添加任务
func (tq *TaskQueue) AddTask(task Task) {
tq.mutex.Lock()
defer tq.mutex.Unlock()
item := &taskItem{
task: task,
priority: task.GetPriority(),
}
heap.Push(tq.queue, item)
tq.logger.Info("Task added to queue: " + task.GetID())
}
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
func (tq *TaskQueue) GetNextTask() Task {
tq.mutex.Lock()
defer tq.mutex.Unlock()
if tq.queue.Len() == 0 {
return nil
}
// 获取优先级最高的任务
item := heap.Pop(tq.queue).(*taskItem)
tq.logger.Info("Task retrieved from queue: " + item.task.GetID())
return item.task
}
// GetTaskCount 获取队列中的任务数量
func (tq *TaskQueue) GetTaskCount() int {
tq.mutex.Lock()
defer tq.mutex.Unlock()
return tq.queue.Len()
}
// priorityQueue 实现优先级队列
type priorityQueue []*taskItem
func (pq priorityQueue) Len() int { return len(pq) }
// Less 优先级小的优先级更高
func (pq priorityQueue) Less(i, j int) bool {
return pq[i].priority < pq[j].priority
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*taskItem)
item.index = n
*pq = append(*pq, item)
}
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // 避免内存泄漏
item.index = -1 // 无效索引
*pq = old[0 : n-1]
return item
}
// Executor 代表任务执行器
type Executor struct {
// taskQueue 任务队列
taskQueue *TaskQueue
// workers 工作协程数量
workers int
// ctx 执行上下文
ctx context.Context
// cancel 取消函数
cancel context.CancelFunc
// wg 等待组
wg sync.WaitGroup
// logger 日志记录器
logger *logs.Logger
}
// NewExecutor 创建并返回一个新的任务执行器实例
func NewExecutor(workers int) *Executor {
ctx, cancel := context.WithCancel(context.Background())
return &Executor{
taskQueue: NewTaskQueue(),
workers: workers,
ctx: ctx,
cancel: cancel,
logger: logs.NewLogger(),
}
}
// Start 启动任务执行器
func (e *Executor) Start() {
e.logger.Info(fmt.Sprintf("Starting task executor with %d workers", e.workers))
// 启动工作协程
for i := 0; i < e.workers; i++ {
e.wg.Add(1)
go e.worker(i)
}
e.logger.Info("Task executor started successfully")
}
// Stop 停止任务执行器
func (e *Executor) Stop() {
e.logger.Info("Stopping task executor")
// 取消上下文
e.cancel()
// 等待所有工作协程结束
e.wg.Wait()
e.logger.Info("Task executor stopped successfully")
}
// SubmitTask 提交任务到执行器
func (e *Executor) SubmitTask(task Task) {
e.taskQueue.AddTask(task)
e.logger.Info("Task submitted: " + task.GetID())
}
// worker 工作协程
func (e *Executor) worker(id int) {
defer e.wg.Done()
e.logger.Info(fmt.Sprintf("Worker %d started", id))
for {
select {
case <-e.ctx.Done():
e.logger.Info(fmt.Sprintf("Worker %d stopped", id))
return
default:
// 获取下一个任务
task := e.taskQueue.GetNextTask()
if task != nil {
e.logger.Info(fmt.Sprintf("Worker %d executing task: %s", id, task.GetID()))
// 执行任务
if err := task.Execute(); err != nil {
e.logger.Error("Task execution failed: " + task.GetID() + ", error: " + err.Error())
} else {
e.logger.Info("Task executed successfully: " + task.GetID())
}
} else {
// 没有任务时短暂休眠
time.Sleep(100 * time.Millisecond)
}
}
}
}