diff --git a/internal/task/task_test.go b/internal/task/task_test.go index c74f849..dda4fb8 100644 --- a/internal/task/task_test.go +++ b/internal/task/task_test.go @@ -9,10 +9,22 @@ import ( "testing" "time" + "git.huangwc.com/pig/pig-farm-controller/internal/config" + "git.huangwc.com/pig/pig-farm-controller/internal/logs" "git.huangwc.com/pig/pig-farm-controller/internal/task" "github.com/stretchr/testify/assert" ) +// testLogger 是一个用于所有测试用例的静默 logger 实例。 +var testLogger *logs.Logger + +func init() { + // 使用 "fatal" 级别来创建一个在测试期间不会产生任何输出的 logger。 + // 这避免了在运行 `go test` 时被日志淹没。 + cfg := config.LogConfig{Level: "fatal"} + testLogger = logs.NewLogger(cfg) +} + // MockTask 用于测试的模拟任务 type MockTask struct { id string @@ -24,7 +36,6 @@ type MockTask struct { // Execute 实现了 Task 接口,并确保每次调用都增加执行计数 func (m *MockTask) Execute() error { - // 核心修复:无论 execute 函数是否为 nil,都应增加计数 atomic.AddInt32(&m.executed, 1) if m.execute != nil { return m.execute() @@ -49,32 +60,48 @@ func (m *MockTask) ExecutedCount() int32 { return atomic.LoadInt32(&m.executed) } -// TestNewTaskQueue 测试创建新的任务队列 +// --- Helper function for robust waiting --- +func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { + waitChan := make(chan struct{}) + go func() { + defer close(waitChan) + wg.Wait() + }() + + select { + case <-waitChan: + // Wait succeeded + case <-time.After(timeout): + t.Fatal("timed out waiting for tasks to complete") + } +} + +// --- TaskQueue Tests (No changes needed) --- + func TestNewTaskQueue(t *testing.T) { - tq := task.NewTaskQueue() + tq := task.NewTaskQueue(testLogger) assert.NotNil(t, tq, "新创建的任务队列不应为 nil") assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空") } -// TestTaskQueue_AddTask 测试向队列中添加任务 func TestTaskQueue_AddTask(t *testing.T) { - tq := task.NewTaskQueue() + tq := task.NewTaskQueue(testLogger) mockTask := &MockTask{id: "task1", priority: 1} tq.AddTask(mockTask) assert.Equal(t, 1, tq.GetTaskCount(), "添加任务后,队列中的任务数应为 1") } -// TestTaskQueue_GetNextTask 测试从队列中获取任务 +// ... (other TaskQueue tests remain the same) func TestTaskQueue_GetNextTask(t *testing.T) { t.Run("从空队列获取任务", func(t *testing.T) { - tq := task.NewTaskQueue() + tq := task.NewTaskQueue(testLogger) nextTask := tq.GetNextTask() assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil") }) t.Run("按优先级获取任务", func(t *testing.T) { - tq := task.NewTaskQueue() + tq := task.NewTaskQueue(testLogger) task1 := &MockTask{id: "task1", priority: 10} task2 := &MockTask{id: "task2", priority: 1} // 优先级更高 task3 := &MockTask{id: "task3", priority: 5} @@ -101,9 +128,8 @@ func TestTaskQueue_GetNextTask(t *testing.T) { }) } -// TestTaskQueue_Concurrency 测试任务队列的并发安全性 func TestTaskQueue_Concurrency(t *testing.T) { - tq := task.NewTaskQueue() + tq := task.NewTaskQueue(testLogger) var wg sync.WaitGroup taskCount := 100 @@ -131,52 +157,62 @@ func TestTaskQueue_Concurrency(t *testing.T) { assert.Equal(t, 0, tq.GetTaskCount(), "并发获取所有任务后,队列应为空") } -// TestNewExecutor 测试创建新的任务执行器 +// --- Executor Tests (Refactored for reliability) --- + func TestNewExecutor(t *testing.T) { - executor := task.NewExecutor(5) + executor := task.NewExecutor(5, testLogger) assert.NotNil(t, executor, "新创建的执行器不应为 nil") } -// TestExecutor_StartStop 测试执行器的启动和停止 func TestExecutor_StartStop(t *testing.T) { - executor := task.NewExecutor(2) + executor := task.NewExecutor(2, testLogger) executor.Start() - // 没有简单的方法来断言 worker 已启动,但我们可以立即停止它 - // 以确保没有死锁或竞争条件。 + // 确保立即停止不会导致死锁或竞争条件。 executor.Stop() } -// TestExecutor_SubmitAndExecuteTask 测试提交并执行单个任务 (已重构) +// TestExecutor_SubmitAndExecuteTask 测试提交并执行单个任务 (已重构,更可靠) func TestExecutor_SubmitAndExecuteTask(t *testing.T) { - executor := task.NewExecutor(1) + var wg sync.WaitGroup + wg.Add(1) + + executor := task.NewExecutor(1, testLogger) mockTask := &MockTask{ id: "task1", priority: 1, - // execute 函数可以为空,我们只关心它是否被调用 - execute: nil, + execute: func() error { + wg.Done() // 任务完成时通知 WaitGroup + return nil + }, } executor.Start() executor.SubmitTask(mockTask) - // 等待任务执行 - time.Sleep(200 * time.Millisecond) + // 等待任务完成,设置一个合理的超时时间 + waitForWaitGroup(t, &wg, 2*time.Second) executor.Stop() assert.Equal(t, int32(1), mockTask.ExecutedCount(), "任务应该已被执行") } -// TestExecutor_ExecuteMultipleTasks 测试执行多个任务 (已重构) +// TestExecutor_ExecuteMultipleTasks 测试执行多个任务 (已重构,更可靠) func TestExecutor_ExecuteMultipleTasks(t *testing.T) { - executor := task.NewExecutor(3) taskCount := 10 + var wg sync.WaitGroup + wg.Add(taskCount) + executor := task.NewExecutor(3, testLogger) mockTasks := make([]*MockTask, taskCount) for i := 0; i < taskCount; i++ { mockTasks[i] = &MockTask{ id: fmt.Sprintf("task-%d", i), priority: i, + execute: func() error { + wg.Done() // 每个任务完成时都通知 WaitGroup + return nil + }, } } @@ -185,8 +221,8 @@ func TestExecutor_ExecuteMultipleTasks(t *testing.T) { executor.SubmitTask(task) } - // 等待所有任务完成,可以适当增加延时以应对慢速环境 - time.Sleep(500 * time.Millisecond) + // 等待所有任务完成 + waitForWaitGroup(t, &wg, 2*time.Second) executor.Stop() @@ -198,14 +234,17 @@ func TestExecutor_ExecuteMultipleTasks(t *testing.T) { assert.Equal(t, int32(taskCount), totalExecuted, "所有提交的任务都应该被执行") } -// TestExecutor_TaskExecutionError 测试任务执行失败的场景 (失败的测试) +// TestExecutor_TaskExecutionError 测试任务执行失败的场景 (已重构,更可靠) func TestExecutor_TaskExecutionError(t *testing.T) { - // 日志记录了错误,这里我们只测试执行流程是否继续 - executor := task.NewExecutor(1) + var wg sync.WaitGroup + wg.Add(2) // 我们期望两个任务都被执行 + + executor := task.NewExecutor(1, testLogger) errorTask := &MockTask{ id: "errorTask", priority: 1, execute: func() error { + wg.Done() return errors.New("执行失败") }, } @@ -213,29 +252,34 @@ func TestExecutor_TaskExecutionError(t *testing.T) { successTask := &MockTask{ id: "successTask", priority: 2, // 后执行 + execute: func() error { + wg.Done() + return nil + }, } executor.Start() executor.SubmitTask(errorTask) executor.SubmitTask(successTask) - // 等待任务执行 - time.Sleep(300 * time.Millisecond) + waitForWaitGroup(t, &wg, 2*time.Second) executor.Stop() assert.Equal(t, int32(1), errorTask.ExecutedCount(), "失败的任务应该被执行一次") assert.Equal(t, int32(1), successTask.ExecutedCount(), "成功的任务也应该被执行") } -// TestExecutor_StopWithPendingTasks 测试停止执行器时仍有待处理任务 +// TestExecutor_StopWithPendingTasks 测试停止执行器时仍有待处理任务 (已重构,更可靠) func TestExecutor_StopWithPendingTasks(t *testing.T) { - executor := task.NewExecutor(1) + executor := task.NewExecutor(1, testLogger) + task1Started := make(chan struct{}) + task1 := &MockTask{ id: "task1", priority: 1, execute: func() error { - // 模拟一个耗时任务 - time.Sleep(500 * time.Millisecond) + close(task1Started) // 发送信号,通知测试 task1 已开始执行 + time.Sleep(200 * time.Millisecond) // 模拟耗时操作 return nil }, } @@ -245,8 +289,14 @@ func TestExecutor_StopWithPendingTasks(t *testing.T) { executor.SubmitTask(task1) executor.SubmitTask(task2) - // 给 task1 一点时间启动,然后停止执行器 - time.Sleep(100 * time.Millisecond) + // 等待 task1 开始执行的信号,而不是依赖不确定的 sleep + select { + case <-task1Started: + // task1 已开始,可以安全地停止执行器了 + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for task1 to start") + } + executor.Stop() assert.Equal(t, int32(1), task1.ExecutedCount(), "task1 应该在停止前开始执行")