package plan import ( "context" "errors" "fmt" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "gorm.io/gorm" ) var ( // ErrPlanNotFound 表示未找到计划 ErrPlanNotFound = errors.New("计划不存在") // ErrPlanCannotBeModified 表示计划不允许修改 ErrPlanCannotBeModified = errors.New("系统计划不允许修改") // ErrPlanCannotBeDeleted 表示计划不允许删除 ErrPlanCannotBeDeleted = errors.New("系统计划不允许删除") // ErrPlanCannotBeStarted 表示计划不允许手动启动 ErrPlanCannotBeStarted = errors.New("系统计划不允许手动启动") // ErrPlanAlreadyEnabled 表示计划已处于启动状态 ErrPlanAlreadyEnabled = errors.New("计划已处于启动状态,无需重复操作") // ErrPlanNotEnabled 表示计划未处于启动状态 ErrPlanNotEnabled = errors.New("计划当前不是启用状态") // ErrPlanCannotBeStopped 表示计划不允许停止 ErrPlanCannotBeStopped = errors.New("系统计划不允许停止") ) // Service 定义了计划领域服务的接口。 type Service interface { // Start 启动计划相关的后台服务,例如计划执行管理器。 Start(ctx context.Context) // Stop 停止计划相关的后台服务,例如计划执行管理器。 Stop(ctx context.Context) // RefreshPlanTriggers 刷新计划触发器,同步数据库中的计划状态和待执行队列中的触发器任务。 RefreshPlanTriggers(ctx context.Context) error // CreatePlan 创建一个新的计划 CreatePlan(ctx context.Context, plan *models.Plan) (*models.Plan, error) // GetPlanByID 根据ID获取计划详情 GetPlanByID(ctx context.Context, id uint) (*models.Plan, error) // ListPlans 获取计划列表,支持过滤和分页 ListPlans(ctx context.Context, opts repository.ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error) // UpdatePlan 更新计划 UpdatePlan(ctx context.Context, plan *models.Plan) (*models.Plan, error) // DeletePlan 删除计划(软删除) DeletePlan(ctx context.Context, id uint) error // StartPlan 启动计划 StartPlan(ctx context.Context, id uint) error // StopPlan 停止计划 StopPlan(ctx context.Context, id uint) error } // planServiceImpl 是 Service 接口的具体实现。 type planServiceImpl struct { ctx context.Context executionManager ExecutionManager taskManager AnalysisPlanTaskManager planRepo repository.PlanRepository deviceRepo repository.DeviceRepository unitOfWork repository.UnitOfWork taskFactory TaskFactory } // NewPlanService 创建一个新的 Service 实例。 func NewPlanService( ctx context.Context, executionManager ExecutionManager, taskManager AnalysisPlanTaskManager, planRepo repository.PlanRepository, deviceRepo repository.DeviceRepository, unitOfWork repository.UnitOfWork, taskFactory TaskFactory, ) Service { return &planServiceImpl{ ctx: ctx, executionManager: executionManager, taskManager: taskManager, planRepo: planRepo, deviceRepo: deviceRepo, unitOfWork: unitOfWork, taskFactory: taskFactory, } } // Start 启动计划相关的后台服务。 func (s *planServiceImpl) Start(ctx context.Context) { planCtx, logger := logs.Trace(ctx, s.ctx, "Start") logger.Infof("PlanService 正在启动...") s.executionManager.Start(planCtx) } // Stop 停止计划相关的后台服务。 func (s *planServiceImpl) Stop(ctx context.Context) { planCtx, logger := logs.Trace(ctx, s.ctx, "Stop") logger.Infof("PlanService 正在停止...") s.executionManager.Stop(planCtx) } // RefreshPlanTriggers 刷新计划触发器。 func (s *planServiceImpl) RefreshPlanTriggers(ctx context.Context) error { planCtx, logger := logs.Trace(ctx, s.ctx, "RefreshPlanTriggers") logger.Infof("PlanService 正在刷新计划触发器...") return s.taskManager.Refresh(planCtx) } // CreatePlan 创建一个新的计划 func (s *planServiceImpl) CreatePlan(ctx context.Context, planToCreate *models.Plan) (*models.Plan, error) { planCtx, logger := logs.Trace(ctx, s.ctx, "CreatePlan") const actionType = "领域层:创建计划" // 1. 业务规则处理 // 用户创建的计划永远是自定义计划 planToCreate.PlanType = models.PlanTypeCustom // 自动判断 ContentType if len(planToCreate.SubPlans) > 0 { planToCreate.ContentType = models.PlanContentTypeSubPlans } else { planToCreate.ContentType = models.PlanContentTypeTasks } // 2. 验证和重排顺序 (领域逻辑) if err := planToCreate.ValidateExecutionOrder(); err != nil { logger.Errorf("%s: 计划 (ID: %d) 的执行顺序无效: %v", actionType, planToCreate.ID, err) return nil, err } planToCreate.ReorderSteps() // 3. 在调用仓库前,准备好所有数据,包括设备关联 for i := range planToCreate.Tasks { taskModel := &planToCreate.Tasks[i] // 使用工厂创建临时领域对象 taskResolver, err := s.taskFactory.CreateTaskFromModel(planCtx, taskModel) if err != nil { // 如果一个任务类型不支持,我们可以选择跳过或报错 logger.Warnf("跳过为任务类型 '%s' 解析设备ID: %v", taskModel.Type, err) continue } deviceIDs, err := taskResolver.ResolveDeviceIDs(planCtx) if err != nil { // 在事务外解析失败,直接返回错误 return nil, fmt.Errorf("为任务 '%s' 提取设备ID失败: %w", taskModel.Name, err) } if len(deviceIDs) > 0 { // 优化:无需查询完整的设备对象,只需构建包含ID的结构体即可建立关联 devices := make([]models.Device, len(deviceIDs)) for i, id := range deviceIDs { devices[i] = models.Device{Model: gorm.Model{ID: id}} } taskModel.Devices = devices } } // 4. 调用仓库方法创建计划,该方法内部会处理事务 err := s.planRepo.CreatePlan(planCtx, planToCreate) if err != nil { logger.Errorf("%s: 数据库创建计划失败: %v", actionType, err) return nil, err } // 5. 创建成功后,调用 manager 确保触发器任务定义存在,但不立即加入待执行队列 if err := s.taskManager.EnsureAnalysisTaskDefinition(planCtx, planToCreate.ID); err != nil { // 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功 logger.Errorf("为新创建的计划 %d 确保触发器任务定义失败: %v", planToCreate.ID, err) } logger.Infof("%s: 计划创建成功, ID: %d", actionType, planToCreate.ID) return planToCreate, nil } // GetPlanByID 根据ID获取计划详情 func (s *planServiceImpl) GetPlanByID(ctx context.Context, id uint) (*models.Plan, error) { planCtx, logger := logs.Trace(ctx, s.ctx, "GetPlanByID") const actionType = "领域层:获取计划详情" plan, err := s.planRepo.GetPlanByID(planCtx, id) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { logger.Warnf("%s: 计划不存在, ID: %d", actionType, id) return nil, ErrPlanNotFound } logger.Errorf("%s: 数据库查询失败: %v, ID: %d", actionType, err, id) return nil, err } logger.Infof("%s: 获取计划详情成功, ID: %d", actionType, id) return plan, nil } // ListPlans 获取计划列表,支持过滤和分页 func (s *planServiceImpl) ListPlans(ctx context.Context, opts repository.ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error) { planCtx, logger := logs.Trace(ctx, s.ctx, "ListPlans") const actionType = "领域层:获取计划列表" plans, total, err := s.planRepo.ListPlans(planCtx, opts, page, pageSize) if err != nil { logger.Errorf("%s: 数据库查询失败: %v", actionType, err) return nil, 0, err } logger.Infof("%s: 获取计划列表成功, 数量: %d", actionType, len(plans)) return plans, total, nil } // UpdatePlan 更新计划 func (s *planServiceImpl) UpdatePlan(ctx context.Context, planToUpdate *models.Plan) (*models.Plan, error) { planCtx, logger := logs.Trace(ctx, s.ctx, "UpdatePlan") const actionType = "领域层:更新计划" existingPlan, err := s.planRepo.GetBasicPlanByID(planCtx, planToUpdate.ID) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { logger.Warnf("%s: 计划不存在, ID: %d", actionType, planToUpdate.ID) return nil, ErrPlanNotFound } logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, planToUpdate.ID) return nil, err } // 系统计划不允许修改 if existingPlan.PlanType == models.PlanTypeSystem { logger.Warnf("%s: 尝试修改系统计划, ID: %d", actionType, planToUpdate.ID) return nil, ErrPlanCannotBeModified } // 自动判断 ContentType if len(planToUpdate.SubPlans) > 0 { planToUpdate.ContentType = models.PlanContentTypeSubPlans } else { planToUpdate.ContentType = models.PlanContentTypeTasks } // 验证和重排顺序 (领域逻辑) if err := planToUpdate.ValidateExecutionOrder(); err != nil { logger.Errorf("%s: 计划 (ID: %d) 的执行顺序无效: %v", actionType, planToUpdate.ID, err) return nil, err } planToUpdate.ReorderSteps() // 只要是更新任务,就重置执行计数器 planToUpdate.ExecuteCount = 0 logger.Infof("计划 #%d 被更新,执行计数器已重置为 0。", planToUpdate.ID) // 在调用仓库前,准备好所有数据,包括设备关联 for i := range planToUpdate.Tasks { taskModel := &planToUpdate.Tasks[i] taskResolver, err := s.taskFactory.CreateTaskFromModel(planCtx, taskModel) if err != nil { logger.Warnf("跳过为任务类型 '%s' 解析设备ID: %v", taskModel.Type, err) continue } deviceIDs, err := taskResolver.ResolveDeviceIDs(planCtx) if err != nil { return nil, fmt.Errorf("为任务 '%s' 提取设备ID失败: %w", taskModel.Name, err) } if len(deviceIDs) > 0 { // 优化:无需查询完整的设备对象,只需构建包含ID的结构体即可建立关联 devices := make([]models.Device, len(deviceIDs)) for i, id := range deviceIDs { devices[i] = models.Device{Model: gorm.Model{ID: id}} } taskModel.Devices = devices } } // 调用仓库方法更新计划,该方法内部会处理事务 err = s.planRepo.UpdatePlanMetadataAndStructure(planCtx, planToUpdate) if err != nil { logger.Errorf("%s: 数据库更新计划失败: %v, Plan: %+v", actionType, err, planToUpdate) return nil, err } if err := s.taskManager.EnsureAnalysisTaskDefinition(planCtx, planToUpdate.ID); err != nil { logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err) } updatedPlan, err := s.planRepo.GetPlanByID(planCtx, planToUpdate.ID) if err != nil { logger.Errorf("%s: 获取更新后计划详情失败: %v, ID: %d", actionType, err, planToUpdate.ID) return nil, errors.New("获取更新后计划详情时发生内部错误") } logger.Infof("%s: 计划更新成功, ID: %d", actionType, updatedPlan.ID) return updatedPlan, nil } // DeletePlan 删除计划(软删除) func (s *planServiceImpl) DeletePlan(ctx context.Context, id uint) error { planCtx, logger := logs.Trace(ctx, s.ctx, "DeletePlan") const actionType = "领域层:删除计划" plan, err := s.planRepo.GetBasicPlanByID(planCtx, id) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { logger.Warnf("%s: 计划不存在, ID: %d", actionType, id) return ErrPlanNotFound } logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id) return err } // 系统计划不允许删除 if plan.PlanType == models.PlanTypeSystem { logger.Warnf("%s: 尝试删除系统计划, ID: %d", actionType, id) return ErrPlanCannotBeDeleted } // 如果计划处于启用状态,先停止它 if plan.Status == models.PlanStatusEnabled { if err := s.planRepo.StopPlanTransactionally(planCtx, id); err != nil { logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id) return err } } if err := s.planRepo.DeletePlan(planCtx, id); err != nil { logger.Errorf("%s: 数据库删除失败: %v, ID: %d", actionType, err, id) return err } logger.Infof("%s: 计划删除成功, ID: %d", actionType, id) return nil } // StartPlan 启动计划 func (s *planServiceImpl) StartPlan(ctx context.Context, id uint) error { planCtx, logger := logs.Trace(ctx, s.ctx, "StartPlan") const actionType = "领域层:启动计划" plan, err := s.planRepo.GetBasicPlanByID(planCtx, id) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { logger.Warnf("%s: 计划不存在, ID: %d", actionType, id) return ErrPlanNotFound } logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id) return err } // 系统计划不允许手动启动 if plan.PlanType == models.PlanTypeSystem { logger.Warnf("%s: 尝试手动启动系统计划, ID: %d", actionType, id) return ErrPlanCannotBeStarted } // 计划已处于启动状态,无需重复操作 if plan.Status == models.PlanStatusEnabled { logger.Warnf("%s: 计划已处于启动状态,无需重复操作, ID: %d", actionType, id) return ErrPlanAlreadyEnabled } // 如果计划未处于启用状态 if plan.Status != models.PlanStatusEnabled { // 如果执行计数器大于0,重置为0 if plan.ExecuteCount > 0 { if err := s.planRepo.UpdateExecuteCount(planCtx, plan.ID, 0); err != nil { logger.Errorf("%s: 重置计划执行计数失败: %v, ID: %d", actionType, err, plan.ID) return err } logger.Infof("计划 #%d 的执行计数器已重置为 0。", plan.ID) } // 更新计划状态为启用 if err := s.planRepo.UpdatePlanStatus(planCtx, plan.ID, models.PlanStatusEnabled); err != nil { logger.Errorf("%s: 更新计划状态失败: %v, ID: %d", actionType, err, plan.ID) return err } logger.Infof("已成功更新计划 #%d 的状态为 '已启动'。", plan.ID) } // 创建或更新触发器 if err := s.taskManager.CreateOrUpdateTrigger(planCtx, plan.ID); err != nil { logger.Errorf("%s: 创建或更新触发器失败: %v, ID: %d", actionType, err, plan.ID) return err } logger.Infof("%s: 计划已成功启动, ID: %d", actionType, id) return nil } // StopPlan 停止计划 func (s *planServiceImpl) StopPlan(ctx context.Context, id uint) error { planCtx, logger := logs.Trace(ctx, s.ctx, "StopPlan") const actionType = "领域层:停止计划" plan, err := s.planRepo.GetBasicPlanByID(planCtx, id) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { logger.Warnf("%s: 计划不存在, ID: %d", actionType, id) return ErrPlanNotFound } logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id) return err } // 系统计划不允许停止 if plan.PlanType == models.PlanTypeSystem { logger.Warnf("%s: 尝试停止系统计划, ID: %d", actionType, id) return ErrPlanCannotBeStopped } // 计划当前不是启用状态 if plan.Status != models.PlanStatusEnabled { logger.Warnf("%s: 计划当前不是启用状态, ID: %d, Status: %s", actionType, id, plan.Status) return ErrPlanNotEnabled } // 停止计划事务性操作 if err := s.planRepo.StopPlanTransactionally(planCtx, id); err != nil { logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id) return err } logger.Infof("%s: 计划已成功停止, ID: %d", actionType, id) return nil }