From 4096499d28a064e89721b6e5c373ea35e45ac533 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Mon, 22 Sep 2025 15:17:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=85=B3=E9=97=AD=E8=AE=A1?= =?UTF-8?q?=E5=88=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO-List.txt | 2 + .../app/controller/plan/plan_controller.go | 20 ++++- .../repository/execution_log_repository.go | 29 +++++++ .../repository/pending_task_repository.go | 12 +++ internal/infra/repository/plan_repository.go | 78 +++++++++++++++++++ 5 files changed, 138 insertions(+), 3 deletions(-) diff --git a/TODO-List.txt b/TODO-List.txt index ad8f35d..a8c5d1c 100644 --- a/TODO-List.txt +++ b/TODO-List.txt @@ -12,3 +12,5 @@ 4. 暂时不考虑和区域主控间的同步消息, 假设所有消息都是异步的, 这可能导致无法知道指令是否执行成功 5. 如果系统停机时间很长, 待执行任务表中的任务过期了怎么办, 目前没有任务过期机制 6. 可以用TimescaleDB代替PGSQL, 优化传感器数据存储性能 + +已执行次数在停止后需要重置吗 \ No newline at end of file diff --git a/internal/app/controller/plan/plan_controller.go b/internal/app/controller/plan/plan_controller.go index 615587e..0dec13f 100644 --- a/internal/app/controller/plan/plan_controller.go +++ b/internal/app/controller/plan/plan_controller.go @@ -405,7 +405,21 @@ func (c *Controller) StartPlan(ctx *gin.Context) { // @Success 200 {object} controller.Response "业务码为200代表成功停止计划" // @Router /api/v1/plans/{id}/stop [post] func (c *Controller) StopPlan(ctx *gin.Context) { - // 占位符:此处应调用服务层或仓库层来停止计划 - c.logger.Infof("收到停止计划请求 (占位符)") - controller.SendResponse(ctx, controller.CodeSuccess, "停止计划接口占位符", nil) + // 1. 从 URL 路径中获取 ID + idStr := ctx.Param("id") + id, err := strconv.ParseUint(idStr, 10, 32) + if err != nil { + controller.SendErrorResponse(ctx, controller.CodeBadRequest, "无效的计划ID格式") + return + } + + // 2. 调用仓库层方法,该方法内部处理事务 + if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil { + c.logger.Errorf("停止计划 #%d 失败: %v", id, err) + controller.SendErrorResponse(ctx, controller.CodeInternalError, "停止计划时发生内部错误: "+err.Error()) + return + } + + // 3. 发送成功响应 + controller.SendResponse(ctx, controller.CodeSuccess, "计划已成功停止", nil) } diff --git a/internal/infra/repository/execution_log_repository.go b/internal/infra/repository/execution_log_repository.go index 5a1cdd6..b3bbff5 100644 --- a/internal/infra/repository/execution_log_repository.go +++ b/internal/infra/repository/execution_log_repository.go @@ -1,6 +1,8 @@ package repository import ( + "errors" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "gorm.io/gorm" ) @@ -24,6 +26,10 @@ type ExecutionLogRepository interface { // FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志 FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error) + // FindInProgressPlanExecutionLogByPlanID 根据 PlanID 查找正在进行的计划执行日志 + FindInProgressPlanExecutionLogByPlanID(planID uint) (*models.PlanExecutionLog, error) + // FindIncompleteTaskExecutionLogsByPlanLogID 根据计划日志ID查找所有未完成的任务日志 + FindIncompleteTaskExecutionLogsByPlanLogID(planLogID uint) ([]models.TaskExecutionLog, error) } // gormExecutionLogRepository 是使用 GORM 的具体实现。 @@ -114,3 +120,26 @@ func (r *gormExecutionLogRepository) FindIncompletePlanExecutionLogs() ([]models err := r.db.Where("status = ?", models.ExecutionStatusStarted).Find(&logs).Error return logs, err } + +// FindInProgressPlanExecutionLogByPlanID 根据 PlanID 查找正在进行的计划执行日志 +func (r *gormExecutionLogRepository) FindInProgressPlanExecutionLogByPlanID(planID uint) (*models.PlanExecutionLog, error) { + var log models.PlanExecutionLog + err := r.db.Where("plan_id = ? AND status = ?", planID, models.ExecutionStatusStarted).First(&log).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // 未找到不是一个需要上报的错误,代表计划当前没有在运行 + return nil, nil + } + // 其他数据库错误 + return nil, err + } + return &log, nil +} + +// FindIncompleteTaskExecutionLogsByPlanLogID 根据计划日志ID查找所有未完成的任务日志 +func (r *gormExecutionLogRepository) FindIncompleteTaskExecutionLogsByPlanLogID(planLogID uint) ([]models.TaskExecutionLog, error) { + var logs []models.TaskExecutionLog + err := r.db.Where("plan_execution_log_id = ? AND (status = ? OR status = ?)", + planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).Find(&logs).Error + return logs, err +} diff --git a/internal/infra/repository/pending_task_repository.go b/internal/infra/repository/pending_task_repository.go index eb3d944..ecc954c 100644 --- a/internal/infra/repository/pending_task_repository.go +++ b/internal/infra/repository/pending_task_repository.go @@ -29,6 +29,8 @@ type PendingTaskRepository interface { ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) // RequeueTask 安全地将一个任务重新放回队列。 RequeueTask(originalPendingTask *models.PendingTask) error + // FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务 + FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error) } // gormPendingTaskRepository 是使用 GORM 的具体实现。 @@ -152,3 +154,13 @@ func (r *gormPendingTaskRepository) RequeueTask(originalPendingTask *models.Pend return tx.Create(originalPendingTask).Error }) } + +// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务 +func (r *gormPendingTaskRepository) FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error) { + if len(taskLogIDs) == 0 { + return []models.PendingTask{}, nil + } + var pendingTasks []models.PendingTask + err := r.db.Where("task_execution_log_id IN ?", taskLogIDs).Find(&pendingTasks).Error + return pendingTasks, err +} diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index d3977d2..0016748 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -56,6 +56,12 @@ type PlanRepository interface { // FindPlansWithPendingTasks 查找所有正在执行的计划 FindPlansWithPendingTasks() ([]*models.Plan, error) + + // DB 返回底层的数据库连接实例,用于服务层事务 + DB() *gorm.DB + + // StopPlanTransactionally 停止一个计划的执行,包括更新状态、移除待执行任务和更新执行日志 + StopPlanTransactionally(planID uint) error } // gormPlanRepository 是 PlanRepository 的 GORM 实现 @@ -646,6 +652,78 @@ func (r *gormPlanRepository) FindPlansWithPendingTasks() ([]*models.Plan, error) return plans, err } +// DB 返回底层的数据库连接实例 +func (r *gormPlanRepository) DB() *gorm.DB { + return r.db +} + +// StopPlanTransactionally 停止一个计划的执行,包括更新状态、移除待执行任务和更新执行日志。 +func (r *gormPlanRepository) StopPlanTransactionally(planID uint) error { + return r.db.Transaction(func(tx *gorm.DB) error { + // 使用事务创建新的仓库实例,确保所有操作都在同一个事务中 + planRepoTx := NewGormPlanRepository(tx) + executionLogRepoTx := NewGormExecutionLogRepository(tx) + pendingTaskRepoTx := NewGormPendingTaskRepository(tx) + + // 1. 更新计划状态为“已停止” + if err := planRepoTx.UpdatePlanStatus(planID, models.PlanStatusDisabled); err != nil { + return fmt.Errorf("更新计划 #%d 状态为 '已停止' 失败: %w", planID, err) + } + + // 2. 查找当前正在进行的计划执行日志 + planLog, err := executionLogRepoTx.FindInProgressPlanExecutionLogByPlanID(planID) + if err != nil { + return fmt.Errorf("查找计划 #%d 正在进行的执行日志失败: %w", planID, err) + } + + if planLog == nil { + // 没有正在进行的执行,直接返回成功 + return nil + } + + // 3. 查找所有需要被取消的任务执行日志 + taskLogs, err := executionLogRepoTx.FindIncompleteTaskExecutionLogsByPlanLogID(planLog.ID) + if err != nil { + return fmt.Errorf("查找计划执行日志 #%d 下未完成的任务日志失败: %w", planLog.ID, err) + } + + if len(taskLogs) > 0 { + var taskLogIDs []uint + for _, tl := range taskLogs { + taskLogIDs = append(taskLogIDs, tl.ID) + } + + // 3.1 批量更新任务执行日志状态为“已取消” + if err := executionLogRepoTx.UpdateLogStatusByIDs(taskLogIDs, models.ExecutionStatusCancelled); err != nil { + return fmt.Errorf("批量更新任务执行日志状态为 '已取消' 失败: %w", err) + } + + // 3.2 查找并删除待执行队列中对应的任务 + pendingTasks, err := pendingTaskRepoTx.FindPendingTasksByTaskLogIDs(taskLogIDs) + if err != nil { + return fmt.Errorf("查找计划执行日志 #%d 下对应的待执行任务失败: %w", planLog.ID, err) + } + + if len(pendingTasks) > 0 { + var pendingTaskIDs []uint + for _, pt := range pendingTasks { + pendingTaskIDs = append(pendingTaskIDs, pt.ID) + } + if err := pendingTaskRepoTx.DeletePendingTasksByIDs(pendingTaskIDs); err != nil { + return fmt.Errorf("批量删除待执行任务失败: %w", err) + } + } + } + + // 4. 更新计划执行历史的总状态为“失败” + if err := executionLogRepoTx.UpdatePlanExecutionLogStatus(planLog.ID, models.ExecutionStatusFailed); err != nil { + return fmt.Errorf("更新计划执行日志 #%d 状态为 '失败' 失败: %w", planLog.ID, err) + } + + return nil + }) +} + // UpdatePlanStatus 更新指定计划的状态 func (r *gormPlanRepository) UpdatePlanStatus(id uint, status models.PlanStatus) error { result := r.db.Model(&models.Plan{}).Where("id = ?", id).Update("status", status)