diff --git a/internal/core/application.go b/internal/core/application.go index 0c36568..63c8db1 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -25,6 +25,12 @@ type Application struct { Storage database.Storage Executor *task.Scheduler API *api.API // 添加 API 对象 + + // 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问 + planRepo repository.PlanRepository + pendingTaskRepo repository.PendingTaskRepository + executionLogRepo repository.ExecutionLogRepository + analysisPlanTaskManager *task.AnalysisPlanTaskManager } // NewApplication 创建并初始化一个新的 Application 实例。 @@ -82,6 +88,11 @@ func NewApplication(configPath string) (*Application, error) { Storage: storage, Executor: executor, API: apiServer, + // 填充新增的字段 + planRepo: planRepo, + pendingTaskRepo: pendingTaskRepo, + executionLogRepo: executionLogRepo, + analysisPlanTaskManager: analysisPlanTaskManager, } return app, nil @@ -91,6 +102,17 @@ func NewApplication(configPath string) (*Application, error) { func (app *Application) Start() error { app.Logger.Info("应用启动中...") + // --- 新增逻辑:初始化待执行任务列表 --- + if err := app.initializePendingTasks( + app.planRepo, // 传入 planRepo + app.pendingTaskRepo, // 传入 pendingTaskRepo + app.executionLogRepo, // 传入 executionLogRepo + app.analysisPlanTaskManager, // 传入 analysisPlanTaskManager + app.Logger, // 传入 logger + ); err != nil { + return fmt.Errorf("初始化待执行任务列表失败: %w", err) + } + // 启动任务执行器 app.Executor.Start() @@ -128,6 +150,104 @@ func (app *Application) Stop() error { return nil } +// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 +func (app *Application) initializePendingTasks( + planRepo repository.PlanRepository, + pendingTaskRepo repository.PendingTaskRepository, + executionLogRepo repository.ExecutionLogRepository, + analysisPlanTaskManager *task.AnalysisPlanTaskManager, + logger *logs.Logger, +) error { + logger.Info("开始初始化待执行任务列表...") + + // 阶段一:修正因崩溃导致状态不一致的固定次数计划 + logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...") + plansToCorrect, err := planRepo.FindPlansWithPendingTasks() + if err != nil { + return fmt.Errorf("查找需要修正的计划失败: %w", err) + } + + for _, plan := range plansToCorrect { + logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name) + + // 更新计划的执行计数 + plan.ExecuteCount++ + logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount) + + if plan.ExecutionType == models.PlanExecutionTypeManual || + (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) { + // 更新计划状态为已停止 + plan.Status = models.PlanStatusStopeed + logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID) + + } + // 保存更新后的计划 + if err := planRepo.UpdatePlan(plan); err != nil { + logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err) + // 这是一个非阻塞性错误,继续处理其他计划 + } + } + logger.Info("阶段一:固定次数计划修正完成。") + + // 阶段二:清理所有待执行任务和相关日志 + logger.Info("阶段二:开始清理所有待执行任务和相关日志...") + pendingTasks, err := pendingTaskRepo.FindAllPendingTasks() + if err != nil { + return fmt.Errorf("获取待执行任务失败: %w", err) + } + + var taskLogIDsToCancel []uint + var planLogIDsToFail []uint + + for _, pt := range pendingTasks { + // 确保 Task 和 TaskExecutionLog 已预加载 + if pt.Task == nil || pt.TaskExecutionLog.ID == 0 { // TaskExecutionLog.ID为零说明没加载 + logger.Warnf("待执行任务 %d 缺少关联的 Task 或 TaskExecutionLog,跳过处理。", pt.ID) + continue + } + + // 收集任务执行日志ID,所有未完成的任务都标记为取消 + taskLogIDsToCancel = append(taskLogIDsToCancel, pt.TaskExecutionLog.ID) + + // 收集计划执行日志ID + if pt.TaskExecutionLog.PlanExecutionLogID != 0 { + planLogIDsToFail = append(planLogIDsToFail, pt.TaskExecutionLog.PlanExecutionLogID) + } + } + + // 批量更新 TaskExecutionLog 状态为取消 + if len(taskLogIDsToCancel) > 0 { + if err := executionLogRepo.UpdateLogStatusByIDs(taskLogIDsToCancel, models.ExecutionStatusCancelled); err != nil { + logger.Errorf("批量更新任务执行日志状态为取消失败: %v", err) + // 这是一个非阻塞性错误,继续执行 + } + } + + // 批量更新 PlanExecutionLog 状态为失败 + if len(planLogIDsToFail) > 0 { + if err := executionLogRepo.UpdatePlanExecutionLogsStatusByIDs(planLogIDsToFail, models.ExecutionStatusFailed); err != nil { + logger.Errorf("批量更新计划执行日志状态为失败失败: %v", err) + // 这是一个非阻塞性错误,继续执行 + } + } + + // 清空待执行列表 + if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil { + return fmt.Errorf("清空待执行任务列表失败: %w", err) + } + logger.Info("阶段二:待执行任务和相关日志清理完成。") + + // 阶段三:初始刷新 + logger.Info("阶段三:开始刷新待执行列表...") + if err := analysisPlanTaskManager.Refresh(); err != nil { + return fmt.Errorf("刷新待执行任务列表失败: %w", err) + } + logger.Info("阶段三:待执行任务列表初始化完成。") + + logger.Info("待执行任务列表初始化完成。") + return nil +} + // initStorage 封装了数据库的初始化、连接和迁移逻辑。 func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) { // 创建存储实例 diff --git a/internal/infra/repository/execution_log_repository.go b/internal/infra/repository/execution_log_repository.go index 4111901..5a1cdd6 100644 --- a/internal/infra/repository/execution_log_repository.go +++ b/internal/infra/repository/execution_log_repository.go @@ -18,6 +18,12 @@ type ExecutionLogRepository interface { FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error) // UpdatePlanExecutionLogStatus 更新计划执行日志的状态 UpdatePlanExecutionLogStatus(logID uint, status models.ExecutionStatus) error + + // UpdatePlanExecutionLogsStatusByIDs 批量更新计划执行日志的状态 + UpdatePlanExecutionLogsStatusByIDs(logIDs []uint, status models.ExecutionStatus) error + + // FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志 + FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error) } // gormExecutionLogRepository 是使用 GORM 的具体实现。 @@ -93,3 +99,18 @@ func (r *gormExecutionLogRepository) FindTaskExecutionLogByID(id uint) (*models. func (r *gormExecutionLogRepository) UpdatePlanExecutionLogStatus(logID uint, status models.ExecutionStatus) error { return r.db.Model(&models.PlanExecutionLog{}).Where("id = ?", logID).Update("status", status).Error } + +// UpdatePlanExecutionLogsStatusByIDs 批量更新计划执行日志的状态 +func (r *gormExecutionLogRepository) UpdatePlanExecutionLogsStatusByIDs(logIDs []uint, status models.ExecutionStatus) error { + if len(logIDs) == 0 { + return nil + } + return r.db.Model(&models.PlanExecutionLog{}).Where("id IN ?", logIDs).Update("status", status).Error +} + +// FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志 +func (r *gormExecutionLogRepository) FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error) { + var logs []models.PlanExecutionLog + err := r.db.Where("status = ?", models.ExecutionStatusStarted).Find(&logs).Error + return logs, err +} diff --git a/internal/infra/repository/pending_task_repository.go b/internal/infra/repository/pending_task_repository.go index cbce386..eb3d944 100644 --- a/internal/infra/repository/pending_task_repository.go +++ b/internal/infra/repository/pending_task_repository.go @@ -21,6 +21,9 @@ type PendingTaskRepository interface { // UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间 UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error + // ClearAllPendingTasks 清空所有待执行任务 + ClearAllPendingTasks() error + // ClaimNextAvailableTask 原子地认领下一个可用的任务。 // 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。 ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) @@ -41,7 +44,8 @@ func NewGormPendingTaskRepository(db *gorm.DB) PendingTaskRepository { func (r *gormPendingTaskRepository) FindAllPendingTasks() ([]models.PendingTask, error) { var tasks []models.PendingTask // 预加载 Task 以便后续访问 Task.PlanID - err := r.db.Preload("Task").Find(&tasks).Error + // 预加载 TaskExecutionLog 以便后续访问 PlanExecutionLogID + err := r.db.Preload("Task").Preload("TaskExecutionLog").Find(&tasks).Error return tasks, err } @@ -82,6 +86,11 @@ func (r *gormPendingTaskRepository) UpdatePendingTaskExecuteAt(id uint, executeA return r.db.Model(&models.PendingTask{}).Where("id = ?", id).Update("execute_at", executeAt).Error } +// ClearAllPendingTasks 清空所有待执行任务 +func (r *gormPendingTaskRepository) ClearAllPendingTasks() error { + return r.db.Where("1 = 1").Delete(&models.PendingTask{}).Error +} + // ClaimNextAvailableTask 以原子方式认领下一个可用的任务。 func (r *gormPendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) { var log models.TaskExecutionLog diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index ee2e760..424ee21 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -51,6 +51,9 @@ type PlanRepository interface { // CreatePlanAnalysisTask 创建一个 plan_analysis 类型的任务并返回它 CreatePlanAnalysisTask(plan *models.Plan) (*models.Task, error) + + // FindPlansWithPendingTasks 查找所有正在执行的计划 + FindPlansWithPendingTasks() ([]*models.Plan, error) } // gormPlanRepository 是 PlanRepository 的 GORM 实现 @@ -625,3 +628,18 @@ func (r *gormPlanRepository) CreatePlanAnalysisTask(plan *models.Plan) (*models. }) return createdTask, err } + +// FindPlansWithPendingTasks 查找所有正在执行的计划 +func (r *gormPlanRepository) FindPlansWithPendingTasks() ([]*models.Plan, error) { + var plans []*models.Plan + + // 关联 pending_tasks, task_execution_logs, tasks 表来查找符合条件的计划 + err := r.db.Table("plans"). + Joins("JOIN tasks ON plans.id = tasks.plan_id"). + Joins("JOIN task_execution_logs ON tasks.id = task_execution_logs.task_id"). + Joins("JOIN pending_tasks ON task_execution_logs.id = pending_tasks.task_execution_log_id"). + Group("plans.id"). // 避免重复,因为一个计划可能有多个待执行任务 + Find(&plans).Error + + return plans, err +}