Files
pig-farm-controller/internal/task/task_test.go
2025-09-11 20:13:09 +08:00

255 lines
6.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package task_test 包含对 task 包的单元测试
package task_test
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/task"
"github.com/stretchr/testify/assert"
)
// MockTask 用于测试的模拟任务
type MockTask struct {
id string
priority int
isDone bool
execute func() error
executed int32 // 使用原子操作来跟踪执行次数
}
// Execute 实现了 Task 接口,并确保每次调用都增加执行计数
func (m *MockTask) Execute() error {
// 核心修复:无论 execute 函数是否为 nil都应增加计数
atomic.AddInt32(&m.executed, 1)
if m.execute != nil {
return m.execute()
}
return nil
}
func (m *MockTask) GetID() string {
return m.id
}
func (m *MockTask) GetPriority() int {
return m.priority
}
func (m *MockTask) IsDone() bool {
return m.isDone
}
// ExecutedCount 返回任务被执行的次数
func (m *MockTask) ExecutedCount() int32 {
return atomic.LoadInt32(&m.executed)
}
// TestNewTaskQueue 测试创建新的任务队列
func TestNewTaskQueue(t *testing.T) {
tq := task.NewTaskQueue()
assert.NotNil(t, tq, "新创建的任务队列不应为 nil")
assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空")
}
// TestTaskQueue_AddTask 测试向队列中添加任务
func TestTaskQueue_AddTask(t *testing.T) {
tq := task.NewTaskQueue()
mockTask := &MockTask{id: "task1", priority: 1}
tq.AddTask(mockTask)
assert.Equal(t, 1, tq.GetTaskCount(), "添加任务后,队列中的任务数应为 1")
}
// TestTaskQueue_GetNextTask 测试从队列中获取任务
func TestTaskQueue_GetNextTask(t *testing.T) {
t.Run("从空队列获取任务", func(t *testing.T) {
tq := task.NewTaskQueue()
nextTask := tq.GetNextTask()
assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil")
})
t.Run("按优先级获取任务", func(t *testing.T) {
tq := task.NewTaskQueue()
task1 := &MockTask{id: "task1", priority: 10}
task2 := &MockTask{id: "task2", priority: 1} // 优先级更高
task3 := &MockTask{id: "task3", priority: 5}
tq.AddTask(task1)
tq.AddTask(task2)
tq.AddTask(task3)
assert.Equal(t, 3, tq.GetTaskCount(), "添加三个任务后,队列中的任务数应为 3")
nextTask := tq.GetNextTask()
assert.NotNil(t, nextTask)
assert.Equal(t, "task2", nextTask.GetID(), "应首先获取优先级最高的任务 (task2)")
nextTask = tq.GetNextTask()
assert.NotNil(t, nextTask)
assert.Equal(t, "task3", nextTask.GetID(), "应获取下一个优先级最高的任务 (task3)")
nextTask = tq.GetNextTask()
assert.NotNil(t, nextTask)
assert.Equal(t, "task1", nextTask.GetID(), "应最后获取优先级最低的任务 (task1)")
assert.Equal(t, 0, tq.GetTaskCount(), "获取所有任务后,队列应为空")
})
}
// TestTaskQueue_Concurrency 测试任务队列的并发安全性
func TestTaskQueue_Concurrency(t *testing.T) {
tq := task.NewTaskQueue()
var wg sync.WaitGroup
taskCount := 100
wg.Add(taskCount)
for i := 0; i < taskCount; i++ {
go func(i int) {
defer wg.Done()
tq.AddTask(&MockTask{id: fmt.Sprintf("task-%d", i), priority: i})
}(i)
}
wg.Wait()
assert.Equal(t, taskCount, tq.GetTaskCount(), "并发添加任务后,队列中的任务数应为 %d", taskCount)
wg.Add(taskCount)
for i := 0; i < taskCount; i++ {
go func() {
defer wg.Done()
task := tq.GetNextTask()
assert.NotNil(t, task)
}()
}
wg.Wait()
assert.Equal(t, 0, tq.GetTaskCount(), "并发获取所有任务后,队列应为空")
}
// TestNewExecutor 测试创建新的任务执行器
func TestNewExecutor(t *testing.T) {
executor := task.NewExecutor(5)
assert.NotNil(t, executor, "新创建的执行器不应为 nil")
}
// TestExecutor_StartStop 测试执行器的启动和停止
func TestExecutor_StartStop(t *testing.T) {
executor := task.NewExecutor(2)
executor.Start()
// 没有简单的方法来断言 worker 已启动,但我们可以立即停止它
// 以确保没有死锁或竞争条件。
executor.Stop()
}
// TestExecutor_SubmitAndExecuteTask 测试提交并执行单个任务 (已重构)
func TestExecutor_SubmitAndExecuteTask(t *testing.T) {
executor := task.NewExecutor(1)
mockTask := &MockTask{
id: "task1",
priority: 1,
// execute 函数可以为空,我们只关心它是否被调用
execute: nil,
}
executor.Start()
executor.SubmitTask(mockTask)
// 等待任务执行
time.Sleep(200 * time.Millisecond)
executor.Stop()
assert.Equal(t, int32(1), mockTask.ExecutedCount(), "任务应该已被执行")
}
// TestExecutor_ExecuteMultipleTasks 测试执行多个任务 (已重构)
func TestExecutor_ExecuteMultipleTasks(t *testing.T) {
executor := task.NewExecutor(3)
taskCount := 10
mockTasks := make([]*MockTask, taskCount)
for i := 0; i < taskCount; i++ {
mockTasks[i] = &MockTask{
id: fmt.Sprintf("task-%d", i),
priority: i,
}
}
executor.Start()
for _, task := range mockTasks {
executor.SubmitTask(task)
}
// 等待所有任务完成,可以适当增加延时以应对慢速环境
time.Sleep(500 * time.Millisecond)
executor.Stop()
var totalExecuted int32
for _, task := range mockTasks {
totalExecuted += task.ExecutedCount()
}
assert.Equal(t, int32(taskCount), totalExecuted, "所有提交的任务都应该被执行")
}
// TestExecutor_TaskExecutionError 测试任务执行失败的场景 (失败的测试)
func TestExecutor_TaskExecutionError(t *testing.T) {
// 日志记录了错误,这里我们只测试执行流程是否继续
executor := task.NewExecutor(1)
errorTask := &MockTask{
id: "errorTask",
priority: 1,
execute: func() error {
return errors.New("执行失败")
},
}
successTask := &MockTask{
id: "successTask",
priority: 2, // 后执行
}
executor.Start()
executor.SubmitTask(errorTask)
executor.SubmitTask(successTask)
// 等待任务执行
time.Sleep(300 * time.Millisecond)
executor.Stop()
assert.Equal(t, int32(1), errorTask.ExecutedCount(), "失败的任务应该被执行一次")
assert.Equal(t, int32(1), successTask.ExecutedCount(), "成功的任务也应该被执行")
}
// TestExecutor_StopWithPendingTasks 测试停止执行器时仍有待处理任务
func TestExecutor_StopWithPendingTasks(t *testing.T) {
executor := task.NewExecutor(1)
task1 := &MockTask{
id: "task1",
priority: 1,
execute: func() error {
// 模拟一个耗时任务
time.Sleep(500 * time.Millisecond)
return nil
},
}
task2 := &MockTask{id: "task2", priority: 2}
executor.Start()
executor.SubmitTask(task1)
executor.SubmitTask(task2)
// 给 task1 一点时间启动,然后停止执行器
time.Sleep(100 * time.Millisecond)
executor.Stop()
assert.Equal(t, int32(1), task1.ExecutedCount(), "task1 应该在停止前开始执行")
assert.Equal(t, int32(0), task2.ExecutedCount(), "task2 不应该被执行,因为执行器已停止")
}