diff --git a/internal/core/application.go b/internal/core/application.go index 87a583d..753a44a 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -191,44 +191,17 @@ func (app *Application) initializePendingTasks( // 阶段二:清理所有待执行任务和相关日志 logger.Info("阶段二:开始清理所有待执行任务和相关日志...") - pendingTasks, err := pendingTaskRepo.FindAllPendingTasks() - if err != nil { - return fmt.Errorf("获取待执行任务失败: %w", err) + + // 直接调用新的方法来更新计划执行日志状态为失败 + if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil { + logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err) + // 这是一个非阻塞性错误,继续执行 } - var taskLogIDsToCancel []uint - var planLogIDsToFail []uint - - for _, pt := range pendingTasks { - // 确保 Task 和 TaskExecutionLog 已预加载 - if pt.Task == nil || pt.TaskExecutionLog.ID == 0 { // TaskExecutionLog.ID为零说明没加载 - logger.Warnf("待执行任务 %d 缺少关联的 Task 或 TaskExecutionLog,跳过处理。", pt.ID) - continue - } - - // 收集任务执行日志ID,所有未完成的任务都标记为取消 - taskLogIDsToCancel = append(taskLogIDsToCancel, pt.TaskExecutionLog.ID) - - // 收集计划执行日志ID - if pt.TaskExecutionLog.PlanExecutionLogID != 0 { - planLogIDsToFail = append(planLogIDsToFail, pt.TaskExecutionLog.PlanExecutionLogID) - } - } - - // 批量更新 TaskExecutionLog 状态为取消 - if len(taskLogIDsToCancel) > 0 { - if err := executionLogRepo.UpdateTaskExecutionLogStatusByIDs(taskLogIDsToCancel, models.ExecutionStatusCancelled); err != nil { - logger.Errorf("批量更新任务执行日志状态为取消失败: %v", err) - // 这是一个非阻塞性错误,继续执行 - } - } - - // 批量更新 PlanExecutionLog 状态为失败 - if len(planLogIDsToFail) > 0 { - if err := executionLogRepo.UpdatePlanExecutionLogsStatusByIDs(planLogIDsToFail, models.ExecutionStatusFailed); err != nil { - logger.Errorf("批量更新计划执行日志状态为失败失败: %v", err) - // 这是一个非阻塞性错误,继续执行 - } + // 直接调用新的方法来更新任务执行日志状态为取消 + if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil { + logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err) + // 这是一个非阻塞性错误,继续执行 } // 清空待执行列表 diff --git a/internal/infra/repository/execution_log_repository.go b/internal/infra/repository/execution_log_repository.go index 30838aa..701f680 100644 --- a/internal/infra/repository/execution_log_repository.go +++ b/internal/infra/repository/execution_log_repository.go @@ -2,6 +2,7 @@ package repository import ( "errors" + "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "gorm.io/gorm" @@ -30,6 +31,11 @@ type ExecutionLogRepository interface { FindInProgressPlanExecutionLogByPlanID(planID uint) (*models.PlanExecutionLog, error) // FindIncompleteTaskExecutionLogsByPlanLogID 根据计划日志ID查找所有未完成的任务日志 FindIncompleteTaskExecutionLogsByPlanLogID(planLogID uint) ([]models.TaskExecutionLog, error) + + // FailAllIncompletePlanExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的计划状态都修改为 ExecutionStatusFailed + FailAllIncompletePlanExecutionLogs() error + // CancelAllIncompleteTaskExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的任务状态修改为 ExecutionStatusCancelled + CancelAllIncompleteTaskExecutionLogs() error } // gormExecutionLogRepository 是使用 GORM 的具体实现。 @@ -143,3 +149,17 @@ func (r *gormExecutionLogRepository) FindIncompleteTaskExecutionLogsByPlanLogID( planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).Find(&logs).Error return logs, err } + +// FailAllIncompletePlanExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的计划状态都修改为 ExecutionStatusFailed +func (r *gormExecutionLogRepository) FailAllIncompletePlanExecutionLogs() error { + return r.db.Model(&models.PlanExecutionLog{}). + Where("status IN (?, ?)", models.ExecutionStatusStarted, models.ExecutionStatusWaiting). + Updates(map[string]interface{}{"status": models.ExecutionStatusFailed, "ended_at": time.Now(), "error": "系统中断"}).Error +} + +// CancelAllIncompleteTaskExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的任务状态修改为 ExecutionStatusCancelled +func (r *gormExecutionLogRepository) CancelAllIncompleteTaskExecutionLogs() error { + return r.db.Model(&models.TaskExecutionLog{}). + Where("status IN (?, ?)", models.ExecutionStatusStarted, models.ExecutionStatusWaiting). + Updates(map[string]interface{}{"status": models.ExecutionStatusCancelled, "ended_at": time.Now(), "output": "系统中断"}).Error +}