实现一个基础的任务队列和执行框架
This commit is contained in:
129
internal/task/README.md
Normal file
129
internal/task/README.md
Normal file
@@ -0,0 +1,129 @@
|
||||
# Task包使用说明
|
||||
|
||||
## 概述
|
||||
|
||||
Task包提供了一个基本的任务队列和执行框架,用于管理、调度和执行各种控制任务。它支持并发执行、任务优先级和优雅的任务管理。
|
||||
|
||||
## 核心组件
|
||||
|
||||
### 1. Task接口
|
||||
所有任务都需要实现Task接口,包含以下方法:
|
||||
- `Execute() error` - 执行任务
|
||||
- `GetID() string` - 获取任务ID
|
||||
- `GetPriority() int` - 获取任务优先级
|
||||
|
||||
### 2. Executor(执行器)
|
||||
负责管理任务队列和执行任务,支持并发执行。
|
||||
|
||||
### 3. TaskQueue(任务队列)
|
||||
用于存储和管理待执行的任务。
|
||||
|
||||
## 使用方法
|
||||
|
||||
### 1. 实现任务
|
||||
|
||||
首先,需要实现Task接口来创建自定义任务:
|
||||
|
||||
```go
|
||||
type MyTask struct {
|
||||
id string
|
||||
priority int
|
||||
// 其他任务特定字段
|
||||
}
|
||||
|
||||
func (t *MyTask) Execute() error {
|
||||
// 实现任务逻辑
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *MyTask) GetID() string {
|
||||
return t.id
|
||||
}
|
||||
|
||||
func (t *MyTask) GetPriority() int {
|
||||
return t.priority
|
||||
}
|
||||
```
|
||||
|
||||
### 2. 创建和启动执行器
|
||||
|
||||
```go
|
||||
// 创建执行器,指定工作协程数量
|
||||
executor := task.NewExecutor(5) // 5个工作协程
|
||||
|
||||
// 启动执行器
|
||||
executor.Start()
|
||||
```
|
||||
|
||||
### 3. 提交任务
|
||||
|
||||
```go
|
||||
// 创建任务实例
|
||||
myTask := NewMyTask("task-1", 1)
|
||||
|
||||
// 提交任务到执行器
|
||||
executor.SubmitTask(myTask)
|
||||
```
|
||||
|
||||
### 4. 停止执行器
|
||||
|
||||
```go
|
||||
// 停止执行器(会等待所有正在执行的任务完成)
|
||||
executor.Stop()
|
||||
```
|
||||
|
||||
## 处理定时循环任务
|
||||
|
||||
对于定时循环任务,建议采用以下方式:
|
||||
|
||||
1. 使用`time.Ticker`定期创建任务
|
||||
2. 将任务提交到执行器
|
||||
|
||||
```go
|
||||
func RunScheduledTasks(executor *task.Executor) {
|
||||
// 启动一个协程来定期提交定时任务
|
||||
go func() {
|
||||
ticker := time.NewTicker(30 * time.Second) // 每30秒执行一次
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// 创建定时任务并提交
|
||||
scheduledTask := NewScheduledTask("scheduled-task", 1)
|
||||
executor.SubmitTask(scheduledTask)
|
||||
case <-executor.ctx.Done():
|
||||
return // 执行器已停止
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
```
|
||||
|
||||
## 处理互不相关的独立任务
|
||||
|
||||
对于互不相关的独立任务,可以直接创建并提交:
|
||||
|
||||
```go
|
||||
// 创建多个独立任务
|
||||
task1 := NewIndependentTask("task-1", "data1", 1)
|
||||
task2 := NewIndependentTask("task-2", "data2", 2)
|
||||
task3 := NewIndependentTask("task-3", "data3", 1)
|
||||
|
||||
// 提交所有任务
|
||||
executor.SubmitTask(task1)
|
||||
executor.SubmitTask(task2)
|
||||
executor.SubmitTask(task3)
|
||||
```
|
||||
|
||||
## 最佳实践
|
||||
|
||||
1. **合理设置工作协程数量**:根据系统资源和任务特性设置适当的工作协程数量
|
||||
2. **正确处理任务错误**:在任务的Execute方法中正确处理和返回错误
|
||||
3. **合理设置任务优先级**:重要的任务可以设置更高的优先级
|
||||
4. **优雅关闭**:使用Stop方法确保所有任务都能正确完成
|
||||
5. **避免任务阻塞**:任务执行时间过长会阻塞工作协程
|
||||
|
||||
## 示例
|
||||
|
||||
请参考 [example_task.go](./example_task.go) 和 [usage_example.go](./usage_example.go) 文件获取完整的使用示例。
|
||||
@@ -1,16 +0,0 @@
|
||||
// Package task 提供任务执行器功能
|
||||
// 负责执行各种控制任务,如环境调节、饲料投放等
|
||||
// 管理任务队列和并发执行
|
||||
package task
|
||||
|
||||
// Executor 代表任务执行器
|
||||
// 负责调度和执行所有控制任务
|
||||
type Executor struct {
|
||||
// TODO: 定义任务执行器结构
|
||||
}
|
||||
|
||||
// NewExecutor 创建并返回一个新的任务执行器实例
|
||||
func NewExecutor() *Executor {
|
||||
// TODO: 实现任务执行器初始化
|
||||
return nil
|
||||
}
|
||||
222
internal/task/task.go
Normal file
222
internal/task/task.go
Normal file
@@ -0,0 +1,222 @@
|
||||
// Package task 提供任务队列和执行框架
|
||||
// 负责管理任务队列、调度和执行各种控制任务
|
||||
package task
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
|
||||
)
|
||||
|
||||
// Task 代表一个任务接口
|
||||
// 所有任务都需要实现此接口
|
||||
type Task interface {
|
||||
// Execute 执行任务
|
||||
Execute() error
|
||||
|
||||
// GetID 获取任务ID
|
||||
GetID() string
|
||||
|
||||
// GetPriority 获取任务优先级
|
||||
GetPriority() int
|
||||
}
|
||||
|
||||
// taskItem 任务队列中的元素
|
||||
type taskItem struct {
|
||||
task Task
|
||||
priority int
|
||||
index int
|
||||
}
|
||||
|
||||
// TaskQueue 代表任务队列
|
||||
type TaskQueue struct {
|
||||
// queue 任务队列(按优先级排序)
|
||||
queue *priorityQueue
|
||||
|
||||
// mutex 互斥锁
|
||||
mutex sync.Mutex
|
||||
|
||||
// logger 日志记录器
|
||||
logger *logs.Logger
|
||||
}
|
||||
|
||||
// NewTaskQueue 创建并返回一个新的任务队列实例
|
||||
func NewTaskQueue() *TaskQueue {
|
||||
pq := make(priorityQueue, 0)
|
||||
heap.Init(&pq)
|
||||
|
||||
return &TaskQueue{
|
||||
queue: &pq,
|
||||
logger: logs.NewLogger(),
|
||||
}
|
||||
}
|
||||
|
||||
// AddTask 向队列中添加任务
|
||||
func (tq *TaskQueue) AddTask(task Task) {
|
||||
tq.mutex.Lock()
|
||||
defer tq.mutex.Unlock()
|
||||
|
||||
item := &taskItem{
|
||||
task: task,
|
||||
priority: task.GetPriority(),
|
||||
}
|
||||
heap.Push(tq.queue, item)
|
||||
tq.logger.Info("Task added to queue: " + task.GetID())
|
||||
}
|
||||
|
||||
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
|
||||
func (tq *TaskQueue) GetNextTask() Task {
|
||||
tq.mutex.Lock()
|
||||
defer tq.mutex.Unlock()
|
||||
|
||||
if tq.queue.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 获取优先级最高的任务
|
||||
item := heap.Pop(tq.queue).(*taskItem)
|
||||
tq.logger.Info("Task retrieved from queue: " + item.task.GetID())
|
||||
return item.task
|
||||
}
|
||||
|
||||
// GetTaskCount 获取队列中的任务数量
|
||||
func (tq *TaskQueue) GetTaskCount() int {
|
||||
tq.mutex.Lock()
|
||||
defer tq.mutex.Unlock()
|
||||
|
||||
return tq.queue.Len()
|
||||
}
|
||||
|
||||
// priorityQueue 实现优先级队列
|
||||
type priorityQueue []*taskItem
|
||||
|
||||
func (pq priorityQueue) Len() int { return len(pq) }
|
||||
|
||||
// Less 优先级小的优先级更高
|
||||
func (pq priorityQueue) Less(i, j int) bool {
|
||||
return pq[i].priority < pq[j].priority
|
||||
}
|
||||
|
||||
func (pq priorityQueue) Swap(i, j int) {
|
||||
pq[i], pq[j] = pq[j], pq[i]
|
||||
pq[i].index = i
|
||||
pq[j].index = j
|
||||
}
|
||||
|
||||
func (pq *priorityQueue) Push(x interface{}) {
|
||||
n := len(*pq)
|
||||
item := x.(*taskItem)
|
||||
item.index = n
|
||||
*pq = append(*pq, item)
|
||||
}
|
||||
|
||||
func (pq *priorityQueue) Pop() interface{} {
|
||||
old := *pq
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
old[n-1] = nil // 避免内存泄漏
|
||||
item.index = -1 // 无效索引
|
||||
*pq = old[0 : n-1]
|
||||
return item
|
||||
}
|
||||
|
||||
// Executor 代表任务执行器
|
||||
type Executor struct {
|
||||
// taskQueue 任务队列
|
||||
taskQueue *TaskQueue
|
||||
|
||||
// workers 工作协程数量
|
||||
workers int
|
||||
|
||||
// ctx 执行上下文
|
||||
ctx context.Context
|
||||
|
||||
// cancel 取消函数
|
||||
cancel context.CancelFunc
|
||||
|
||||
// wg 等待组
|
||||
wg sync.WaitGroup
|
||||
|
||||
// logger 日志记录器
|
||||
logger *logs.Logger
|
||||
}
|
||||
|
||||
// NewExecutor 创建并返回一个新的任务执行器实例
|
||||
func NewExecutor(workers int) *Executor {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &Executor{
|
||||
taskQueue: NewTaskQueue(),
|
||||
workers: workers,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
logger: logs.NewLogger(),
|
||||
}
|
||||
}
|
||||
|
||||
// Start 启动任务执行器
|
||||
func (e *Executor) Start() {
|
||||
e.logger.Info(fmt.Sprintf("Starting task executor with %d workers", e.workers))
|
||||
|
||||
// 启动工作协程
|
||||
for i := 0; i < e.workers; i++ {
|
||||
e.wg.Add(1)
|
||||
go e.worker(i)
|
||||
}
|
||||
|
||||
e.logger.Info("Task executor started successfully")
|
||||
}
|
||||
|
||||
// Stop 停止任务执行器
|
||||
func (e *Executor) Stop() {
|
||||
e.logger.Info("Stopping task executor")
|
||||
|
||||
// 取消上下文
|
||||
e.cancel()
|
||||
|
||||
// 等待所有工作协程结束
|
||||
e.wg.Wait()
|
||||
|
||||
e.logger.Info("Task executor stopped successfully")
|
||||
}
|
||||
|
||||
// SubmitTask 提交任务到执行器
|
||||
func (e *Executor) SubmitTask(task Task) {
|
||||
e.taskQueue.AddTask(task)
|
||||
e.logger.Info("Task submitted: " + task.GetID())
|
||||
}
|
||||
|
||||
// worker 工作协程
|
||||
func (e *Executor) worker(id int) {
|
||||
defer e.wg.Done()
|
||||
|
||||
e.logger.Info(fmt.Sprintf("Worker %d started", id))
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
e.logger.Info(fmt.Sprintf("Worker %d stopped", id))
|
||||
return
|
||||
default:
|
||||
// 获取下一个任务
|
||||
task := e.taskQueue.GetNextTask()
|
||||
if task != nil {
|
||||
e.logger.Info(fmt.Sprintf("Worker %d executing task: %s", id, task.GetID()))
|
||||
|
||||
// 执行任务
|
||||
if err := task.Execute(); err != nil {
|
||||
e.logger.Error("Task execution failed: " + task.GetID() + ", error: " + err.Error())
|
||||
} else {
|
||||
e.logger.Info("Task executed successfully: " + task.GetID())
|
||||
}
|
||||
} else {
|
||||
// 没有任务时短暂休眠
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user