重构 #4
| @@ -9,10 +9,22 @@ import ( | |||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"git.huangwc.com/pig/pig-farm-controller/internal/config" | ||||||
|  | 	"git.huangwc.com/pig/pig-farm-controller/internal/logs" | ||||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/task" | 	"git.huangwc.com/pig/pig-farm-controller/internal/task" | ||||||
| 	"github.com/stretchr/testify/assert" | 	"github.com/stretchr/testify/assert" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // testLogger 是一个用于所有测试用例的静默 logger 实例。 | ||||||
|  | var testLogger *logs.Logger | ||||||
|  |  | ||||||
|  | func init() { | ||||||
|  | 	// 使用 "fatal" 级别来创建一个在测试期间不会产生任何输出的 logger。 | ||||||
|  | 	// 这避免了在运行 `go test` 时被日志淹没。 | ||||||
|  | 	cfg := config.LogConfig{Level: "fatal"} | ||||||
|  | 	testLogger = logs.NewLogger(cfg) | ||||||
|  | } | ||||||
|  |  | ||||||
| // MockTask 用于测试的模拟任务 | // MockTask 用于测试的模拟任务 | ||||||
| type MockTask struct { | type MockTask struct { | ||||||
| 	id       string | 	id       string | ||||||
| @@ -24,7 +36,6 @@ type MockTask struct { | |||||||
|  |  | ||||||
| // Execute 实现了 Task 接口,并确保每次调用都增加执行计数 | // Execute 实现了 Task 接口,并确保每次调用都增加执行计数 | ||||||
| func (m *MockTask) Execute() error { | func (m *MockTask) Execute() error { | ||||||
| 	// 核心修复:无论 execute 函数是否为 nil,都应增加计数 |  | ||||||
| 	atomic.AddInt32(&m.executed, 1) | 	atomic.AddInt32(&m.executed, 1) | ||||||
| 	if m.execute != nil { | 	if m.execute != nil { | ||||||
| 		return m.execute() | 		return m.execute() | ||||||
| @@ -49,32 +60,48 @@ func (m *MockTask) ExecutedCount() int32 { | |||||||
| 	return atomic.LoadInt32(&m.executed) | 	return atomic.LoadInt32(&m.executed) | ||||||
| } | } | ||||||
|  |  | ||||||
| // TestNewTaskQueue 测试创建新的任务队列 | // --- Helper function for robust waiting --- | ||||||
|  | func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { | ||||||
|  | 	waitChan := make(chan struct{}) | ||||||
|  | 	go func() { | ||||||
|  | 		defer close(waitChan) | ||||||
|  | 		wg.Wait() | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	select { | ||||||
|  | 	case <-waitChan: | ||||||
|  | 		// Wait succeeded | ||||||
|  | 	case <-time.After(timeout): | ||||||
|  | 		t.Fatal("timed out waiting for tasks to complete") | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // --- TaskQueue Tests (No changes needed) --- | ||||||
|  |  | ||||||
| func TestNewTaskQueue(t *testing.T) { | func TestNewTaskQueue(t *testing.T) { | ||||||
| 	tq := task.NewTaskQueue() | 	tq := task.NewTaskQueue(testLogger) | ||||||
| 	assert.NotNil(t, tq, "新创建的任务队列不应为 nil") | 	assert.NotNil(t, tq, "新创建的任务队列不应为 nil") | ||||||
| 	assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空") | 	assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空") | ||||||
| } | } | ||||||
|  |  | ||||||
| // TestTaskQueue_AddTask 测试向队列中添加任务 |  | ||||||
| func TestTaskQueue_AddTask(t *testing.T) { | func TestTaskQueue_AddTask(t *testing.T) { | ||||||
| 	tq := task.NewTaskQueue() | 	tq := task.NewTaskQueue(testLogger) | ||||||
| 	mockTask := &MockTask{id: "task1", priority: 1} | 	mockTask := &MockTask{id: "task1", priority: 1} | ||||||
|  |  | ||||||
| 	tq.AddTask(mockTask) | 	tq.AddTask(mockTask) | ||||||
| 	assert.Equal(t, 1, tq.GetTaskCount(), "添加任务后,队列中的任务数应为 1") | 	assert.Equal(t, 1, tq.GetTaskCount(), "添加任务后,队列中的任务数应为 1") | ||||||
| } | } | ||||||
|  |  | ||||||
| // TestTaskQueue_GetNextTask 测试从队列中获取任务 | // ... (other TaskQueue tests remain the same) | ||||||
| func TestTaskQueue_GetNextTask(t *testing.T) { | func TestTaskQueue_GetNextTask(t *testing.T) { | ||||||
| 	t.Run("从空队列获取任务", func(t *testing.T) { | 	t.Run("从空队列获取任务", func(t *testing.T) { | ||||||
| 		tq := task.NewTaskQueue() | 		tq := task.NewTaskQueue(testLogger) | ||||||
| 		nextTask := tq.GetNextTask() | 		nextTask := tq.GetNextTask() | ||||||
| 		assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil") | 		assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil") | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
| 	t.Run("按优先级获取任务", func(t *testing.T) { | 	t.Run("按优先级获取任务", func(t *testing.T) { | ||||||
| 		tq := task.NewTaskQueue() | 		tq := task.NewTaskQueue(testLogger) | ||||||
| 		task1 := &MockTask{id: "task1", priority: 10} | 		task1 := &MockTask{id: "task1", priority: 10} | ||||||
| 		task2 := &MockTask{id: "task2", priority: 1} // 优先级更高 | 		task2 := &MockTask{id: "task2", priority: 1} // 优先级更高 | ||||||
| 		task3 := &MockTask{id: "task3", priority: 5} | 		task3 := &MockTask{id: "task3", priority: 5} | ||||||
| @@ -101,9 +128,8 @@ func TestTaskQueue_GetNextTask(t *testing.T) { | |||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| // TestTaskQueue_Concurrency 测试任务队列的并发安全性 |  | ||||||
| func TestTaskQueue_Concurrency(t *testing.T) { | func TestTaskQueue_Concurrency(t *testing.T) { | ||||||
| 	tq := task.NewTaskQueue() | 	tq := task.NewTaskQueue(testLogger) | ||||||
| 	var wg sync.WaitGroup | 	var wg sync.WaitGroup | ||||||
| 	taskCount := 100 | 	taskCount := 100 | ||||||
|  |  | ||||||
| @@ -131,52 +157,62 @@ func TestTaskQueue_Concurrency(t *testing.T) { | |||||||
| 	assert.Equal(t, 0, tq.GetTaskCount(), "并发获取所有任务后,队列应为空") | 	assert.Equal(t, 0, tq.GetTaskCount(), "并发获取所有任务后,队列应为空") | ||||||
| } | } | ||||||
|  |  | ||||||
| // TestNewExecutor 测试创建新的任务执行器 | // --- Executor Tests (Refactored for reliability) --- | ||||||
|  |  | ||||||
| func TestNewExecutor(t *testing.T) { | func TestNewExecutor(t *testing.T) { | ||||||
| 	executor := task.NewExecutor(5) | 	executor := task.NewExecutor(5, testLogger) | ||||||
| 	assert.NotNil(t, executor, "新创建的执行器不应为 nil") | 	assert.NotNil(t, executor, "新创建的执行器不应为 nil") | ||||||
| } | } | ||||||
|  |  | ||||||
| // TestExecutor_StartStop 测试执行器的启动和停止 |  | ||||||
| func TestExecutor_StartStop(t *testing.T) { | func TestExecutor_StartStop(t *testing.T) { | ||||||
| 	executor := task.NewExecutor(2) | 	executor := task.NewExecutor(2, testLogger) | ||||||
| 	executor.Start() | 	executor.Start() | ||||||
| 	// 没有简单的方法来断言 worker 已启动,但我们可以立即停止它 | 	// 确保立即停止不会导致死锁或竞争条件。 | ||||||
| 	// 以确保没有死锁或竞争条件。 |  | ||||||
| 	executor.Stop() | 	executor.Stop() | ||||||
| } | } | ||||||
|  |  | ||||||
| // TestExecutor_SubmitAndExecuteTask 测试提交并执行单个任务 (已重构) | // TestExecutor_SubmitAndExecuteTask 测试提交并执行单个任务 (已重构,更可靠) | ||||||
| func TestExecutor_SubmitAndExecuteTask(t *testing.T) { | func TestExecutor_SubmitAndExecuteTask(t *testing.T) { | ||||||
| 	executor := task.NewExecutor(1) | 	var wg sync.WaitGroup | ||||||
|  | 	wg.Add(1) | ||||||
|  |  | ||||||
|  | 	executor := task.NewExecutor(1, testLogger) | ||||||
| 	mockTask := &MockTask{ | 	mockTask := &MockTask{ | ||||||
| 		id:       "task1", | 		id:       "task1", | ||||||
| 		priority: 1, | 		priority: 1, | ||||||
| 		// execute 函数可以为空,我们只关心它是否被调用 | 		execute: func() error { | ||||||
| 		execute: nil, | 			wg.Done() // 任务完成时通知 WaitGroup | ||||||
|  | 			return nil | ||||||
|  | 		}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	executor.Start() | 	executor.Start() | ||||||
| 	executor.SubmitTask(mockTask) | 	executor.SubmitTask(mockTask) | ||||||
|  |  | ||||||
| 	// 等待任务执行 | 	// 等待任务完成,设置一个合理的超时时间 | ||||||
| 	time.Sleep(200 * time.Millisecond) | 	waitForWaitGroup(t, &wg, 2*time.Second) | ||||||
|  |  | ||||||
| 	executor.Stop() | 	executor.Stop() | ||||||
|  |  | ||||||
| 	assert.Equal(t, int32(1), mockTask.ExecutedCount(), "任务应该已被执行") | 	assert.Equal(t, int32(1), mockTask.ExecutedCount(), "任务应该已被执行") | ||||||
| } | } | ||||||
|  |  | ||||||
| // TestExecutor_ExecuteMultipleTasks 测试执行多个任务 (已重构) | // TestExecutor_ExecuteMultipleTasks 测试执行多个任务 (已重构,更可靠) | ||||||
| func TestExecutor_ExecuteMultipleTasks(t *testing.T) { | func TestExecutor_ExecuteMultipleTasks(t *testing.T) { | ||||||
| 	executor := task.NewExecutor(3) |  | ||||||
| 	taskCount := 10 | 	taskCount := 10 | ||||||
|  | 	var wg sync.WaitGroup | ||||||
|  | 	wg.Add(taskCount) | ||||||
|  |  | ||||||
|  | 	executor := task.NewExecutor(3, testLogger) | ||||||
| 	mockTasks := make([]*MockTask, taskCount) | 	mockTasks := make([]*MockTask, taskCount) | ||||||
| 	for i := 0; i < taskCount; i++ { | 	for i := 0; i < taskCount; i++ { | ||||||
| 		mockTasks[i] = &MockTask{ | 		mockTasks[i] = &MockTask{ | ||||||
| 			id:       fmt.Sprintf("task-%d", i), | 			id:       fmt.Sprintf("task-%d", i), | ||||||
| 			priority: i, | 			priority: i, | ||||||
|  | 			execute: func() error { | ||||||
|  | 				wg.Done() // 每个任务完成时都通知 WaitGroup | ||||||
|  | 				return nil | ||||||
|  | 			}, | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -185,8 +221,8 @@ func TestExecutor_ExecuteMultipleTasks(t *testing.T) { | |||||||
| 		executor.SubmitTask(task) | 		executor.SubmitTask(task) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// 等待所有任务完成,可以适当增加延时以应对慢速环境 | 	// 等待所有任务完成 | ||||||
| 	time.Sleep(500 * time.Millisecond) | 	waitForWaitGroup(t, &wg, 2*time.Second) | ||||||
|  |  | ||||||
| 	executor.Stop() | 	executor.Stop() | ||||||
|  |  | ||||||
| @@ -198,14 +234,17 @@ func TestExecutor_ExecuteMultipleTasks(t *testing.T) { | |||||||
| 	assert.Equal(t, int32(taskCount), totalExecuted, "所有提交的任务都应该被执行") | 	assert.Equal(t, int32(taskCount), totalExecuted, "所有提交的任务都应该被执行") | ||||||
| } | } | ||||||
|  |  | ||||||
| // TestExecutor_TaskExecutionError 测试任务执行失败的场景 (失败的测试) | // TestExecutor_TaskExecutionError 测试任务执行失败的场景 (已重构,更可靠) | ||||||
| func TestExecutor_TaskExecutionError(t *testing.T) { | func TestExecutor_TaskExecutionError(t *testing.T) { | ||||||
| 	// 日志记录了错误,这里我们只测试执行流程是否继续 | 	var wg sync.WaitGroup | ||||||
| 	executor := task.NewExecutor(1) | 	wg.Add(2) // 我们期望两个任务都被执行 | ||||||
|  |  | ||||||
|  | 	executor := task.NewExecutor(1, testLogger) | ||||||
| 	errorTask := &MockTask{ | 	errorTask := &MockTask{ | ||||||
| 		id:       "errorTask", | 		id:       "errorTask", | ||||||
| 		priority: 1, | 		priority: 1, | ||||||
| 		execute: func() error { | 		execute: func() error { | ||||||
|  | 			wg.Done() | ||||||
| 			return errors.New("执行失败") | 			return errors.New("执行失败") | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| @@ -213,29 +252,34 @@ func TestExecutor_TaskExecutionError(t *testing.T) { | |||||||
| 	successTask := &MockTask{ | 	successTask := &MockTask{ | ||||||
| 		id:       "successTask", | 		id:       "successTask", | ||||||
| 		priority: 2, // 后执行 | 		priority: 2, // 后执行 | ||||||
|  | 		execute: func() error { | ||||||
|  | 			wg.Done() | ||||||
|  | 			return nil | ||||||
|  | 		}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	executor.Start() | 	executor.Start() | ||||||
| 	executor.SubmitTask(errorTask) | 	executor.SubmitTask(errorTask) | ||||||
| 	executor.SubmitTask(successTask) | 	executor.SubmitTask(successTask) | ||||||
|  |  | ||||||
| 	// 等待任务执行 | 	waitForWaitGroup(t, &wg, 2*time.Second) | ||||||
| 	time.Sleep(300 * time.Millisecond) |  | ||||||
| 	executor.Stop() | 	executor.Stop() | ||||||
|  |  | ||||||
| 	assert.Equal(t, int32(1), errorTask.ExecutedCount(), "失败的任务应该被执行一次") | 	assert.Equal(t, int32(1), errorTask.ExecutedCount(), "失败的任务应该被执行一次") | ||||||
| 	assert.Equal(t, int32(1), successTask.ExecutedCount(), "成功的任务也应该被执行") | 	assert.Equal(t, int32(1), successTask.ExecutedCount(), "成功的任务也应该被执行") | ||||||
| } | } | ||||||
|  |  | ||||||
| // TestExecutor_StopWithPendingTasks 测试停止执行器时仍有待处理任务 | // TestExecutor_StopWithPendingTasks 测试停止执行器时仍有待处理任务 (已重构,更可靠) | ||||||
| func TestExecutor_StopWithPendingTasks(t *testing.T) { | func TestExecutor_StopWithPendingTasks(t *testing.T) { | ||||||
| 	executor := task.NewExecutor(1) | 	executor := task.NewExecutor(1, testLogger) | ||||||
|  | 	task1Started := make(chan struct{}) | ||||||
|  |  | ||||||
| 	task1 := &MockTask{ | 	task1 := &MockTask{ | ||||||
| 		id:       "task1", | 		id:       "task1", | ||||||
| 		priority: 1, | 		priority: 1, | ||||||
| 		execute: func() error { | 		execute: func() error { | ||||||
| 			// 模拟一个耗时任务 | 			close(task1Started)                // 发送信号,通知测试 task1 已开始执行 | ||||||
| 			time.Sleep(500 * time.Millisecond) | 			time.Sleep(200 * time.Millisecond) // 模拟耗时操作 | ||||||
| 			return nil | 			return nil | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| @@ -245,8 +289,14 @@ func TestExecutor_StopWithPendingTasks(t *testing.T) { | |||||||
| 	executor.SubmitTask(task1) | 	executor.SubmitTask(task1) | ||||||
| 	executor.SubmitTask(task2) | 	executor.SubmitTask(task2) | ||||||
|  |  | ||||||
| 	// 给 task1 一点时间启动,然后停止执行器 | 	// 等待 task1 开始执行的信号,而不是依赖不确定的 sleep | ||||||
| 	time.Sleep(100 * time.Millisecond) | 	select { | ||||||
|  | 	case <-task1Started: | ||||||
|  | 		// task1 已开始,可以安全地停止执行器了 | ||||||
|  | 	case <-time.After(1 * time.Second): | ||||||
|  | 		t.Fatal("timed out waiting for task1 to start") | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	executor.Stop() | 	executor.Stop() | ||||||
|  |  | ||||||
| 	assert.Equal(t, int32(1), task1.ExecutedCount(), "task1 应该在停止前开始执行") | 	assert.Equal(t, int32(1), task1.ExecutedCount(), "task1 应该在停止前开始执行") | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user