增加延时Task

This commit is contained in:
2025-09-13 12:25:27 +08:00
parent 2593097989
commit 4035172a4b
4 changed files with 136 additions and 15 deletions

View File

@@ -0,0 +1,53 @@
package task
import (
"fmt"
"time"
)
// DelayTask 是一个用于模拟延迟的 Task 实现
type DelayTask struct {
id string
duration time.Duration
priority int
done bool
}
// NewDelayTask 创建一个新的 DelayTask 实例
func NewDelayTask(id string, duration time.Duration, priority int) *DelayTask {
return &DelayTask{
id: id,
duration: duration,
priority: priority,
done: false,
}
}
// Execute 执行延迟任务,等待指定的时间
func (d *DelayTask) Execute() error {
fmt.Printf("任务 %s (%s): 开始延迟 %s...\n", d.id, d.GetDescription(), d.duration)
time.Sleep(d.duration)
fmt.Printf("任务 %s (%s): 延迟结束。\n", d.id, d.GetDescription())
d.done = true
return nil
}
// GetID 获取任务ID
func (d *DelayTask) GetID() string {
return d.id
}
// GetPriority 获取任务优先级
func (d *DelayTask) GetPriority() int {
return d.priority
}
// IsDone 检查任务是否已完成
func (d *DelayTask) IsDone() bool {
return d.done
}
// GetDescription 获取任务说明根据任务ID和延迟时间生成
func (d *DelayTask) GetDescription() string {
return fmt.Sprintf("延迟任务ID: %s延迟时间: %s", d.id, d.duration)
}

View File

@@ -0,0 +1,61 @@
package task_test
import (
"fmt"
"testing"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/task"
)
func TestNewDelayTask(t *testing.T) {
id := "test-delay-task-1"
duration := 100 * time.Millisecond
priority := 1
dt := task.NewDelayTask(id, duration, priority)
if dt.GetID() != id {
t.Errorf("期望任务ID为 %s, 实际为 %s", id, dt.GetID())
}
if dt.GetPriority() != priority {
t.Errorf("期望任务优先级为 %d, 实际为 %d", priority, dt.GetPriority())
}
if dt.IsDone() != false {
t.Error("任务初始状态不应为已完成")
}
// 动态生成的描述,需要匹配 GetDescription 的实现
expectedDesc := fmt.Sprintf("延迟任务ID: %s延迟时间: %s", id, duration)
if dt.GetDescription() != expectedDesc {
t.Errorf("期望任务描述为 %s, 实际为 %s", expectedDesc, dt.GetDescription())
}
}
func TestDelayTaskExecute(t *testing.T) {
id := "test-delay-task-execute"
duration := 50 * time.Millisecond // 使用较短的延迟以加快测试速度
priority := 1
dt := task.NewDelayTask(id, duration, priority)
if dt.IsDone() {
t.Error("任务执行前不应为已完成状态")
}
startTime := time.Now()
err := dt.Execute()
endTime := time.Now()
if err != nil {
t.Errorf("Execute 方法返回错误: %v", err)
}
if !dt.IsDone() {
t.Error("任务执行后应为已完成状态")
}
// 验证延迟时间大致正确,允许一些误差
elapsed := endTime.Sub(startTime)
if elapsed < duration || elapsed > duration*2 {
t.Errorf("期望执行时间在 %v 左右, 但实际耗时 %v", duration, elapsed)
}
}

View File

@@ -23,6 +23,9 @@ type Task interface {
// IsDone 检查任务是否已完成
IsDone() bool
// GetDescription 获取任务说明
GetDescription() string
}
// taskItem 任务队列中的元素
@@ -65,7 +68,7 @@ func (q *Queue) AddTask(task Task) {
priority: task.GetPriority(),
}
heap.Push(q.queue, item)
q.logger.Infow("任务已添加到队列", "任务ID", task.GetID())
q.logger.Infow("任务已添加到队列", "任务ID", task.GetID(), "任务描述", task.GetDescription())
}
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
@@ -78,7 +81,7 @@ func (q *Queue) GetNextTask() Task {
}
item := heap.Pop(q.queue).(*taskItem)
q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID())
q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID(), "任务描述", item.task.GetDescription())
return item.task
}
@@ -185,7 +188,7 @@ func (e *Executor) Stop() {
// SubmitTask 提交任务到执行器
func (e *Executor) SubmitTask(task Task) {
e.queue.AddTask(task)
e.logger.Infow("任务已提交", "任务ID", task.GetID())
e.logger.Infow("任务已提交", "任务ID", task.GetID(), "任务描述", task.GetDescription())
}
// worker 工作协程
@@ -203,13 +206,13 @@ func (e *Executor) worker(id int) {
// 获取下一个任务
task := e.queue.GetNextTask()
if task != nil {
e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID())
e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
// 执行任务
if err := task.Execute(); err != nil {
e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "错误", err)
e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription(), "错误", err)
} else {
e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID())
e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
}
} else {
// 没有任务时短暂休眠

View File

@@ -60,6 +60,10 @@ func (m *MockTask) ExecutedCount() int32 {
return atomic.LoadInt32(&m.executed)
}
func (m *MockTask) GetDescription() string {
return "Mock Task"
}
// --- 健壮等待的辅助函数 ---
func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
waitChan := make(chan struct{})
@@ -78,14 +82,14 @@ func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
// --- 任务队列测试 (无需更改) ---
func TestNewTaskQueue(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
func TestNewQueue(t *testing.T) {
tq := task.NewQueue(testLogger)
assert.NotNil(t, tq, "新创建的任务队列不应为 nil")
assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空")
}
func TestTaskQueue_AddTask(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
func TestQueue_AddTask(t *testing.T) {
tq := task.NewQueue(testLogger)
mockTask := &MockTask{id: "task1", priority: 1}
tq.AddTask(mockTask)
@@ -93,15 +97,15 @@ func TestTaskQueue_AddTask(t *testing.T) {
}
// ... (其他任务队列测试保持不变)
func TestTaskQueue_GetNextTask(t *testing.T) {
func TestQueue_GetNextTask(t *testing.T) {
t.Run("从空队列获取任务", func(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
tq := task.NewQueue(testLogger)
nextTask := tq.GetNextTask()
assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil")
})
t.Run("按优先级获取任务", func(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
tq := task.NewQueue(testLogger)
task1 := &MockTask{id: "task1", priority: 10}
task2 := &MockTask{id: "task2", priority: 1} // 优先级更高
task3 := &MockTask{id: "task3", priority: 5}
@@ -128,8 +132,8 @@ func TestTaskQueue_GetNextTask(t *testing.T) {
})
}
func TestTaskQueue_Concurrency(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
func TestQueue_Concurrency(t *testing.T) {
tq := task.NewQueue(testLogger)
var wg sync.WaitGroup
taskCount := 100