diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 38d98fc..1d46c4c 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -44,6 +44,7 @@ const ( PlanStatusDisabled PlanStatus = 0 // 禁用计划 PlanStatusEnabled PlanStatus = 1 // 启用计划 PlanStatusStopeed PlanStatus = 2 // 执行完毕 + PlanStatusFailed PlanStatus = 3 // 执行失败 ) // Plan 代表系统中的一个计划,可以包含子计划或任务 diff --git a/internal/infra/repository/execution_log_repository.go b/internal/infra/repository/execution_log_repository.go index 701f680..d804247 100644 --- a/internal/infra/repository/execution_log_repository.go +++ b/internal/infra/repository/execution_log_repository.go @@ -36,6 +36,18 @@ type ExecutionLogRepository interface { FailAllIncompletePlanExecutionLogs() error // CancelAllIncompleteTaskExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的任务状态修改为 ExecutionStatusCancelled CancelAllIncompleteTaskExecutionLogs() error + + // FindPlanExecutionLogByID 根据ID查找计划执行日志 + FindPlanExecutionLogByID(id uint) (*models.PlanExecutionLog, error) + + // CountIncompleteTasksByPlanLogID 计算一个计划执行中未完成的任务数量 + CountIncompleteTasksByPlanLogID(planLogID uint) (int64, error) + + // FailPlanExecution 将指定的计划执行标记为失败 + FailPlanExecution(planLogID uint, errorMessage string) error + + // CancelIncompleteTasksByPlanLogID 取消一个计划执行中的所有未完成任务 + CancelIncompleteTasksByPlanLogID(planLogID uint, reason string) error } // gormExecutionLogRepository 是使用 GORM 的具体实现。 @@ -163,3 +175,46 @@ func (r *gormExecutionLogRepository) CancelAllIncompleteTaskExecutionLogs() erro Where("status IN (?, ?)", models.ExecutionStatusStarted, models.ExecutionStatusWaiting). Updates(map[string]interface{}{"status": models.ExecutionStatusCancelled, "ended_at": time.Now(), "output": "系统中断"}).Error } + +// FindPlanExecutionLogByID 根据ID查找计划执行日志 +func (r *gormExecutionLogRepository) FindPlanExecutionLogByID(id uint) (*models.PlanExecutionLog, error) { + var log models.PlanExecutionLog + err := r.db.First(&log, id).Error + if err != nil { + return nil, err + } + return &log, nil +} + +// CountIncompleteTasksByPlanLogID 计算一个计划执行中未完成的任务数量 +func (r *gormExecutionLogRepository) CountIncompleteTasksByPlanLogID(planLogID uint) (int64, error) { + var count int64 + err := r.db.Model(&models.TaskExecutionLog{}). + Where("plan_execution_log_id = ? AND status IN (?, ?)", + planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted). + Count(&count).Error + return count, err +} + +// FailPlanExecution 将指定的计划执行标记为失败 +func (r *gormExecutionLogRepository) FailPlanExecution(planLogID uint, errorMessage string) error { + return r.db.Model(&models.PlanExecutionLog{}). + Where("id = ?", planLogID). + Updates(map[string]interface{}{ + "status": models.ExecutionStatusFailed, + "error": errorMessage, + "ended_at": time.Now(), + }).Error +} + +// CancelIncompleteTasksByPlanLogID 取消一个计划执行中的所有未完成任务 +func (r *gormExecutionLogRepository) CancelIncompleteTasksByPlanLogID(planLogID uint, reason string) error { + return r.db.Model(&models.TaskExecutionLog{}). + Where("plan_execution_log_id = ? AND status IN (?, ?)", + planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted). + Updates(map[string]interface{}{ + "status": models.ExecutionStatusCancelled, + "output": reason, + "ended_at": time.Now(), + }).Error +} diff --git a/internal/infra/repository/pending_task_repository.go b/internal/infra/repository/pending_task_repository.go index ecc954c..786c361 100644 --- a/internal/infra/repository/pending_task_repository.go +++ b/internal/infra/repository/pending_task_repository.go @@ -31,6 +31,9 @@ type PendingTaskRepository interface { RequeueTask(originalPendingTask *models.PendingTask) error // FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务 FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error) + + // DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务 + DeletePendingTasksByPlanLogID(planLogID uint) error } // gormPendingTaskRepository 是使用 GORM 的具体实现。 @@ -164,3 +167,12 @@ func (r *gormPendingTaskRepository) FindPendingTasksByTaskLogIDs(taskLogIDs []ui err := r.db.Where("task_execution_log_id IN ?", taskLogIDs).Find(&pendingTasks).Error return pendingTasks, err } + +// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务 +func (r *gormPendingTaskRepository) DeletePendingTasksByPlanLogID(planLogID uint) error { + // 使用子查询找到所有与 planLogID 相关的 task_execution_log_id + subQuery := r.db.Model(&models.TaskExecutionLog{}).Select("id").Where("plan_execution_log_id = ?", planLogID) + + // 使用子查询的结果来删除待执行任务 + return r.db.Where("task_execution_log_id IN (?)", subQuery).Delete(&models.PendingTask{}).Error +} diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index e3367bf..6dbe320 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -46,8 +46,8 @@ type PlanRepository interface { FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) // FindRunnablePlans 获取所有应执行的计划 FindRunnablePlans() ([]*models.Plan, error) - // FindDisabledAndStoppedPlans 获取所有已禁用或已停止的计划 - FindDisabledAndStoppedPlans() ([]*models.Plan, error) + // FindInactivePlans 获取所有已禁用或已停止的计划 + FindInactivePlans() ([]*models.Plan, error) // FindPlanAnalysisTaskByPlanID 根据 PlanID 找到其关联的 'plan_analysis' 任务 FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error) @@ -600,10 +600,10 @@ func (r *gormPlanRepository) FindRunnablePlans() ([]*models.Plan, error) { return plans, err } -func (r *gormPlanRepository) FindDisabledAndStoppedPlans() ([]*models.Plan, error) { +func (r *gormPlanRepository) FindInactivePlans() ([]*models.Plan, error) { var plans []*models.Plan err := r.db. - Where("status = ? OR status = ?", models.PlanStatusDisabled, models.PlanStatusStopeed). + Where("status != ?", models.PlanStatusEnabled). Find(&plans).Error return plans, err }