Files
pig-farm-controller/internal/task

Task包使用说明

概述

Task包提供了一个基本的任务队列和执行框架用于管理、调度和执行各种控制任务。它支持并发执行、任务优先级和优雅的任务管理。

核心组件

1. Task接口

所有任务都需要实现Task接口包含以下方法

  • Execute() error - 执行任务
  • GetID() string - 获取任务ID
  • GetPriority() int - 获取任务优先级

2. Executor执行器

负责管理任务队列和执行任务,支持并发执行。

3. TaskQueue任务队列

用于存储和管理待执行的任务。

使用方法

1. 实现任务

首先需要实现Task接口来创建自定义任务

type MyTask struct {
    id       string
    priority int
    // 其他任务特定字段
}

func (t *MyTask) Execute() error {
    // 实现任务逻辑
    return nil
}

func (t *MyTask) GetID() string {
    return t.id
}

func (t *MyTask) GetPriority() int {
    return t.priority
}

2. 创建和启动执行器

// 创建执行器,指定工作协程数量
executor := task.NewExecutor(5) // 5个工作协程

// 启动执行器
executor.Start()

3. 提交任务

// 创建任务实例
myTask := NewMyTask("task-1", 1)

// 提交任务到执行器
executor.SubmitTask(myTask)

4. 停止执行器

// 停止执行器(会等待所有正在执行的任务完成)
executor.Stop()

处理定时循环任务

对于定时循环任务,建议采用以下方式:

  1. 使用time.Ticker定期创建任务
  2. 将任务提交到执行器
func RunScheduledTasks(executor *task.Executor) {
    // 启动一个协程来定期提交定时任务
    go func() {
        ticker := time.NewTicker(30 * time.Second) // 每30秒执行一次
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                // 创建定时任务并提交
                scheduledTask := NewScheduledTask("scheduled-task", 1)
                executor.SubmitTask(scheduledTask)
            case <-executor.ctx.Done():
                return // 执行器已停止
            }
        }
    }()
}

处理互不相关的独立任务

对于互不相关的独立任务,可以直接创建并提交:

// 创建多个独立任务
task1 := NewIndependentTask("task-1", "data1", 1)
task2 := NewIndependentTask("task-2", "data2", 2)
task3 := NewIndependentTask("task-3", "data3", 1)

// 提交所有任务
executor.SubmitTask(task1)
executor.SubmitTask(task2)
executor.SubmitTask(task3)

最佳实践

  1. 合理设置工作协程数量:根据系统资源和任务特性设置适当的工作协程数量
  2. 正确处理任务错误在任务的Execute方法中正确处理和返回错误
  3. 合理设置任务优先级:重要的任务可以设置更高的优先级
  4. 优雅关闭使用Stop方法确保所有任务都能正确完成
  5. 避免任务阻塞:任务执行时间过长会阻塞工作协程

示例

请参考 example_task.gousage_example.go 文件获取完整的使用示例。