317 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			317 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package plan
 | 
						||
 | 
						||
import (
 | 
						||
	"fmt"
 | 
						||
	"sync"
 | 
						||
	"time"
 | 
						||
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils"
 | 
						||
)
 | 
						||
 | 
						||
// AnalysisPlanTaskManager 负责管理分析计划的触发器任务。
 | 
						||
// 它确保数据库中可执行的计划在待执行队列中有对应的触发器,并移除无效的触发器。
 | 
						||
// 这是一个有状态的组件,包含一个互斥锁以确保并发安全。
 | 
						||
type AnalysisPlanTaskManager struct {
 | 
						||
	planRepo         repository.PlanRepository
 | 
						||
	pendingTaskRepo  repository.PendingTaskRepository
 | 
						||
	executionLogRepo repository.ExecutionLogRepository
 | 
						||
	logger           *logs.Logger
 | 
						||
	mu               sync.Mutex
 | 
						||
}
 | 
						||
 | 
						||
// NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。
 | 
						||
func NewAnalysisPlanTaskManager(
 | 
						||
	planRepo repository.PlanRepository,
 | 
						||
	pendingTaskRepo repository.PendingTaskRepository,
 | 
						||
	executionLogRepo repository.ExecutionLogRepository,
 | 
						||
	logger *logs.Logger,
 | 
						||
) *AnalysisPlanTaskManager {
 | 
						||
	return &AnalysisPlanTaskManager{
 | 
						||
		planRepo:         planRepo,
 | 
						||
		pendingTaskRepo:  pendingTaskRepo,
 | 
						||
		executionLogRepo: executionLogRepo,
 | 
						||
		logger:           logger,
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
// Refresh 同步数据库中的计划状态和待执行队列中的触发器任务。
 | 
						||
// 这是一个编排方法,将复杂的逻辑分解到多个内部方法中。
 | 
						||
func (m *AnalysisPlanTaskManager) Refresh() error {
 | 
						||
	m.mu.Lock()
 | 
						||
	defer m.mu.Unlock()
 | 
						||
 | 
						||
	m.logger.Info("开始同步计划任务管理器...")
 | 
						||
 | 
						||
	// 1. 一次性获取所有需要的数据
 | 
						||
	runnablePlans, invalidPlanIDs, pendingTasks, err := m.getRefreshData()
 | 
						||
	if err != nil {
 | 
						||
		return fmt.Errorf("获取刷新数据失败: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 2. 清理所有与失效计划相关的待执行任务
 | 
						||
	if err := m.cleanupInvalidTasks(invalidPlanIDs, pendingTasks); err != nil {
 | 
						||
		// 仅记录错误,清理失败不应阻止新任务的添加
 | 
						||
		m.logger.Errorf("清理无效任务时出错: %v", err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 3. 添加或更新触发器
 | 
						||
	if err := m.addOrUpdateTriggers(runnablePlans, pendingTasks); err != nil {
 | 
						||
		return fmt.Errorf("添加或更新触发器时出错: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	m.logger.Info("计划任务管理器同步完成.")
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。
 | 
						||
// 如果触发器已存在,会根据计划类型更新其执行时间。
 | 
						||
func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error {
 | 
						||
	m.mu.Lock()
 | 
						||
	defer m.mu.Unlock()
 | 
						||
 | 
						||
	// 检查计划是否可执行
 | 
						||
	plan, err := m.planRepo.GetBasicPlanByID(planID)
 | 
						||
	if err != nil {
 | 
						||
		return fmt.Errorf("获取计划基本信息失败: %w", err)
 | 
						||
	}
 | 
						||
	if plan.Status != models.PlanStatusEnabled {
 | 
						||
		return fmt.Errorf("计划 #%d 当前状态为 '%d',无法创建或更新触发器", planID, plan.Status)
 | 
						||
	}
 | 
						||
 | 
						||
	// 查找现有触发器
 | 
						||
	existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(planID)
 | 
						||
	if err != nil {
 | 
						||
		return fmt.Errorf("查找现有触发器失败: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 如果触发器已存在,则根据计划类型更新其执行时间
 | 
						||
	if existingTrigger != nil {
 | 
						||
		var expectedExecuteAt time.Time
 | 
						||
		if plan.ExecutionType == models.PlanExecutionTypeManual {
 | 
						||
			// 手动计划,如果再次触发,则立即执行
 | 
						||
			expectedExecuteAt = time.Now()
 | 
						||
		} else { // 自动计划
 | 
						||
			// 自动计划,根据 Cron 表达式计算下一次执行时间
 | 
						||
			next, err := utils.GetNextCronTime(plan.CronExpression)
 | 
						||
			if err != nil {
 | 
						||
				m.logger.Errorf("为计划 #%d 解析Cron表达式失败,无法更新触发器: %v", plan.ID, err)
 | 
						||
				return fmt.Errorf("解析 Cron 表达式失败: %w", err)
 | 
						||
			}
 | 
						||
			expectedExecuteAt = next
 | 
						||
		}
 | 
						||
 | 
						||
		// 如果计算出的执行时间与当前待执行任务的时间不一致,则更新
 | 
						||
		if !existingTrigger.ExecuteAt.Equal(expectedExecuteAt) {
 | 
						||
			m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, expectedExecuteAt)
 | 
						||
			if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, expectedExecuteAt); err != nil {
 | 
						||
				m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
 | 
						||
				return fmt.Errorf("更新触发器执行时间失败: %w", err)
 | 
						||
			}
 | 
						||
		} else {
 | 
						||
			m.logger.Infof("计划 #%d 的触发器已存在且执行时间无需更新。", plan.ID)
 | 
						||
		}
 | 
						||
		return nil // 触发器已存在且已处理更新,直接返回
 | 
						||
	}
 | 
						||
 | 
						||
	// 如果触发器不存在,则创建新的触发器
 | 
						||
	m.logger.Infof("为计划 #%d 创建新的触发器...", planID)
 | 
						||
	return m.createTriggerTask(plan)
 | 
						||
}
 | 
						||
 | 
						||
// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。
 | 
						||
// 如果不存在,则会自动创建。此方法不涉及待执行队列。
 | 
						||
func (m *AnalysisPlanTaskManager) EnsureAnalysisTaskDefinition(planID uint) error {
 | 
						||
	m.mu.Lock()
 | 
						||
	defer m.mu.Unlock()
 | 
						||
 | 
						||
	plan, err := m.planRepo.GetBasicPlanByID(planID)
 | 
						||
	if err != nil {
 | 
						||
		return fmt.Errorf("确保分析任务定义失败:获取计划 #%d 基本信息时出错: %w", planID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
 | 
						||
	if err != nil {
 | 
						||
		return fmt.Errorf("确保分析任务定义失败:查找计划 #%d 的分析任务时出错: %w", plan.ID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	if analysisTask == nil {
 | 
						||
		m.logger.Infof("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
 | 
						||
		_, err := m.planRepo.CreatePlanAnalysisTask(plan) // CreatePlanAnalysisTask returns *models.Task, error
 | 
						||
		if err != nil {
 | 
						||
			return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
 | 
						||
		}
 | 
						||
		m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义。", plan.ID)
 | 
						||
	} else {
 | 
						||
		m.logger.Infof("计划 #%d 的 'plan_analysis' 任务定义已存在。", plan.ID)
 | 
						||
	}
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// --- 内部私有方法 ---
 | 
						||
 | 
						||
// getRefreshData 从数据库获取刷新所需的所有数据。
 | 
						||
func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) {
 | 
						||
	runnablePlans, err = m.planRepo.FindRunnablePlans()
 | 
						||
	if err != nil {
 | 
						||
		m.logger.Errorf("获取可执行计划列表失败: %v", err)
 | 
						||
		return
 | 
						||
	}
 | 
						||
 | 
						||
	invalidPlans, err := m.planRepo.FindInactivePlans()
 | 
						||
	if err != nil {
 | 
						||
		m.logger.Errorf("获取失效计划列表失败: %v", err)
 | 
						||
		return
 | 
						||
	}
 | 
						||
	invalidPlanIDs = make([]uint, len(invalidPlans))
 | 
						||
	for i, p := range invalidPlans {
 | 
						||
		invalidPlanIDs[i] = p.ID
 | 
						||
	}
 | 
						||
 | 
						||
	pendingTasks, err = m.pendingTaskRepo.FindAllPendingTasks()
 | 
						||
	if err != nil {
 | 
						||
		m.logger.Errorf("获取所有待执行任务失败: %v", err)
 | 
						||
		return
 | 
						||
	}
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// cleanupInvalidTasks 清理所有与失效计划相关的待执行任务。
 | 
						||
func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error {
 | 
						||
	if len(invalidPlanIDs) == 0 {
 | 
						||
		return nil // 没有需要清理的计划
 | 
						||
	}
 | 
						||
 | 
						||
	invalidPlanIDSet := make(map[uint]struct{}, len(invalidPlanIDs))
 | 
						||
	for _, id := range invalidPlanIDs {
 | 
						||
		invalidPlanIDSet[id] = struct{}{}
 | 
						||
	}
 | 
						||
 | 
						||
	var tasksToDeleteIDs []uint
 | 
						||
	var logsToCancelIDs []uint
 | 
						||
 | 
						||
	for _, pt := range allPendingTasks {
 | 
						||
		if pt.Task == nil { // 防御性编程,确保 Task 被预加载
 | 
						||
			continue
 | 
						||
		}
 | 
						||
		if _, isInvalid := invalidPlanIDSet[pt.Task.PlanID]; isInvalid {
 | 
						||
			tasksToDeleteIDs = append(tasksToDeleteIDs, pt.ID)
 | 
						||
			logsToCancelIDs = append(logsToCancelIDs, pt.TaskExecutionLogID)
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	if len(tasksToDeleteIDs) == 0 {
 | 
						||
		return nil // 没有找到需要清理的任务
 | 
						||
	}
 | 
						||
 | 
						||
	m.logger.Infof("准备从待执行队列中清理 %d 个与失效计划相关的任务...", len(tasksToDeleteIDs))
 | 
						||
 | 
						||
	// 批量删除待执行任务
 | 
						||
	if err := m.pendingTaskRepo.DeletePendingTasksByIDs(tasksToDeleteIDs); err != nil {
 | 
						||
		return fmt.Errorf("批量删除待执行任务失败: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 批量更新相关执行日志状态为“已取消”
 | 
						||
	if err := m.executionLogRepo.UpdateTaskExecutionLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
 | 
						||
		// 这是一个非关键性错误,只记录日志
 | 
						||
		m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err)
 | 
						||
	}
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// addOrUpdateTriggers 检查、更新或创建触发器。
 | 
						||
func (m *AnalysisPlanTaskManager) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error {
 | 
						||
	// 创建一个映射,存放所有已在队列中的计划触发器
 | 
						||
	pendingTriggersMap := make(map[uint]models.PendingTask)
 | 
						||
	for _, pt := range allPendingTasks {
 | 
						||
		if pt.Task != nil && pt.Task.Type == models.TaskPlanAnalysis {
 | 
						||
			pendingTriggersMap[pt.Task.PlanID] = pt
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	for _, plan := range runnablePlans {
 | 
						||
		existingTrigger, exists := pendingTriggersMap[plan.ID]
 | 
						||
 | 
						||
		if exists {
 | 
						||
			// --- 新增逻辑:检查并更新现有触发器 ---
 | 
						||
			// 只对自动计划检查时间更新
 | 
						||
			if plan.ExecutionType == models.PlanExecutionTypeAutomatic {
 | 
						||
				next, err := utils.GetNextCronTime(plan.CronExpression)
 | 
						||
				if err != nil {
 | 
						||
					m.logger.Errorf("为计划 #%d 解析Cron表达式失败,跳过更新: %v", plan.ID, err)
 | 
						||
					continue
 | 
						||
				}
 | 
						||
				// 如果数据库中记录的执行时间与根据当前Cron表达式计算出的下一次时间不一致,则更新
 | 
						||
				if !existingTrigger.ExecuteAt.Equal(next) {
 | 
						||
					m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, next)
 | 
						||
					if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, next); err != nil {
 | 
						||
						m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
 | 
						||
					}
 | 
						||
				}
 | 
						||
			}
 | 
						||
		} else {
 | 
						||
			// --- 原有逻辑:为缺失的计划创建新触发器 ---
 | 
						||
			m.logger.Infof("发现应执行但队列中缺失的计划 #%d,正在为其创建触发器...", plan.ID)
 | 
						||
			if err := m.createTriggerTask(plan); err != nil {
 | 
						||
				m.logger.Errorf("为计划 #%d 创建触发器失败: %v", plan.ID, err)
 | 
						||
				// 继续处理下一个,不因单点失败而中断
 | 
						||
			}
 | 
						||
		}
 | 
						||
	}
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// createTriggerTask 是创建触发器任务的内部核心逻辑。
 | 
						||
func (m *AnalysisPlanTaskManager) createTriggerTask(plan *models.Plan) error {
 | 
						||
	analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
 | 
						||
	if err != nil {
 | 
						||
		return fmt.Errorf("查找计划分析任务失败: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	// --- 如果触发器任务定义不存在,则自动创建 ---
 | 
						||
	if analysisTask == nil {
 | 
						||
		m.logger.Warnf("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
 | 
						||
		newAnalysisTask, err := m.planRepo.CreatePlanAnalysisTask(plan)
 | 
						||
		if err != nil {
 | 
						||
			return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
 | 
						||
		}
 | 
						||
		analysisTask = newAnalysisTask
 | 
						||
		m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义 (ID: %d)", plan.ID, analysisTask.ID)
 | 
						||
	}
 | 
						||
 | 
						||
	var executeAt time.Time
 | 
						||
	if plan.ExecutionType == models.PlanExecutionTypeManual {
 | 
						||
		executeAt = time.Now()
 | 
						||
	} else {
 | 
						||
		next, err := utils.GetNextCronTime(plan.CronExpression)
 | 
						||
		if err != nil {
 | 
						||
			return fmt.Errorf("解析 Cron 表达式 '%s' 失败: %w", plan.CronExpression, err)
 | 
						||
		}
 | 
						||
		executeAt = next
 | 
						||
	}
 | 
						||
 | 
						||
	taskLog := &models.TaskExecutionLog{
 | 
						||
		TaskID: analysisTask.ID,
 | 
						||
		Status: models.ExecutionStatusWaiting,
 | 
						||
	}
 | 
						||
	if err := m.executionLogRepo.CreateTaskExecutionLog(taskLog); err != nil {
 | 
						||
		return fmt.Errorf("创建任务执行日志失败: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	pendingTask := &models.PendingTask{
 | 
						||
		TaskID:             analysisTask.ID,
 | 
						||
		ExecuteAt:          executeAt,
 | 
						||
		TaskExecutionLogID: taskLog.ID,
 | 
						||
	}
 | 
						||
	if err := m.pendingTaskRepo.CreatePendingTask(pendingTask); err != nil {
 | 
						||
		return fmt.Errorf("创建待执行任务失败: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	m.logger.Infof("成功为计划 #%d 创建触发器 (任务ID: %d),执行时间: %v", plan.ID, analysisTask.ID, executeAt)
 | 
						||
	return nil
 | 
						||
}
 |