Files
pig-farm-controller/internal/task/task.go
2025-09-10 13:04:25 +08:00

229 lines
4.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package task 提供任务队列和执行框架
// 负责管理任务队列、调度和执行各种控制任务
package task
import (
"container/heap"
"context"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
)
// Task 代表一个任务接口
// 所有任务都需要实现此接口
type Task interface {
// Execute 执行任务
Execute() error
// GetID 获取任务ID
GetID() string
// GetPriority 获取任务优先级
GetPriority() int
// Done 返回一个channel当任务执行完毕时该channel会被关闭
Done() <-chan struct{}
// IsDone 检查任务是否已完成
IsDone() bool
}
// taskItem 任务队列中的元素
type taskItem struct {
task Task
priority int
index int
}
// TaskQueue 代表任务队列
type TaskQueue struct {
// queue 任务队列(按优先级排序)
queue *priorityQueue
// mutex 互斥锁
mutex sync.Mutex
// logger 日志记录器
logger *logs.Logger
}
// NewTaskQueue 创建并返回一个新的任务队列实例
func NewTaskQueue() *TaskQueue {
pq := make(priorityQueue, 0)
heap.Init(&pq)
return &TaskQueue{
queue: &pq,
logger: logs.NewLogger(),
}
}
// AddTask 向队列中添加任务
func (tq *TaskQueue) AddTask(task Task) {
tq.mutex.Lock()
defer tq.mutex.Unlock()
item := &taskItem{
task: task,
priority: task.GetPriority(),
}
heap.Push(tq.queue, item)
tq.logger.Info("任务已添加到队列: " + task.GetID())
}
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
func (tq *TaskQueue) GetNextTask() Task {
tq.mutex.Lock()
defer tq.mutex.Unlock()
if tq.queue.Len() == 0 {
return nil
}
// 获取优先级最高的任务
item := heap.Pop(tq.queue).(*taskItem)
tq.logger.Info("从队列中获取任务: " + item.task.GetID())
return item.task
}
// GetTaskCount 获取队列中的任务数量
func (tq *TaskQueue) GetTaskCount() int {
tq.mutex.Lock()
defer tq.mutex.Unlock()
return tq.queue.Len()
}
// priorityQueue 实现优先级队列
type priorityQueue []*taskItem
func (pq priorityQueue) Len() int { return len(pq) }
// Less 优先级小的优先级更高
func (pq priorityQueue) Less(i, j int) bool {
return pq[i].priority < pq[j].priority
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*taskItem)
item.index = n
*pq = append(*pq, item)
}
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // 避免内存泄漏
item.index = -1 // 无效索引
*pq = old[0 : n-1]
return item
}
// Executor 代表任务执行器
type Executor struct {
// taskQueue 任务队列
taskQueue *TaskQueue
// workers 工作协程数量
workers int
// ctx 执行上下文
ctx context.Context
// cancel 取消函数
cancel context.CancelFunc
// wg 等待组
wg sync.WaitGroup
// logger 日志记录器
logger *logs.Logger
}
// NewExecutor 创建并返回一个新的任务执行器实例
func NewExecutor(workers int) *Executor {
ctx, cancel := context.WithCancel(context.Background())
return &Executor{
taskQueue: NewTaskQueue(),
workers: workers,
ctx: ctx,
cancel: cancel,
logger: logs.NewLogger(),
}
}
// Start 启动任务执行器
func (e *Executor) Start() {
e.logger.Info(fmt.Sprintf("正在启动任务执行器,工作协程数: %d", e.workers))
// 启动工作协程
for i := 0; i < e.workers; i++ {
e.wg.Add(1)
go e.worker(i)
}
e.logger.Info("任务执行器启动成功")
}
// Stop 停止任务执行器
func (e *Executor) Stop() {
e.logger.Info("正在停止任务执行器")
// 取消上下文
e.cancel()
// 等待所有工作协程结束
e.wg.Wait()
e.logger.Info("任务执行器已停止")
}
// SubmitTask 提交任务到执行器
func (e *Executor) SubmitTask(task Task) {
e.taskQueue.AddTask(task)
e.logger.Info("任务已提交: " + task.GetID())
}
// worker 工作协程
func (e *Executor) worker(id int) {
defer e.wg.Done()
e.logger.Info(fmt.Sprintf("工作协程(id = %d)已启动", id))
for {
select {
case <-e.ctx.Done():
e.logger.Info(fmt.Sprintf("工作协程 %d 已停止", id))
return
default:
// 获取下一个任务
task := e.taskQueue.GetNextTask()
if task != nil {
e.logger.Info(fmt.Sprintf("工作协程 %d 正在执行任务: %s", id, task.GetID()))
// 执行任务
if err := task.Execute(); err != nil {
e.logger.Error("任务执行失败: " + task.GetID() + ", 错误: " + err.Error())
} else {
e.logger.Info("任务执行成功: " + task.GetID())
}
} else {
// 没有任务时短暂休眠
time.Sleep(100 * time.Millisecond)
}
}
}
}