Compare commits
2 Commits
6797db914a
...
4035172a4b
| Author | SHA1 | Date | |
|---|---|---|---|
| 4035172a4b | |||
| 2593097989 |
53
internal/infra/task/delay_task.go
Normal file
53
internal/infra/task/delay_task.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DelayTask 是一个用于模拟延迟的 Task 实现
|
||||
type DelayTask struct {
|
||||
id string
|
||||
duration time.Duration
|
||||
priority int
|
||||
done bool
|
||||
}
|
||||
|
||||
// NewDelayTask 创建一个新的 DelayTask 实例
|
||||
func NewDelayTask(id string, duration time.Duration, priority int) *DelayTask {
|
||||
return &DelayTask{
|
||||
id: id,
|
||||
duration: duration,
|
||||
priority: priority,
|
||||
done: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Execute 执行延迟任务,等待指定的时间
|
||||
func (d *DelayTask) Execute() error {
|
||||
fmt.Printf("任务 %s (%s): 开始延迟 %s...\n", d.id, d.GetDescription(), d.duration)
|
||||
time.Sleep(d.duration)
|
||||
fmt.Printf("任务 %s (%s): 延迟结束。\n", d.id, d.GetDescription())
|
||||
d.done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetID 获取任务ID
|
||||
func (d *DelayTask) GetID() string {
|
||||
return d.id
|
||||
}
|
||||
|
||||
// GetPriority 获取任务优先级
|
||||
func (d *DelayTask) GetPriority() int {
|
||||
return d.priority
|
||||
}
|
||||
|
||||
// IsDone 检查任务是否已完成
|
||||
func (d *DelayTask) IsDone() bool {
|
||||
return d.done
|
||||
}
|
||||
|
||||
// GetDescription 获取任务说明,根据任务ID和延迟时间生成
|
||||
func (d *DelayTask) GetDescription() string {
|
||||
return fmt.Sprintf("延迟任务,ID: %s,延迟时间: %s", d.id, d.duration)
|
||||
}
|
||||
61
internal/infra/task/delay_task_test.go
Normal file
61
internal/infra/task/delay_task_test.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package task_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/task"
|
||||
)
|
||||
|
||||
func TestNewDelayTask(t *testing.T) {
|
||||
id := "test-delay-task-1"
|
||||
duration := 100 * time.Millisecond
|
||||
priority := 1
|
||||
|
||||
dt := task.NewDelayTask(id, duration, priority)
|
||||
|
||||
if dt.GetID() != id {
|
||||
t.Errorf("期望任务ID为 %s, 实际为 %s", id, dt.GetID())
|
||||
}
|
||||
if dt.GetPriority() != priority {
|
||||
t.Errorf("期望任务优先级为 %d, 实际为 %d", priority, dt.GetPriority())
|
||||
}
|
||||
if dt.IsDone() != false {
|
||||
t.Error("任务初始状态不应为已完成")
|
||||
}
|
||||
// 动态生成的描述,需要匹配 GetDescription 的实现
|
||||
expectedDesc := fmt.Sprintf("延迟任务,ID: %s,延迟时间: %s", id, duration)
|
||||
if dt.GetDescription() != expectedDesc {
|
||||
t.Errorf("期望任务描述为 %s, 实际为 %s", expectedDesc, dt.GetDescription())
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayTaskExecute(t *testing.T) {
|
||||
id := "test-delay-task-execute"
|
||||
duration := 50 * time.Millisecond // 使用较短的延迟以加快测试速度
|
||||
priority := 1
|
||||
|
||||
dt := task.NewDelayTask(id, duration, priority)
|
||||
|
||||
if dt.IsDone() {
|
||||
t.Error("任务执行前不应为已完成状态")
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
err := dt.Execute()
|
||||
endTime := time.Now()
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Execute 方法返回错误: %v", err)
|
||||
}
|
||||
if !dt.IsDone() {
|
||||
t.Error("任务执行后应为已完成状态")
|
||||
}
|
||||
|
||||
// 验证延迟时间大致正确,允许一些误差
|
||||
elapsed := endTime.Sub(startTime)
|
||||
if elapsed < duration || elapsed > duration*2 {
|
||||
t.Errorf("期望执行时间在 %v 左右, 但实际耗时 %v", duration, elapsed)
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,3 @@
|
||||
// Package task 提供任务队列和执行框架
|
||||
// 负责管理任务队列、调度和执行各种控制任务
|
||||
package task
|
||||
|
||||
import (
|
||||
@@ -25,6 +23,9 @@ type Task interface {
|
||||
|
||||
// IsDone 检查任务是否已完成
|
||||
IsDone() bool
|
||||
|
||||
// GetDescription 获取任务说明
|
||||
GetDescription() string
|
||||
}
|
||||
|
||||
// taskItem 任务队列中的元素
|
||||
@@ -34,8 +35,8 @@ type taskItem struct {
|
||||
index int
|
||||
}
|
||||
|
||||
// TaskQueue 代表任务队列
|
||||
type TaskQueue struct {
|
||||
// Queue 代表任务队列
|
||||
type Queue struct {
|
||||
// queue 任务队列(按优先级排序)
|
||||
queue *priorityQueue
|
||||
|
||||
@@ -46,50 +47,50 @@ type TaskQueue struct {
|
||||
logger *logs.Logger
|
||||
}
|
||||
|
||||
// NewTaskQueue 创建并返回一个新的任务队列实例。
|
||||
func NewTaskQueue(logger *logs.Logger) *TaskQueue {
|
||||
// NewQueue 创建并返回一个新的任务队列实例。
|
||||
func NewQueue(logger *logs.Logger) *Queue {
|
||||
pq := make(priorityQueue, 0)
|
||||
heap.Init(&pq)
|
||||
|
||||
return &TaskQueue{
|
||||
return &Queue{
|
||||
queue: &pq,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// AddTask 向队列中添加任务
|
||||
func (tq *TaskQueue) AddTask(task Task) {
|
||||
tq.mutex.Lock()
|
||||
defer tq.mutex.Unlock()
|
||||
func (q *Queue) AddTask(task Task) {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
|
||||
item := &taskItem{
|
||||
task: task,
|
||||
priority: task.GetPriority(),
|
||||
}
|
||||
heap.Push(tq.queue, item)
|
||||
tq.logger.Infow("任务已添加到队列", "任务ID", task.GetID())
|
||||
heap.Push(q.queue, item)
|
||||
q.logger.Infow("任务已添加到队列", "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
||||
}
|
||||
|
||||
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
|
||||
func (tq *TaskQueue) GetNextTask() Task {
|
||||
tq.mutex.Lock()
|
||||
defer tq.mutex.Unlock()
|
||||
func (q *Queue) GetNextTask() Task {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
|
||||
if tq.queue.Len() == 0 {
|
||||
if q.queue.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
item := heap.Pop(tq.queue).(*taskItem)
|
||||
tq.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID())
|
||||
item := heap.Pop(q.queue).(*taskItem)
|
||||
q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID(), "任务描述", item.task.GetDescription())
|
||||
return item.task
|
||||
}
|
||||
|
||||
// GetTaskCount 获取队列中的任务数量
|
||||
func (tq *TaskQueue) GetTaskCount() int {
|
||||
tq.mutex.Lock()
|
||||
defer tq.mutex.Unlock()
|
||||
func (q *Queue) GetTaskCount() int {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
|
||||
return tq.queue.Len()
|
||||
return q.queue.Len()
|
||||
}
|
||||
|
||||
// priorityQueue 实现优先级队列
|
||||
@@ -126,8 +127,8 @@ func (pq *priorityQueue) Pop() interface{} {
|
||||
|
||||
// Executor 代表任务执行器
|
||||
type Executor struct {
|
||||
// taskQueue 任务队列
|
||||
taskQueue *TaskQueue
|
||||
// queue 任务队列
|
||||
queue *Queue
|
||||
|
||||
// workers 工作协程数量
|
||||
workers int
|
||||
@@ -150,11 +151,11 @@ func NewExecutor(workers int, logger *logs.Logger) *Executor {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &Executor{
|
||||
taskQueue: NewTaskQueue(logger), // 将 logger 传递给 TaskQueue
|
||||
workers: workers,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
logger: logger,
|
||||
queue: NewQueue(logger), // 将 logger 传递给 Queue
|
||||
workers: workers,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,8 +187,8 @@ func (e *Executor) Stop() {
|
||||
|
||||
// SubmitTask 提交任务到执行器
|
||||
func (e *Executor) SubmitTask(task Task) {
|
||||
e.taskQueue.AddTask(task)
|
||||
e.logger.Infow("任务已提交", "任务ID", task.GetID())
|
||||
e.queue.AddTask(task)
|
||||
e.logger.Infow("任务已提交", "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
||||
}
|
||||
|
||||
// worker 工作协程
|
||||
@@ -203,15 +204,15 @@ func (e *Executor) worker(id int) {
|
||||
return
|
||||
default:
|
||||
// 获取下一个任务
|
||||
task := e.taskQueue.GetNextTask()
|
||||
task := e.queue.GetNextTask()
|
||||
if task != nil {
|
||||
e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID())
|
||||
e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
||||
|
||||
// 执行任务
|
||||
if err := task.Execute(); err != nil {
|
||||
e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "错误", err)
|
||||
e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription(), "错误", err)
|
||||
} else {
|
||||
e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID())
|
||||
e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
||||
}
|
||||
} else {
|
||||
// 没有任务时短暂休眠
|
||||
|
||||
@@ -60,6 +60,10 @@ func (m *MockTask) ExecutedCount() int32 {
|
||||
return atomic.LoadInt32(&m.executed)
|
||||
}
|
||||
|
||||
func (m *MockTask) GetDescription() string {
|
||||
return "Mock Task"
|
||||
}
|
||||
|
||||
// --- 健壮等待的辅助函数 ---
|
||||
func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
|
||||
waitChan := make(chan struct{})
|
||||
@@ -78,14 +82,14 @@ func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
|
||||
|
||||
// --- 任务队列测试 (无需更改) ---
|
||||
|
||||
func TestNewTaskQueue(t *testing.T) {
|
||||
tq := task.NewTaskQueue(testLogger)
|
||||
func TestNewQueue(t *testing.T) {
|
||||
tq := task.NewQueue(testLogger)
|
||||
assert.NotNil(t, tq, "新创建的任务队列不应为 nil")
|
||||
assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空")
|
||||
}
|
||||
|
||||
func TestTaskQueue_AddTask(t *testing.T) {
|
||||
tq := task.NewTaskQueue(testLogger)
|
||||
func TestQueue_AddTask(t *testing.T) {
|
||||
tq := task.NewQueue(testLogger)
|
||||
mockTask := &MockTask{id: "task1", priority: 1}
|
||||
|
||||
tq.AddTask(mockTask)
|
||||
@@ -93,15 +97,15 @@ func TestTaskQueue_AddTask(t *testing.T) {
|
||||
}
|
||||
|
||||
// ... (其他任务队列测试保持不变)
|
||||
func TestTaskQueue_GetNextTask(t *testing.T) {
|
||||
func TestQueue_GetNextTask(t *testing.T) {
|
||||
t.Run("从空队列获取任务", func(t *testing.T) {
|
||||
tq := task.NewTaskQueue(testLogger)
|
||||
tq := task.NewQueue(testLogger)
|
||||
nextTask := tq.GetNextTask()
|
||||
assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil")
|
||||
})
|
||||
|
||||
t.Run("按优先级获取任务", func(t *testing.T) {
|
||||
tq := task.NewTaskQueue(testLogger)
|
||||
tq := task.NewQueue(testLogger)
|
||||
task1 := &MockTask{id: "task1", priority: 10}
|
||||
task2 := &MockTask{id: "task2", priority: 1} // 优先级更高
|
||||
task3 := &MockTask{id: "task3", priority: 5}
|
||||
@@ -128,8 +132,8 @@ func TestTaskQueue_GetNextTask(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestTaskQueue_Concurrency(t *testing.T) {
|
||||
tq := task.NewTaskQueue(testLogger)
|
||||
func TestQueue_Concurrency(t *testing.T) {
|
||||
tq := task.NewQueue(testLogger)
|
||||
var wg sync.WaitGroup
|
||||
taskCount := 100
|
||||
|
||||
|
||||
Reference in New Issue
Block a user