1. 调整目录结构

2. 实现user_controller
This commit is contained in:
2025-09-11 23:48:06 +08:00
parent 1aab2e850a
commit 196e63f40d
16 changed files with 298 additions and 21 deletions

222
internal/infra/task/task.go Normal file
View File

@@ -0,0 +1,222 @@
// Package task 提供任务队列和执行框架
// 负责管理任务队列、调度和执行各种控制任务
package task
import (
"container/heap"
"context"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
)
// Task 代表一个任务接口
// 所有任务都需要实现此接口
type Task interface {
// Execute 执行任务
Execute() error
// GetID 获取任务ID
GetID() string
// GetPriority 获取任务优先级
GetPriority() int
// IsDone 检查任务是否已完成
IsDone() bool
}
// taskItem 任务队列中的元素
type taskItem struct {
task Task
priority int
index int
}
// TaskQueue 代表任务队列
type TaskQueue struct {
// queue 任务队列(按优先级排序)
queue *priorityQueue
// mutex 互斥锁
mutex sync.Mutex
// logger 日志记录器
logger *logs.Logger
}
// NewTaskQueue 创建并返回一个新的任务队列实例。
func NewTaskQueue(logger *logs.Logger) *TaskQueue {
pq := make(priorityQueue, 0)
heap.Init(&pq)
return &TaskQueue{
queue: &pq,
logger: logger,
}
}
// AddTask 向队列中添加任务
func (tq *TaskQueue) AddTask(task Task) {
tq.mutex.Lock()
defer tq.mutex.Unlock()
item := &taskItem{
task: task,
priority: task.GetPriority(),
}
heap.Push(tq.queue, item)
tq.logger.Infow("任务已添加到队列", "taskID", task.GetID())
}
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
func (tq *TaskQueue) GetNextTask() Task {
tq.mutex.Lock()
defer tq.mutex.Unlock()
if tq.queue.Len() == 0 {
return nil
}
item := heap.Pop(tq.queue).(*taskItem)
tq.logger.Infow("从队列中获取任务", "taskID", item.task.GetID())
return item.task
}
// GetTaskCount 获取队列中的任务数量
func (tq *TaskQueue) GetTaskCount() int {
tq.mutex.Lock()
defer tq.mutex.Unlock()
return tq.queue.Len()
}
// priorityQueue 实现优先级队列
type priorityQueue []*taskItem
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
return pq[i].priority < pq[j].priority
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*taskItem)
item.index = n
*pq = append(*pq, item)
}
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // 避免内存泄漏
item.index = -1 // 无效索引
*pq = old[0 : n-1]
return item
}
// Executor 代表任务执行器
type Executor struct {
// taskQueue 任务队列
taskQueue *TaskQueue
// workers 工作协程数量
workers int
// ctx 执行上下文
ctx context.Context
// cancel 取消函数
cancel context.CancelFunc
// wg 等待组
wg sync.WaitGroup
// logger 日志记录器
logger *logs.Logger
}
// NewExecutor 创建并返回一个新的任务执行器实例。
func NewExecutor(workers int, logger *logs.Logger) *Executor {
ctx, cancel := context.WithCancel(context.Background())
return &Executor{
taskQueue: NewTaskQueue(logger), // 将 logger 传递给 TaskQueue
workers: workers,
ctx: ctx,
cancel: cancel,
logger: logger,
}
}
// Start 启动任务执行器
func (e *Executor) Start() {
e.logger.Infow("正在启动任务执行器", "workers", e.workers)
// 启动工作协程
for i := 0; i < e.workers; i++ {
e.wg.Add(1)
go e.worker(i)
}
e.logger.Info("任务执行器启动成功")
}
// Stop 停止任务执行器
func (e *Executor) Stop() {
e.logger.Info("正在停止任务执行器")
// 取消上下文
e.cancel()
// 等待所有工作协程结束
e.wg.Wait()
e.logger.Info("任务执行器已停止")
}
// SubmitTask 提交任务到执行器
func (e *Executor) SubmitTask(task Task) {
e.taskQueue.AddTask(task)
e.logger.Infow("任务已提交", "taskID", task.GetID())
}
// worker 工作协程
func (e *Executor) worker(id int) {
defer e.wg.Done()
e.logger.Infow("工作协程已启动", "workerID", id)
for {
select {
case <-e.ctx.Done():
e.logger.Infow("工作协程已停止", "workerID", id)
return
default:
// 获取下一个任务
task := e.taskQueue.GetNextTask()
if task != nil {
e.logger.Infow("工作协程正在执行任务", "workerID", id, "taskID", task.GetID())
// 执行任务
if err := task.Execute(); err != nil {
e.logger.Errorw("任务执行失败", "workerID", id, "taskID", task.GetID(), "error", err)
} else {
e.logger.Infow("任务执行成功", "workerID", id, "taskID", task.GetID())
}
} else {
// 没有任务时短暂休眠
time.Sleep(100 * time.Millisecond)
}
}
}
}

View File

@@ -0,0 +1,304 @@
// Package task_test 包含对 task 包的单元测试
package task_test
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/task"
"github.com/stretchr/testify/assert"
)
// testLogger 是一个用于所有测试用例的静默 logger 实例。
var testLogger *logs.Logger
func init() {
// 使用 "fatal" 级别来创建一个在测试期间不会产生任何输出的 logger。
// 这避免了在运行 `go test` 时被日志淹没。
cfg := config.LogConfig{Level: "fatal"}
testLogger = logs.NewLogger(cfg)
}
// MockTask 用于测试的模拟任务
type MockTask struct {
id string
priority int
isDone bool
execute func() error
executed int32 // 使用原子操作来跟踪执行次数
}
// Execute 实现了 Task 接口,并确保每次调用都增加执行计数
func (m *MockTask) Execute() error {
atomic.AddInt32(&m.executed, 1)
if m.execute != nil {
return m.execute()
}
return nil
}
func (m *MockTask) GetID() string {
return m.id
}
func (m *MockTask) GetPriority() int {
return m.priority
}
func (m *MockTask) IsDone() bool {
return m.isDone
}
// ExecutedCount 返回任务被执行的次数
func (m *MockTask) ExecutedCount() int32 {
return atomic.LoadInt32(&m.executed)
}
// --- Helper function for robust waiting ---
func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
waitChan := make(chan struct{})
go func() {
defer close(waitChan)
wg.Wait()
}()
select {
case <-waitChan:
// Wait succeeded
case <-time.After(timeout):
t.Fatal("timed out waiting for tasks to complete")
}
}
// --- TaskQueue Tests (No changes needed) ---
func TestNewTaskQueue(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
assert.NotNil(t, tq, "新创建的任务队列不应为 nil")
assert.Equal(t, 0, tq.GetTaskCount(), "新创建的任务队列应为空")
}
func TestTaskQueue_AddTask(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
mockTask := &MockTask{id: "task1", priority: 1}
tq.AddTask(mockTask)
assert.Equal(t, 1, tq.GetTaskCount(), "添加任务后,队列中的任务数应为 1")
}
// ... (other TaskQueue tests remain the same)
func TestTaskQueue_GetNextTask(t *testing.T) {
t.Run("从空队列获取任务", func(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
nextTask := tq.GetNextTask()
assert.Nil(t, nextTask, "从空队列中获取任务应返回 nil")
})
t.Run("按优先级获取任务", func(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
task1 := &MockTask{id: "task1", priority: 10}
task2 := &MockTask{id: "task2", priority: 1} // 优先级更高
task3 := &MockTask{id: "task3", priority: 5}
tq.AddTask(task1)
tq.AddTask(task2)
tq.AddTask(task3)
assert.Equal(t, 3, tq.GetTaskCount(), "添加三个任务后,队列中的任务数应为 3")
nextTask := tq.GetNextTask()
assert.NotNil(t, nextTask)
assert.Equal(t, "task2", nextTask.GetID(), "应首先获取优先级最高的任务 (task2)")
nextTask = tq.GetNextTask()
assert.NotNil(t, nextTask)
assert.Equal(t, "task3", nextTask.GetID(), "应获取下一个优先级最高的任务 (task3)")
nextTask = tq.GetNextTask()
assert.NotNil(t, nextTask)
assert.Equal(t, "task1", nextTask.GetID(), "应最后获取优先级最低的任务 (task1)")
assert.Equal(t, 0, tq.GetTaskCount(), "获取所有任务后,队列应为空")
})
}
func TestTaskQueue_Concurrency(t *testing.T) {
tq := task.NewTaskQueue(testLogger)
var wg sync.WaitGroup
taskCount := 100
wg.Add(taskCount)
for i := 0; i < taskCount; i++ {
go func(i int) {
defer wg.Done()
tq.AddTask(&MockTask{id: fmt.Sprintf("task-%d", i), priority: i})
}(i)
}
wg.Wait()
assert.Equal(t, taskCount, tq.GetTaskCount(), "并发添加任务后,队列中的任务数应为 %d", taskCount)
wg.Add(taskCount)
for i := 0; i < taskCount; i++ {
go func() {
defer wg.Done()
task := tq.GetNextTask()
assert.NotNil(t, task)
}()
}
wg.Wait()
assert.Equal(t, 0, tq.GetTaskCount(), "并发获取所有任务后,队列应为空")
}
// --- Executor Tests (Refactored for reliability) ---
func TestNewExecutor(t *testing.T) {
executor := task.NewExecutor(5, testLogger)
assert.NotNil(t, executor, "新创建的执行器不应为 nil")
}
func TestExecutor_StartStop(t *testing.T) {
executor := task.NewExecutor(2, testLogger)
executor.Start()
// 确保立即停止不会导致死锁或竞争条件。
executor.Stop()
}
// TestExecutor_SubmitAndExecuteTask 测试提交并执行单个任务 (已重构,更可靠)
func TestExecutor_SubmitAndExecuteTask(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
executor := task.NewExecutor(1, testLogger)
mockTask := &MockTask{
id: "task1",
priority: 1,
execute: func() error {
wg.Done() // 任务完成时通知 WaitGroup
return nil
},
}
executor.Start()
executor.SubmitTask(mockTask)
// 等待任务完成,设置一个合理的超时时间
waitForWaitGroup(t, &wg, 2*time.Second)
executor.Stop()
assert.Equal(t, int32(1), mockTask.ExecutedCount(), "任务应该已被执行")
}
// TestExecutor_ExecuteMultipleTasks 测试执行多个任务 (已重构,更可靠)
func TestExecutor_ExecuteMultipleTasks(t *testing.T) {
taskCount := 10
var wg sync.WaitGroup
wg.Add(taskCount)
executor := task.NewExecutor(3, testLogger)
mockTasks := make([]*MockTask, taskCount)
for i := 0; i < taskCount; i++ {
mockTasks[i] = &MockTask{
id: fmt.Sprintf("task-%d", i),
priority: i,
execute: func() error {
wg.Done() // 每个任务完成时都通知 WaitGroup
return nil
},
}
}
executor.Start()
for _, task := range mockTasks {
executor.SubmitTask(task)
}
// 等待所有任务完成
waitForWaitGroup(t, &wg, 2*time.Second)
executor.Stop()
var totalExecuted int32
for _, task := range mockTasks {
totalExecuted += task.ExecutedCount()
}
assert.Equal(t, int32(taskCount), totalExecuted, "所有提交的任务都应该被执行")
}
// TestExecutor_TaskExecutionError 测试任务执行失败的场景 (已重构,更可靠)
func TestExecutor_TaskExecutionError(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2) // 我们期望两个任务都被执行
executor := task.NewExecutor(1, testLogger)
errorTask := &MockTask{
id: "errorTask",
priority: 1,
execute: func() error {
wg.Done()
return errors.New("执行失败")
},
}
successTask := &MockTask{
id: "successTask",
priority: 2, // 后执行
execute: func() error {
wg.Done()
return nil
},
}
executor.Start()
executor.SubmitTask(errorTask)
executor.SubmitTask(successTask)
waitForWaitGroup(t, &wg, 2*time.Second)
executor.Stop()
assert.Equal(t, int32(1), errorTask.ExecutedCount(), "失败的任务应该被执行一次")
assert.Equal(t, int32(1), successTask.ExecutedCount(), "成功的任务也应该被执行")
}
// TestExecutor_StopWithPendingTasks 测试停止执行器时仍有待处理任务 (已重构,更可靠)
func TestExecutor_StopWithPendingTasks(t *testing.T) {
executor := task.NewExecutor(1, testLogger)
task1Started := make(chan struct{})
task1 := &MockTask{
id: "task1",
priority: 1,
execute: func() error {
close(task1Started) // 发送信号,通知测试 task1 已开始执行
time.Sleep(200 * time.Millisecond) // 模拟耗时操作
return nil
},
}
task2 := &MockTask{id: "task2", priority: 2}
executor.Start()
executor.SubmitTask(task1)
executor.SubmitTask(task2)
// 等待 task1 开始执行的信号,而不是依赖不确定的 sleep
select {
case <-task1Started:
// task1 已开始,可以安全地停止执行器了
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for task1 to start")
}
executor.Stop()
assert.Equal(t, int32(1), task1.ExecutedCount(), "task1 应该在停止前开始执行")
assert.Equal(t, int32(0), task2.ExecutedCount(), "task2 不应该被执行,因为执行器已停止")
}