Compare commits
2 Commits
6797db914a
...
4035172a4b
| Author | SHA1 | Date | |
|---|---|---|---|
| 4035172a4b | |||
| 2593097989 |
53
internal/infra/task/delay_task.go
Normal file
53
internal/infra/task/delay_task.go
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
package task
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DelayTask 是一个用于模拟延迟的 Task 实现
|
||||||
|
type DelayTask struct {
|
||||||
|
id string
|
||||||
|
duration time.Duration
|
||||||
|
priority int
|
||||||
|
done bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDelayTask 创建一个新的 DelayTask 实例
|
||||||
|
func NewDelayTask(id string, duration time.Duration, priority int) *DelayTask {
|
||||||
|
return &DelayTask{
|
||||||
|
id: id,
|
||||||
|
duration: duration,
|
||||||
|
priority: priority,
|
||||||
|
done: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute 执行延迟任务,等待指定的时间
|
||||||
|
func (d *DelayTask) Execute() error {
|
||||||
|
fmt.Printf("任务 %s (%s): 开始延迟 %s...\n", d.id, d.GetDescription(), d.duration)
|
||||||
|
time.Sleep(d.duration)
|
||||||
|
fmt.Printf("任务 %s (%s): 延迟结束。\n", d.id, d.GetDescription())
|
||||||
|
d.done = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetID 获取任务ID
|
||||||
|
func (d *DelayTask) GetID() string {
|
||||||
|
return d.id
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPriority 获取任务优先级
|
||||||
|
func (d *DelayTask) GetPriority() int {
|
||||||
|
return d.priority
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsDone 检查任务是否已完成
|
||||||
|
func (d *DelayTask) IsDone() bool {
|
||||||
|
return d.done
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDescription 获取任务说明,根据任务ID和延迟时间生成
|
||||||
|
func (d *DelayTask) GetDescription() string {
|
||||||
|
return fmt.Sprintf("延迟任务,ID: %s,延迟时间: %s", d.id, d.duration)
|
||||||
|
}
|
||||||
61
internal/infra/task/delay_task_test.go
Normal file
61
internal/infra/task/delay_task_test.go
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
package task_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/task"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewDelayTask(t *testing.T) {
|
||||||
|
id := "test-delay-task-1"
|
||||||
|
duration := 100 * time.Millisecond
|
||||||
|
priority := 1
|
||||||
|
|
||||||
|
dt := task.NewDelayTask(id, duration, priority)
|
||||||
|
|
||||||
|
if dt.GetID() != id {
|
||||||
|
t.Errorf("期望任务ID为 %s, 实际为 %s", id, dt.GetID())
|
||||||
|
}
|
||||||
|
if dt.GetPriority() != priority {
|
||||||
|
t.Errorf("期望任务优先级为 %d, 实际为 %d", priority, dt.GetPriority())
|
||||||
|
}
|
||||||
|
if dt.IsDone() != false {
|
||||||
|
t.Error("任务初始状态不应为已完成")
|
||||||
|
}
|
||||||
|
// 动态生成的描述,需要匹配 GetDescription 的实现
|
||||||
|
expectedDesc := fmt.Sprintf("延迟任务,ID: %s,延迟时间: %s", id, duration)
|
||||||
|
if dt.GetDescription() != expectedDesc {
|
||||||
|
t.Errorf("期望任务描述为 %s, 实际为 %s", expectedDesc, dt.GetDescription())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDelayTaskExecute(t *testing.T) {
|
||||||
|
id := "test-delay-task-execute"
|
||||||
|
duration := 50 * time.Millisecond // 使用较短的延迟以加快测试速度
|
||||||
|
priority := 1
|
||||||
|
|
||||||
|
dt := task.NewDelayTask(id, duration, priority)
|
||||||
|
|
||||||
|
if dt.IsDone() {
|
||||||
|
t.Error("任务执行前不应为已完成状态")
|
||||||
|
}
|
||||||
|
|
||||||
|
startTime := time.Now()
|
||||||
|
err := dt.Execute()
|
||||||
|
endTime := time.Now()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Execute 方法返回错误: %v", err)
|
||||||
|
}
|
||||||
|
if !dt.IsDone() {
|
||||||
|
t.Error("任务执行后应为已完成状态")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 验证延迟时间大致正确,允许一些误差
|
||||||
|
elapsed := endTime.Sub(startTime)
|
||||||
|
if elapsed < duration || elapsed > duration*2 {
|
||||||
|
t.Errorf("期望执行时间在 %v 左右, 但实际耗时 %v", duration, elapsed)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,3 @@
|
|||||||
// Package task 提供任务队列和执行框架
|
|
||||||
// 负责管理任务队列、调度和执行各种控制任务
|
|
||||||
package task
|
package task
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -25,6 +23,9 @@ type Task interface {
|
|||||||
|
|
||||||
// IsDone 检查任务是否已完成
|
// IsDone 检查任务是否已完成
|
||||||
IsDone() bool
|
IsDone() bool
|
||||||
|
|
||||||
|
// GetDescription 获取任务说明
|
||||||
|
GetDescription() string
|
||||||
}
|
}
|
||||||
|
|
||||||
// taskItem 任务队列中的元素
|
// taskItem 任务队列中的元素
|
||||||
@@ -34,8 +35,8 @@ type taskItem struct {
|
|||||||
index int
|
index int
|
||||||
}
|
}
|
||||||
|
|
||||||
// TaskQueue 代表任务队列
|
// Queue 代表任务队列
|
||||||
type TaskQueue struct {
|
type Queue struct {
|
||||||
// queue 任务队列(按优先级排序)
|
// queue 任务队列(按优先级排序)
|
||||||
queue *priorityQueue
|
queue *priorityQueue
|
||||||
|
|
||||||
@@ -46,50 +47,50 @@ type TaskQueue struct {
|
|||||||
logger *logs.Logger
|
logger *logs.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTaskQueue 创建并返回一个新的任务队列实例。
|
// NewQueue 创建并返回一个新的任务队列实例。
|
||||||
func NewTaskQueue(logger *logs.Logger) *TaskQueue {
|
func NewQueue(logger *logs.Logger) *Queue {
|
||||||
pq := make(priorityQueue, 0)
|
pq := make(priorityQueue, 0)
|
||||||
heap.Init(&pq)
|
heap.Init(&pq)
|
||||||
|
|
||||||
return &TaskQueue{
|
return &Queue{
|
||||||
queue: &pq,
|
queue: &pq,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddTask 向队列中添加任务
|
// AddTask 向队列中添加任务
|
||||||
func (tq *TaskQueue) AddTask(task Task) {
|
func (q *Queue) AddTask(task Task) {
|
||||||
tq.mutex.Lock()
|
q.mutex.Lock()
|
||||||
defer tq.mutex.Unlock()
|
defer q.mutex.Unlock()
|
||||||
|
|
||||||
item := &taskItem{
|
item := &taskItem{
|
||||||
task: task,
|
task: task,
|
||||||
priority: task.GetPriority(),
|
priority: task.GetPriority(),
|
||||||
}
|
}
|
||||||
heap.Push(tq.queue, item)
|
heap.Push(q.queue, item)
|
||||||
tq.logger.Infow("任务已添加到队列", "任务ID", task.GetID())
|
q.logger.Infow("任务已添加到队列", "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
|
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
|
||||||
func (tq *TaskQueue) GetNextTask() Task {
|
func (q *Queue) GetNextTask() Task {
|
||||||
tq.mutex.Lock()
|
q.mutex.Lock()
|
||||||
defer tq.mutex.Unlock()
|
defer q.mutex.Unlock()
|
||||||
|
|
||||||
if tq.queue.Len() == 0 {
|
if q.queue.Len() == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
item := heap.Pop(tq.queue).(*taskItem)
|
item := heap.Pop(q.queue).(*taskItem)
|
||||||
tq.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID())
|
q.logger.Infow("从队列中获取任务", "任务ID", item.task.GetID(), "任务描述", item.task.GetDescription())
|
||||||
return item.task
|
return item.task
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetTaskCount 获取队列中的任务数量
|
// GetTaskCount 获取队列中的任务数量
|
||||||
func (tq *TaskQueue) GetTaskCount() int {
|
func (q *Queue) GetTaskCount() int {
|
||||||
tq.mutex.Lock()
|
q.mutex.Lock()
|
||||||
defer tq.mutex.Unlock()
|
defer q.mutex.Unlock()
|
||||||
|
|
||||||
return tq.queue.Len()
|
return q.queue.Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
// priorityQueue 实现优先级队列
|
// priorityQueue 实现优先级队列
|
||||||
@@ -126,8 +127,8 @@ func (pq *priorityQueue) Pop() interface{} {
|
|||||||
|
|
||||||
// Executor 代表任务执行器
|
// Executor 代表任务执行器
|
||||||
type Executor struct {
|
type Executor struct {
|
||||||
// taskQueue 任务队列
|
// queue 任务队列
|
||||||
taskQueue *TaskQueue
|
queue *Queue
|
||||||
|
|
||||||
// workers 工作协程数量
|
// workers 工作协程数量
|
||||||
workers int
|
workers int
|
||||||
@@ -150,11 +151,11 @@ func NewExecutor(workers int, logger *logs.Logger) *Executor {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
return &Executor{
|
return &Executor{
|
||||||
taskQueue: NewTaskQueue(logger), // 将 logger 传递给 TaskQueue
|
queue: NewQueue(logger), // 将 logger 传递给 Queue
|
||||||
workers: workers,
|
workers: workers,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -186,8 +187,8 @@ func (e *Executor) Stop() {
|
|||||||
|
|
||||||
// SubmitTask 提交任务到执行器
|
// SubmitTask 提交任务到执行器
|
||||||
func (e *Executor) SubmitTask(task Task) {
|
func (e *Executor) SubmitTask(task Task) {
|
||||||
e.taskQueue.AddTask(task)
|
e.queue.AddTask(task)
|
||||||
e.logger.Infow("任务已提交", "任务ID", task.GetID())
|
e.logger.Infow("任务已提交", "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
||||||
}
|
}
|
||||||
|
|
||||||
// worker 工作协程
|
// worker 工作协程
|
||||||
@@ -203,15 +204,15 @@ func (e *Executor) worker(id int) {
|
|||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
// 获取下一个任务
|
// 获取下一个任务
|
||||||
task := e.taskQueue.GetNextTask()
|
task := e.queue.GetNextTask()
|
||||||
if task != nil {
|
if task != nil {
|
||||||
e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID())
|
e.logger.Infow("工作协程正在执行任务", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
||||||
|
|
||||||
// 执行任务
|
// 执行任务
|
||||||
if err := task.Execute(); err != nil {
|
if err := task.Execute(); err != nil {
|
||||||
e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "错误", err)
|
e.logger.Errorw("任务执行失败", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription(), "错误", err)
|
||||||
} else {
|
} else {
|
||||||
e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID())
|
e.logger.Infow("任务执行成功", "工作协程ID", id, "任务ID", task.GetID(), "任务描述", task.GetDescription())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 没有任务时短暂休眠
|
// 没有任务时短暂休眠
|
||||||
|
|||||||
@@ -60,6 +60,10 @@ func (m *MockTask) ExecutedCount() int32 {
|
|||||||
return atomic.LoadInt32(&m.executed)
|
return atomic.LoadInt32(&m.executed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MockTask) GetDescription() string {
|
||||||
|
return "Mock Task"
|
||||||
|
}
|
||||||
|
|
||||||
// --- 健壮等待的辅助函数 ---
|
// --- 健壮等待的辅助函数 ---
|
||||||
func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
|
func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
|
||||||
waitChan := make(chan struct{})
|
waitChan := make(chan struct{})
|
||||||
@@ -78,14 +82,14 @@ func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
|
|||||||
|
|
||||||
// --- 任务队列测试 (无需更改) ---
|
// --- 任务队列测试 (无需更改) ---
|
||||||
|
|
||||||
func TestNewTaskQueue(t *testing.T) {
|
func TestNewQueue(t *testing.T) {
|
||||||
tq := task.NewTaskQueue(testLogger)
|
tq := task.NewQueue(testLogger)
|
||||||
assert.NotNil(t, tq, "新创建的任务队列不应为 nil")
|
assert.NotNil(t, tq, "新创建的任务队列不应为 nil")
|
||||||
assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空")
|
assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskQueue_AddTask(t *testing.T) {
|
func TestQueue_AddTask(t *testing.T) {
|
||||||
tq := task.NewTaskQueue(testLogger)
|
tq := task.NewQueue(testLogger)
|
||||||
mockTask := &MockTask{id: "task1", priority: 1}
|
mockTask := &MockTask{id: "task1", priority: 1}
|
||||||
|
|
||||||
tq.AddTask(mockTask)
|
tq.AddTask(mockTask)
|
||||||
@@ -93,15 +97,15 @@ func TestTaskQueue_AddTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ... (其他任务队列测试保持不变)
|
// ... (其他任务队列测试保持不变)
|
||||||
func TestTaskQueue_GetNextTask(t *testing.T) {
|
func TestQueue_GetNextTask(t *testing.T) {
|
||||||
t.Run("从空队列获取任务", func(t *testing.T) {
|
t.Run("从空队列获取任务", func(t *testing.T) {
|
||||||
tq := task.NewTaskQueue(testLogger)
|
tq := task.NewQueue(testLogger)
|
||||||
nextTask := tq.GetNextTask()
|
nextTask := tq.GetNextTask()
|
||||||
assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil")
|
assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil")
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("按优先级获取任务", func(t *testing.T) {
|
t.Run("按优先级获取任务", func(t *testing.T) {
|
||||||
tq := task.NewTaskQueue(testLogger)
|
tq := task.NewQueue(testLogger)
|
||||||
task1 := &MockTask{id: "task1", priority: 10}
|
task1 := &MockTask{id: "task1", priority: 10}
|
||||||
task2 := &MockTask{id: "task2", priority: 1} // 优先级更高
|
task2 := &MockTask{id: "task2", priority: 1} // 优先级更高
|
||||||
task3 := &MockTask{id: "task3", priority: 5}
|
task3 := &MockTask{id: "task3", priority: 5}
|
||||||
@@ -128,8 +132,8 @@ func TestTaskQueue_GetNextTask(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskQueue_Concurrency(t *testing.T) {
|
func TestQueue_Concurrency(t *testing.T) {
|
||||||
tq := task.NewTaskQueue(testLogger)
|
tq := task.NewQueue(testLogger)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
taskCount := 100
|
taskCount := 100
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user