Files
pig-farm-controller/internal/app/service/task/scheduler.go
2025-09-23 17:19:39 +08:00

430 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package task
import (
"encoding/json"
"errors"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"github.com/panjf2000/ants/v2"
"gorm.io/gorm"
)
// ProgressTracker 仅用于在内存中提供计划执行的并发锁
type ProgressTracker struct {
mu sync.Mutex
cond *sync.Cond // 用于实现阻塞锁
runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁)
}
// NewProgressTracker 创建一个新的进度跟踪器
func NewProgressTracker() *ProgressTracker {
t := &ProgressTracker{
runningPlans: make(map[uint]bool),
}
t.cond = sync.NewCond(&t.mu)
return t
}
// TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。
func (t *ProgressTracker) TryLock(planLogID uint) bool {
t.mu.Lock()
defer t.mu.Unlock()
if t.runningPlans[planLogID] {
return false // 已被锁定
}
t.runningPlans[planLogID] = true
return true
}
// Lock (阻塞) 获取一个计划的执行锁。如果锁已被占用,则会一直等待直到锁被释放。
func (t *ProgressTracker) Lock(planLogID uint) {
t.mu.Lock()
// 当计划正在运行时,调用 t.cond.Wait() 会原子地解锁 mu 并挂起当前协程。
// 当被唤醒时,它会重新锁定 mu 并再次检查循环条件。
for t.runningPlans[planLogID] {
t.cond.Wait()
}
// 获取到锁
t.runningPlans[planLogID] = true
t.mu.Unlock()
}
// Unlock 解锁一个计划,并唤醒所有正在等待此锁的协程。
func (t *ProgressTracker) Unlock(planLogID uint) {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.runningPlans, planLogID)
// 唤醒所有在此条件上等待的协程
t.cond.Broadcast()
}
// GetRunningPlanIDs 获取当前所有正在执行的计划ID列表
func (t *ProgressTracker) GetRunningPlanIDs() []uint {
t.mu.Lock()
defer t.mu.Unlock()
ids := make([]uint, 0, len(t.runningPlans))
for id := range t.runningPlans {
ids = append(ids, id)
}
return ids
}
// Scheduler 是核心的、持久化的任务调度器
type Scheduler struct {
logger *logs.Logger
pollingInterval time.Duration
workers int
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
planRepo repository.PlanRepository
analysisPlanTaskManager *AnalysisPlanTaskManager // <--- 注入共享的 Manager
progressTracker *ProgressTracker
taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例
pool *ants.Pool // 使用 ants 协程池来管理并发
wg sync.WaitGroup
stopChan chan struct{} // 用于停止主循环的信号通道
}
// NewScheduler 创建一个新的调度器实例
func NewScheduler(
pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository,
planRepo repository.PlanRepository,
analysisPlanTaskManager *AnalysisPlanTaskManager, // <--- 注入 Manager
taskFactory func(taskType models.TaskType) Task,
logger *logs.Logger,
interval time.Duration,
numWorkers int) *Scheduler {
return &Scheduler{
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
planRepo: planRepo,
analysisPlanTaskManager: analysisPlanTaskManager, // <--- 注入 Manager
logger: logger,
pollingInterval: interval,
workers: numWorkers,
progressTracker: NewProgressTracker(),
taskFactory: taskFactory,
stopChan: make(chan struct{}), // 初始化停止信号通道
}
}
// Start 启动调度器,包括初始化协程池和启动主轮询循环
func (s *Scheduler) Start() {
s.logger.Warnf("任务调度器正在启动,工作协程数: %d...", s.workers)
pool, err := ants.NewPool(s.workers, ants.WithPanicHandler(func(err interface{}) {
s.logger.Errorf("[严重] 任务执行时发生 panic: %v", err)
}))
if err != nil {
panic("初始化协程池失败: " + err.Error())
}
s.pool = pool
s.wg.Add(1)
go s.run()
s.logger.Warnf("任务调度器已成功启动")
}
// Stop 优雅地停止调度器
func (s *Scheduler) Stop() {
s.logger.Warnf("正在停止任务调度器...")
close(s.stopChan) // 1. 发出停止信号,停止主循环
s.wg.Wait() // 2. 等待主循环完成
s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕)
s.logger.Warnf("任务调度器已安全停止")
}
// run 是主轮询循环,负责从数据库认领任务并提交到协程池
func (s *Scheduler) run() {
defer s.wg.Done()
ticker := time.NewTicker(s.pollingInterval)
defer ticker.Stop()
for {
select {
case <-s.stopChan:
// 收到停止信号,退出循环
return
case <-ticker.C:
// 定时触发任务认领和提交
go s.claimAndSubmit()
}
}
}
// claimAndSubmit 实现了最终的“认领-锁定-执行 或 等待-放回”的健壮逻辑
func (s *Scheduler) claimAndSubmit() {
runningPlanIDs := s.progressTracker.GetRunningPlanIDs()
claimedLog, pendingTask, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs)
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
s.logger.Errorf("认领任务时发生错误: %v", err)
}
// gorm.ErrRecordNotFound 说明没任务要执行
return
}
// 尝试获取内存执行锁
if s.progressTracker.TryLock(claimedLog.PlanExecutionLogID) {
// 成功获取锁,正常派发任务
err = s.pool.Submit(func() {
defer s.progressTracker.Unlock(claimedLog.PlanExecutionLogID)
s.processTask(claimedLog)
})
if err != nil {
s.logger.Errorf("向协程池提交任务失败: %v", err)
// 提交失败,必须释放刚刚获取的锁
s.progressTracker.Unlock(claimedLog.PlanExecutionLogID)
// 同样需要将任务安全放回
s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask)
}
} else {
// 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。
s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask)
}
}
// handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。
func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) {
s.logger.Warnf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planExecutionLogID, taskToRequeue.ID, taskToRequeue.TaskID)
// 1. 阻塞式地等待,直到可以获取到该计划的锁。
s.progressTracker.Lock(planExecutionLogID)
defer s.progressTracker.Unlock(planExecutionLogID)
// 2. 在持有锁的情况下,将任务安全地放回队列。
if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil {
s.logger.Errorf("[严重] 任务重新入队失败, 原始PendingTaskID: %d, 错误: %v", taskToRequeue.ID, err)
return
}
s.logger.Warnf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID)
}
// processTask 处理单个任务的逻辑
func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
s.logger.Warnf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s",
claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name)
claimedLog.StartedAt = time.Now()
claimedLog.Status = models.ExecutionStatusCompleted // 先乐观假定任务成功, 后续失败了再改
defer s.updateTaskExecutionLogStatus(claimedLog)
// 执行任务
err := s.runTask(claimedLog)
if err != nil {
claimedLog.Status = models.ExecutionStatusFailed
claimedLog.Output = err.Error()
// 任务失败时,调用统一的终止服务
s.handlePlanTermination(claimedLog.PlanExecutionLogID, "子任务执行失败: "+err.Error())
return
}
// 如果是计划分析任务,它的职责是解析和分发任务,到此即完成,不参与后续的计划完成度检查。
if claimedLog.Task.Type == models.TaskPlanAnalysis {
s.logger.Warnf("完成计划分析任务, 日志ID: %d", claimedLog.ID)
return
}
// --- 以下是常规任务的完成逻辑 ---
s.logger.Warnf("完成任务, 日志ID: %d", claimedLog.ID)
// 检查是否是最后一个任务
incompleteCount, err := s.executionLogRepo.CountIncompleteTasksByPlanLogID(claimedLog.PlanExecutionLogID)
if err != nil {
s.logger.Errorf("检查计划 %d 的未完成任务数时出错: %v", claimedLog.PlanExecutionLogID, err)
return
}
// 如果此计划执行中已没有其他“等待中”或“进行中”的任务,则认为计划已完成
if incompleteCount == 0 {
s.logger.Infof("计划执行 %d 的所有任务已完成,开始处理计划完成逻辑...", claimedLog.PlanExecutionLogID)
// 通过 PlanExecutionLog 反查正确的顶层 PlanID
planExecutionLog, err := s.executionLogRepo.FindPlanExecutionLogByID(claimedLog.PlanExecutionLogID)
if err != nil {
s.logger.Errorf("获取计划执行日志 %d 失败: %v", claimedLog.PlanExecutionLogID, err)
return
}
planID := planExecutionLog.PlanID // 这才是正确的顶层计划ID
// 获取计划的最新数据
plan, err := s.planRepo.GetPlanByID(planID)
if err != nil {
s.logger.Errorf("获取计划 %d 的完整信息失败: %v", planID, err)
return
}
// 更新计划的执行计数器
plan.ExecuteCount++
// 如果是自动计划且达到执行次数上限,则更新计划状态为已停止
if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && plan.ExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual {
plan.Status = models.PlanStatusStopeed
s.logger.Infof("计划 %d (自动执行) 已达到最大执行次数 %d状态更新为 '执行完毕'。", planID, plan.ExecuteNum)
}
// 保存更新后的计划状态和执行计数
if err := s.planRepo.UpdatePlan(plan); err != nil {
s.logger.Errorf("更新计划 %d 的执行计数和状态失败: %v", planID, err)
return
}
// 更新计划执行日志状态为完成
if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(claimedLog.PlanExecutionLogID, models.ExecutionStatusCompleted); err != nil {
s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", claimedLog.PlanExecutionLogID, err)
}
// 调用共享的 Manager 来处理触发器更新逻辑
if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(planID); err != nil {
s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", planID, err)
}
}
}
// runTask 用于执行具体任务
func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
// 这是个特殊任务, 用于解析Plan并将解析出的任务队列添加到待执行队列中
if claimedLog.Task.Type == models.TaskPlanAnalysis {
// 解析plan
err := s.analysisPlan(claimedLog)
if err != nil {
// TODO 这里要处理一下, 比如再插一个新的触发器回去
s.logger.Errorf("[严重] 计划解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
} else {
// 执行普通任务
task := s.taskFactory(claimedLog.Task.Type)
if err := task.ParseParams(s.logger, claimedLog); err != nil {
s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
if err := task.Execute(); err != nil {
s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
task.OnFailure(err)
return err
}
}
return nil
}
// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 创建Plan执行记录
// 从任务的 Parameters 中解析出真实的 PlanID
var params struct {
PlanID uint `json:"plan_id"`
}
if err := json.Unmarshal(claimedLog.Task.Parameters, &params); err != nil {
s.logger.Errorf("解析任务参数中的计划ID失败日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
realPlanID := params.PlanID
planLog := &models.PlanExecutionLog{
PlanID: realPlanID, // 使用从参数中解析出的真实 PlanID
Status: models.ExecutionStatusStarted,
StartedAt: time.Now(),
}
if err := s.executionLogRepo.CreatePlanExecutionLog(planLog); err != nil {
s.logger.Errorf("[严重] 创建计划执行日志失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 解析出Task列表
tasks, err := s.planRepo.FlattenPlanTasks(realPlanID)
if err != nil {
s.logger.Errorf("[严重] 解析计划失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 写入执行历史
taskLogs := make([]*models.TaskExecutionLog, len(tasks))
for i, task := range tasks {
taskLogs[i] = &models.TaskExecutionLog{
PlanExecutionLogID: planLog.ID,
TaskID: task.ID,
Status: models.ExecutionStatusWaiting,
}
}
err = s.executionLogRepo.CreateTaskExecutionLogsInBatch(taskLogs)
if err != nil {
s.logger.Errorf("[严重] 写入执行历史, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 写入待执行队列
pendingTasks := make([]*models.PendingTask, len(tasks))
for i, task := range tasks {
pendingTasks[i] = &models.PendingTask{
TaskID: task.ID,
TaskExecutionLogID: taskLogs[i].ID, // 使用正确的 TaskExecutionLogID
// 待执行队列是通过任务触发时间排序的, 且只要在调度器获取的时间点之前的都可以被触发
ExecuteAt: time.Now().Add(time.Duration(i) * time.Second),
}
}
err = s.pendingTaskRepo.CreatePendingTasksInBatch(pendingTasks)
if err != nil {
s.logger.Errorf("[严重] 写入待执行队列, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
return nil
}
// updateTaskExecutionLogStatus 修改任务历史中的执行状态
func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutionLog) error {
claimedLog.EndedAt = time.Now()
if err := s.executionLogRepo.UpdateTaskExecutionLog(claimedLog); err != nil {
s.logger.Errorf("[严重] 更新任务执行日志失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
return nil
}
// handlePlanTermination 集中处理计划的终止逻辑(失败或取消)
func (s *Scheduler) handlePlanTermination(planLogID uint, reason string) {
// 1. 从待执行队列中删除所有相关的子任务
if err := s.pendingTaskRepo.DeletePendingTasksByPlanLogID(planLogID); err != nil {
s.logger.Errorf("从待执行队列中删除计划 %d 的后续任务时出错: %v", planLogID, err)
}
// 2. 将父计划的执行日志标记为失败
if err := s.executionLogRepo.FailPlanExecution(planLogID, reason); err != nil {
s.logger.Errorf("标记计划执行日志 %d 为失败时出错: %v", planLogID, err)
}
// 3. 将所有未完成的子任务日志标记为已取消
if err := s.executionLogRepo.CancelIncompleteTasksByPlanLogID(planLogID, "父计划失败或被取消"); err != nil {
s.logger.Errorf("取消计划 %d 的后续任务日志时出错: %v", planLogID, err)
}
// 4. 将计划本身的状态更新为失败
planLog, err := s.executionLogRepo.FindPlanExecutionLogByID(planLogID)
if err != nil {
s.logger.Errorf("无法找到计划执行日志 %d 以更新父计划状态: %v", planLogID, err)
return
}
if err := s.planRepo.UpdatePlanStatus(planLog.PlanID, models.PlanStatusFailed); err != nil {
s.logger.Errorf("更新计划 %d 状态为 '失败' 时出错: %v", planLog.PlanID, err)
}
}