Compare commits

..

2 Commits

Author SHA1 Message Date
4035172a4b 增加延时Task 2025-09-13 12:25:27 +08:00
2593097989 优化命名 2025-09-13 12:12:38 +08:00
4 changed files with 163 additions and 44 deletions

View File

@@ -0,0 +1,53 @@
package task
import (
"fmt"
"time"
)
// DelayTask 是一个用于模拟延迟的 Task 实现
type DelayTask struct {
id string
duration time.Duration
priority int
done bool
}
// NewDelayTask 创建一个新的 DelayTask 实例
func NewDelayTask(id string, duration time.Duration, priority int) *DelayTask {
return &DelayTask{
id: id,
duration: duration,
priority: priority,
done: false,
}
}
// Execute 执行延迟任务,等待指定的时间
func (d *DelayTask) Execute() error {
fmt.Printf("任务 %s (%s): 开始延迟 %s...\n", d.id, d.GetDescription(), d.duration)
time.Sleep(d.duration)
fmt.Printf("任务 %s (%s): 延迟结束。\n", d.id, d.GetDescription())
d.done = true
return nil
}
// GetID 获取任务ID
func (d *DelayTask) GetID() string {
return d.id
}
// GetPriority 获取任务优先级
func (d *DelayTask) GetPriority() int {
return d.priority
}
// IsDone 检查任务是否已完成
func (d *DelayTask) IsDone() bool {
return d.done
}
// GetDescription 获取任务说明根据任务ID和延迟时间生成
func (d *DelayTask) GetDescription() string {
return fmt.Sprintf("延迟任务ID: %s延迟时间: %s", d.id, d.duration)
}

View File

@@ -0,0 +1,61 @@
package task_test
import (
"fmt"
"testing"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/task"
)
func TestNewDelayTask(t *testing.T) {
id := "test-delay-task-1"
duration := 100 * time.Millisecond
priority := 1
dt := task.NewDelayTask(id, duration, priority)
if dt.GetID() != id {
t.Errorf("期望任务ID为 %s, 实际为 %s", id, dt.GetID())
}
if dt.GetPriority() != priority {
t.Errorf("期望任务优先级为 %d, 实际为 %d", priority, dt.GetPriority())
}
if dt.IsDone() != false {
t.Error("任务初始状态不应为已完成")
}
// 动态生成的描述,需要匹配 GetDescription 的实现
expectedDesc := fmt.Sprintf("延迟任务ID: %s延迟时间: %s", id, duration)
if dt.GetDescription() != expectedDesc {
t.Errorf("期望任务描述为 %s, 实际为 %s", expectedDesc, dt.GetDescription())
}
}
func TestDelayTaskExecute(t *testing.T) {
id := "test-delay-task-execute"
duration := 50 * time.Millisecond // 使用较短的延迟以加快测试速度
priority := 1
dt := task.NewDelayTask(id, duration, priority)
if dt.IsDone() {
t.Error("任务执行前不应为已完成状态")
}
startTime := time.Now()
err := dt.Execute()
endTime := time.Now()
if err != nil {
t.Errorf("Execute 方法返回错误: %v", err)
}
if !dt.IsDone() {
t.Error("任务执行后应为已完成状态")
}
// 验证延迟时间大致正确,允许一些误差
elapsed := endTime.Sub(startTime)
if elapsed < duration || elapsed > duration*2 {
t.Errorf("期望执行时间在 %v 左右, 但实际耗时 %v", duration, elapsed)
}
}

View File

@@ -1,5 +1,3 @@
// Package task 提供任务队列和执行框架
// 负责管理任务队列、调度和执行各种控制任务
package task
import (
@@ -25,6 +23,9 @@ type Task interface {
// IsDone 检查任务是否已完成
IsDone() bool
// GetDescription 获取任务说明
GetDescription() string
}
// taskItem 任务队列中的元素
@@ -34,8 +35,8 @@ type taskItem struct {
index int
}
// TaskQueue 代表任务队列
type TaskQueue struct {
// Queue 代表任务队列
type Queue struct {
// queue 任务队列(按优先级排序)
queue *priorityQueue
@@ -46,50 +47,50 @@ type TaskQueue struct {
logger *logs.Logger
}
// NewTaskQueue 创建并返回一个新的任务队列实例。
func NewTaskQueue(logger *logs.Logger) *TaskQueue {
// NewQueue 创建并返回一个新的任务队列实例。
func NewQueue(logger *logs.Logger) *Queue {
pq := make(priorityQueue, 0)
heap.Init(&pq)
return &TaskQueue{
return &Queue{
queue: &pq,
logger: logger,
}
}
// AddTask 向队列中添加任务
func (tq *TaskQueue) AddTask(task Task) {
tq.mutex.Lock()
defer tq.mutex.Unlock()
func (q *Queue) AddTask(task Task) {
q.mutex.Lock()
defer q.mutex.Unlock()
item := &taskItem{
task: task,
priority: task.GetPriority(),
}
heap.Push(tq.queue, item)
tq.logger.Infow("任务已添加到队列", "任务ID", task.GetID())
heap.Push(q.queue, item)
q.logger.Infow("任务已添加到队列", "任务ID", task.GetID(), "任务描述", task.GetDescription())
}
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
func (tq *TaskQueue) GetNextTask() Task {
tq.mutex.Lock()
defer tq.mutex.Unlock()
func (q *Queue) GetNextTask() Task {
q.mutex.Lock()
defer q.mutex.Unlock()
if tq.queue.Len() == 0 {
if q.queue.Len() == 0 {
return nil
}
item := heap.Pop(tq.queue).(*taskItem)
tq.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID())
item := heap.Pop(q.queue).(*taskItem)
q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID(), "任务描述", item.task.GetDescription())
return item.task
}
// GetTaskCount 获取队列中的任务数量
func (tq *TaskQueue) GetTaskCount() int {
tq.mutex.Lock()
defer tq.mutex.Unlock()
func (q *Queue) GetTaskCount() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return tq.queue.Len()
return q.queue.Len()
}
// priorityQueue 实现优先级队列
@@ -126,8 +127,8 @@ func (pq *priorityQueue) Pop() interface{} {
// Executor 代表任务执行器
type Executor struct {
// taskQueue 任务队列
taskQueue *TaskQueue
// queue 任务队列
queue *Queue
// workers 工作协程数量
workers int
@@ -150,11 +151,11 @@ func NewExecutor(workers int, logger *logs.Logger) *Executor {
ctx, cancel := context.WithCancel(context.Background())
return &Executor{
taskQueue: NewTaskQueue(logger), // 将 logger 传递给 TaskQueue
workers: workers,
ctx: ctx,
cancel: cancel,
logger: logger,
queue: NewQueue(logger), // 将 logger 传递给 Queue
workers: workers,
ctx: ctx,
cancel: cancel,
logger: logger,
}
}
@@ -186,8 +187,8 @@ func (e *Executor) Stop() {
// SubmitTask 提交任务到执行器
func (e *Executor) SubmitTask(task Task) {
e.taskQueue.AddTask(task)
e.logger.Infow("任务已提交", "任务ID", task.GetID())
e.queue.AddTask(task)
e.logger.Infow("任务已提交", "任务ID", task.GetID(), "任务描述", task.GetDescription())
}
// worker 工作协程
@@ -203,15 +204,15 @@ func (e *Executor) worker(id int) {
return
default:
// 获取下一个任务
task := e.taskQueue.GetNextTask()
task := e.queue.GetNextTask()
if task != nil {
e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID())
e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
// 执行任务
if err := task.Execute(); err != nil {
e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "错误", err)
e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription(), "错误", err)
} else {
e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID())
e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
}
} else {
// 没有任务时短暂休眠

View File

@@ -60,6 +60,10 @@ func (m *MockTask) ExecutedCount() int32 {
return atomic.LoadInt32(&m.executed)
}
func (m *MockTask) GetDescription() string {
return "Mock Task"
}
// --- 健壮等待的辅助函数 ---
func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
waitChan := make(chan struct{})
@@ -78,14 +82,14 @@ func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
// --- 任务队列测试 (无需更改) ---
func TestNewTaskQueue(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
func TestNewQueue(t *testing.T) {
tq := task.NewQueue(testLogger)
assert.NotNil(t, tq, "新创建的任务队列不应为 nil")
assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空")
}
func TestTaskQueue_AddTask(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
func TestQueue_AddTask(t *testing.T) {
tq := task.NewQueue(testLogger)
mockTask := &MockTask{id: "task1", priority: 1}
tq.AddTask(mockTask)
@@ -93,15 +97,15 @@ func TestTaskQueue_AddTask(t *testing.T) {
}
// ... (其他任务队列测试保持不变)
func TestTaskQueue_GetNextTask(t *testing.T) {
func TestQueue_GetNextTask(t *testing.T) {
t.Run("从空队列获取任务", func(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
tq := task.NewQueue(testLogger)
nextTask := tq.GetNextTask()
assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil")
})
t.Run("按优先级获取任务", func(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
tq := task.NewQueue(testLogger)
task1 := &MockTask{id: "task1", priority: 10}
task2 := &MockTask{id: "task2", priority: 1} // 优先级更高
task3 := &MockTask{id: "task3", priority: 5}
@@ -128,8 +132,8 @@ func TestTaskQueue_GetNextTask(t *testing.T) {
})
}
func TestTaskQueue_Concurrency(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
func TestQueue_Concurrency(t *testing.T) {
tq := task.NewQueue(testLogger)
var wg sync.WaitGroup
taskCount := 100