224 lines
4.7 KiB
Go
224 lines
4.7 KiB
Go
package task
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
|
)
|
|
|
|
// Task 代表一个任务接口
|
|
// 所有任务都需要实现此接口
|
|
type Task interface {
|
|
// Execute 执行任务
|
|
Execute() error
|
|
|
|
// GetID 获取任务ID
|
|
GetID() string
|
|
|
|
// GetPriority 获取任务优先级
|
|
GetPriority() int
|
|
|
|
// IsDone 检查任务是否已完成
|
|
IsDone() bool
|
|
|
|
// GetDescription 获取任务说明
|
|
GetDescription() string
|
|
}
|
|
|
|
// taskItem 任务队列中的元素
|
|
type taskItem struct {
|
|
task Task
|
|
priority int
|
|
index int
|
|
}
|
|
|
|
// Queue 代表任务队列
|
|
type Queue struct {
|
|
// queue 任务队列(按优先级排序)
|
|
queue *priorityQueue
|
|
|
|
// mutex 互斥锁
|
|
mutex sync.Mutex
|
|
|
|
// logger 日志记录器
|
|
logger *logs.Logger
|
|
}
|
|
|
|
// NewQueue 创建并返回一个新的任务队列实例。
|
|
func NewQueue(logger *logs.Logger) *Queue {
|
|
pq := make(priorityQueue, 0)
|
|
heap.Init(&pq)
|
|
|
|
return &Queue{
|
|
queue: &pq,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// AddTask 向队列中添加任务
|
|
func (q *Queue) AddTask(task Task) {
|
|
q.mutex.Lock()
|
|
defer q.mutex.Unlock()
|
|
|
|
item := &taskItem{
|
|
task: task,
|
|
priority: task.GetPriority(),
|
|
}
|
|
heap.Push(q.queue, item)
|
|
q.logger.Infow("任务已添加到队列", "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
|
}
|
|
|
|
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
|
|
func (q *Queue) GetNextTask() Task {
|
|
q.mutex.Lock()
|
|
defer q.mutex.Unlock()
|
|
|
|
if q.queue.Len() == 0 {
|
|
return nil
|
|
}
|
|
|
|
item := heap.Pop(q.queue).(*taskItem)
|
|
q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID(), "任务描述", item.task.GetDescription())
|
|
return item.task
|
|
}
|
|
|
|
// GetTaskCount 获取队列中的任务数量
|
|
func (q *Queue) GetTaskCount() int {
|
|
q.mutex.Lock()
|
|
defer q.mutex.Unlock()
|
|
|
|
return q.queue.Len()
|
|
}
|
|
|
|
// priorityQueue 实现优先级队列
|
|
type priorityQueue []*taskItem
|
|
|
|
func (pq priorityQueue) Len() int { return len(pq) }
|
|
|
|
func (pq priorityQueue) Less(i, j int) bool {
|
|
return pq[i].priority < pq[j].priority
|
|
}
|
|
|
|
func (pq priorityQueue) Swap(i, j int) {
|
|
pq[i], pq[j] = pq[j], pq[i]
|
|
pq[i].index = i
|
|
pq[j].index = j
|
|
}
|
|
|
|
func (pq *priorityQueue) Push(x interface{}) {
|
|
n := len(*pq)
|
|
item := x.(*taskItem)
|
|
item.index = n
|
|
*pq = append(*pq, item)
|
|
}
|
|
|
|
func (pq *priorityQueue) Pop() interface{} {
|
|
old := *pq
|
|
n := len(old)
|
|
item := old[n-1]
|
|
old[n-1] = nil // 避免内存泄漏
|
|
item.index = -1 // 无效索引
|
|
*pq = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
// Executor 代表任务执行器
|
|
type Executor struct {
|
|
// queue 任务队列
|
|
queue *Queue
|
|
|
|
// workers 工作协程数量
|
|
workers int
|
|
|
|
// ctx 执行上下文
|
|
ctx context.Context
|
|
|
|
// cancel 取消函数
|
|
cancel context.CancelFunc
|
|
|
|
// wg 等待组
|
|
wg sync.WaitGroup
|
|
|
|
// logger 日志记录器
|
|
logger *logs.Logger
|
|
}
|
|
|
|
// NewExecutor 创建并返回一个新的任务执行器实例。
|
|
func NewExecutor(workers int, logger *logs.Logger) *Executor {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &Executor{
|
|
queue: NewQueue(logger), // 将 logger 传递给 Queue
|
|
workers: workers,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Start 启动任务执行器
|
|
func (e *Executor) Start() {
|
|
e.logger.Infow("正在启动任务执行器", "工作协程数", e.workers)
|
|
|
|
// 启动工作协程
|
|
for i := 0; i < e.workers; i++ {
|
|
e.wg.Add(1)
|
|
go e.worker(i)
|
|
}
|
|
|
|
e.logger.Info("任务执行器启动成功")
|
|
}
|
|
|
|
// Stop 停止任务执行器
|
|
func (e *Executor) Stop() {
|
|
e.logger.Info("正在停止任务执行器")
|
|
|
|
// 取消上下文
|
|
e.cancel()
|
|
|
|
// 等待所有工作协程结束
|
|
e.wg.Wait()
|
|
|
|
e.logger.Info("任务执行器已停止")
|
|
}
|
|
|
|
// SubmitTask 提交任务到执行器
|
|
func (e *Executor) SubmitTask(task Task) {
|
|
e.queue.AddTask(task)
|
|
e.logger.Infow("任务已提交", "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
|
}
|
|
|
|
// worker 工作协程
|
|
func (e *Executor) worker(id int) {
|
|
defer e.wg.Done()
|
|
|
|
e.logger.Infow("工作协程已启动", "工作协程ID", id)
|
|
|
|
for {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
e.logger.Infow("工作协程已停止", "工作协程ID", id)
|
|
return
|
|
default:
|
|
// 获取下一个任务
|
|
task := e.queue.GetNextTask()
|
|
if task != nil {
|
|
e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
|
|
|
// 执行任务
|
|
if err := task.Execute(); err != nil {
|
|
e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription(), "错误", err)
|
|
} else {
|
|
e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
|
}
|
|
} else {
|
|
// 没有任务时短暂休眠
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
}
|