Files
pig-farm-controller/internal/app/service/task/analysis_plan_task_manager.go
huang 05e789b707 1. 函数改名
2. 删掉没用文件
2025-09-23 11:08:18 +08:00

317 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package task
import (
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils"
)
// AnalysisPlanTaskManager 负责管理分析计划的触发器任务。
// 它确保数据库中可执行的计划在待执行队列中有对应的触发器,并移除无效的触发器。
// 这是一个有状态的组件,包含一个互斥锁以确保并发安全。
type AnalysisPlanTaskManager struct {
planRepo repository.PlanRepository
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
logger *logs.Logger
mu sync.Mutex
}
// NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。
func NewAnalysisPlanTaskManager(
planRepo repository.PlanRepository,
pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository,
logger *logs.Logger,
) *AnalysisPlanTaskManager {
return &AnalysisPlanTaskManager{
planRepo: planRepo,
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
logger: logger,
}
}
// Refresh 同步数据库中的计划状态和待执行队列中的触发器任务。
// 这是一个编排方法,将复杂的逻辑分解到多个内部方法中。
func (m *AnalysisPlanTaskManager) Refresh() error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Info("开始同步计划任务管理器...")
// 1. 一次性获取所有需要的数据
runnablePlans, invalidPlanIDs, pendingTasks, err := m.getRefreshData()
if err != nil {
return fmt.Errorf("获取刷新数据失败: %w", err)
}
// 2. 清理所有与失效计划相关的待执行任务
if err := m.cleanupInvalidTasks(invalidPlanIDs, pendingTasks); err != nil {
// 仅记录错误,清理失败不应阻止新任务的添加
m.logger.Errorf("清理无效任务时出错: %v", err)
}
// 3. 添加或更新触发器
if err := m.addOrUpdateTriggers(runnablePlans, pendingTasks); err != nil {
return fmt.Errorf("添加或更新触发器时出错: %w", err)
}
m.logger.Info("计划任务管理器同步完成.")
return nil
}
// CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。
// 如果触发器已存在,会根据计划类型更新其执行时间。
func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error {
m.mu.Lock()
defer m.mu.Unlock()
// 检查计划是否可执行
plan, err := m.planRepo.GetBasicPlanByID(planID)
if err != nil {
return fmt.Errorf("获取计划基本信息失败: %w", err)
}
if plan.Status != models.PlanStatusEnabled {
return fmt.Errorf("计划 #%d 当前状态为 '%d',无法创建或更新触发器", planID, plan.Status)
}
// 查找现有触发器
existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(planID)
if err != nil {
return fmt.Errorf("查找现有触发器失败: %w", err)
}
// 如果触发器已存在,则根据计划类型更新其执行时间
if existingTrigger != nil {
var expectedExecuteAt time.Time
if plan.ExecutionType == models.PlanExecutionTypeManual {
// 手动计划,如果再次触发,则立即执行
expectedExecuteAt = time.Now()
} else { // 自动计划
// 自动计划,根据 Cron 表达式计算下一次执行时间
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
m.logger.Errorf("为计划 #%d 解析Cron表达式失败无法更新触发器: %v", plan.ID, err)
return fmt.Errorf("解析 Cron 表达式失败: %w", err)
}
expectedExecuteAt = next
}
// 如果计算出的执行时间与当前待执行任务的时间不一致,则更新
if !existingTrigger.ExecuteAt.Equal(expectedExecuteAt) {
m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, expectedExecuteAt)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, expectedExecuteAt); err != nil {
m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
return fmt.Errorf("更新触发器执行时间失败: %w", err)
}
} else {
m.logger.Infof("计划 #%d 的触发器已存在且执行时间无需更新。", plan.ID)
}
return nil // 触发器已存在且已处理更新,直接返回
}
// 如果触发器不存在,则创建新的触发器
m.logger.Infof("为计划 #%d 创建新的触发器...", planID)
return m.createTriggerTask(plan)
}
// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。
// 如果不存在,则会自动创建。此方法不涉及待执行队列。
func (m *AnalysisPlanTaskManager) EnsureAnalysisTaskDefinition(planID uint) error {
m.mu.Lock()
defer m.mu.Unlock()
plan, err := m.planRepo.GetBasicPlanByID(planID)
if err != nil {
return fmt.Errorf("确保分析任务定义失败:获取计划 #%d 基本信息时出错: %w", planID, err)
}
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
if err != nil {
return fmt.Errorf("确保分析任务定义失败:查找计划 #%d 的分析任务时出错: %w", plan.ID, err)
}
if analysisTask == nil {
m.logger.Infof("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
_, err := m.planRepo.CreatePlanAnalysisTask(plan) // CreatePlanAnalysisTask returns *models.Task, error
if err != nil {
return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
}
m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义。", plan.ID)
} else {
m.logger.Infof("计划 #%d 的 'plan_analysis' 任务定义已存在。", plan.ID)
}
return nil
}
// --- 内部私有方法 ---
// getRefreshData 从数据库获取刷新所需的所有数据。
func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) {
runnablePlans, err = m.planRepo.FindRunnablePlans()
if err != nil {
m.logger.Errorf("获取可执行计划列表失败: %v", err)
return
}
invalidPlans, err := m.planRepo.FindDisabledAndStoppedPlans()
if err != nil {
m.logger.Errorf("获取失效计划列表失败: %v", err)
return
}
invalidPlanIDs = make([]uint, len(invalidPlans))
for i, p := range invalidPlans {
invalidPlanIDs[i] = p.ID
}
pendingTasks, err = m.pendingTaskRepo.FindAllPendingTasks()
if err != nil {
m.logger.Errorf("获取所有待执行任务失败: %v", err)
return
}
return
}
// cleanupInvalidTasks 清理所有与失效计划相关的待执行任务。
func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error {
if len(invalidPlanIDs) == 0 {
return nil // 没有需要清理的计划
}
invalidPlanIDSet := make(map[uint]struct{}, len(invalidPlanIDs))
for _, id := range invalidPlanIDs {
invalidPlanIDSet[id] = struct{}{}
}
var tasksToDeleteIDs []uint
var logsToCancelIDs []uint
for _, pt := range allPendingTasks {
if pt.Task == nil { // 防御性编程,确保 Task 被预加载
continue
}
if _, isInvalid := invalidPlanIDSet[pt.Task.PlanID]; isInvalid {
tasksToDeleteIDs = append(tasksToDeleteIDs, pt.ID)
logsToCancelIDs = append(logsToCancelIDs, pt.TaskExecutionLogID)
}
}
if len(tasksToDeleteIDs) == 0 {
return nil // 没有找到需要清理的任务
}
m.logger.Infof("准备从待执行队列中清理 %d 个与失效计划相关的任务...", len(tasksToDeleteIDs))
// 批量删除待执行任务
if err := m.pendingTaskRepo.DeletePendingTasksByIDs(tasksToDeleteIDs); err != nil {
return fmt.Errorf("批量删除待执行任务失败: %w", err)
}
// 批量更新相关执行日志状态为“已取消”
if err := m.executionLogRepo.UpdateTaskExecutionLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
// 这是一个非关键性错误,只记录日志
m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err)
}
return nil
}
// addOrUpdateTriggers 检查、更新或创建触发器。
func (m *AnalysisPlanTaskManager) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error {
// 创建一个映射,存放所有已在队列中的计划触发器
pendingTriggersMap := make(map[uint]models.PendingTask)
for _, pt := range allPendingTasks {
if pt.Task != nil && pt.Task.Type == models.TaskPlanAnalysis {
pendingTriggersMap[pt.Task.PlanID] = pt
}
}
for _, plan := range runnablePlans {
existingTrigger, exists := pendingTriggersMap[plan.ID]
if exists {
// --- 新增逻辑:检查并更新现有触发器 ---
// 只对自动计划检查时间更新
if plan.ExecutionType == models.PlanExecutionTypeAutomatic {
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
m.logger.Errorf("为计划 #%d 解析Cron表达式失败跳过更新: %v", plan.ID, err)
continue
}
// 如果数据库中记录的执行时间与根据当前Cron表达式计算出的下一次时间不一致则更新
if !existingTrigger.ExecuteAt.Equal(next) {
m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, next)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, next); err != nil {
m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
}
}
}
} else {
// --- 原有逻辑:为缺失的计划创建新触发器 ---
m.logger.Infof("发现应执行但队列中缺失的计划 #%d正在为其创建触发器...", plan.ID)
if err := m.createTriggerTask(plan); err != nil {
m.logger.Errorf("为计划 #%d 创建触发器失败: %v", plan.ID, err)
// 继续处理下一个,不因单点失败而中断
}
}
}
return nil
}
// createTriggerTask 是创建触发器任务的内部核心逻辑。
func (m *AnalysisPlanTaskManager) createTriggerTask(plan *models.Plan) error {
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
if err != nil {
return fmt.Errorf("查找计划分析任务失败: %w", err)
}
// --- 如果触发器任务定义不存在,则自动创建 ---
if analysisTask == nil {
m.logger.Warnf("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
newAnalysisTask, err := m.planRepo.CreatePlanAnalysisTask(plan)
if err != nil {
return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
}
analysisTask = newAnalysisTask
m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义 (ID: %d)", plan.ID, analysisTask.ID)
}
var executeAt time.Time
if plan.ExecutionType == models.PlanExecutionTypeManual {
executeAt = time.Now()
} else {
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
return fmt.Errorf("解析 Cron 表达式 '%s' 失败: %w", plan.CronExpression, err)
}
executeAt = next
}
taskLog := &models.TaskExecutionLog{
TaskID: analysisTask.ID,
Status: models.ExecutionStatusWaiting,
}
if err := m.executionLogRepo.CreateTaskExecutionLog(taskLog); err != nil {
return fmt.Errorf("创建任务执行日志失败: %w", err)
}
pendingTask := &models.PendingTask{
TaskID: analysisTask.ID,
ExecuteAt: executeAt,
TaskExecutionLogID: taskLog.ID,
}
if err := m.pendingTaskRepo.CreatePendingTask(pendingTask); err != nil {
return fmt.Errorf("创建待执行任务失败: %w", err)
}
m.logger.Infof("成功为计划 #%d 创建触发器 (任务ID: %d),执行时间: %v", plan.ID, analysisTask.ID, executeAt)
return nil
}