diff --git a/internal/task/task_test.go b/internal/task/task_test.go new file mode 100644 index 0000000..c74f849 --- /dev/null +++ b/internal/task/task_test.go @@ -0,0 +1,254 @@ +// Package task_test 包含对 task 包的单元测试 +package task_test + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/task" + "github.com/stretchr/testify/assert" +) + +// MockTask 用于测试的模拟任务 +type MockTask struct { + id string + priority int + isDone bool + execute func() error + executed int32 // 使用原子操作来跟踪执行次数 +} + +// Execute 实现了 Task 接口,并确保每次调用都增加执行计数 +func (m *MockTask) Execute() error { + // 核心修复:无论 execute 函数是否为 nil,都应增加计数 + atomic.AddInt32(&m.executed, 1) + if m.execute != nil { + return m.execute() + } + return nil +} + +func (m *MockTask) GetID() string { + return m.id +} + +func (m *MockTask) GetPriority() int { + return m.priority +} + +func (m *MockTask) IsDone() bool { + return m.isDone +} + +// ExecutedCount 返回任务被执行的次数 +func (m *MockTask) ExecutedCount() int32 { + return atomic.LoadInt32(&m.executed) +} + +// TestNewTaskQueue 测试创建新的任务队列 +func TestNewTaskQueue(t *testing.T) { + tq := task.NewTaskQueue() + assert.NotNil(t, tq, "新创建的任务队列不应为 nil") + assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空") +} + +// TestTaskQueue_AddTask 测试向队列中添加任务 +func TestTaskQueue_AddTask(t *testing.T) { + tq := task.NewTaskQueue() + mockTask := &MockTask{id: "task1", priority: 1} + + tq.AddTask(mockTask) + assert.Equal(t, 1, tq.GetTaskCount(), "添加任务后,队列中的任务数应为 1") +} + +// TestTaskQueue_GetNextTask 测试从队列中获取任务 +func TestTaskQueue_GetNextTask(t *testing.T) { + t.Run("从空队列获取任务", func(t *testing.T) { + tq := task.NewTaskQueue() + nextTask := tq.GetNextTask() + assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil") + }) + + t.Run("按优先级获取任务", func(t *testing.T) { + tq := task.NewTaskQueue() + task1 := &MockTask{id: "task1", priority: 10} + task2 := &MockTask{id: "task2", priority: 1} // 优先级更高 + task3 := &MockTask{id: "task3", priority: 5} + + tq.AddTask(task1) + tq.AddTask(task2) + tq.AddTask(task3) + + assert.Equal(t, 3, tq.GetTaskCount(), "添加三个任务后,队列中的任务数应为 3") + + nextTask := tq.GetNextTask() + assert.NotNil(t, nextTask) + assert.Equal(t, "task2", nextTask.GetID(), "应首先获取优先级最高的任务 (task2)") + + nextTask = tq.GetNextTask() + assert.NotNil(t, nextTask) + assert.Equal(t, "task3", nextTask.GetID(), "应获取下一个优先级最高的任务 (task3)") + + nextTask = tq.GetNextTask() + assert.NotNil(t, nextTask) + assert.Equal(t, "task1", nextTask.GetID(), "应最后获取优先级最低的任务 (task1)") + + assert.Equal(t, 0, tq.GetTaskCount(), "获取所有任务后,队列应为空") + }) +} + +// TestTaskQueue_Concurrency 测试任务队列的并发安全性 +func TestTaskQueue_Concurrency(t *testing.T) { + tq := task.NewTaskQueue() + var wg sync.WaitGroup + taskCount := 100 + + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + go func(i int) { + defer wg.Done() + tq.AddTask(&MockTask{id: fmt.Sprintf("task-%d", i), priority: i}) + }(i) + } + wg.Wait() + + assert.Equal(t, taskCount, tq.GetTaskCount(), "并发添加任务后,队列中的任务数应为 %d", taskCount) + + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + go func() { + defer wg.Done() + task := tq.GetNextTask() + assert.NotNil(t, task) + }() + } + wg.Wait() + + assert.Equal(t, 0, tq.GetTaskCount(), "并发获取所有任务后,队列应为空") +} + +// TestNewExecutor 测试创建新的任务执行器 +func TestNewExecutor(t *testing.T) { + executor := task.NewExecutor(5) + assert.NotNil(t, executor, "新创建的执行器不应为 nil") +} + +// TestExecutor_StartStop 测试执行器的启动和停止 +func TestExecutor_StartStop(t *testing.T) { + executor := task.NewExecutor(2) + executor.Start() + // 没有简单的方法来断言 worker 已启动,但我们可以立即停止它 + // 以确保没有死锁或竞争条件。 + executor.Stop() +} + +// TestExecutor_SubmitAndExecuteTask 测试提交并执行单个任务 (已重构) +func TestExecutor_SubmitAndExecuteTask(t *testing.T) { + executor := task.NewExecutor(1) + mockTask := &MockTask{ + id: "task1", + priority: 1, + // execute 函数可以为空,我们只关心它是否被调用 + execute: nil, + } + + executor.Start() + executor.SubmitTask(mockTask) + + // 等待任务执行 + time.Sleep(200 * time.Millisecond) + + executor.Stop() + + assert.Equal(t, int32(1), mockTask.ExecutedCount(), "任务应该已被执行") +} + +// TestExecutor_ExecuteMultipleTasks 测试执行多个任务 (已重构) +func TestExecutor_ExecuteMultipleTasks(t *testing.T) { + executor := task.NewExecutor(3) + taskCount := 10 + + mockTasks := make([]*MockTask, taskCount) + for i := 0; i < taskCount; i++ { + mockTasks[i] = &MockTask{ + id: fmt.Sprintf("task-%d", i), + priority: i, + } + } + + executor.Start() + for _, task := range mockTasks { + executor.SubmitTask(task) + } + + // 等待所有任务完成,可以适当增加延时以应对慢速环境 + time.Sleep(500 * time.Millisecond) + + executor.Stop() + + var totalExecuted int32 + for _, task := range mockTasks { + totalExecuted += task.ExecutedCount() + } + + assert.Equal(t, int32(taskCount), totalExecuted, "所有提交的任务都应该被执行") +} + +// TestExecutor_TaskExecutionError 测试任务执行失败的场景 (失败的测试) +func TestExecutor_TaskExecutionError(t *testing.T) { + // 日志记录了错误,这里我们只测试执行流程是否继续 + executor := task.NewExecutor(1) + errorTask := &MockTask{ + id: "errorTask", + priority: 1, + execute: func() error { + return errors.New("执行失败") + }, + } + + successTask := &MockTask{ + id: "successTask", + priority: 2, // 后执行 + } + + executor.Start() + executor.SubmitTask(errorTask) + executor.SubmitTask(successTask) + + // 等待任务执行 + time.Sleep(300 * time.Millisecond) + executor.Stop() + + assert.Equal(t, int32(1), errorTask.ExecutedCount(), "失败的任务应该被执行一次") + assert.Equal(t, int32(1), successTask.ExecutedCount(), "成功的任务也应该被执行") +} + +// TestExecutor_StopWithPendingTasks 测试停止执行器时仍有待处理任务 +func TestExecutor_StopWithPendingTasks(t *testing.T) { + executor := task.NewExecutor(1) + task1 := &MockTask{ + id: "task1", + priority: 1, + execute: func() error { + // 模拟一个耗时任务 + time.Sleep(500 * time.Millisecond) + return nil + }, + } + task2 := &MockTask{id: "task2", priority: 2} + + executor.Start() + executor.SubmitTask(task1) + executor.SubmitTask(task2) + + // 给 task1 一点时间启动,然后停止执行器 + time.Sleep(100 * time.Millisecond) + executor.Stop() + + assert.Equal(t, int32(1), task1.ExecutedCount(), "task1 应该在停止前开始执行") + assert.Equal(t, int32(0), task2.ExecutedCount(), "task2 不应该被执行,因为执行器已停止") +}