From 4035172a4b0eb782ac84ffa6d484a72f0579f9cc Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Sat, 13 Sep 2025 12:25:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=BB=B6=E6=97=B6Task?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/infra/task/delay_task.go | 53 ++++++++++++++++++++++ internal/infra/task/delay_task_test.go | 61 ++++++++++++++++++++++++++ internal/infra/task/task.go | 15 ++++--- internal/infra/task/task_test.go | 22 ++++++---- 4 files changed, 136 insertions(+), 15 deletions(-) create mode 100644 internal/infra/task/delay_task.go create mode 100644 internal/infra/task/delay_task_test.go diff --git a/internal/infra/task/delay_task.go b/internal/infra/task/delay_task.go new file mode 100644 index 0000000..0b90eeb --- /dev/null +++ b/internal/infra/task/delay_task.go @@ -0,0 +1,53 @@ +package task + +import ( + "fmt" + "time" +) + +// DelayTask 是一个用于模拟延迟的 Task 实现 +type DelayTask struct { + id string + duration time.Duration + priority int + done bool +} + +// NewDelayTask 创建一个新的 DelayTask 实例 +func NewDelayTask(id string, duration time.Duration, priority int) *DelayTask { + return &DelayTask{ + id: id, + duration: duration, + priority: priority, + done: false, + } +} + +// Execute 执行延迟任务,等待指定的时间 +func (d *DelayTask) Execute() error { + fmt.Printf("任务 %s (%s): 开始延迟 %s...\n", d.id, d.GetDescription(), d.duration) + time.Sleep(d.duration) + fmt.Printf("任务 %s (%s): 延迟结束。\n", d.id, d.GetDescription()) + d.done = true + return nil +} + +// GetID 获取任务ID +func (d *DelayTask) GetID() string { + return d.id +} + +// GetPriority 获取任务优先级 +func (d *DelayTask) GetPriority() int { + return d.priority +} + +// IsDone 检查任务是否已完成 +func (d *DelayTask) IsDone() bool { + return d.done +} + +// GetDescription 获取任务说明,根据任务ID和延迟时间生成 +func (d *DelayTask) GetDescription() string { + return fmt.Sprintf("延迟任务,ID: %s,延迟时间: %s", d.id, d.duration) +} diff --git a/internal/infra/task/delay_task_test.go b/internal/infra/task/delay_task_test.go new file mode 100644 index 0000000..bc1848a --- /dev/null +++ b/internal/infra/task/delay_task_test.go @@ -0,0 +1,61 @@ +package task_test + +import ( + "fmt" + "testing" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/task" +) + +func TestNewDelayTask(t *testing.T) { + id := "test-delay-task-1" + duration := 100 * time.Millisecond + priority := 1 + + dt := task.NewDelayTask(id, duration, priority) + + if dt.GetID() != id { + t.Errorf("期望任务ID为 %s, 实际为 %s", id, dt.GetID()) + } + if dt.GetPriority() != priority { + t.Errorf("期望任务优先级为 %d, 实际为 %d", priority, dt.GetPriority()) + } + if dt.IsDone() != false { + t.Error("任务初始状态不应为已完成") + } + // 动态生成的描述,需要匹配 GetDescription 的实现 + expectedDesc := fmt.Sprintf("延迟任务,ID: %s,延迟时间: %s", id, duration) + if dt.GetDescription() != expectedDesc { + t.Errorf("期望任务描述为 %s, 实际为 %s", expectedDesc, dt.GetDescription()) + } +} + +func TestDelayTaskExecute(t *testing.T) { + id := "test-delay-task-execute" + duration := 50 * time.Millisecond // 使用较短的延迟以加快测试速度 + priority := 1 + + dt := task.NewDelayTask(id, duration, priority) + + if dt.IsDone() { + t.Error("任务执行前不应为已完成状态") + } + + startTime := time.Now() + err := dt.Execute() + endTime := time.Now() + + if err != nil { + t.Errorf("Execute 方法返回错误: %v", err) + } + if !dt.IsDone() { + t.Error("任务执行后应为已完成状态") + } + + // 验证延迟时间大致正确,允许一些误差 + elapsed := endTime.Sub(startTime) + if elapsed < duration || elapsed > duration*2 { + t.Errorf("期望执行时间在 %v 左右, 但实际耗时 %v", duration, elapsed) + } +} diff --git a/internal/infra/task/task.go b/internal/infra/task/task.go index 1128e40..9ae418a 100644 --- a/internal/infra/task/task.go +++ b/internal/infra/task/task.go @@ -23,6 +23,9 @@ type Task interface { // IsDone 检查任务是否已完成 IsDone() bool + + // GetDescription 获取任务说明 + GetDescription() string } // taskItem 任务队列中的元素 @@ -65,7 +68,7 @@ func (q *Queue) AddTask(task Task) { priority: task.GetPriority(), } heap.Push(q.queue, item) - q.logger.Infow("任务已添加到队列", "任务ID", task.GetID()) + q.logger.Infow("任务已添加到队列", "任务ID", task.GetID(), "任务描述", task.GetDescription()) } // GetNextTask 获取下一个要执行的任务(优先级最高的任务) @@ -78,7 +81,7 @@ func (q *Queue) GetNextTask() Task { } item := heap.Pop(q.queue).(*taskItem) - q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID()) + q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID(), "任务描述", item.task.GetDescription()) return item.task } @@ -185,7 +188,7 @@ func (e *Executor) Stop() { // SubmitTask 提交任务到执行器 func (e *Executor) SubmitTask(task Task) { e.queue.AddTask(task) - e.logger.Infow("任务已提交", "任务ID", task.GetID()) + e.logger.Infow("任务已提交", "任务ID", task.GetID(), "任务描述", task.GetDescription()) } // worker 工作协程 @@ -203,13 +206,13 @@ func (e *Executor) worker(id int) { // 获取下一个任务 task := e.queue.GetNextTask() if task != nil { - e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID()) + e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription()) // 执行任务 if err := task.Execute(); err != nil { - e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "错误", err) + e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription(), "错误", err) } else { - e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID()) + e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription()) } } else { // 没有任务时短暂休眠 diff --git a/internal/infra/task/task_test.go b/internal/infra/task/task_test.go index e7518a2..52fb78d 100644 --- a/internal/infra/task/task_test.go +++ b/internal/infra/task/task_test.go @@ -60,6 +60,10 @@ func (m *MockTask) ExecutedCount() int32 { return atomic.LoadInt32(&m.executed) } +func (m *MockTask) GetDescription() string { + return "Mock Task" +} + // --- 健壮等待的辅助函数 --- func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { waitChan := make(chan struct{}) @@ -78,14 +82,14 @@ func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { // --- 任务队列测试 (无需更改) --- -func TestNewTaskQueue(t *testing.T) { - tq := task.NewTaskQueue(testLogger) +func TestNewQueue(t *testing.T) { + tq := task.NewQueue(testLogger) assert.NotNil(t, tq, "新创建的任务队列不应为 nil") assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空") } -func TestTaskQueue_AddTask(t *testing.T) { - tq := task.NewTaskQueue(testLogger) +func TestQueue_AddTask(t *testing.T) { + tq := task.NewQueue(testLogger) mockTask := &MockTask{id: "task1", priority: 1} tq.AddTask(mockTask) @@ -93,15 +97,15 @@ func TestTaskQueue_AddTask(t *testing.T) { } // ... (其他任务队列测试保持不变) -func TestTaskQueue_GetNextTask(t *testing.T) { +func TestQueue_GetNextTask(t *testing.T) { t.Run("从空队列获取任务", func(t *testing.T) { - tq := task.NewTaskQueue(testLogger) + tq := task.NewQueue(testLogger) nextTask := tq.GetNextTask() assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil") }) t.Run("按优先级获取任务", func(t *testing.T) { - tq := task.NewTaskQueue(testLogger) + tq := task.NewQueue(testLogger) task1 := &MockTask{id: "task1", priority: 10} task2 := &MockTask{id: "task2", priority: 1} // 优先级更高 task3 := &MockTask{id: "task3", priority: 5} @@ -128,8 +132,8 @@ func TestTaskQueue_GetNextTask(t *testing.T) { }) } -func TestTaskQueue_Concurrency(t *testing.T) { - tq := task.NewTaskQueue(testLogger) +func TestQueue_Concurrency(t *testing.T) { + tq := task.NewQueue(testLogger) var wg sync.WaitGroup taskCount := 100