233 lines
7.9 KiB
Go
233 lines
7.9 KiB
Go
package task
|
||
|
||
import (
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
|
||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils"
|
||
)
|
||
|
||
// AnalysisPlanTaskManager 负责管理分析计划的触发器任务。
|
||
// 它确保数据库中可执行的计划在待执行队列中有对应的触发器,并移除无效的触发器。
|
||
// 这是一个有状态的组件,包含一个互斥锁以确保并发安全。
|
||
type AnalysisPlanTaskManager struct {
|
||
planRepo repository.PlanRepository
|
||
pendingTaskRepo repository.PendingTaskRepository
|
||
executionLogRepo repository.ExecutionLogRepository
|
||
logger *logs.Logger
|
||
mu sync.Mutex
|
||
}
|
||
|
||
// NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。
|
||
func NewAnalysisPlanTaskManager(
|
||
planRepo repository.PlanRepository,
|
||
pendingTaskRepo repository.PendingTaskRepository,
|
||
executionLogRepo repository.ExecutionLogRepository,
|
||
logger *logs.Logger,
|
||
) *AnalysisPlanTaskManager {
|
||
return &AnalysisPlanTaskManager{
|
||
planRepo: planRepo,
|
||
pendingTaskRepo: pendingTaskRepo,
|
||
executionLogRepo: executionLogRepo,
|
||
logger: logger,
|
||
}
|
||
}
|
||
|
||
// Refresh 同步数据库中的计划状态和待执行队列中的触发器任务。
|
||
// 这是一个编排方法,将复杂的逻辑分解到多个内部方法中。
|
||
func (m *AnalysisPlanTaskManager) Refresh() error {
|
||
m.mu.Lock()
|
||
defer m.mu.Unlock()
|
||
|
||
m.logger.Info("开始同步计划任务管理器...")
|
||
|
||
// 1. 一次性获取所有需要的数据
|
||
runnablePlans, invalidPlanIDs, pendingTasks, err := m.getRefreshData()
|
||
if err != nil {
|
||
return fmt.Errorf("获取刷新数据失败: %w", err)
|
||
}
|
||
|
||
// 2. 清理所有与失效计划相关的待执行任务
|
||
if err := m.cleanupInvalidTasks(invalidPlanIDs, pendingTasks); err != nil {
|
||
// 仅记录错误,清理失败不应阻止新任务的添加
|
||
m.logger.Errorf("清理无效任务时出错: %v", err)
|
||
}
|
||
|
||
// 3. 为应执行但缺失的计划添加新触发器
|
||
if err := m.addNewTriggers(runnablePlans, pendingTasks); err != nil {
|
||
return fmt.Errorf("添加新触发器时出错: %w", err)
|
||
}
|
||
|
||
m.logger.Info("计划任务管理器同步完成.")
|
||
return nil
|
||
}
|
||
|
||
// CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。
|
||
// 这个方法是幂等的:如果一个有效的触发器已存在,它将不会重复创建。
|
||
func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error {
|
||
m.mu.Lock()
|
||
defer m.mu.Unlock()
|
||
|
||
// 检查计划是否可执行
|
||
plan, err := m.planRepo.GetBasicPlanByID(planID)
|
||
if err != nil {
|
||
return fmt.Errorf("获取计划基本信息失败: %w", err)
|
||
}
|
||
if plan.Status != models.PlanStatusEnabled {
|
||
return fmt.Errorf("计划 #%d 当前状态为 '%d',无法创建触发器", planID, plan.Status)
|
||
}
|
||
|
||
// 幂等性检查:如果触发器已存在,则直接返回
|
||
existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(planID)
|
||
if err != nil {
|
||
return fmt.Errorf("查找现有触发器失败: %w", err)
|
||
}
|
||
if existingTrigger != nil {
|
||
m.logger.Infof("计划 #%d 的触发器已存在于待执行队列中,无需重复创建。", planID)
|
||
return nil
|
||
}
|
||
|
||
m.logger.Infof("为计划 #%d 创建新的触发器...", planID)
|
||
return m.createTriggerTask(plan)
|
||
}
|
||
|
||
// --- 内部私有方法 ---
|
||
|
||
// getRefreshData 从数据库获取刷新所需的所有数据。
|
||
func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) {
|
||
runnablePlans, err = m.planRepo.FindRunnablePlans()
|
||
if err != nil {
|
||
m.logger.Errorf("获取可执行计划列表失败: %v", err)
|
||
return
|
||
}
|
||
|
||
invalidPlans, err := m.planRepo.FindDisabledAndStoppedPlans()
|
||
if err != nil {
|
||
m.logger.Errorf("获取失效计划列表失败: %v", err)
|
||
return
|
||
}
|
||
invalidPlanIDs = make([]uint, len(invalidPlans))
|
||
for i, p := range invalidPlans {
|
||
invalidPlanIDs[i] = p.ID
|
||
}
|
||
|
||
pendingTasks, err = m.pendingTaskRepo.FindAllPendingTasks()
|
||
if err != nil {
|
||
m.logger.Errorf("获取所有待执行任务失败: %v", err)
|
||
return
|
||
}
|
||
return
|
||
}
|
||
|
||
// cleanupInvalidTasks 清理所有与失效计划相关的待执行任务。
|
||
func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error {
|
||
if len(invalidPlanIDs) == 0 {
|
||
return nil // 没有需要清理的计划
|
||
}
|
||
|
||
invalidPlanIDSet := make(map[uint]struct{}, len(invalidPlanIDs))
|
||
for _, id := range invalidPlanIDs {
|
||
invalidPlanIDSet[id] = struct{}{}
|
||
}
|
||
|
||
var tasksToDeleteIDs []uint
|
||
var logsToCancelIDs []uint
|
||
|
||
for _, pt := range allPendingTasks {
|
||
if pt.Task == nil { // 防御性编程,确保 Task 被预加载
|
||
continue
|
||
}
|
||
if _, isInvalid := invalidPlanIDSet[pt.Task.PlanID]; isInvalid {
|
||
tasksToDeleteIDs = append(tasksToDeleteIDs, pt.ID)
|
||
logsToCancelIDs = append(logsToCancelIDs, pt.TaskExecutionLogID)
|
||
}
|
||
}
|
||
|
||
if len(tasksToDeleteIDs) == 0 {
|
||
return nil // 没有找到需要清理的任务
|
||
}
|
||
|
||
m.logger.Infof("准备从待执行队列中清理 %d 个与失效计划相关的任务...", len(tasksToDeleteIDs))
|
||
|
||
// 批量删除待执行任务
|
||
if err := m.pendingTaskRepo.DeletePendingTasksByIDs(tasksToDeleteIDs); err != nil {
|
||
return fmt.Errorf("批量删除待执行任务失败: %w", err)
|
||
}
|
||
|
||
// 批量更新相关执行日志状态为“已取消”
|
||
if err := m.executionLogRepo.UpdateLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
|
||
// 这是一个非关键性错误,只记录日志
|
||
m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// addNewTriggers 检查并为应执行但缺失的计划添加新触发器。
|
||
func (m *AnalysisPlanTaskManager) addNewTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error {
|
||
// 创建一个集合,存放所有已在队列中的计划触发器
|
||
pendingTriggerPlanIDs := make(map[uint]struct{})
|
||
for _, pt := range allPendingTasks {
|
||
if pt.Task != nil && pt.Task.Type == models.TaskPlanAnalysis {
|
||
pendingTriggerPlanIDs[pt.Task.PlanID] = struct{}{}
|
||
}
|
||
}
|
||
|
||
for _, plan := range runnablePlans {
|
||
if _, exists := pendingTriggerPlanIDs[plan.ID]; !exists {
|
||
m.logger.Infof("发现应执行但队列中缺失的计划 #%d,正在为其创建触发器...", plan.ID)
|
||
if err := m.createTriggerTask(plan); err != nil {
|
||
m.logger.Errorf("为计划 #%d 创建触发器失败: %v", plan.ID, err)
|
||
// 继续处理下一个,不因单点失败而中断
|
||
}
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// createTriggerTask 是创建触发器任务的内部核心逻辑。
|
||
func (m *AnalysisPlanTaskManager) createTriggerTask(plan *models.Plan) error {
|
||
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
|
||
if err != nil {
|
||
return fmt.Errorf("查找计划分析任务失败: %w", err)
|
||
}
|
||
if analysisTask == nil {
|
||
return fmt.Errorf("未找到计划 #%d 关联的 'plan_analysis' 任务", plan.ID)
|
||
}
|
||
|
||
var executeAt time.Time
|
||
if plan.ExecutionType == models.PlanExecutionTypeManual {
|
||
executeAt = time.Now()
|
||
} else {
|
||
next, err := utils.GetNextCronTime(plan.CronExpression)
|
||
if err != nil {
|
||
return fmt.Errorf("解析 Cron 表达式 '%s' 失败: %w", plan.CronExpression, err)
|
||
}
|
||
executeAt = next
|
||
}
|
||
|
||
taskLog := &models.TaskExecutionLog{
|
||
TaskID: analysisTask.ID,
|
||
Status: models.ExecutionStatusWaiting,
|
||
}
|
||
if err := m.executionLogRepo.CreateTaskExecutionLog(taskLog); err != nil {
|
||
return fmt.Errorf("创建任务执行日志失败: %w", err)
|
||
}
|
||
|
||
pendingTask := &models.PendingTask{
|
||
TaskID: analysisTask.ID,
|
||
ExecuteAt: executeAt,
|
||
TaskExecutionLogID: taskLog.ID,
|
||
}
|
||
if err := m.pendingTaskRepo.CreatePendingTask(pendingTask); err != nil {
|
||
return fmt.Errorf("创建待执行任务失败: %w", err)
|
||
}
|
||
|
||
m.logger.Infof("成功为计划 #%d 创建触发器 (任务ID: %d),执行时间: %v", plan.ID, analysisTask.ID, executeAt)
|
||
return nil
|
||
}
|