From 05e789b707d503737d8d2cf3afb44fd53b076c75 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 11:08:18 +0800 Subject: [PATCH 01/11] =?UTF-8?q?1.=20=E5=87=BD=E6=95=B0=E6=94=B9=E5=90=8D?= =?UTF-8?q?=202.=20=E5=88=A0=E6=8E=89=E6=B2=A1=E7=94=A8=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task/analysis_plan_task_manager.go | 2 +- .../app/service/task/plan_analysis_task.go | 29 ------------------- internal/core/application.go | 2 +- .../repository/execution_log_repository.go | 10 +++---- internal/infra/repository/plan_repository.go | 2 +- 5 files changed, 8 insertions(+), 37 deletions(-) delete mode 100644 internal/app/service/task/plan_analysis_task.go diff --git a/internal/app/service/task/analysis_plan_task_manager.go b/internal/app/service/task/analysis_plan_task_manager.go index 4bac7ec..0e5b7b5 100644 --- a/internal/app/service/task/analysis_plan_task_manager.go +++ b/internal/app/service/task/analysis_plan_task_manager.go @@ -215,7 +215,7 @@ func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, all } // 批量更新相关执行日志状态为“已取消” - if err := m.executionLogRepo.UpdateLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil { + if err := m.executionLogRepo.UpdateTaskExecutionLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil { // 这是一个非关键性错误,只记录日志 m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err) } diff --git a/internal/app/service/task/plan_analysis_task.go b/internal/app/service/task/plan_analysis_task.go deleted file mode 100644 index e48e41f..0000000 --- a/internal/app/service/task/plan_analysis_task.go +++ /dev/null @@ -1,29 +0,0 @@ -package task - -import ( - "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" -) - -// PlanAnalysisTask 用于在任务执行队列中触发一个plan的执行 -// 该任务会解析plan生成扁平化的待执行任务表, 并将任务列表插入任务执行队列 -// 该任务会预写入plan所有待执行任务的执行日志 -// 每个plan执行完毕时 或 创建plan时 都应该重新创建一个 PlanAnalysisTask 以便触发下次plan执行 -// 更新plan后应当更新对应 PlanAnalysisTask -type PlanAnalysisTask struct { -} - -func (p *PlanAnalysisTask) Execute() error { - //TODO implement me - panic("implement me") -} - -func (p *PlanAnalysisTask) ParseParams(logger *logs.Logger, claimedLog *models.TaskExecutionLog) error { - //TODO implement me - panic("implement me") -} - -func (p *PlanAnalysisTask) OnFailure(executeErr error) { - //TODO implement me - panic("implement me") -} diff --git a/internal/core/application.go b/internal/core/application.go index 174b434..87a583d 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -217,7 +217,7 @@ func (app *Application) initializePendingTasks( // 批量更新 TaskExecutionLog 状态为取消 if len(taskLogIDsToCancel) > 0 { - if err := executionLogRepo.UpdateLogStatusByIDs(taskLogIDsToCancel, models.ExecutionStatusCancelled); err != nil { + if err := executionLogRepo.UpdateTaskExecutionLogStatusByIDs(taskLogIDsToCancel, models.ExecutionStatusCancelled); err != nil { logger.Errorf("批量更新任务执行日志状态为取消失败: %v", err) // 这是一个非阻塞性错误,继续执行 } diff --git a/internal/infra/repository/execution_log_repository.go b/internal/infra/repository/execution_log_repository.go index b3bbff5..30838aa 100644 --- a/internal/infra/repository/execution_log_repository.go +++ b/internal/infra/repository/execution_log_repository.go @@ -10,8 +10,8 @@ import ( // ExecutionLogRepository 定义了与执行日志交互的接口。 // 这为服务层提供了一个清晰的契约,并允许在测试中轻松地进行模拟。 type ExecutionLogRepository interface { - UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error - UpdateLogStatus(logID uint, status models.ExecutionStatus) error + UpdateTaskExecutionLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error + UpdateTaskExecutionLogStatus(logID uint, status models.ExecutionStatus) error CreateTaskExecutionLog(log *models.TaskExecutionLog) error CreatePlanExecutionLog(log *models.PlanExecutionLog) error UpdatePlanExecutionLog(log *models.PlanExecutionLog) error @@ -43,7 +43,7 @@ func NewGormExecutionLogRepository(db *gorm.DB) ExecutionLogRepository { return &gormExecutionLogRepository{db: db} } -func (r *gormExecutionLogRepository) UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error { +func (r *gormExecutionLogRepository) UpdateTaskExecutionLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error { if len(logIDs) == 0 { return nil } @@ -52,7 +52,7 @@ func (r *gormExecutionLogRepository) UpdateLogStatusByIDs(logIDs []uint, status Update("status", status).Error } -func (r *gormExecutionLogRepository) UpdateLogStatus(logID uint, status models.ExecutionStatus) error { +func (r *gormExecutionLogRepository) UpdateTaskExecutionLogStatus(logID uint, status models.ExecutionStatus) error { return r.db.Model(&models.TaskExecutionLog{}).Where("id = ?", logID).Update("status", status).Error } @@ -117,7 +117,7 @@ func (r *gormExecutionLogRepository) UpdatePlanExecutionLogsStatusByIDs(logIDs [ // FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志 func (r *gormExecutionLogRepository) FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error) { var logs []models.PlanExecutionLog - err := r.db.Where("status = ?", models.ExecutionStatusStarted).Find(&logs).Error + err := r.db.Where("status = ? OR status = ?", models.ExecutionStatusStarted, models.ExecutionStatusWaiting).Find(&logs).Error return logs, err } diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index 0016748..e3367bf 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -694,7 +694,7 @@ func (r *gormPlanRepository) StopPlanTransactionally(planID uint) error { } // 3.1 批量更新任务执行日志状态为“已取消” - if err := executionLogRepoTx.UpdateLogStatusByIDs(taskLogIDs, models.ExecutionStatusCancelled); err != nil { + if err := executionLogRepoTx.UpdateTaskExecutionLogStatusByIDs(taskLogIDs, models.ExecutionStatusCancelled); err != nil { return fmt.Errorf("批量更新任务执行日志状态为 '已取消' 失败: %w", err) } From 83db3b2278c84ac930006f310485e2d2874744f5 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 11:39:47 +0800 Subject: [PATCH 02/11] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=A8=8B=E5=BA=8F?= =?UTF-8?q?=E5=90=AF=E5=8A=A8=E6=97=B6=E6=89=A7=E8=A1=8C=E5=8E=86=E5=8F=B2?= =?UTF-8?q?=E6=B2=A1=E6=9C=89=E6=AD=A3=E7=A1=AE=E5=88=B7=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/core/application.go | 45 ++++--------------- .../repository/execution_log_repository.go | 20 +++++++++ 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/internal/core/application.go b/internal/core/application.go index 87a583d..753a44a 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -191,44 +191,17 @@ func (app *Application) initializePendingTasks( // 阶段二:清理所有待执行任务和相关日志 logger.Info("阶段二:开始清理所有待执行任务和相关日志...") - pendingTasks, err := pendingTaskRepo.FindAllPendingTasks() - if err != nil { - return fmt.Errorf("获取待执行任务失败: %w", err) + + // 直接调用新的方法来更新计划执行日志状态为失败 + if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil { + logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err) + // 这是一个非阻塞性错误,继续执行 } - var taskLogIDsToCancel []uint - var planLogIDsToFail []uint - - for _, pt := range pendingTasks { - // 确保 Task 和 TaskExecutionLog 已预加载 - if pt.Task == nil || pt.TaskExecutionLog.ID == 0 { // TaskExecutionLog.ID为零说明没加载 - logger.Warnf("待执行任务 %d 缺少关联的 Task 或 TaskExecutionLog,跳过处理。", pt.ID) - continue - } - - // 收集任务执行日志ID,所有未完成的任务都标记为取消 - taskLogIDsToCancel = append(taskLogIDsToCancel, pt.TaskExecutionLog.ID) - - // 收集计划执行日志ID - if pt.TaskExecutionLog.PlanExecutionLogID != 0 { - planLogIDsToFail = append(planLogIDsToFail, pt.TaskExecutionLog.PlanExecutionLogID) - } - } - - // 批量更新 TaskExecutionLog 状态为取消 - if len(taskLogIDsToCancel) > 0 { - if err := executionLogRepo.UpdateTaskExecutionLogStatusByIDs(taskLogIDsToCancel, models.ExecutionStatusCancelled); err != nil { - logger.Errorf("批量更新任务执行日志状态为取消失败: %v", err) - // 这是一个非阻塞性错误,继续执行 - } - } - - // 批量更新 PlanExecutionLog 状态为失败 - if len(planLogIDsToFail) > 0 { - if err := executionLogRepo.UpdatePlanExecutionLogsStatusByIDs(planLogIDsToFail, models.ExecutionStatusFailed); err != nil { - logger.Errorf("批量更新计划执行日志状态为失败失败: %v", err) - // 这是一个非阻塞性错误,继续执行 - } + // 直接调用新的方法来更新任务执行日志状态为取消 + if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil { + logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err) + // 这是一个非阻塞性错误,继续执行 } // 清空待执行列表 diff --git a/internal/infra/repository/execution_log_repository.go b/internal/infra/repository/execution_log_repository.go index 30838aa..701f680 100644 --- a/internal/infra/repository/execution_log_repository.go +++ b/internal/infra/repository/execution_log_repository.go @@ -2,6 +2,7 @@ package repository import ( "errors" + "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "gorm.io/gorm" @@ -30,6 +31,11 @@ type ExecutionLogRepository interface { FindInProgressPlanExecutionLogByPlanID(planID uint) (*models.PlanExecutionLog, error) // FindIncompleteTaskExecutionLogsByPlanLogID 根据计划日志ID查找所有未完成的任务日志 FindIncompleteTaskExecutionLogsByPlanLogID(planLogID uint) ([]models.TaskExecutionLog, error) + + // FailAllIncompletePlanExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的计划状态都修改为 ExecutionStatusFailed + FailAllIncompletePlanExecutionLogs() error + // CancelAllIncompleteTaskExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的任务状态修改为 ExecutionStatusCancelled + CancelAllIncompleteTaskExecutionLogs() error } // gormExecutionLogRepository 是使用 GORM 的具体实现。 @@ -143,3 +149,17 @@ func (r *gormExecutionLogRepository) FindIncompleteTaskExecutionLogsByPlanLogID( planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).Find(&logs).Error return logs, err } + +// FailAllIncompletePlanExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的计划状态都修改为 ExecutionStatusFailed +func (r *gormExecutionLogRepository) FailAllIncompletePlanExecutionLogs() error { + return r.db.Model(&models.PlanExecutionLog{}). + Where("status IN (?, ?)", models.ExecutionStatusStarted, models.ExecutionStatusWaiting). + Updates(map[string]interface{}{"status": models.ExecutionStatusFailed, "ended_at": time.Now(), "error": "系统中断"}).Error +} + +// CancelAllIncompleteTaskExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的任务状态修改为 ExecutionStatusCancelled +func (r *gormExecutionLogRepository) CancelAllIncompleteTaskExecutionLogs() error { + return r.db.Model(&models.TaskExecutionLog{}). + Where("status IN (?, ?)", models.ExecutionStatusStarted, models.ExecutionStatusWaiting). + Updates(map[string]interface{}{"status": models.ExecutionStatusCancelled, "ended_at": time.Now(), "output": "系统中断"}).Error +} From db42560654f21b8db995ff8f4b4f74b314563813 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 17:11:31 +0800 Subject: [PATCH 03/11] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E8=BF=9B=E5=BA=A6=E8=B7=9F=E8=B8=AA=E5=99=A8,=20=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E4=BB=8E=E6=95=B0=E6=8D=AE=E5=BA=93=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E8=BF=9B=E5=BA=A6:?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 提供仓库层api --- internal/infra/models/plan.go | 1 + .../repository/execution_log_repository.go | 55 +++++++++++++++++++ .../repository/pending_task_repository.go | 12 ++++ internal/infra/repository/plan_repository.go | 8 +-- 4 files changed, 72 insertions(+), 4 deletions(-) diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 38d98fc..1d46c4c 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -44,6 +44,7 @@ const ( PlanStatusDisabled PlanStatus = 0 // 禁用计划 PlanStatusEnabled PlanStatus = 1 // 启用计划 PlanStatusStopeed PlanStatus = 2 // 执行完毕 + PlanStatusFailed PlanStatus = 3 // 执行失败 ) // Plan 代表系统中的一个计划,可以包含子计划或任务 diff --git a/internal/infra/repository/execution_log_repository.go b/internal/infra/repository/execution_log_repository.go index 701f680..d804247 100644 --- a/internal/infra/repository/execution_log_repository.go +++ b/internal/infra/repository/execution_log_repository.go @@ -36,6 +36,18 @@ type ExecutionLogRepository interface { FailAllIncompletePlanExecutionLogs() error // CancelAllIncompleteTaskExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的任务状态修改为 ExecutionStatusCancelled CancelAllIncompleteTaskExecutionLogs() error + + // FindPlanExecutionLogByID 根据ID查找计划执行日志 + FindPlanExecutionLogByID(id uint) (*models.PlanExecutionLog, error) + + // CountIncompleteTasksByPlanLogID 计算一个计划执行中未完成的任务数量 + CountIncompleteTasksByPlanLogID(planLogID uint) (int64, error) + + // FailPlanExecution 将指定的计划执行标记为失败 + FailPlanExecution(planLogID uint, errorMessage string) error + + // CancelIncompleteTasksByPlanLogID 取消一个计划执行中的所有未完成任务 + CancelIncompleteTasksByPlanLogID(planLogID uint, reason string) error } // gormExecutionLogRepository 是使用 GORM 的具体实现。 @@ -163,3 +175,46 @@ func (r *gormExecutionLogRepository) CancelAllIncompleteTaskExecutionLogs() erro Where("status IN (?, ?)", models.ExecutionStatusStarted, models.ExecutionStatusWaiting). Updates(map[string]interface{}{"status": models.ExecutionStatusCancelled, "ended_at": time.Now(), "output": "系统中断"}).Error } + +// FindPlanExecutionLogByID 根据ID查找计划执行日志 +func (r *gormExecutionLogRepository) FindPlanExecutionLogByID(id uint) (*models.PlanExecutionLog, error) { + var log models.PlanExecutionLog + err := r.db.First(&log, id).Error + if err != nil { + return nil, err + } + return &log, nil +} + +// CountIncompleteTasksByPlanLogID 计算一个计划执行中未完成的任务数量 +func (r *gormExecutionLogRepository) CountIncompleteTasksByPlanLogID(planLogID uint) (int64, error) { + var count int64 + err := r.db.Model(&models.TaskExecutionLog{}). + Where("plan_execution_log_id = ? AND status IN (?, ?)", + planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted). + Count(&count).Error + return count, err +} + +// FailPlanExecution 将指定的计划执行标记为失败 +func (r *gormExecutionLogRepository) FailPlanExecution(planLogID uint, errorMessage string) error { + return r.db.Model(&models.PlanExecutionLog{}). + Where("id = ?", planLogID). + Updates(map[string]interface{}{ + "status": models.ExecutionStatusFailed, + "error": errorMessage, + "ended_at": time.Now(), + }).Error +} + +// CancelIncompleteTasksByPlanLogID 取消一个计划执行中的所有未完成任务 +func (r *gormExecutionLogRepository) CancelIncompleteTasksByPlanLogID(planLogID uint, reason string) error { + return r.db.Model(&models.TaskExecutionLog{}). + Where("plan_execution_log_id = ? AND status IN (?, ?)", + planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted). + Updates(map[string]interface{}{ + "status": models.ExecutionStatusCancelled, + "output": reason, + "ended_at": time.Now(), + }).Error +} diff --git a/internal/infra/repository/pending_task_repository.go b/internal/infra/repository/pending_task_repository.go index ecc954c..786c361 100644 --- a/internal/infra/repository/pending_task_repository.go +++ b/internal/infra/repository/pending_task_repository.go @@ -31,6 +31,9 @@ type PendingTaskRepository interface { RequeueTask(originalPendingTask *models.PendingTask) error // FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务 FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error) + + // DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务 + DeletePendingTasksByPlanLogID(planLogID uint) error } // gormPendingTaskRepository 是使用 GORM 的具体实现。 @@ -164,3 +167,12 @@ func (r *gormPendingTaskRepository) FindPendingTasksByTaskLogIDs(taskLogIDs []ui err := r.db.Where("task_execution_log_id IN ?", taskLogIDs).Find(&pendingTasks).Error return pendingTasks, err } + +// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务 +func (r *gormPendingTaskRepository) DeletePendingTasksByPlanLogID(planLogID uint) error { + // 使用子查询找到所有与 planLogID 相关的 task_execution_log_id + subQuery := r.db.Model(&models.TaskExecutionLog{}).Select("id").Where("plan_execution_log_id = ?", planLogID) + + // 使用子查询的结果来删除待执行任务 + return r.db.Where("task_execution_log_id IN (?)", subQuery).Delete(&models.PendingTask{}).Error +} diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index e3367bf..6dbe320 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -46,8 +46,8 @@ type PlanRepository interface { FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) // FindRunnablePlans 获取所有应执行的计划 FindRunnablePlans() ([]*models.Plan, error) - // FindDisabledAndStoppedPlans 获取所有已禁用或已停止的计划 - FindDisabledAndStoppedPlans() ([]*models.Plan, error) + // FindInactivePlans 获取所有已禁用或已停止的计划 + FindInactivePlans() ([]*models.Plan, error) // FindPlanAnalysisTaskByPlanID 根据 PlanID 找到其关联的 'plan_analysis' 任务 FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error) @@ -600,10 +600,10 @@ func (r *gormPlanRepository) FindRunnablePlans() ([]*models.Plan, error) { return plans, err } -func (r *gormPlanRepository) FindDisabledAndStoppedPlans() ([]*models.Plan, error) { +func (r *gormPlanRepository) FindInactivePlans() ([]*models.Plan, error) { var plans []*models.Plan err := r.db. - Where("status = ? OR status = ?", models.PlanStatusDisabled, models.PlanStatusStopeed). + Where("status != ?", models.PlanStatusEnabled). Find(&plans).Error return plans, err } From 06f327518aaaa248a725ff7b8e989034421b29b7 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 17:19:39 +0800 Subject: [PATCH 04/11] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E8=BF=9B=E5=BA=A6=E8=B7=9F=E8=B8=AA=E5=99=A8,=20=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E4=BB=8E=E6=95=B0=E6=8D=AE=E5=BA=93=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E8=BF=9B=E5=BA=A6:?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 调整任务调度器 --- internal/app/service/task/scheduler.go | 122 +++++++++++++------------ 1 file changed, 65 insertions(+), 57 deletions(-) diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index cea91ee..0076f83 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -13,49 +13,22 @@ import ( "gorm.io/gorm" ) -// ProgressTracker 在内存中跟踪计划的执行状态,包括进度和执行锁 +// ProgressTracker 仅用于在内存中提供计划执行的并发锁 type ProgressTracker struct { - mu sync.Mutex - cond *sync.Cond // 用于实现阻塞锁 - totalTasks map[uint]int // key: planExecutionLogID, value: total tasks - completedTasks map[uint]int // key: planExecutionLogID, value: completed tasks - runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁) + mu sync.Mutex + cond *sync.Cond // 用于实现阻塞锁 + runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁) } // NewProgressTracker 创建一个新的进度跟踪器 func NewProgressTracker() *ProgressTracker { t := &ProgressTracker{ - totalTasks: make(map[uint]int), - completedTasks: make(map[uint]int), - runningPlans: make(map[uint]bool), + runningPlans: make(map[uint]bool), } t.cond = sync.NewCond(&t.mu) return t } -// AddNewPlan 添加一个新的计划,并初始化进度跟踪器 -func (t *ProgressTracker) AddNewPlan(planLogID uint, totalTasks int) { - t.mu.Lock() - t.totalTasks[planLogID] = totalTasks - t.completedTasks[planLogID] = 0 - t.mu.Unlock() -} - -// CompletedTask 通知计数器一个任务被完成 -func (t *ProgressTracker) CompletedTask(planLogID uint) { - t.mu.Lock() - t.completedTasks[planLogID]++ - t.mu.Unlock() -} - -// IsPlanOver 检查计划是否完成 -func (t *ProgressTracker) IsPlanOver(planLogID uint) bool { - t.mu.Lock() - defer t.mu.Unlock() - - return t.completedTasks[planLogID] >= t.totalTasks[planLogID] -} - // TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。 func (t *ProgressTracker) TryLock(planLogID uint) bool { t.mu.Lock() @@ -248,32 +221,44 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { if err != nil { claimedLog.Status = models.ExecutionStatusFailed claimedLog.Output = err.Error() + + // 任务失败时,调用统一的终止服务 + s.handlePlanTermination(claimedLog.PlanExecutionLogID, "子任务执行失败: "+err.Error()) return } + + // 如果是计划分析任务,它的职责是解析和分发任务,到此即完成,不参与后续的计划完成度检查。 + if claimedLog.Task.Type == models.TaskPlanAnalysis { + s.logger.Warnf("完成计划分析任务, 日志ID: %d", claimedLog.ID) + return + } + + // --- 以下是常规任务的完成逻辑 --- s.logger.Warnf("完成任务, 日志ID: %d", claimedLog.ID) - // 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行 - if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) { - // --- 新增逻辑:更新计划执行次数并判断是否需要触发下一次执行 --- - var planID uint - // 根据任务类型获取正确的 PlanID - if claimedLog.Task.Type == models.TaskPlanAnalysis { - var params struct { - PlanID uint `json:"plan_id"` - } - if err := json.Unmarshal(claimedLog.Task.Parameters, ¶ms); err != nil { - s.logger.Errorf("解析任务参数中的计划ID失败,日志ID: %d, 错误: %v", claimedLog.ID, err) - return - } - planID = params.PlanID - } else { - planID = claimedLog.Task.PlanID + // 检查是否是最后一个任务 + incompleteCount, err := s.executionLogRepo.CountIncompleteTasksByPlanLogID(claimedLog.PlanExecutionLogID) + if err != nil { + s.logger.Errorf("检查计划 %d 的未完成任务数时出错: %v", claimedLog.PlanExecutionLogID, err) + return + } + + // 如果此计划执行中已没有其他“等待中”或“进行中”的任务,则认为计划已完成 + if incompleteCount == 0 { + s.logger.Infof("计划执行 %d 的所有任务已完成,开始处理计划完成逻辑...", claimedLog.PlanExecutionLogID) + + // 通过 PlanExecutionLog 反查正确的顶层 PlanID + planExecutionLog, err := s.executionLogRepo.FindPlanExecutionLogByID(claimedLog.PlanExecutionLogID) + if err != nil { + s.logger.Errorf("获取计划执行日志 %d 失败: %v", claimedLog.PlanExecutionLogID, err) + return } + planID := planExecutionLog.PlanID // 这才是正确的顶层计划ID // 获取计划的最新数据 - plan, err := s.planRepo.GetPlanByID(planID) // Changed to GetPlanByID to include sub-plans + plan, err := s.planRepo.GetPlanByID(planID) if err != nil { - s.logger.Errorf("获取计划 %d 的完整信息失败: %v", planID, err) // Updated error message + s.logger.Errorf("获取计划 %d 的完整信息失败: %v", planID, err) return } @@ -287,7 +272,7 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { } // 保存更新后的计划状态和执行计数 - if err := s.planRepo.UpdatePlan(plan); err != nil { // UpdatePlan 可以更新整个 Plan 对象 + if err := s.planRepo.UpdatePlan(plan); err != nil { s.logger.Errorf("更新计划 %d 的执行计数和状态失败: %v", planID, err) return } @@ -295,15 +280,13 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { // 更新计划执行日志状态为完成 if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(claimedLog.PlanExecutionLogID, models.ExecutionStatusCompleted); err != nil { s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", claimedLog.PlanExecutionLogID, err) - // 这是一个非阻塞性错误,不中断后续流程 } - // 调用共享的 Manager 来处理触发器更新逻辑 (Manager 会根据最新的 Plan 状态决定是否创建新触发器) + // 调用共享的 Manager 来处理触发器更新逻辑 if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(planID); err != nil { s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", planID, err) } } - } // runTask 用于执行具体任务 @@ -402,9 +385,6 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { return err } - // 将Task列表加入待执行队列中 - s.progressTracker.AddNewPlan(planLog.ID, len(tasks)) - return nil } @@ -419,3 +399,31 @@ func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutio return nil } + +// handlePlanTermination 集中处理计划的终止逻辑(失败或取消) +func (s *Scheduler) handlePlanTermination(planLogID uint, reason string) { + // 1. 从待执行队列中删除所有相关的子任务 + if err := s.pendingTaskRepo.DeletePendingTasksByPlanLogID(planLogID); err != nil { + s.logger.Errorf("从待执行队列中删除计划 %d 的后续任务时出错: %v", planLogID, err) + } + + // 2. 将父计划的执行日志标记为失败 + if err := s.executionLogRepo.FailPlanExecution(planLogID, reason); err != nil { + s.logger.Errorf("标记计划执行日志 %d 为失败时出错: %v", planLogID, err) + } + + // 3. 将所有未完成的子任务日志标记为已取消 + if err := s.executionLogRepo.CancelIncompleteTasksByPlanLogID(planLogID, "父计划失败或被取消"); err != nil { + s.logger.Errorf("取消计划 %d 的后续任务日志时出错: %v", planLogID, err) + } + + // 4. 将计划本身的状态更新为失败 + planLog, err := s.executionLogRepo.FindPlanExecutionLogByID(planLogID) + if err != nil { + s.logger.Errorf("无法找到计划执行日志 %d 以更新父计划状态: %v", planLogID, err) + return + } + if err := s.planRepo.UpdatePlanStatus(planLog.PlanID, models.PlanStatusFailed); err != nil { + s.logger.Errorf("更新计划 %d 状态为 '失败' 时出错: %v", planLog.PlanID, err) + } +} From 557a0d5d3ea895218e9079a9a1275f56714a02e6 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 17:21:14 +0800 Subject: [PATCH 05/11] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E8=BF=9B=E5=BA=A6=E8=B7=9F=E8=B8=AA=E5=99=A8,=20=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E4=BB=8E=E6=95=B0=E6=8D=AE=E5=BA=93=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E8=BF=9B=E5=BA=A6:?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修bug --- internal/app/service/task/analysis_plan_task_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/app/service/task/analysis_plan_task_manager.go b/internal/app/service/task/analysis_plan_task_manager.go index 0e5b7b5..2a77f16 100644 --- a/internal/app/service/task/analysis_plan_task_manager.go +++ b/internal/app/service/task/analysis_plan_task_manager.go @@ -161,7 +161,7 @@ func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan return } - invalidPlans, err := m.planRepo.FindDisabledAndStoppedPlans() + invalidPlans, err := m.planRepo.FindInactivePlans() if err != nil { m.logger.Errorf("获取失效计划列表失败: %v", err) return From eda5c8dedb1a99052437f937d68b1b7a62ba261b Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 17:28:33 +0800 Subject: [PATCH 06/11] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E8=BF=9B=E5=BA=A6=E8=B7=9F=E8=B8=AA=E5=99=A8,=20=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E4=BB=8E=E6=95=B0=E6=8D=AE=E5=BA=93=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E8=BF=9B=E5=BA=A6:?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修bug --- internal/app/service/task/scheduler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index 0076f83..80a5f62 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -243,8 +243,9 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { return } - // 如果此计划执行中已没有其他“等待中”或“进行中”的任务,则认为计划已完成 - if incompleteCount == 0 { + // 如果此计划执行中,未完成的任务只剩下当前这一个(因为当前任务的状态此时在数据库中仍为 'started'), + // 则认为整个计划已完成。 + if incompleteCount == 1 { s.logger.Infof("计划执行 %d 的所有任务已完成,开始处理计划完成逻辑...", claimedLog.PlanExecutionLogID) // 通过 PlanExecutionLog 反查正确的顶层 PlanID From 9e129a1ac0a0cbd7f49a12e04c019cceeed4f5f3 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 17:55:33 +0800 Subject: [PATCH 07/11] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E8=BF=9B=E5=BA=A6=E8=B7=9F=E8=B8=AA=E5=99=A8,=20=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E4=BB=8E=E6=95=B0=E6=8D=AE=E5=BA=93=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E8=BF=9B=E5=BA=A6:?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修bug --- internal/app/service/task/scheduler.go | 25 ++++++++++---------- internal/infra/repository/plan_repository.go | 10 ++++++++ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index 80a5f62..de821ee 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -256,25 +256,26 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { } planID := planExecutionLog.PlanID // 这才是正确的顶层计划ID - // 获取计划的最新数据 - plan, err := s.planRepo.GetPlanByID(planID) + // 获取计划的最新数据,这里我们只需要基本信息来判断执行类型和次数 + plan, err := s.planRepo.GetBasicPlanByID(planID) if err != nil { - s.logger.Errorf("获取计划 %d 的完整信息失败: %v", planID, err) + s.logger.Errorf("获取计划 %d 的基本信息失败: %v", planID, err) return } - // 更新计划的执行计数器 - plan.ExecuteCount++ + // 在内存中计算新的计数值和状态 + newExecuteCount := plan.ExecuteCount + 1 + newStatus := plan.Status // 默认为当前状态 - // 如果是自动计划且达到执行次数上限,则更新计划状态为已停止 - if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && plan.ExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual { - plan.Status = models.PlanStatusStopeed - s.logger.Infof("计划 %d (自动执行) 已达到最大执行次数 %d,状态更新为 '执行完毕'。", planID, plan.ExecuteNum) + // 如果是自动计划且达到执行次数上限,或计划是手动类型,则更新计划状态为已停止 + if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && newExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual { + newStatus = models.PlanStatusStopeed + s.logger.Infof("计划 %d 已完成执行,状态更新为 '执行完毕'。", planID) } - // 保存更新后的计划状态和执行计数 - if err := s.planRepo.UpdatePlan(plan); err != nil { - s.logger.Errorf("更新计划 %d 的执行计数和状态失败: %v", planID, err) + // 使用新的、专门的方法来原子性地更新计数值和状态 + if err := s.planRepo.UpdatePlanStateAfterExecution(planID, newExecuteCount, newStatus); err != nil { + s.logger.Errorf("更新计划 %d 的执行后状态失败: %v", planID, err) return } diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index 6dbe320..9df675c 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -62,6 +62,9 @@ type PlanRepository interface { // StopPlanTransactionally 停止一个计划的执行,包括更新状态、移除待执行任务和更新执行日志 StopPlanTransactionally(planID uint) error + + // UpdatePlanStateAfterExecution 更新计划执行后的状态(计数和状态) + UpdatePlanStateAfterExecution(planID uint, newCount uint, newStatus models.PlanStatus) error } // gormPlanRepository 是 PlanRepository 的 GORM 实现 @@ -735,3 +738,10 @@ func (r *gormPlanRepository) UpdatePlanStatus(id uint, status models.PlanStatus) } return nil } + +func (r *gormPlanRepository) UpdatePlanStateAfterExecution(planID uint, newCount uint, newStatus models.PlanStatus) error { + return r.db.Model(&models.Plan{}).Where("id = ?", planID).Updates(map[string]interface{}{ + "execute_count": newCount, + "status": newStatus, + }).Error +} From b6a872b3b81277145c40d8ad773cc83fe7c03784 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 18:11:21 +0800 Subject: [PATCH 08/11] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=88=A4=E6=96=ADconte?= =?UTF-8?q?nt=5Ftype?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/docs.go | 31 ++++++------------- docs/swagger.json | 31 ++++++------------- docs/swagger.yaml | 17 ++++------ .../app/controller/plan/plan_controller.go | 18 +++++++++-- 4 files changed, 40 insertions(+), 57 deletions(-) diff --git a/docs/docs.go b/docs/docs.go index 68e498f..d06a31d 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -741,22 +741,26 @@ const docTemplate = `{ "enum": [ 0, 1, - 2 + 2, + 3 ], "x-enum-comments": { "PlanStatusDisabled": "禁用计划", "PlanStatusEnabled": "启用计划", + "PlanStatusFailed": "执行失败", "PlanStatusStopeed": "执行完毕" }, "x-enum-descriptions": [ - "启用计划", "禁用计划", - "执行完毕" + "启用计划", + "执行完毕", + "执行失败" ], "x-enum-varnames": [ - "PlanStatusEnabled", "PlanStatusDisabled", - "PlanStatusStopeed" + "PlanStatusEnabled", + "PlanStatusStopeed", + "PlanStatusFailed" ] }, "models.TaskType": { @@ -781,19 +785,10 @@ const docTemplate = `{ "plan.CreatePlanRequest": { "type": "object", "required": [ - "content_type", "execution_type", "name" ], "properties": { - "content_type": { - "allOf": [ - { - "$ref": "#/definitions/models.PlanContentType" - } - ], - "example": "tasks" - }, "cron_expression": { "type": "string", "example": "0 0 6 * * *" @@ -1005,14 +1000,6 @@ const docTemplate = `{ "plan.UpdatePlanRequest": { "type": "object", "properties": { - "content_type": { - "allOf": [ - { - "$ref": "#/definitions/models.PlanContentType" - } - ], - "example": "tasks" - }, "cron_expression": { "type": "string", "example": "0 0 6 * * *" diff --git a/docs/swagger.json b/docs/swagger.json index 4adabab..ba89445 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -730,22 +730,26 @@ "enum": [ 0, 1, - 2 + 2, + 3 ], "x-enum-comments": { "PlanStatusDisabled": "禁用计划", "PlanStatusEnabled": "启用计划", + "PlanStatusFailed": "执行失败", "PlanStatusStopeed": "执行完毕" }, "x-enum-descriptions": [ - "启用计划", "禁用计划", - "执行完毕" + "启用计划", + "执行完毕", + "执行失败" ], "x-enum-varnames": [ - "PlanStatusEnabled", "PlanStatusDisabled", - "PlanStatusStopeed" + "PlanStatusEnabled", + "PlanStatusStopeed", + "PlanStatusFailed" ] }, "models.TaskType": { @@ -770,19 +774,10 @@ "plan.CreatePlanRequest": { "type": "object", "required": [ - "content_type", "execution_type", "name" ], "properties": { - "content_type": { - "allOf": [ - { - "$ref": "#/definitions/models.PlanContentType" - } - ], - "example": "tasks" - }, "cron_expression": { "type": "string", "example": "0 0 6 * * *" @@ -994,14 +989,6 @@ "plan.UpdatePlanRequest": { "type": "object", "properties": { - "content_type": { - "allOf": [ - { - "$ref": "#/definitions/models.PlanContentType" - } - ], - "example": "tasks" - }, "cron_expression": { "type": "string", "example": "0 0 6 * * *" diff --git a/docs/swagger.yaml b/docs/swagger.yaml index dcb61c8..a759ecd 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -129,20 +129,24 @@ definitions: - 0 - 1 - 2 + - 3 format: int32 type: integer x-enum-comments: PlanStatusDisabled: 禁用计划 PlanStatusEnabled: 启用计划 + PlanStatusFailed: 执行失败 PlanStatusStopeed: 执行完毕 x-enum-descriptions: - - 启用计划 - 禁用计划 + - 启用计划 - 执行完毕 + - 执行失败 x-enum-varnames: - - PlanStatusEnabled - PlanStatusDisabled + - PlanStatusEnabled - PlanStatusStopeed + - PlanStatusFailed models.TaskType: enum: - plan_analysis @@ -159,10 +163,6 @@ definitions: - TaskTypeWaiting plan.CreatePlanRequest: properties: - content_type: - allOf: - - $ref: '#/definitions/models.PlanContentType' - example: tasks cron_expression: example: 0 0 6 * * * type: string @@ -188,7 +188,6 @@ definitions: $ref: '#/definitions/plan.TaskRequest' type: array required: - - content_type - execution_type - name type: object @@ -306,10 +305,6 @@ definitions: type: object plan.UpdatePlanRequest: properties: - content_type: - allOf: - - $ref: '#/definitions/models.PlanContentType' - example: tasks cron_expression: example: 0 0 6 * * * type: string diff --git a/internal/app/controller/plan/plan_controller.go b/internal/app/controller/plan/plan_controller.go index 0dec13f..c250477 100644 --- a/internal/app/controller/plan/plan_controller.go +++ b/internal/app/controller/plan/plan_controller.go @@ -22,7 +22,6 @@ type CreatePlanRequest struct { ExecutionType models.PlanExecutionType `json:"execution_type" binding:"required" example:"automatic"` ExecuteNum uint `json:"execute_num,omitempty" example:"10"` CronExpression string `json:"cron_expression" example:"0 0 6 * * *"` - ContentType models.PlanContentType `json:"content_type" binding:"required" example:"tasks"` SubPlanIDs []uint `json:"sub_plan_ids,omitempty"` Tasks []TaskRequest `json:"tasks,omitempty"` } @@ -55,7 +54,6 @@ type UpdatePlanRequest struct { ExecutionType models.PlanExecutionType `json:"execution_type" example:"automatic"` ExecuteNum uint `json:"execute_num,omitempty" example:"10"` CronExpression string `json:"cron_expression" example:"0 0 6 * * *"` - ContentType models.PlanContentType `json:"content_type" example:"tasks"` SubPlanIDs []uint `json:"sub_plan_ids,omitempty"` Tasks []TaskRequest `json:"tasks,omitempty"` } @@ -132,6 +130,14 @@ func (c *Controller) CreatePlan(ctx *gin.Context) { return } + // --- 自动判断 ContentType --- + if len(req.SubPlanIDs) > 0 { + planToCreate.ContentType = models.PlanContentTypeSubPlans + } else { + // 如果 SubPlanIDs 未提供,则默认为 Tasks 类型(即使 Tasks 字段也未提供) + planToCreate.ContentType = models.PlanContentTypeTasks + } + // 调用仓库方法创建计划 if err := c.planRepo.CreatePlan(planToCreate); err != nil { controller.SendErrorResponse(ctx, controller.CodeBadRequest, "创建计划失败: "+err.Error()) @@ -269,6 +275,14 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) { } planToUpdate.ID = uint(id) // 确保ID被设置 + // --- 自动判断 ContentType --- + if len(req.SubPlanIDs) > 0 { + planToUpdate.ContentType = models.PlanContentTypeSubPlans + } else { + // 如果 SubPlanIDs 未提供,则默认为 Tasks 类型(即使 Tasks 字段也未提供) + planToUpdate.ContentType = models.PlanContentTypeTasks + } + // 4. 检查计划是否存在 _, err = c.planRepo.GetBasicPlanByID(uint(id)) if err != nil { From e711db94c0e12a89e0461f8299a20ff6f800d6e5 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 19:28:43 +0800 Subject: [PATCH 09/11] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=88=A4=E6=96=ADconte?= =?UTF-8?q?nt=5Ftype?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/controller/plan/converter.go | 32 +++++++++++++---------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/internal/app/controller/plan/converter.go b/internal/app/controller/plan/converter.go index 6ace996..1727c14 100644 --- a/internal/app/controller/plan/converter.go +++ b/internal/app/controller/plan/converter.go @@ -64,13 +64,14 @@ func PlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) { ExecutionType: req.ExecutionType, ExecuteNum: req.ExecuteNum, CronExpression: req.CronExpression, - ContentType: req.ContentType, + // ContentType 在控制器中设置,此处不再处理 } // 处理子计划 (通过ID引用) - if req.ContentType == models.PlanContentTypeSubPlans && req.SubPlanIDs != nil { - plan.SubPlans = make([]models.SubPlan, len(req.SubPlanIDs)) - for i, childPlanID := range req.SubPlanIDs { + if req.SubPlanIDs != nil { + subPlanSlice := req.SubPlanIDs + plan.SubPlans = make([]models.SubPlan, len(subPlanSlice)) + for i, childPlanID := range subPlanSlice { plan.SubPlans[i] = models.SubPlan{ ChildPlanID: childPlanID, ExecutionOrder: i, // 默认执行顺序, ReorderSteps会再次确认 @@ -79,9 +80,10 @@ func PlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) { } // 处理任务 - if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil { - plan.Tasks = make([]models.Task, len(req.Tasks)) - for i, taskReq := range req.Tasks { + if req.Tasks != nil { + taskSlice := req.Tasks + plan.Tasks = make([]models.Task, len(taskSlice)) + for i, taskReq := range taskSlice { task, err := TaskFromRequest(&taskReq) if err != nil { return nil, err @@ -114,13 +116,14 @@ func PlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) { ExecutionType: req.ExecutionType, ExecuteNum: req.ExecuteNum, CronExpression: req.CronExpression, - ContentType: req.ContentType, + // ContentType 在控制器中设置,此处不再处理 } // 处理子计划 (通过ID引用) - if req.ContentType == models.PlanContentTypeSubPlans && req.SubPlanIDs != nil { - plan.SubPlans = make([]models.SubPlan, len(req.SubPlanIDs)) - for i, childPlanID := range req.SubPlanIDs { + if req.SubPlanIDs != nil { + subPlanSlice := req.SubPlanIDs + plan.SubPlans = make([]models.SubPlan, len(subPlanSlice)) + for i, childPlanID := range subPlanSlice { plan.SubPlans[i] = models.SubPlan{ ChildPlanID: childPlanID, ExecutionOrder: i, // 默认执行顺序, ReorderSteps会再次确认 @@ -129,9 +132,10 @@ func PlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) { } // 处理任务 - if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil { - plan.Tasks = make([]models.Task, len(req.Tasks)) - for i, taskReq := range req.Tasks { + if req.Tasks != nil { + taskSlice := req.Tasks + plan.Tasks = make([]models.Task, len(taskSlice)) + for i, taskReq := range taskSlice { task, err := TaskFromRequest(&taskReq) if err != nil { return nil, err From 08e326d56da5edf8a8a70fbc4808673fb67f0306 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 21:14:37 +0800 Subject: [PATCH 10/11] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=A9=BA=E8=AE=A1?= =?UTF-8?q?=E5=88=92=E4=B8=8D=E4=BC=9A=E5=8F=8D=E5=A4=8D=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/service/task/scheduler.go | 96 +++++++++++++++----------- 1 file changed, 54 insertions(+), 42 deletions(-) diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index de821ee..6e93590 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -246,48 +246,7 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { // 如果此计划执行中,未完成的任务只剩下当前这一个(因为当前任务的状态此时在数据库中仍为 'started'), // 则认为整个计划已完成。 if incompleteCount == 1 { - s.logger.Infof("计划执行 %d 的所有任务已完成,开始处理计划完成逻辑...", claimedLog.PlanExecutionLogID) - - // 通过 PlanExecutionLog 反查正确的顶层 PlanID - planExecutionLog, err := s.executionLogRepo.FindPlanExecutionLogByID(claimedLog.PlanExecutionLogID) - if err != nil { - s.logger.Errorf("获取计划执行日志 %d 失败: %v", claimedLog.PlanExecutionLogID, err) - return - } - planID := planExecutionLog.PlanID // 这才是正确的顶层计划ID - - // 获取计划的最新数据,这里我们只需要基本信息来判断执行类型和次数 - plan, err := s.planRepo.GetBasicPlanByID(planID) - if err != nil { - s.logger.Errorf("获取计划 %d 的基本信息失败: %v", planID, err) - return - } - - // 在内存中计算新的计数值和状态 - newExecuteCount := plan.ExecuteCount + 1 - newStatus := plan.Status // 默认为当前状态 - - // 如果是自动计划且达到执行次数上限,或计划是手动类型,则更新计划状态为已停止 - if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && newExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual { - newStatus = models.PlanStatusStopeed - s.logger.Infof("计划 %d 已完成执行,状态更新为 '执行完毕'。", planID) - } - - // 使用新的、专门的方法来原子性地更新计数值和状态 - if err := s.planRepo.UpdatePlanStateAfterExecution(planID, newExecuteCount, newStatus); err != nil { - s.logger.Errorf("更新计划 %d 的执行后状态失败: %v", planID, err) - return - } - - // 更新计划执行日志状态为完成 - if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(claimedLog.PlanExecutionLogID, models.ExecutionStatusCompleted); err != nil { - s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", claimedLog.PlanExecutionLogID, err) - } - - // 调用共享的 Manager 来处理触发器更新逻辑 - if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(planID); err != nil { - s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", planID, err) - } + s.handlePlanCompletion(claimedLog.PlanExecutionLogID) } } @@ -387,6 +346,13 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { return err } + // --- 处理空计划的边缘情况 --- + // 如果一个计划被解析后,发现其任务列表为空, + // 那么它实际上已经“执行”完毕了,我们需要在这里手动为它创建下一次的触发器。 + if len(tasks) == 0 { + s.handlePlanCompletion(planLog.ID) + } + return nil } @@ -429,3 +395,49 @@ func (s *Scheduler) handlePlanTermination(planLogID uint, reason string) { s.logger.Errorf("更新计划 %d 状态为 '失败' 时出错: %v", planLog.PlanID, err) } } + +// handlePlanCompletion 集中处理计划成功完成后的所有逻辑 +func (s *Scheduler) handlePlanCompletion(planLogID uint) { + s.logger.Infof("计划执行 %d 的所有任务已完成,开始处理计划完成逻辑...", planLogID) + + // 1. 通过 PlanExecutionLog 反查正确的顶层 PlanID + planExecutionLog, err := s.executionLogRepo.FindPlanExecutionLogByID(planLogID) + if err != nil { + s.logger.Errorf("获取计划执行日志 %d 失败: %v", planLogID, err) + return + } + topLevelPlanID := planExecutionLog.PlanID // 这才是正确的顶层计划ID + + // 2. 获取计划的最新数据,这里我们只需要基本信息来判断执行类型和次数 + plan, err := s.planRepo.GetBasicPlanByID(topLevelPlanID) + if err != nil { + s.logger.Errorf("获取计划 %d 的基本信息失败: %v", topLevelPlanID, err) + return + } + + // 3. 在内存中计算新的计数值和状态 + newExecuteCount := plan.ExecuteCount + 1 + newStatus := plan.Status // 默认为当前状态 + + // 如果是自动计划且达到执行次数上限,或计划是手动类型,则更新计划状态为已停止 + if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && newExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual { + newStatus = models.PlanStatusStopeed + s.logger.Infof("计划 %d 已完成执行,状态更新为 '执行完毕'。", topLevelPlanID) + } + + // 4. 使用专门的方法来原子性地更新计数值和状态 + if err := s.planRepo.UpdatePlanStateAfterExecution(topLevelPlanID, newExecuteCount, newStatus); err != nil { + s.logger.Errorf("更新计划 %d 的执行后状态失败: %v", topLevelPlanID, err) + return + } + + // 5. 更新计划执行日志状态为完成 + if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(planLogID, models.ExecutionStatusCompleted); err != nil { + s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", planLogID, err) + } + + // 6. 调用共享的 Manager 来处理触发器更新逻辑 + if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(topLevelPlanID); err != nil { + s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", topLevelPlanID, err) + } +} From 25d6855b38608cef7422be919285b9bbc47d8ba3 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 21:26:45 +0800 Subject: [PATCH 11/11] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=A9=BA=E8=AE=A1?= =?UTF-8?q?=E5=88=92=E4=B8=8D=E4=BC=9A=E5=8F=8D=E5=A4=8D=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/service/task/scheduler.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index 6e93590..fc9506e 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -437,7 +437,12 @@ func (s *Scheduler) handlePlanCompletion(planLogID uint) { } // 6. 调用共享的 Manager 来处理触发器更新逻辑 - if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(topLevelPlanID); err != nil { - s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", topLevelPlanID, err) + // 只有当计划在本次执行后仍然是 Enabled 状态时,才需要创建下一次的触发器。 + if newStatus == models.PlanStatusEnabled { + if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(topLevelPlanID); err != nil { + s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", topLevelPlanID, err) + } else { + s.logger.Infof("计划 %d 状态为 '%d',无需创建下一次触发器。", topLevelPlanID, newStatus) + } } }