task.go 改名
This commit is contained in:
		| @@ -37,11 +37,6 @@ func (d *DelayTask) GetID() string { | |||||||
| 	return d.id | 	return d.id | ||||||
| } | } | ||||||
|  |  | ||||||
| // GetPriority 获取任务优先级 |  | ||||||
| func (d *DelayTask) GetPriority() int { |  | ||||||
| 	return d.priority |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // IsDone 检查任务是否已完成 | // IsDone 检查任务是否已完成 | ||||||
| func (d *DelayTask) IsDone() bool { | func (d *DelayTask) IsDone() bool { | ||||||
| 	return d.done | 	return d.done | ||||||
|   | |||||||
							
								
								
									
										209
									
								
								internal/infra/task/scheduler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										209
									
								
								internal/infra/task/scheduler.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,209 @@ | |||||||
|  | package task | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"errors" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||||
|  | 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||||
|  | 	"github.com/panjf2000/ants/v2" | ||||||
|  | 	"gorm.io/gorm" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Logger 定义了调度器期望的日志接口,方便替换为项目中的日志组件 | ||||||
|  | type Logger interface { | ||||||
|  | 	Printf(format string, v ...interface{}) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // ProgressTracker 在内存中跟踪计划的执行状态,包括进度和执行锁 | ||||||
|  | type ProgressTracker struct { | ||||||
|  | 	mu             sync.Mutex | ||||||
|  | 	cond           *sync.Cond    // 用于实现阻塞锁 | ||||||
|  | 	totalTasks     map[uint]int  // key: planExecutionLogID, value: total tasks | ||||||
|  | 	completedTasks map[uint]int  // key: planExecutionLogID, value: completed tasks | ||||||
|  | 	runningPlans   map[uint]bool // key: planExecutionLogID, value: true (用作内存锁) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewProgressTracker 创建一个新的进度跟踪器 | ||||||
|  | func NewProgressTracker() *ProgressTracker { | ||||||
|  | 	t := &ProgressTracker{ | ||||||
|  | 		totalTasks:     make(map[uint]int), | ||||||
|  | 		completedTasks: make(map[uint]int), | ||||||
|  | 		runningPlans:   make(map[uint]bool), | ||||||
|  | 	} | ||||||
|  | 	t.cond = sync.NewCond(&t.mu) | ||||||
|  | 	return t | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。 | ||||||
|  | func (t *ProgressTracker) TryLock(planLogID uint) bool { | ||||||
|  | 	t.mu.Lock() | ||||||
|  | 	defer t.mu.Unlock() | ||||||
|  | 	if t.runningPlans[planLogID] { | ||||||
|  | 		return false // 已被锁定 | ||||||
|  | 	} | ||||||
|  | 	t.runningPlans[planLogID] = true | ||||||
|  | 	return true | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Lock (阻塞) 获取一个计划的执行锁。如果锁已被占用,则会一直等待直到锁被释放。 | ||||||
|  | func (t *ProgressTracker) Lock(planLogID uint) { | ||||||
|  | 	t.mu.Lock() | ||||||
|  | 	// 当计划正在运行时,调用 t.cond.Wait() 会原子地解锁 mu 并挂起当前协程。 | ||||||
|  | 	// 当被唤醒时,它会重新锁定 mu 并再次检查循环条件。 | ||||||
|  | 	for t.runningPlans[planLogID] { | ||||||
|  | 		t.cond.Wait() | ||||||
|  | 	} | ||||||
|  | 	// 获取到锁 | ||||||
|  | 	t.runningPlans[planLogID] = true | ||||||
|  | 	t.mu.Unlock() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Unlock 解锁一个计划,并唤醒所有正在等待此锁的协程。 | ||||||
|  | func (t *ProgressTracker) Unlock(planLogID uint) { | ||||||
|  | 	t.mu.Lock() | ||||||
|  | 	defer t.mu.Unlock() | ||||||
|  | 	delete(t.runningPlans, planLogID) | ||||||
|  | 	// 唤醒所有在此条件上等待的协程 | ||||||
|  | 	t.cond.Broadcast() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // GetRunningPlanIDs 获取当前所有正在执行的计划ID列表 | ||||||
|  | func (t *ProgressTracker) GetRunningPlanIDs() []uint { | ||||||
|  | 	t.mu.Lock() | ||||||
|  | 	defer t.mu.Unlock() | ||||||
|  | 	ids := make([]uint, 0, len(t.runningPlans)) | ||||||
|  | 	for id := range t.runningPlans { | ||||||
|  | 		ids = append(ids, id) | ||||||
|  | 	} | ||||||
|  | 	return ids | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Scheduler 是核心的、持久化的任务调度器 | ||||||
|  | type Scheduler struct { | ||||||
|  | 	logger          Logger | ||||||
|  | 	pollingInterval time.Duration | ||||||
|  | 	workers         int | ||||||
|  | 	pendingTaskRepo repository.PendingTaskRepository | ||||||
|  | 	progressTracker *ProgressTracker | ||||||
|  |  | ||||||
|  | 	pool   *ants.Pool // 使用 ants 协程池来管理并发 | ||||||
|  | 	wg     sync.WaitGroup | ||||||
|  | 	ctx    context.Context | ||||||
|  | 	cancel context.CancelFunc | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewScheduler 创建一个新的调度器实例 | ||||||
|  | func NewScheduler(pendingTaskRepo repository.PendingTaskRepository, logger Logger, interval time.Duration, numWorkers int) *Scheduler { | ||||||
|  | 	ctx, cancel := context.WithCancel(context.Background()) | ||||||
|  | 	return &Scheduler{ | ||||||
|  | 		pendingTaskRepo: pendingTaskRepo, | ||||||
|  | 		logger:          logger, | ||||||
|  | 		pollingInterval: interval, | ||||||
|  | 		workers:         numWorkers, | ||||||
|  | 		progressTracker: NewProgressTracker(), | ||||||
|  | 		ctx:             ctx, | ||||||
|  | 		cancel:          cancel, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Start 启动调度器,包括初始化协程池和启动主轮询循环 | ||||||
|  | func (s *Scheduler) Start() { | ||||||
|  | 	s.logger.Printf("任务调度器正在启动,工作协程数: %d...", s.workers) | ||||||
|  | 	pool, err := ants.NewPool(s.workers, ants.WithPanicHandler(func(err interface{}) { | ||||||
|  | 		s.logger.Printf("[严重] 任务执行时发生 panic: %v", err) | ||||||
|  | 	})) | ||||||
|  | 	if err != nil { | ||||||
|  | 		panic("初始化协程池失败: " + err.Error()) | ||||||
|  | 	} | ||||||
|  | 	s.pool = pool | ||||||
|  |  | ||||||
|  | 	s.wg.Add(1) | ||||||
|  | 	go s.run() | ||||||
|  | 	s.logger.Printf("任务调度器已成功启动") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Stop 优雅地停止调度器 | ||||||
|  | func (s *Scheduler) Stop() { | ||||||
|  | 	s.logger.Printf("正在停止任务调度器...") | ||||||
|  | 	s.cancel()       // 1. 发出取消信号,停止主循环 | ||||||
|  | 	s.wg.Wait()      // 2. 等待主循环完成 | ||||||
|  | 	s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕) | ||||||
|  | 	s.logger.Printf("任务调度器已安全停止") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // run 是主轮询循环,负责从数据库认领任务并提交到协程池 | ||||||
|  | func (s *Scheduler) run() { | ||||||
|  | 	defer s.wg.Done() | ||||||
|  | 	ticker := time.NewTicker(s.pollingInterval) | ||||||
|  | 	defer ticker.Stop() | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-s.ctx.Done(): | ||||||
|  | 			return | ||||||
|  | 		case <-ticker.C: | ||||||
|  | 			go s.claimAndSubmit() | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // claimAndSubmit 实现了最终的“认领-锁定-执行 或 等待-放回”的健壮逻辑 | ||||||
|  | func (s *Scheduler) claimAndSubmit() { | ||||||
|  | 	runningPlanIDs := s.progressTracker.GetRunningPlanIDs() | ||||||
|  |  | ||||||
|  | 	claimedLog, pendingTask, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs) | ||||||
|  | 	if err != nil { | ||||||
|  | 		if !errors.Is(err, gorm.ErrRecordNotFound) { | ||||||
|  | 			s.logger.Printf("认领任务时发生错误: %v", err) | ||||||
|  | 		} | ||||||
|  | 		// gorm.ErrRecordNotFound 说明没任务要执行 | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// 尝试获取内存执行锁 | ||||||
|  | 	if s.progressTracker.TryLock(claimedLog.PlanExecutionLogID) { | ||||||
|  | 		// 成功获取锁,正常派发任务 | ||||||
|  | 		err = s.pool.Submit(func() { | ||||||
|  | 			defer s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) | ||||||
|  | 			s.processTask(claimedLog) | ||||||
|  | 		}) | ||||||
|  | 		if err != nil { | ||||||
|  | 			s.logger.Printf("向协程池提交任务失败: %v", err) | ||||||
|  | 			// 提交失败,必须释放刚刚获取的锁 | ||||||
|  | 			s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) | ||||||
|  | 			// 同样需要将任务安全放回 | ||||||
|  | 			s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		// 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。 | ||||||
|  | 		s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 | ||||||
|  | func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) { | ||||||
|  | 	s.logger.Printf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planExecutionLogID, taskToRequeue.ID, taskToRequeue.TaskID) | ||||||
|  |  | ||||||
|  | 	// 1. 阻塞式地等待,直到可以获取到该计划的锁。 | ||||||
|  | 	s.progressTracker.Lock(planExecutionLogID) | ||||||
|  | 	defer s.progressTracker.Unlock(planExecutionLogID) | ||||||
|  |  | ||||||
|  | 	// 2. 在持有锁的情况下,将任务安全地放回队列。 | ||||||
|  | 	if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil { | ||||||
|  | 		s.logger.Printf("[严重] 任务重新入队失败, 原始PendingTaskID: %d, 错误: %v", taskToRequeue.ID, err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	s.logger.Printf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // processTask 处理单个任务的逻辑 (当前为占位符) | ||||||
|  | func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { | ||||||
|  | 	s.logger.Printf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s", | ||||||
|  | 		claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name) | ||||||
|  | 	time.Sleep(2 * time.Second) // 模拟任务执行 | ||||||
|  | 	s.logger.Printf("完成任务, 日志ID: %d", claimedLog.ID) | ||||||
|  | } | ||||||
| @@ -1,209 +1 @@ | |||||||
| package task | package task | ||||||
|  |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"errors" |  | ||||||
| 	"sync" |  | ||||||
| 	"time" |  | ||||||
|  |  | ||||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" |  | ||||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" |  | ||||||
| 	"github.com/panjf2000/ants/v2" |  | ||||||
| 	"gorm.io/gorm" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| // Logger 定义了调度器期望的日志接口,方便替换为项目中的日志组件 |  | ||||||
| type Logger interface { |  | ||||||
| 	Printf(format string, v ...interface{}) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // ProgressTracker 在内存中跟踪计划的执行状态,包括进度和执行锁 |  | ||||||
| type ProgressTracker struct { |  | ||||||
| 	mu             sync.Mutex |  | ||||||
| 	cond           *sync.Cond    // 用于实现阻塞锁 |  | ||||||
| 	totalTasks     map[uint]int  // key: planExecutionLogID, value: total tasks |  | ||||||
| 	completedTasks map[uint]int  // key: planExecutionLogID, value: completed tasks |  | ||||||
| 	runningPlans   map[uint]bool // key: planExecutionLogID, value: true (用作内存锁) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // NewProgressTracker 创建一个新的进度跟踪器 |  | ||||||
| func NewProgressTracker() *ProgressTracker { |  | ||||||
| 	t := &ProgressTracker{ |  | ||||||
| 		totalTasks:     make(map[uint]int), |  | ||||||
| 		completedTasks: make(map[uint]int), |  | ||||||
| 		runningPlans:   make(map[uint]bool), |  | ||||||
| 	} |  | ||||||
| 	t.cond = sync.NewCond(&t.mu) |  | ||||||
| 	return t |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。 |  | ||||||
| func (t *ProgressTracker) TryLock(planLogID uint) bool { |  | ||||||
| 	t.mu.Lock() |  | ||||||
| 	defer t.mu.Unlock() |  | ||||||
| 	if t.runningPlans[planLogID] { |  | ||||||
| 		return false // 已被锁定 |  | ||||||
| 	} |  | ||||||
| 	t.runningPlans[planLogID] = true |  | ||||||
| 	return true |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Lock (阻塞) 获取一个计划的执行锁。如果锁已被占用,则会一直等待直到锁被释放。 |  | ||||||
| func (t *ProgressTracker) Lock(planLogID uint) { |  | ||||||
| 	t.mu.Lock() |  | ||||||
| 	// 当计划正在运行时,调用 t.cond.Wait() 会原子地解锁 mu 并挂起当前协程。 |  | ||||||
| 	// 当被唤醒时,它会重新锁定 mu 并再次检查循环条件。 |  | ||||||
| 	for t.runningPlans[planLogID] { |  | ||||||
| 		t.cond.Wait() |  | ||||||
| 	} |  | ||||||
| 	// 获取到锁 |  | ||||||
| 	t.runningPlans[planLogID] = true |  | ||||||
| 	t.mu.Unlock() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Unlock 解锁一个计划,并唤醒所有正在等待此锁的协程。 |  | ||||||
| func (t *ProgressTracker) Unlock(planLogID uint) { |  | ||||||
| 	t.mu.Lock() |  | ||||||
| 	defer t.mu.Unlock() |  | ||||||
| 	delete(t.runningPlans, planLogID) |  | ||||||
| 	// 唤醒所有在此条件上等待的协程 |  | ||||||
| 	t.cond.Broadcast() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // GetRunningPlanIDs 获取当前所有正在执行的计划ID列表 |  | ||||||
| func (t *ProgressTracker) GetRunningPlanIDs() []uint { |  | ||||||
| 	t.mu.Lock() |  | ||||||
| 	defer t.mu.Unlock() |  | ||||||
| 	ids := make([]uint, 0, len(t.runningPlans)) |  | ||||||
| 	for id := range t.runningPlans { |  | ||||||
| 		ids = append(ids, id) |  | ||||||
| 	} |  | ||||||
| 	return ids |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Scheduler 是核心的、持久化的任务调度器 |  | ||||||
| type Scheduler struct { |  | ||||||
| 	logger          Logger |  | ||||||
| 	pollingInterval time.Duration |  | ||||||
| 	workers         int |  | ||||||
| 	pendingTaskRepo repository.PendingTaskRepository |  | ||||||
| 	progressTracker *ProgressTracker |  | ||||||
|  |  | ||||||
| 	pool   *ants.Pool // 使用 ants 协程池来管理并发 |  | ||||||
| 	wg     sync.WaitGroup |  | ||||||
| 	ctx    context.Context |  | ||||||
| 	cancel context.CancelFunc |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // NewScheduler 创建一个新的调度器实例 |  | ||||||
| func NewScheduler(pendingTaskRepo repository.PendingTaskRepository, logger Logger, interval time.Duration, numWorkers int) *Scheduler { |  | ||||||
| 	ctx, cancel := context.WithCancel(context.Background()) |  | ||||||
| 	return &Scheduler{ |  | ||||||
| 		pendingTaskRepo: pendingTaskRepo, |  | ||||||
| 		logger:          logger, |  | ||||||
| 		pollingInterval: interval, |  | ||||||
| 		workers:         numWorkers, |  | ||||||
| 		progressTracker: NewProgressTracker(), |  | ||||||
| 		ctx:             ctx, |  | ||||||
| 		cancel:          cancel, |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Start 启动调度器,包括初始化协程池和启动主轮询循环 |  | ||||||
| func (s *Scheduler) Start() { |  | ||||||
| 	s.logger.Printf("任务调度器正在启动,工作协程数: %d...", s.workers) |  | ||||||
| 	pool, err := ants.NewPool(s.workers, ants.WithPanicHandler(func(err interface{}) { |  | ||||||
| 		s.logger.Printf("[严重] 任务执行时发生 panic: %v", err) |  | ||||||
| 	})) |  | ||||||
| 	if err != nil { |  | ||||||
| 		panic("初始化协程池失败: " + err.Error()) |  | ||||||
| 	} |  | ||||||
| 	s.pool = pool |  | ||||||
|  |  | ||||||
| 	s.wg.Add(1) |  | ||||||
| 	go s.run() |  | ||||||
| 	s.logger.Printf("任务调度器已成功启动") |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Stop 优雅地停止调度器 |  | ||||||
| func (s *Scheduler) Stop() { |  | ||||||
| 	s.logger.Printf("正在停止任务调度器...") |  | ||||||
| 	s.cancel()       // 1. 发出取消信号,停止主循环 |  | ||||||
| 	s.wg.Wait()      // 2. 等待主循环完成 |  | ||||||
| 	s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕) |  | ||||||
| 	s.logger.Printf("任务调度器已安全停止") |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // run 是主轮询循环,负责从数据库认领任务并提交到协程池 |  | ||||||
| func (s *Scheduler) run() { |  | ||||||
| 	defer s.wg.Done() |  | ||||||
| 	ticker := time.NewTicker(s.pollingInterval) |  | ||||||
| 	defer ticker.Stop() |  | ||||||
|  |  | ||||||
| 	for { |  | ||||||
| 		select { |  | ||||||
| 		case <-s.ctx.Done(): |  | ||||||
| 			return |  | ||||||
| 		case <-ticker.C: |  | ||||||
| 			go s.claimAndSubmit() |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // claimAndSubmit 实现了最终的“认领-锁定-执行 或 等待-放回”的健壮逻辑 |  | ||||||
| func (s *Scheduler) claimAndSubmit() { |  | ||||||
| 	runningPlanIDs := s.progressTracker.GetRunningPlanIDs() |  | ||||||
|  |  | ||||||
| 	claimedLog, pendingTask, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs) |  | ||||||
| 	if err != nil { |  | ||||||
| 		if !errors.Is(err, gorm.ErrRecordNotFound) { |  | ||||||
| 			s.logger.Printf("认领任务时发生错误: %v", err) |  | ||||||
| 		} |  | ||||||
| 		// gorm.ErrRecordNotFound 说明没任务要执行 |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// 尝试获取内存执行锁 |  | ||||||
| 	if s.progressTracker.TryLock(claimedLog.PlanExecutionLogID) { |  | ||||||
| 		// 成功获取锁,正常派发任务 |  | ||||||
| 		err = s.pool.Submit(func() { |  | ||||||
| 			defer s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) |  | ||||||
| 			s.processTask(claimedLog) |  | ||||||
| 		}) |  | ||||||
| 		if err != nil { |  | ||||||
| 			s.logger.Printf("向协程池提交任务失败: %v", err) |  | ||||||
| 			// 提交失败,必须释放刚刚获取的锁 |  | ||||||
| 			s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) |  | ||||||
| 			// 同样需要将任务安全放回 |  | ||||||
| 			s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) |  | ||||||
| 		} |  | ||||||
| 	} else { |  | ||||||
| 		// 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。 |  | ||||||
| 		s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。 |  | ||||||
| func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) { |  | ||||||
| 	s.logger.Printf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planExecutionLogID, taskToRequeue.ID, taskToRequeue.TaskID) |  | ||||||
|  |  | ||||||
| 	// 1. 阻塞式地等待,直到可以获取到该计划的锁。 |  | ||||||
| 	s.progressTracker.Lock(planExecutionLogID) |  | ||||||
| 	defer s.progressTracker.Unlock(planExecutionLogID) |  | ||||||
|  |  | ||||||
| 	// 2. 在持有锁的情况下,将任务安全地放回队列。 |  | ||||||
| 	if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil { |  | ||||||
| 		s.logger.Printf("[严重] 任务重新入队失败, 原始PendingTaskID: %d, 错误: %v", taskToRequeue.ID, err) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	s.logger.Printf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // processTask 处理单个任务的逻辑 (当前为占位符) |  | ||||||
| func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { |  | ||||||
| 	s.logger.Printf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s", |  | ||||||
| 		claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name) |  | ||||||
| 	time.Sleep(2 * time.Second) // 模拟任务执行 |  | ||||||
| 	s.logger.Printf("完成任务, 日志ID: %d", claimedLog.ID) |  | ||||||
| } |  | ||||||
|   | |||||||
| @@ -1,77 +0,0 @@ | |||||||
| // Package task_test 包含对 task 包的单元测试 |  | ||||||
| package task_test |  | ||||||
|  |  | ||||||
| import ( |  | ||||||
| 	"sync" |  | ||||||
| 	"sync/atomic" |  | ||||||
| 	"testing" |  | ||||||
| 	"time" |  | ||||||
|  |  | ||||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/config" |  | ||||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| // testLogger 是一个用于所有测试用例的静默 logger 实例。 |  | ||||||
| var testLogger *logs.Logger |  | ||||||
|  |  | ||||||
| func init() { |  | ||||||
| 	// 使用 "fatal" 级别来创建一个在测试期间不会产生任何输出的 logger。 |  | ||||||
| 	// 这避免了在运行 `go test` 时被日志淹没。 |  | ||||||
| 	cfg := config.LogConfig{Level: "fatal"} |  | ||||||
| 	testLogger = logs.NewLogger(cfg) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // MockTask 用于测试的模拟任务 |  | ||||||
| type MockTask struct { |  | ||||||
| 	id       string |  | ||||||
| 	priority int |  | ||||||
| 	isDone   bool |  | ||||||
| 	execute  func() error |  | ||||||
| 	executed int32 // 使用原子操作来跟踪执行次数 |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Execute 实现了 Task 接口,并确保每次调用都增加执行计数 |  | ||||||
| func (m *MockTask) Execute() error { |  | ||||||
| 	atomic.AddInt32(&m.executed, 1) |  | ||||||
| 	if m.execute != nil { |  | ||||||
| 		return m.execute() |  | ||||||
| 	} |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (m *MockTask) GetID() string { |  | ||||||
| 	return m.id |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (m *MockTask) GetPriority() int { |  | ||||||
| 	return m.priority |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (m *MockTask) IsDone() bool { |  | ||||||
| 	return m.isDone |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // ExecutedCount 返回任务被执行的次数 |  | ||||||
| func (m *MockTask) ExecutedCount() int32 { |  | ||||||
| 	return atomic.LoadInt32(&m.executed) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (m *MockTask) GetDescription() string { |  | ||||||
| 	return "Mock Task" |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // --- 健壮等待的辅助函数 --- |  | ||||||
| func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { |  | ||||||
| 	waitChan := make(chan struct{}) |  | ||||||
| 	go func() { |  | ||||||
| 		defer close(waitChan) |  | ||||||
| 		wg.Wait() |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	select { |  | ||||||
| 	case <-waitChan: |  | ||||||
| 		// 等待成功 |  | ||||||
| 	case <-time.After(timeout): |  | ||||||
| 		t.Fatal("等待任务完成超时") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
		Reference in New Issue
	
	Block a user