Files
pig-farm-controller/internal/infra/task/task.go

206 lines
6.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package task
import (
"context"
"errors"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"github.com/panjf2000/ants/v2"
"gorm.io/gorm"
)
// Logger 定义了调度器期望的日志接口,方便替换为项目中的日志组件
type Logger interface {
Printf(format string, v ...interface{})
}
// ProgressTracker 在内存中跟踪计划的执行状态,包括进度和执行锁
type ProgressTracker struct {
mu sync.Mutex
cond *sync.Cond // 用于实现阻塞锁
totalTasks map[uint]int // key: planExecutionLogID, value: total tasks
completedTasks map[uint]int // key: planExecutionLogID, value: completed tasks
runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁)
}
func NewProgressTracker() *ProgressTracker {
t := &ProgressTracker{
totalTasks: make(map[uint]int),
completedTasks: make(map[uint]int),
runningPlans: make(map[uint]bool),
}
t.cond = sync.NewCond(&t.mu)
return t
}
// TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。
func (t *ProgressTracker) TryLock(planLogID uint) bool {
t.mu.Lock()
defer t.mu.Unlock()
if t.runningPlans[planLogID] {
return false // 已被锁定
}
t.runningPlans[planLogID] = true
return true
}
// Lock (阻塞) 获取一个计划的执行锁。如果锁已被占用,则会一直等待直到锁被释放。
func (t *ProgressTracker) Lock(planLogID uint) {
t.mu.Lock()
// 当计划正在运行时,调用 t.cond.Wait() 会原子地解锁 mu 并挂起当前协程。
// 当被唤醒时,它会重新锁定 mu 并再次检查循环条件。
for t.runningPlans[planLogID] {
t.cond.Wait()
}
// 获取到锁
t.runningPlans[planLogID] = true
t.mu.Unlock()
}
// Unlock 解锁一个计划,并唤醒所有正在等待此锁的协程。
func (t *ProgressTracker) Unlock(planLogID uint) {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.runningPlans, planLogID)
// 唤醒所有在此条件上等待的协程
t.cond.Broadcast()
}
// GetRunningPlanIDs 获取当前所有正在执行的计划ID列表
func (t *ProgressTracker) GetRunningPlanIDs() []uint {
t.mu.Lock()
defer t.mu.Unlock()
ids := make([]uint, 0, len(t.runningPlans))
for id := range t.runningPlans {
ids = append(ids, id)
}
return ids
}
// Scheduler 是核心的、持久化的任务调度器
type Scheduler struct {
logger Logger
pollingInterval time.Duration
workers int
pendingTaskRepo repository.PendingTaskRepository
progressTracker *ProgressTracker
pool *ants.Pool // 使用 ants 协程池来管理并发
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// NewScheduler 创建一个新的调度器实例
func NewScheduler(pendingTaskRepo repository.PendingTaskRepository, logger Logger, interval time.Duration, numWorkers int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
pendingTaskRepo: pendingTaskRepo,
logger: logger,
pollingInterval: interval,
workers: numWorkers,
progressTracker: NewProgressTracker(),
ctx: ctx,
cancel: cancel,
}
}
// Start 启动调度器,包括初始化协程池和启动主轮询循环
func (s *Scheduler) Start() {
s.logger.Printf("任务调度器正在启动,工作协程数: %d...", s.workers)
pool, err := ants.NewPool(s.workers, ants.WithPanicHandler(func(err interface{}) {
s.logger.Printf("[严重] 任务执行时发生 panic: %v", err)
}))
if err != nil {
panic("初始化协程池失败: " + err.Error())
}
s.pool = pool
s.wg.Add(1)
go s.run()
s.logger.Printf("任务调度器已成功启动")
}
// Stop 优雅地停止调度器
func (s *Scheduler) Stop() {
s.logger.Printf("正在停止任务调度器...")
s.cancel() // 1. 发出取消信号,停止主循环
s.wg.Wait() // 2. 等待主循环完成
s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕)
s.logger.Printf("任务调度器已安全停止")
}
// run 是主轮询循环,负责从数据库认领任务并提交到协程池
func (s *Scheduler) run() {
defer s.wg.Done()
ticker := time.NewTicker(s.pollingInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
go s.claimAndSubmit()
}
}
}
// claimAndSubmit 实现了“认领-锁定-执行 或 等待-放回”的健壮逻辑
func (s *Scheduler) claimAndSubmit() {
runningPlanIDs := s.progressTracker.GetRunningPlanIDs()
claimedLog, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs)
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
s.logger.Printf("认领任务时发生错误: %v", err)
}
// gorm.ErrRecordNotFound 说明没任务要执行
return
}
// 尝试获取内存执行锁
if s.progressTracker.TryLock(claimedLog.PlanExecutionLogID) {
// 成功获取锁,正常派发任务
err = s.pool.Submit(func() {
defer s.progressTracker.Unlock(claimedLog.PlanExecutionLogID)
s.processTask(claimedLog)
})
if err != nil {
s.logger.Printf("向协程池提交任务失败: %v", err)
s.progressTracker.Unlock(claimedLog.PlanExecutionLogID) // 提交失败,必须释放刚刚获取的锁
// 同样需要将任务安全放回
s.handleRequeue(claimedLog)
}
} else {
// 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。
s.handleRequeue(claimedLog)
}
}
// handleRequeue 处理需要被安全放回队列的任务
func (s *Scheduler) handleRequeue(log *models.TaskExecutionLog) {
s.logger.Printf("计划 %d 正在执行,任务 %d 将等待并重新入队...", log.PlanExecutionLogID, log.ID)
// 1. 阻塞式地等待,直到可以获取到该计划的锁
s.progressTracker.Lock(log.PlanExecutionLogID)
// 2. 在持有锁的情况下,将任务安全地放回队列
// 增加一个小的延迟例如1秒以避免与主循环发生过于频繁的竞争
if err := s.pendingTaskRepo.RequeueTask(log); err != nil {
s.logger.Printf("任务重新入队失败, 日志ID: %d, 错误: %v", log.ID, err)
}
// 3. 释放锁,让其他等待的任务(或主循环)可以继续
s.progressTracker.Unlock(log.PlanExecutionLogID)
}
// processTask 处理单个任务的逻辑 (保持不变)
func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
s.logger.Printf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s",
claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name)
time.Sleep(2 * time.Second) // 模拟任务执行
s.logger.Printf("完成任务, 日志ID: %d", claimedLog.ID)
}