diff --git a/internal/task/README.md b/internal/task/README.md new file mode 100644 index 0000000..a5a7891 --- /dev/null +++ b/internal/task/README.md @@ -0,0 +1,129 @@ +# Task包使用说明 + +## 概述 + +Task包提供了一个基本的任务队列和执行框架,用于管理、调度和执行各种控制任务。它支持并发执行、任务优先级和优雅的任务管理。 + +## 核心组件 + +### 1. Task接口 +所有任务都需要实现Task接口,包含以下方法: +- `Execute() error` - 执行任务 +- `GetID() string` - 获取任务ID +- `GetPriority() int` - 获取任务优先级 + +### 2. Executor(执行器) +负责管理任务队列和执行任务,支持并发执行。 + +### 3. TaskQueue(任务队列) +用于存储和管理待执行的任务。 + +## 使用方法 + +### 1. 实现任务 + +首先,需要实现Task接口来创建自定义任务: + +```go +type MyTask struct { + id string + priority int + // 其他任务特定字段 +} + +func (t *MyTask) Execute() error { + // 实现任务逻辑 + return nil +} + +func (t *MyTask) GetID() string { + return t.id +} + +func (t *MyTask) GetPriority() int { + return t.priority +} +``` + +### 2. 创建和启动执行器 + +```go +// 创建执行器,指定工作协程数量 +executor := task.NewExecutor(5) // 5个工作协程 + +// 启动执行器 +executor.Start() +``` + +### 3. 提交任务 + +```go +// 创建任务实例 +myTask := NewMyTask("task-1", 1) + +// 提交任务到执行器 +executor.SubmitTask(myTask) +``` + +### 4. 停止执行器 + +```go +// 停止执行器(会等待所有正在执行的任务完成) +executor.Stop() +``` + +## 处理定时循环任务 + +对于定时循环任务,建议采用以下方式: + +1. 使用`time.Ticker`定期创建任务 +2. 将任务提交到执行器 + +```go +func RunScheduledTasks(executor *task.Executor) { + // 启动一个协程来定期提交定时任务 + go func() { + ticker := time.NewTicker(30 * time.Second) // 每30秒执行一次 + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // 创建定时任务并提交 + scheduledTask := NewScheduledTask("scheduled-task", 1) + executor.SubmitTask(scheduledTask) + case <-executor.ctx.Done(): + return // 执行器已停止 + } + } + }() +} +``` + +## 处理互不相关的独立任务 + +对于互不相关的独立任务,可以直接创建并提交: + +```go +// 创建多个独立任务 +task1 := NewIndependentTask("task-1", "data1", 1) +task2 := NewIndependentTask("task-2", "data2", 2) +task3 := NewIndependentTask("task-3", "data3", 1) + +// 提交所有任务 +executor.SubmitTask(task1) +executor.SubmitTask(task2) +executor.SubmitTask(task3) +``` + +## 最佳实践 + +1. **合理设置工作协程数量**:根据系统资源和任务特性设置适当的工作协程数量 +2. **正确处理任务错误**:在任务的Execute方法中正确处理和返回错误 +3. **合理设置任务优先级**:重要的任务可以设置更高的优先级 +4. **优雅关闭**:使用Stop方法确保所有任务都能正确完成 +5. **避免任务阻塞**:任务执行时间过长会阻塞工作协程 + +## 示例 + +请参考 [example_task.go](./example_task.go) 和 [usage_example.go](./usage_example.go) 文件获取完整的使用示例。 \ No newline at end of file diff --git a/internal/task/executor.go b/internal/task/executor.go deleted file mode 100644 index 96679c8..0000000 --- a/internal/task/executor.go +++ /dev/null @@ -1,16 +0,0 @@ -// Package task 提供任务执行器功能 -// 负责执行各种控制任务,如环境调节、饲料投放等 -// 管理任务队列和并发执行 -package task - -// Executor 代表任务执行器 -// 负责调度和执行所有控制任务 -type Executor struct { - // TODO: 定义任务执行器结构 -} - -// NewExecutor 创建并返回一个新的任务执行器实例 -func NewExecutor() *Executor { - // TODO: 实现任务执行器初始化 - return nil -} diff --git a/internal/task/task.go b/internal/task/task.go new file mode 100644 index 0000000..7c89171 --- /dev/null +++ b/internal/task/task.go @@ -0,0 +1,222 @@ +// Package task 提供任务队列和执行框架 +// 负责管理任务队列、调度和执行各种控制任务 +package task + +import ( + "container/heap" + "context" + "fmt" + "sync" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/logs" +) + +// Task 代表一个任务接口 +// 所有任务都需要实现此接口 +type Task interface { + // Execute 执行任务 + Execute() error + + // GetID 获取任务ID + GetID() string + + // GetPriority 获取任务优先级 + GetPriority() int +} + +// taskItem 任务队列中的元素 +type taskItem struct { + task Task + priority int + index int +} + +// TaskQueue 代表任务队列 +type TaskQueue struct { + // queue 任务队列(按优先级排序) + queue *priorityQueue + + // mutex 互斥锁 + mutex sync.Mutex + + // logger 日志记录器 + logger *logs.Logger +} + +// NewTaskQueue 创建并返回一个新的任务队列实例 +func NewTaskQueue() *TaskQueue { + pq := make(priorityQueue, 0) + heap.Init(&pq) + + return &TaskQueue{ + queue: &pq, + logger: logs.NewLogger(), + } +} + +// AddTask 向队列中添加任务 +func (tq *TaskQueue) AddTask(task Task) { + tq.mutex.Lock() + defer tq.mutex.Unlock() + + item := &taskItem{ + task: task, + priority: task.GetPriority(), + } + heap.Push(tq.queue, item) + tq.logger.Info("Task added to queue: " + task.GetID()) +} + +// GetNextTask 获取下一个要执行的任务(优先级最高的任务) +func (tq *TaskQueue) GetNextTask() Task { + tq.mutex.Lock() + defer tq.mutex.Unlock() + + if tq.queue.Len() == 0 { + return nil + } + + // 获取优先级最高的任务 + item := heap.Pop(tq.queue).(*taskItem) + tq.logger.Info("Task retrieved from queue: " + item.task.GetID()) + return item.task +} + +// GetTaskCount 获取队列中的任务数量 +func (tq *TaskQueue) GetTaskCount() int { + tq.mutex.Lock() + defer tq.mutex.Unlock() + + return tq.queue.Len() +} + +// priorityQueue 实现优先级队列 +type priorityQueue []*taskItem + +func (pq priorityQueue) Len() int { return len(pq) } + +// Less 优先级小的优先级更高 +func (pq priorityQueue) Less(i, j int) bool { + return pq[i].priority < pq[j].priority +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*taskItem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // 避免内存泄漏 + item.index = -1 // 无效索引 + *pq = old[0 : n-1] + return item +} + +// Executor 代表任务执行器 +type Executor struct { + // taskQueue 任务队列 + taskQueue *TaskQueue + + // workers 工作协程数量 + workers int + + // ctx 执行上下文 + ctx context.Context + + // cancel 取消函数 + cancel context.CancelFunc + + // wg 等待组 + wg sync.WaitGroup + + // logger 日志记录器 + logger *logs.Logger +} + +// NewExecutor 创建并返回一个新的任务执行器实例 +func NewExecutor(workers int) *Executor { + ctx, cancel := context.WithCancel(context.Background()) + + return &Executor{ + taskQueue: NewTaskQueue(), + workers: workers, + ctx: ctx, + cancel: cancel, + logger: logs.NewLogger(), + } +} + +// Start 启动任务执行器 +func (e *Executor) Start() { + e.logger.Info(fmt.Sprintf("Starting task executor with %d workers", e.workers)) + + // 启动工作协程 + for i := 0; i < e.workers; i++ { + e.wg.Add(1) + go e.worker(i) + } + + e.logger.Info("Task executor started successfully") +} + +// Stop 停止任务执行器 +func (e *Executor) Stop() { + e.logger.Info("Stopping task executor") + + // 取消上下文 + e.cancel() + + // 等待所有工作协程结束 + e.wg.Wait() + + e.logger.Info("Task executor stopped successfully") +} + +// SubmitTask 提交任务到执行器 +func (e *Executor) SubmitTask(task Task) { + e.taskQueue.AddTask(task) + e.logger.Info("Task submitted: " + task.GetID()) +} + +// worker 工作协程 +func (e *Executor) worker(id int) { + defer e.wg.Done() + + e.logger.Info(fmt.Sprintf("Worker %d started", id)) + + for { + select { + case <-e.ctx.Done(): + e.logger.Info(fmt.Sprintf("Worker %d stopped", id)) + return + default: + // 获取下一个任务 + task := e.taskQueue.GetNextTask() + if task != nil { + e.logger.Info(fmt.Sprintf("Worker %d executing task: %s", id, task.GetID())) + + // 执行任务 + if err := task.Execute(); err != nil { + e.logger.Error("Task execution failed: " + task.GetID() + ", error: " + err.Error()) + } else { + e.logger.Info("Task executed successfully: " + task.GetID()) + } + } else { + // 没有任务时短暂休眠 + time.Sleep(100 * time.Millisecond) + } + } + } +}