重构AnalysisPlanTaskManager

This commit is contained in:
2025-09-20 22:32:00 +08:00
parent 6711f55fba
commit 056279bdc2
3 changed files with 116 additions and 35 deletions

View File

@@ -138,10 +138,10 @@ func (c *Controller) CreatePlan(ctx *gin.Context) {
return return
} }
// 创建成功后,调用 manager 创建或更新触发器 // 创建成功后,调用 manager 确保触发器任务定义存在,但不立即加入待执行队列
if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToCreate.ID); err != nil { if err := c.analysisPlanTaskManager.EnsureAnalysisTaskDefinition(planToCreate.ID); err != nil {
// 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功 // 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功
c.logger.Errorf("为新创建的计划 %d 创建触发器失败: %v", planToCreate.ID, err) c.logger.Errorf("为新创建的计划 %d 确保触发器任务定义失败: %v", planToCreate.ID, err)
} }
// 使用已有的转换函数将创建后的模型转换为响应对象 // 使用已有的转换函数将创建后的模型转换为响应对象
@@ -287,10 +287,10 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
return return
} }
// 更新成功后,调用 manager 创建或更新触发器 // 更新成功后,调用 manager 确保触发器任务定义存在
if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToUpdate.ID); err != nil { if err := c.analysisPlanTaskManager.EnsureAnalysisTaskDefinition(planToUpdate.ID); err != nil {
// 这是一个非阻塞性错误,我们只记录日志 // 这是一个非阻塞性错误,我们只记录日志
c.logger.Errorf("为更新后的计划 %d 更新触发器失败: %v", planToUpdate.ID, err) c.logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err)
} }
// 6. 获取更新后的完整计划用于响应 // 6. 获取更新后的完整计划用于响应

View File

@@ -68,6 +68,7 @@ func (m *AnalysisPlanTaskManager) Refresh() error {
// CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。 // CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。
// 这个方法是幂等的:如果一个有效的触发器已存在,它将不会重复创建。 // 这个方法是幂等的:如果一个有效的触发器已存在,它将不会重复创建。
// 关键修改:如果触发器已存在,会根据计划类型更新其执行时间。
func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error { func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@@ -78,23 +79,79 @@ func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error {
return fmt.Errorf("获取计划基本信息失败: %w", err) return fmt.Errorf("获取计划基本信息失败: %w", err)
} }
if plan.Status != models.PlanStatusEnabled { if plan.Status != models.PlanStatusEnabled {
return fmt.Errorf("计划 #%d 当前状态为 '%d',无法创建触发器", planID, plan.Status) return fmt.Errorf("计划 #%d 当前状态为 '%d',无法创建或更新触发器", planID, plan.Status)
} }
// 幂等性检查:如果触发器已存在,则直接返回 // 查找现有触发器
existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(planID) existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(planID)
if err != nil { if err != nil {
return fmt.Errorf("查找现有触发器失败: %w", err) return fmt.Errorf("查找现有触发器失败: %w", err)
} }
// 如果触发器已存在,则根据计划类型更新其执行时间
if existingTrigger != nil { if existingTrigger != nil {
m.logger.Infof("计划 #%d 的触发器已存在于待执行队列中,无需重复创建。", planID) var expectedExecuteAt time.Time
return nil if plan.ExecutionType == models.PlanExecutionTypeManual {
// 手动计划,如果再次触发,则立即执行
expectedExecuteAt = time.Now()
} else { // 自动计划
// 自动计划,根据 Cron 表达式计算下一次执行时间
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
m.logger.Errorf("为计划 #%d 解析Cron表达式失败无法更新触发器: %v", plan.ID, err)
return fmt.Errorf("解析 Cron 表达式失败: %w", err)
}
expectedExecuteAt = next
}
// 如果计算出的执行时间与当前待执行任务的时间不一致,则更新
if !existingTrigger.ExecuteAt.Equal(expectedExecuteAt) {
m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, expectedExecuteAt)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, expectedExecuteAt); err != nil {
m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
return fmt.Errorf("更新触发器执行时间失败: %w", err)
}
} else {
m.logger.Infof("计划 #%d 的触发器已存在且执行时间无需更新。", plan.ID)
}
return nil // 触发器已存在且已处理更新,直接返回
} }
// 如果触发器不存在,则创建新的触发器
m.logger.Infof("为计划 #%d 创建新的触发器...", planID) m.logger.Infof("为计划 #%d 创建新的触发器...", planID)
return m.createTriggerTask(plan) return m.createTriggerTask(plan)
} }
// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。
// 如果不存在,则会自动创建。此方法不涉及待执行队列。
func (m *AnalysisPlanTaskManager) EnsureAnalysisTaskDefinition(planID uint) error {
m.mu.Lock()
defer m.mu.Unlock()
plan, err := m.planRepo.GetBasicPlanByID(planID)
if err != nil {
return fmt.Errorf("确保分析任务定义失败:获取计划 #%d 基本信息时出错: %w", planID, err)
}
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
if err != nil {
return fmt.Errorf("确保分析任务定义失败:查找计划 #%d 的分析任务时出错: %w", plan.ID, err)
}
if analysisTask == nil {
m.logger.Infof("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
_, err := m.planRepo.CreatePlanAnalysisTask(plan) // CreatePlanAnalysisTask returns *models.Task, error
if err != nil {
return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
}
m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义。", plan.ID)
} else {
m.logger.Infof("计划 #%d 的 'plan_analysis' 任务定义已存在。", plan.ID)
}
return nil
}
// --- 内部私有方法 --- // --- 内部私有方法 ---
// getRefreshData 从数据库获取刷新所需的所有数据。 // getRefreshData 从数据库获取刷新所需的所有数据。

View File

@@ -1,7 +1,7 @@
package task package task
import ( import (
"context" "encoding/json"
"errors" "errors"
"sync" "sync"
"time" "time"
@@ -112,10 +112,9 @@ type Scheduler struct {
progressTracker *ProgressTracker progressTracker *ProgressTracker
taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例
pool *ants.Pool // 使用 ants 协程池来管理并发 pool *ants.Pool // 使用 ants 协程池来管理并发
wg sync.WaitGroup wg sync.WaitGroup
ctx context.Context stopChan chan struct{} // 用于停止主循环的信号通道
cancel context.CancelFunc
} }
// NewScheduler 创建一个新的调度器实例 // NewScheduler 创建一个新的调度器实例
@@ -128,8 +127,6 @@ func NewScheduler(
logger *logs.Logger, logger *logs.Logger,
interval time.Duration, interval time.Duration,
numWorkers int) *Scheduler { numWorkers int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{ return &Scheduler{
pendingTaskRepo: pendingTaskRepo, pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo, executionLogRepo: executionLogRepo,
@@ -140,8 +137,7 @@ func NewScheduler(
workers: numWorkers, workers: numWorkers,
progressTracker: NewProgressTracker(), progressTracker: NewProgressTracker(),
taskFactory: taskFactory, taskFactory: taskFactory,
ctx: ctx, stopChan: make(chan struct{}), // 初始化停止信号通道
cancel: cancel,
} }
} }
@@ -164,9 +160,9 @@ func (s *Scheduler) Start() {
// Stop 优雅地停止调度器 // Stop 优雅地停止调度器
func (s *Scheduler) Stop() { func (s *Scheduler) Stop() {
s.logger.Warnf("正在停止任务调度器...") s.logger.Warnf("正在停止任务调度器...")
s.cancel() // 1. 发出取消信号,停止主循环 close(s.stopChan) // 1. 发出停止信号,停止主循环
s.wg.Wait() // 2. 等待主循环完成 s.wg.Wait() // 2. 等待主循环完成
s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕) s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕)
s.logger.Warnf("任务调度器已安全停止") s.logger.Warnf("任务调度器已安全停止")
} }
@@ -178,9 +174,11 @@ func (s *Scheduler) run() {
for { for {
select { select {
case <-s.ctx.Done(): case <-s.stopChan:
// 收到停止信号,退出循环
return return
case <-ticker.C: case <-ticker.C:
// 定时触发任务认领和提交
go s.claimAndSubmit() go s.claimAndSubmit()
} }
} }
@@ -256,10 +254,26 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
// 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行 // 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行
if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) { if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) {
// 调用共享的 Manager 来处理触发器更新逻辑 // --- 新增逻辑:更新计划执行次数并判断是否需要触发下一次执行 ---
err = s.analysisPlanTaskManager.CreateOrUpdateTrigger(s.ctx, claimedLog.Task.PlanID) planID := claimedLog.Task.PlanID
// 获取计划的最新数据
plan, err := s.planRepo.GetBasicPlanByID(planID)
if err != nil { if err != nil {
s.logger.Errorf("[严重] 创建计划分析任务失败, 当前Plan(%v)将无法进行下次触发, 错误: %v", claimedLog.Task.PlanID, err) s.logger.Errorf("获取计划 %d 的基本信息失败: %v", planID, err)
return
}
// 更新计划的执行计数器
plan.ExecuteCount++
if err := s.planRepo.UpdatePlan(plan); err != nil { // UpdatePlan 可以更新整个 Plan 对象
s.logger.Errorf("更新计划 %d 的执行计数失败: %v", planID, err)
return
}
// 调用共享的 Manager 来处理触发器更新逻辑
if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(planID); err != nil {
s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", planID, err)
} }
} }
@@ -301,8 +315,18 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 创建Plan执行记录 // 创建Plan执行记录
// 从任务的 Parameters 中解析出真实的 PlanID
var params struct {
PlanID uint `json:"plan_id"`
}
if err := json.Unmarshal(claimedLog.Task.Parameters, &params); err != nil {
s.logger.Errorf("解析任务参数中的计划ID失败日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
realPlanID := params.PlanID
planLog := &models.PlanExecutionLog{ planLog := &models.PlanExecutionLog{
PlanID: claimedLog.Task.PlanID, PlanID: realPlanID, // 使用从参数中解析出的真实 PlanID
Status: models.ExecutionStatusStarted, Status: models.ExecutionStatusStarted,
StartedAt: time.Now(), StartedAt: time.Now(),
} }
@@ -312,7 +336,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
} }
// 解析出Task列表 // 解析出Task列表
tasks, err := s.planRepo.FlattenPlanTasks(claimedLog.Task.PlanID) tasks, err := s.planRepo.FlattenPlanTasks(realPlanID)
if err != nil { if err != nil {
s.logger.Errorf("[严重] 解析计划失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) s.logger.Errorf("[严重] 解析计划失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err return err
@@ -320,12 +344,12 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 写入执行历史 // 写入执行历史
taskLogs := make([]*models.TaskExecutionLog, len(tasks)) taskLogs := make([]*models.TaskExecutionLog, len(tasks))
for _, task := range tasks { for i, task := range tasks {
taskLogs = append(taskLogs, &models.TaskExecutionLog{ taskLogs[i] = &models.TaskExecutionLog{
PlanExecutionLogID: planLog.ID, PlanExecutionLogID: planLog.ID,
TaskID: task.ID, TaskID: task.ID,
Status: models.ExecutionStatusWaiting, Status: models.ExecutionStatusWaiting,
}) }
} }
err = s.executionLogRepo.CreateTaskExecutionLogsInBatch(taskLogs) err = s.executionLogRepo.CreateTaskExecutionLogsInBatch(taskLogs)
@@ -337,13 +361,13 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 写入待执行队列 // 写入待执行队列
pendingTasks := make([]*models.PendingTask, len(tasks)) pendingTasks := make([]*models.PendingTask, len(tasks))
for i, task := range tasks { for i, task := range tasks {
pendingTasks = append(pendingTasks, &models.PendingTask{ pendingTasks[i] = &models.PendingTask{
TaskID: task.ID, TaskID: task.ID,
TaskExecutionLogID: pendingTasks[i].ID, TaskExecutionLogID: taskLogs[i].ID, // 使用正确的 TaskExecutionLogID
// 待执行队列是通过任务触发时间排序的, 且只要在调度器获取的时间点之前的都可以被触发 // 待执行队列是通过任务触发时间排序的, 且只要在调度器获取的时间点之前的都可以被触发
ExecuteAt: time.Now().Add(time.Duration(i) * time.Second), ExecuteAt: time.Now().Add(time.Duration(i) * time.Second),
}) }
} }
err = s.pendingTaskRepo.CreatePendingTasksInBatch(pendingTasks) err = s.pendingTaskRepo.CreatePendingTasksInBatch(pendingTasks)
if err != nil { if err != nil {
@@ -352,7 +376,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
} }
// 将Task列表加入待执行队列中 // 将Task列表加入待执行队列中
s.progressTracker.AddNewPlan(claimedLog.PlanExecutionLogID, len(tasks)) s.progressTracker.AddNewPlan(planLog.ID, len(tasks))
return nil return nil
} }