Compare commits
	
		
			2 Commits
		
	
	
		
			6797db914a
			...
			4035172a4b
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 4035172a4b | |||
| 2593097989 | 
							
								
								
									
										53
									
								
								internal/infra/task/delay_task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								internal/infra/task/delay_task.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,53 @@
 | 
			
		||||
package task
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// DelayTask 是一个用于模拟延迟的 Task 实现
 | 
			
		||||
type DelayTask struct {
 | 
			
		||||
	id       string
 | 
			
		||||
	duration time.Duration
 | 
			
		||||
	priority int
 | 
			
		||||
	done     bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewDelayTask 创建一个新的 DelayTask 实例
 | 
			
		||||
func NewDelayTask(id string, duration time.Duration, priority int) *DelayTask {
 | 
			
		||||
	return &DelayTask{
 | 
			
		||||
		id:       id,
 | 
			
		||||
		duration: duration,
 | 
			
		||||
		priority: priority,
 | 
			
		||||
		done:     false,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Execute 执行延迟任务,等待指定的时间
 | 
			
		||||
func (d *DelayTask) Execute() error {
 | 
			
		||||
	fmt.Printf("任务 %s (%s): 开始延迟 %s...\n", d.id, d.GetDescription(), d.duration)
 | 
			
		||||
	time.Sleep(d.duration)
 | 
			
		||||
	fmt.Printf("任务 %s (%s): 延迟结束。\n", d.id, d.GetDescription())
 | 
			
		||||
	d.done = true
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetID 获取任务ID
 | 
			
		||||
func (d *DelayTask) GetID() string {
 | 
			
		||||
	return d.id
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetPriority 获取任务优先级
 | 
			
		||||
func (d *DelayTask) GetPriority() int {
 | 
			
		||||
	return d.priority
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// IsDone 检查任务是否已完成
 | 
			
		||||
func (d *DelayTask) IsDone() bool {
 | 
			
		||||
	return d.done
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetDescription 获取任务说明,根据任务ID和延迟时间生成
 | 
			
		||||
func (d *DelayTask) GetDescription() string {
 | 
			
		||||
	return fmt.Sprintf("延迟任务,ID: %s,延迟时间: %s", d.id, d.duration)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										61
									
								
								internal/infra/task/delay_task_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										61
									
								
								internal/infra/task/delay_task_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,61 @@
 | 
			
		||||
package task_test
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/task"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestNewDelayTask(t *testing.T) {
 | 
			
		||||
	id := "test-delay-task-1"
 | 
			
		||||
	duration := 100 * time.Millisecond
 | 
			
		||||
	priority := 1
 | 
			
		||||
 | 
			
		||||
	dt := task.NewDelayTask(id, duration, priority)
 | 
			
		||||
 | 
			
		||||
	if dt.GetID() != id {
 | 
			
		||||
		t.Errorf("期望任务ID为 %s, 实际为 %s", id, dt.GetID())
 | 
			
		||||
	}
 | 
			
		||||
	if dt.GetPriority() != priority {
 | 
			
		||||
		t.Errorf("期望任务优先级为 %d, 实际为 %d", priority, dt.GetPriority())
 | 
			
		||||
	}
 | 
			
		||||
	if dt.IsDone() != false {
 | 
			
		||||
		t.Error("任务初始状态不应为已完成")
 | 
			
		||||
	}
 | 
			
		||||
	// 动态生成的描述,需要匹配 GetDescription 的实现
 | 
			
		||||
	expectedDesc := fmt.Sprintf("延迟任务,ID: %s,延迟时间: %s", id, duration)
 | 
			
		||||
	if dt.GetDescription() != expectedDesc {
 | 
			
		||||
		t.Errorf("期望任务描述为 %s, 实际为 %s", expectedDesc, dt.GetDescription())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestDelayTaskExecute(t *testing.T) {
 | 
			
		||||
	id := "test-delay-task-execute"
 | 
			
		||||
	duration := 50 * time.Millisecond // 使用较短的延迟以加快测试速度
 | 
			
		||||
	priority := 1
 | 
			
		||||
 | 
			
		||||
	dt := task.NewDelayTask(id, duration, priority)
 | 
			
		||||
 | 
			
		||||
	if dt.IsDone() {
 | 
			
		||||
		t.Error("任务执行前不应为已完成状态")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	startTime := time.Now()
 | 
			
		||||
	err := dt.Execute()
 | 
			
		||||
	endTime := time.Now()
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("Execute 方法返回错误: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if !dt.IsDone() {
 | 
			
		||||
		t.Error("任务执行后应为已完成状态")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 验证延迟时间大致正确,允许一些误差
 | 
			
		||||
	elapsed := endTime.Sub(startTime)
 | 
			
		||||
	if elapsed < duration || elapsed > duration*2 {
 | 
			
		||||
		t.Errorf("期望执行时间在 %v 左右, 但实际耗时 %v", duration, elapsed)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -1,5 +1,3 @@
 | 
			
		||||
// Package task 提供任务队列和执行框架
 | 
			
		||||
// 负责管理任务队列、调度和执行各种控制任务
 | 
			
		||||
package task
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
@@ -25,6 +23,9 @@ type Task interface {
 | 
			
		||||
 | 
			
		||||
	// IsDone 检查任务是否已完成
 | 
			
		||||
	IsDone() bool
 | 
			
		||||
 | 
			
		||||
	// GetDescription 获取任务说明
 | 
			
		||||
	GetDescription() string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// taskItem 任务队列中的元素
 | 
			
		||||
@@ -34,8 +35,8 @@ type taskItem struct {
 | 
			
		||||
	index    int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TaskQueue 代表任务队列
 | 
			
		||||
type TaskQueue struct {
 | 
			
		||||
// Queue 代表任务队列
 | 
			
		||||
type Queue struct {
 | 
			
		||||
	// queue 任务队列(按优先级排序)
 | 
			
		||||
	queue *priorityQueue
 | 
			
		||||
 | 
			
		||||
@@ -46,50 +47,50 @@ type TaskQueue struct {
 | 
			
		||||
	logger *logs.Logger
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewTaskQueue 创建并返回一个新的任务队列实例。
 | 
			
		||||
func NewTaskQueue(logger *logs.Logger) *TaskQueue {
 | 
			
		||||
// NewQueue 创建并返回一个新的任务队列实例。
 | 
			
		||||
func NewQueue(logger *logs.Logger) *Queue {
 | 
			
		||||
	pq := make(priorityQueue, 0)
 | 
			
		||||
	heap.Init(&pq)
 | 
			
		||||
 | 
			
		||||
	return &TaskQueue{
 | 
			
		||||
	return &Queue{
 | 
			
		||||
		queue:  &pq,
 | 
			
		||||
		logger: logger,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddTask 向队列中添加任务
 | 
			
		||||
func (tq *TaskQueue) AddTask(task Task) {
 | 
			
		||||
	tq.mutex.Lock()
 | 
			
		||||
	defer tq.mutex.Unlock()
 | 
			
		||||
func (q *Queue) AddTask(task Task) {
 | 
			
		||||
	q.mutex.Lock()
 | 
			
		||||
	defer q.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	item := &taskItem{
 | 
			
		||||
		task:     task,
 | 
			
		||||
		priority: task.GetPriority(),
 | 
			
		||||
	}
 | 
			
		||||
	heap.Push(tq.queue, item)
 | 
			
		||||
	tq.logger.Infow("任务已添加到队列", "任务ID", task.GetID())
 | 
			
		||||
	heap.Push(q.queue, item)
 | 
			
		||||
	q.logger.Infow("任务已添加到队列", "任务ID", task.GetID(), "任务描述", task.GetDescription())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
 | 
			
		||||
func (tq *TaskQueue) GetNextTask() Task {
 | 
			
		||||
	tq.mutex.Lock()
 | 
			
		||||
	defer tq.mutex.Unlock()
 | 
			
		||||
func (q *Queue) GetNextTask() Task {
 | 
			
		||||
	q.mutex.Lock()
 | 
			
		||||
	defer q.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	if tq.queue.Len() == 0 {
 | 
			
		||||
	if q.queue.Len() == 0 {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	item := heap.Pop(tq.queue).(*taskItem)
 | 
			
		||||
	tq.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID())
 | 
			
		||||
	item := heap.Pop(q.queue).(*taskItem)
 | 
			
		||||
	q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID(), "任务描述", item.task.GetDescription())
 | 
			
		||||
	return item.task
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetTaskCount 获取队列中的任务数量
 | 
			
		||||
func (tq *TaskQueue) GetTaskCount() int {
 | 
			
		||||
	tq.mutex.Lock()
 | 
			
		||||
	defer tq.mutex.Unlock()
 | 
			
		||||
func (q *Queue) GetTaskCount() int {
 | 
			
		||||
	q.mutex.Lock()
 | 
			
		||||
	defer q.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	return tq.queue.Len()
 | 
			
		||||
	return q.queue.Len()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// priorityQueue 实现优先级队列
 | 
			
		||||
@@ -126,8 +127,8 @@ func (pq *priorityQueue) Pop() interface{} {
 | 
			
		||||
 | 
			
		||||
// Executor 代表任务执行器
 | 
			
		||||
type Executor struct {
 | 
			
		||||
	// taskQueue 任务队列
 | 
			
		||||
	taskQueue *TaskQueue
 | 
			
		||||
	// queue 任务队列
 | 
			
		||||
	queue *Queue
 | 
			
		||||
 | 
			
		||||
	// workers 工作协程数量
 | 
			
		||||
	workers int
 | 
			
		||||
@@ -150,7 +151,7 @@ func NewExecutor(workers int, logger *logs.Logger) *Executor {
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
 | 
			
		||||
	return &Executor{
 | 
			
		||||
		taskQueue: NewTaskQueue(logger), // 将 logger 传递给 TaskQueue
 | 
			
		||||
		queue:   NewQueue(logger), // 将 logger 传递给 Queue
 | 
			
		||||
		workers: workers,
 | 
			
		||||
		ctx:     ctx,
 | 
			
		||||
		cancel:  cancel,
 | 
			
		||||
@@ -186,8 +187,8 @@ func (e *Executor) Stop() {
 | 
			
		||||
 | 
			
		||||
// SubmitTask 提交任务到执行器
 | 
			
		||||
func (e *Executor) SubmitTask(task Task) {
 | 
			
		||||
	e.taskQueue.AddTask(task)
 | 
			
		||||
	e.logger.Infow("任务已提交", "任务ID", task.GetID())
 | 
			
		||||
	e.queue.AddTask(task)
 | 
			
		||||
	e.logger.Infow("任务已提交", "任务ID", task.GetID(), "任务描述", task.GetDescription())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// worker 工作协程
 | 
			
		||||
@@ -203,15 +204,15 @@ func (e *Executor) worker(id int) {
 | 
			
		||||
			return
 | 
			
		||||
		default:
 | 
			
		||||
			// 获取下一个任务
 | 
			
		||||
			task := e.taskQueue.GetNextTask()
 | 
			
		||||
			task := e.queue.GetNextTask()
 | 
			
		||||
			if task != nil {
 | 
			
		||||
				e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID())
 | 
			
		||||
				e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
 | 
			
		||||
 | 
			
		||||
				// 执行任务
 | 
			
		||||
				if err := task.Execute(); err != nil {
 | 
			
		||||
					e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "错误", err)
 | 
			
		||||
					e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription(), "错误", err)
 | 
			
		||||
				} else {
 | 
			
		||||
					e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID())
 | 
			
		||||
					e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				// 没有任务时短暂休眠
 | 
			
		||||
 
 | 
			
		||||
@@ -60,6 +60,10 @@ func (m *MockTask) ExecutedCount() int32 {
 | 
			
		||||
	return atomic.LoadInt32(&m.executed)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *MockTask) GetDescription() string {
 | 
			
		||||
	return "Mock Task"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// --- 健壮等待的辅助函数 ---
 | 
			
		||||
func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
 | 
			
		||||
	waitChan := make(chan struct{})
 | 
			
		||||
@@ -78,14 +82,14 @@ func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
 | 
			
		||||
 | 
			
		||||
// --- 任务队列测试 (无需更改) ---
 | 
			
		||||
 | 
			
		||||
func TestNewTaskQueue(t *testing.T) {
 | 
			
		||||
	tq := task.NewTaskQueue(testLogger)
 | 
			
		||||
func TestNewQueue(t *testing.T) {
 | 
			
		||||
	tq := task.NewQueue(testLogger)
 | 
			
		||||
	assert.NotNil(t, tq, "新创建的任务队列不应为 nil")
 | 
			
		||||
	assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestTaskQueue_AddTask(t *testing.T) {
 | 
			
		||||
	tq := task.NewTaskQueue(testLogger)
 | 
			
		||||
func TestQueue_AddTask(t *testing.T) {
 | 
			
		||||
	tq := task.NewQueue(testLogger)
 | 
			
		||||
	mockTask := &MockTask{id: "task1", priority: 1}
 | 
			
		||||
 | 
			
		||||
	tq.AddTask(mockTask)
 | 
			
		||||
@@ -93,15 +97,15 @@ func TestTaskQueue_AddTask(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ... (其他任务队列测试保持不变)
 | 
			
		||||
func TestTaskQueue_GetNextTask(t *testing.T) {
 | 
			
		||||
func TestQueue_GetNextTask(t *testing.T) {
 | 
			
		||||
	t.Run("从空队列获取任务", func(t *testing.T) {
 | 
			
		||||
		tq := task.NewTaskQueue(testLogger)
 | 
			
		||||
		tq := task.NewQueue(testLogger)
 | 
			
		||||
		nextTask := tq.GetNextTask()
 | 
			
		||||
		assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil")
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	t.Run("按优先级获取任务", func(t *testing.T) {
 | 
			
		||||
		tq := task.NewTaskQueue(testLogger)
 | 
			
		||||
		tq := task.NewQueue(testLogger)
 | 
			
		||||
		task1 := &MockTask{id: "task1", priority: 10}
 | 
			
		||||
		task2 := &MockTask{id: "task2", priority: 1} // 优先级更高
 | 
			
		||||
		task3 := &MockTask{id: "task3", priority: 5}
 | 
			
		||||
@@ -128,8 +132,8 @@ func TestTaskQueue_GetNextTask(t *testing.T) {
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestTaskQueue_Concurrency(t *testing.T) {
 | 
			
		||||
	tq := task.NewTaskQueue(testLogger)
 | 
			
		||||
func TestQueue_Concurrency(t *testing.T) {
 | 
			
		||||
	tq := task.NewQueue(testLogger)
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	taskCount := 100
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user