实现关闭计划

This commit is contained in:
2025-09-22 15:17:54 +08:00
parent bd2d1c6b63
commit 4096499d28
5 changed files with 138 additions and 3 deletions

View File

@@ -12,3 +12,5 @@
4. 暂时不考虑和区域主控间的同步消息, 假设所有消息都是异步的, 这可能导致无法知道指令是否执行成功
5. 如果系统停机时间很长, 待执行任务表中的任务过期了怎么办, 目前没有任务过期机制
6. 可以用TimescaleDB代替PGSQL, 优化传感器数据存储性能
已执行次数在停止后需要重置吗

View File

@@ -405,7 +405,21 @@ func (c *Controller) StartPlan(ctx *gin.Context) {
// @Success 200 {object} controller.Response "业务码为200代表成功停止计划"
// @Router /api/v1/plans/{id}/stop [post]
func (c *Controller) StopPlan(ctx *gin.Context) {
// 占位符:此处应调用服务层或仓库层来停止计划
c.logger.Infof("收到停止计划请求 (占位符)")
controller.SendResponse(ctx, controller.CodeSuccess, "停止计划接口占位符", nil)
// 1. 从 URL 路径中获取 ID
idStr := ctx.Param("id")
id, err := strconv.ParseUint(idStr, 10, 32)
if err != nil {
controller.SendErrorResponse(ctx, controller.CodeBadRequest, "无效的计划ID格式")
return
}
// 2. 调用仓库层方法,该方法内部处理事务
if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil {
c.logger.Errorf("停止计划 #%d 失败: %v", id, err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "停止计划时发生内部错误: "+err.Error())
return
}
// 3. 发送成功响应
controller.SendResponse(ctx, controller.CodeSuccess, "计划已成功停止", nil)
}

View File

@@ -1,6 +1,8 @@
package repository
import (
"errors"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
)
@@ -24,6 +26,10 @@ type ExecutionLogRepository interface {
// FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志
FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error)
// FindInProgressPlanExecutionLogByPlanID 根据 PlanID 查找正在进行的计划执行日志
FindInProgressPlanExecutionLogByPlanID(planID uint) (*models.PlanExecutionLog, error)
// FindIncompleteTaskExecutionLogsByPlanLogID 根据计划日志ID查找所有未完成的任务日志
FindIncompleteTaskExecutionLogsByPlanLogID(planLogID uint) ([]models.TaskExecutionLog, error)
}
// gormExecutionLogRepository 是使用 GORM 的具体实现。
@@ -114,3 +120,26 @@ func (r *gormExecutionLogRepository) FindIncompletePlanExecutionLogs() ([]models
err := r.db.Where("status = ?", models.ExecutionStatusStarted).Find(&logs).Error
return logs, err
}
// FindInProgressPlanExecutionLogByPlanID 根据 PlanID 查找正在进行的计划执行日志
func (r *gormExecutionLogRepository) FindInProgressPlanExecutionLogByPlanID(planID uint) (*models.PlanExecutionLog, error) {
var log models.PlanExecutionLog
err := r.db.Where("plan_id = ? AND status = ?", planID, models.ExecutionStatusStarted).First(&log).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
// 未找到不是一个需要上报的错误,代表计划当前没有在运行
return nil, nil
}
// 其他数据库错误
return nil, err
}
return &log, nil
}
// FindIncompleteTaskExecutionLogsByPlanLogID 根据计划日志ID查找所有未完成的任务日志
func (r *gormExecutionLogRepository) FindIncompleteTaskExecutionLogsByPlanLogID(planLogID uint) ([]models.TaskExecutionLog, error) {
var logs []models.TaskExecutionLog
err := r.db.Where("plan_execution_log_id = ? AND (status = ? OR status = ?)",
planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).Find(&logs).Error
return logs, err
}

View File

@@ -29,6 +29,8 @@ type PendingTaskRepository interface {
ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error)
// RequeueTask 安全地将一个任务重新放回队列。
RequeueTask(originalPendingTask *models.PendingTask) error
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error)
}
// gormPendingTaskRepository 是使用 GORM 的具体实现。
@@ -152,3 +154,13 @@ func (r *gormPendingTaskRepository) RequeueTask(originalPendingTask *models.Pend
return tx.Create(originalPendingTask).Error
})
}
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
func (r *gormPendingTaskRepository) FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error) {
if len(taskLogIDs) == 0 {
return []models.PendingTask{}, nil
}
var pendingTasks []models.PendingTask
err := r.db.Where("task_execution_log_id IN ?", taskLogIDs).Find(&pendingTasks).Error
return pendingTasks, err
}

View File

@@ -56,6 +56,12 @@ type PlanRepository interface {
// FindPlansWithPendingTasks 查找所有正在执行的计划
FindPlansWithPendingTasks() ([]*models.Plan, error)
// DB 返回底层的数据库连接实例,用于服务层事务
DB() *gorm.DB
// StopPlanTransactionally 停止一个计划的执行,包括更新状态、移除待执行任务和更新执行日志
StopPlanTransactionally(planID uint) error
}
// gormPlanRepository 是 PlanRepository 的 GORM 实现
@@ -646,6 +652,78 @@ func (r *gormPlanRepository) FindPlansWithPendingTasks() ([]*models.Plan, error)
return plans, err
}
// DB 返回底层的数据库连接实例
func (r *gormPlanRepository) DB() *gorm.DB {
return r.db
}
// StopPlanTransactionally 停止一个计划的执行,包括更新状态、移除待执行任务和更新执行日志。
func (r *gormPlanRepository) StopPlanTransactionally(planID uint) error {
return r.db.Transaction(func(tx *gorm.DB) error {
// 使用事务创建新的仓库实例,确保所有操作都在同一个事务中
planRepoTx := NewGormPlanRepository(tx)
executionLogRepoTx := NewGormExecutionLogRepository(tx)
pendingTaskRepoTx := NewGormPendingTaskRepository(tx)
// 1. 更新计划状态为“已停止”
if err := planRepoTx.UpdatePlanStatus(planID, models.PlanStatusDisabled); err != nil {
return fmt.Errorf("更新计划 #%d 状态为 '已停止' 失败: %w", planID, err)
}
// 2. 查找当前正在进行的计划执行日志
planLog, err := executionLogRepoTx.FindInProgressPlanExecutionLogByPlanID(planID)
if err != nil {
return fmt.Errorf("查找计划 #%d 正在进行的执行日志失败: %w", planID, err)
}
if planLog == nil {
// 没有正在进行的执行,直接返回成功
return nil
}
// 3. 查找所有需要被取消的任务执行日志
taskLogs, err := executionLogRepoTx.FindIncompleteTaskExecutionLogsByPlanLogID(planLog.ID)
if err != nil {
return fmt.Errorf("查找计划执行日志 #%d 下未完成的任务日志失败: %w", planLog.ID, err)
}
if len(taskLogs) > 0 {
var taskLogIDs []uint
for _, tl := range taskLogs {
taskLogIDs = append(taskLogIDs, tl.ID)
}
// 3.1 批量更新任务执行日志状态为“已取消”
if err := executionLogRepoTx.UpdateLogStatusByIDs(taskLogIDs, models.ExecutionStatusCancelled); err != nil {
return fmt.Errorf("批量更新任务执行日志状态为 '已取消' 失败: %w", err)
}
// 3.2 查找并删除待执行队列中对应的任务
pendingTasks, err := pendingTaskRepoTx.FindPendingTasksByTaskLogIDs(taskLogIDs)
if err != nil {
return fmt.Errorf("查找计划执行日志 #%d 下对应的待执行任务失败: %w", planLog.ID, err)
}
if len(pendingTasks) > 0 {
var pendingTaskIDs []uint
for _, pt := range pendingTasks {
pendingTaskIDs = append(pendingTaskIDs, pt.ID)
}
if err := pendingTaskRepoTx.DeletePendingTasksByIDs(pendingTaskIDs); err != nil {
return fmt.Errorf("批量删除待执行任务失败: %w", err)
}
}
}
// 4. 更新计划执行历史的总状态为“失败”
if err := executionLogRepoTx.UpdatePlanExecutionLogStatus(planLog.ID, models.ExecutionStatusFailed); err != nil {
return fmt.Errorf("更新计划执行日志 #%d 状态为 '失败' 失败: %w", planLog.ID, err)
}
return nil
})
}
// UpdatePlanStatus 更新指定计划的状态
func (r *gormPlanRepository) UpdatePlanStatus(id uint, status models.PlanStatus) error {
result := r.db.Model(&models.Plan{}).Where("id = ?", id).Update("status", status)