From 05e789b707d503737d8d2cf3afb44fd53b076c75 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 23 Sep 2025 11:08:18 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E5=87=BD=E6=95=B0=E6=94=B9=E5=90=8D=202.?= =?UTF-8?q?=20=E5=88=A0=E6=8E=89=E6=B2=A1=E7=94=A8=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task/analysis_plan_task_manager.go | 2 +- .../app/service/task/plan_analysis_task.go | 29 ------------------- internal/core/application.go | 2 +- .../repository/execution_log_repository.go | 10 +++---- internal/infra/repository/plan_repository.go | 2 +- 5 files changed, 8 insertions(+), 37 deletions(-) delete mode 100644 internal/app/service/task/plan_analysis_task.go diff --git a/internal/app/service/task/analysis_plan_task_manager.go b/internal/app/service/task/analysis_plan_task_manager.go index 4bac7ec..0e5b7b5 100644 --- a/internal/app/service/task/analysis_plan_task_manager.go +++ b/internal/app/service/task/analysis_plan_task_manager.go @@ -215,7 +215,7 @@ func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, all } // 批量更新相关执行日志状态为“已取消” - if err := m.executionLogRepo.UpdateLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil { + if err := m.executionLogRepo.UpdateTaskExecutionLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil { // 这是一个非关键性错误,只记录日志 m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err) } diff --git a/internal/app/service/task/plan_analysis_task.go b/internal/app/service/task/plan_analysis_task.go deleted file mode 100644 index e48e41f..0000000 --- a/internal/app/service/task/plan_analysis_task.go +++ /dev/null @@ -1,29 +0,0 @@ -package task - -import ( - "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" -) - -// PlanAnalysisTask 用于在任务执行队列中触发一个plan的执行 -// 该任务会解析plan生成扁平化的待执行任务表, 并将任务列表插入任务执行队列 -// 该任务会预写入plan所有待执行任务的执行日志 -// 每个plan执行完毕时 或 创建plan时 都应该重新创建一个 PlanAnalysisTask 以便触发下次plan执行 -// 更新plan后应当更新对应 PlanAnalysisTask -type PlanAnalysisTask struct { -} - -func (p *PlanAnalysisTask) Execute() error { - //TODO implement me - panic("implement me") -} - -func (p *PlanAnalysisTask) ParseParams(logger *logs.Logger, claimedLog *models.TaskExecutionLog) error { - //TODO implement me - panic("implement me") -} - -func (p *PlanAnalysisTask) OnFailure(executeErr error) { - //TODO implement me - panic("implement me") -} diff --git a/internal/core/application.go b/internal/core/application.go index 174b434..87a583d 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -217,7 +217,7 @@ func (app *Application) initializePendingTasks( // 批量更新 TaskExecutionLog 状态为取消 if len(taskLogIDsToCancel) > 0 { - if err := executionLogRepo.UpdateLogStatusByIDs(taskLogIDsToCancel, models.ExecutionStatusCancelled); err != nil { + if err := executionLogRepo.UpdateTaskExecutionLogStatusByIDs(taskLogIDsToCancel, models.ExecutionStatusCancelled); err != nil { logger.Errorf("批量更新任务执行日志状态为取消失败: %v", err) // 这是一个非阻塞性错误,继续执行 } diff --git a/internal/infra/repository/execution_log_repository.go b/internal/infra/repository/execution_log_repository.go index b3bbff5..30838aa 100644 --- a/internal/infra/repository/execution_log_repository.go +++ b/internal/infra/repository/execution_log_repository.go @@ -10,8 +10,8 @@ import ( // ExecutionLogRepository 定义了与执行日志交互的接口。 // 这为服务层提供了一个清晰的契约,并允许在测试中轻松地进行模拟。 type ExecutionLogRepository interface { - UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error - UpdateLogStatus(logID uint, status models.ExecutionStatus) error + UpdateTaskExecutionLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error + UpdateTaskExecutionLogStatus(logID uint, status models.ExecutionStatus) error CreateTaskExecutionLog(log *models.TaskExecutionLog) error CreatePlanExecutionLog(log *models.PlanExecutionLog) error UpdatePlanExecutionLog(log *models.PlanExecutionLog) error @@ -43,7 +43,7 @@ func NewGormExecutionLogRepository(db *gorm.DB) ExecutionLogRepository { return &gormExecutionLogRepository{db: db} } -func (r *gormExecutionLogRepository) UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error { +func (r *gormExecutionLogRepository) UpdateTaskExecutionLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error { if len(logIDs) == 0 { return nil } @@ -52,7 +52,7 @@ func (r *gormExecutionLogRepository) UpdateLogStatusByIDs(logIDs []uint, status Update("status", status).Error } -func (r *gormExecutionLogRepository) UpdateLogStatus(logID uint, status models.ExecutionStatus) error { +func (r *gormExecutionLogRepository) UpdateTaskExecutionLogStatus(logID uint, status models.ExecutionStatus) error { return r.db.Model(&models.TaskExecutionLog{}).Where("id = ?", logID).Update("status", status).Error } @@ -117,7 +117,7 @@ func (r *gormExecutionLogRepository) UpdatePlanExecutionLogsStatusByIDs(logIDs [ // FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志 func (r *gormExecutionLogRepository) FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error) { var logs []models.PlanExecutionLog - err := r.db.Where("status = ?", models.ExecutionStatusStarted).Find(&logs).Error + err := r.db.Where("status = ? OR status = ?", models.ExecutionStatusStarted, models.ExecutionStatusWaiting).Find(&logs).Error return logs, err } diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index 0016748..e3367bf 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -694,7 +694,7 @@ func (r *gormPlanRepository) StopPlanTransactionally(planID uint) error { } // 3.1 批量更新任务执行日志状态为“已取消” - if err := executionLogRepoTx.UpdateLogStatusByIDs(taskLogIDs, models.ExecutionStatusCancelled); err != nil { + if err := executionLogRepoTx.UpdateTaskExecutionLogStatusByIDs(taskLogIDs, models.ExecutionStatusCancelled); err != nil { return fmt.Errorf("批量更新任务执行日志状态为 '已取消' 失败: %w", err) }