Files
pig-farm-controller/internal/domain/plan/plan_service.go
2025-11-05 21:40:19 +08:00

422 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package plan
import (
"context"
"errors"
"fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"gorm.io/gorm"
)
var (
// ErrPlanNotFound 表示未找到计划
ErrPlanNotFound = errors.New("计划不存在")
// ErrPlanCannotBeModified 表示计划不允许修改
ErrPlanCannotBeModified = errors.New("系统计划不允许修改")
// ErrPlanCannotBeDeleted 表示计划不允许删除
ErrPlanCannotBeDeleted = errors.New("系统计划不允许删除")
// ErrPlanCannotBeStarted 表示计划不允许手动启动
ErrPlanCannotBeStarted = errors.New("系统计划不允许手动启动")
// ErrPlanAlreadyEnabled 表示计划已处于启动状态
ErrPlanAlreadyEnabled = errors.New("计划已处于启动状态,无需重复操作")
// ErrPlanNotEnabled 表示计划未处于启动状态
ErrPlanNotEnabled = errors.New("计划当前不是启用状态")
// ErrPlanCannotBeStopped 表示计划不允许停止
ErrPlanCannotBeStopped = errors.New("系统计划不允许停止")
)
// Service 定义了计划领域服务的接口。
type Service interface {
// Start 启动计划相关的后台服务,例如计划执行管理器。
Start(ctx context.Context)
// Stop 停止计划相关的后台服务,例如计划执行管理器。
Stop(ctx context.Context)
// RefreshPlanTriggers 刷新计划触发器,同步数据库中的计划状态和待执行队列中的触发器任务。
RefreshPlanTriggers(ctx context.Context) error
// CreatePlan 创建一个新的计划
CreatePlan(ctx context.Context, plan *models.Plan) (*models.Plan, error)
// GetPlanByID 根据ID获取计划详情
GetPlanByID(ctx context.Context, id uint) (*models.Plan, error)
// ListPlans 获取计划列表,支持过滤和分页
ListPlans(ctx context.Context, opts repository.ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error)
// UpdatePlan 更新计划
UpdatePlan(ctx context.Context, plan *models.Plan) (*models.Plan, error)
// DeletePlan 删除计划(软删除)
DeletePlan(ctx context.Context, id uint) error
// StartPlan 启动计划
StartPlan(ctx context.Context, id uint) error
// StopPlan 停止计划
StopPlan(ctx context.Context, id uint) error
}
// planServiceImpl 是 Service 接口的具体实现。
type planServiceImpl struct {
ctx context.Context
executionManager ExecutionManager
taskManager AnalysisPlanTaskManager
planRepo repository.PlanRepository
deviceRepo repository.DeviceRepository
unitOfWork repository.UnitOfWork
taskFactory TaskFactory
}
// NewPlanService 创建一个新的 Service 实例。
func NewPlanService(
ctx context.Context,
executionManager ExecutionManager,
taskManager AnalysisPlanTaskManager,
planRepo repository.PlanRepository,
deviceRepo repository.DeviceRepository,
unitOfWork repository.UnitOfWork,
taskFactory TaskFactory,
) Service {
return &planServiceImpl{
ctx: ctx,
executionManager: executionManager,
taskManager: taskManager,
planRepo: planRepo,
deviceRepo: deviceRepo,
unitOfWork: unitOfWork,
taskFactory: taskFactory,
}
}
// Start 启动计划相关的后台服务。
func (s *planServiceImpl) Start(ctx context.Context) {
planCtx, logger := logs.Trace(ctx, s.ctx, "Start")
logger.Infof("PlanService 正在启动...")
s.executionManager.Start(planCtx)
}
// Stop 停止计划相关的后台服务。
func (s *planServiceImpl) Stop(ctx context.Context) {
planCtx, logger := logs.Trace(ctx, s.ctx, "Stop")
logger.Infof("PlanService 正在停止...")
s.executionManager.Stop(planCtx)
}
// RefreshPlanTriggers 刷新计划触发器。
func (s *planServiceImpl) RefreshPlanTriggers(ctx context.Context) error {
planCtx, logger := logs.Trace(ctx, s.ctx, "RefreshPlanTriggers")
logger.Infof("PlanService 正在刷新计划触发器...")
return s.taskManager.Refresh(planCtx)
}
// CreatePlan 创建一个新的计划
func (s *planServiceImpl) CreatePlan(ctx context.Context, planToCreate *models.Plan) (*models.Plan, error) {
planCtx, logger := logs.Trace(ctx, s.ctx, "CreatePlan")
const actionType = "领域层:创建计划"
// 1. 业务规则处理
// 用户创建的计划永远是自定义计划
planToCreate.PlanType = models.PlanTypeCustom
// 自动判断 ContentType
if len(planToCreate.SubPlans) > 0 {
planToCreate.ContentType = models.PlanContentTypeSubPlans
} else {
planToCreate.ContentType = models.PlanContentTypeTasks
}
// 2. 验证和重排顺序 (领域逻辑)
if err := planToCreate.ValidateExecutionOrder(); err != nil {
logger.Errorf("%s: 计划 (ID: %d) 的执行顺序无效: %v", actionType, planToCreate.ID, err)
return nil, err
}
planToCreate.ReorderSteps()
// 3. 在调用仓库前,准备好所有数据,包括设备关联
for i := range planToCreate.Tasks {
taskModel := &planToCreate.Tasks[i]
// 使用工厂创建临时领域对象
taskResolver, err := s.taskFactory.CreateTaskFromModel(planCtx, taskModel)
if err != nil {
// 如果一个任务类型不支持,我们可以选择跳过或报错
logger.Warnf("跳过为任务类型 '%s' 解析设备ID: %v", taskModel.Type, err)
continue
}
deviceIDs, err := taskResolver.ResolveDeviceIDs(planCtx)
if err != nil {
// 在事务外解析失败,直接返回错误
return nil, fmt.Errorf("为任务 '%s' 提取设备ID失败: %w", taskModel.Name, err)
}
if len(deviceIDs) > 0 {
// 优化无需查询完整的设备对象只需构建包含ID的结构体即可建立关联
devices := make([]models.Device, len(deviceIDs))
for i, id := range deviceIDs {
devices[i] = models.Device{Model: gorm.Model{ID: id}}
}
taskModel.Devices = devices
}
}
// 4. 调用仓库方法创建计划,该方法内部会处理事务
err := s.planRepo.CreatePlan(planCtx, planToCreate)
if err != nil {
logger.Errorf("%s: 数据库创建计划失败: %v", actionType, err)
return nil, err
}
// 5. 创建成功后,调用 manager 确保触发器任务定义存在,但不立即加入待执行队列
if err := s.taskManager.EnsureAnalysisTaskDefinition(planCtx, planToCreate.ID); err != nil {
// 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功
logger.Errorf("为新创建的计划 %d 确保触发器任务定义失败: %v", planToCreate.ID, err)
}
logger.Infof("%s: 计划创建成功, ID: %d", actionType, planToCreate.ID)
return planToCreate, nil
}
// GetPlanByID 根据ID获取计划详情
func (s *planServiceImpl) GetPlanByID(ctx context.Context, id uint) (*models.Plan, error) {
planCtx, logger := logs.Trace(ctx, s.ctx, "GetPlanByID")
const actionType = "领域层:获取计划详情"
plan, err := s.planRepo.GetPlanByID(planCtx, id)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
return nil, ErrPlanNotFound
}
logger.Errorf("%s: 数据库查询失败: %v, ID: %d", actionType, err, id)
return nil, err
}
logger.Infof("%s: 获取计划详情成功, ID: %d", actionType, id)
return plan, nil
}
// ListPlans 获取计划列表,支持过滤和分页
func (s *planServiceImpl) ListPlans(ctx context.Context, opts repository.ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error) {
planCtx, logger := logs.Trace(ctx, s.ctx, "ListPlans")
const actionType = "领域层:获取计划列表"
plans, total, err := s.planRepo.ListPlans(planCtx, opts, page, pageSize)
if err != nil {
logger.Errorf("%s: 数据库查询失败: %v", actionType, err)
return nil, 0, err
}
logger.Infof("%s: 获取计划列表成功, 数量: %d", actionType, len(plans))
return plans, total, nil
}
// UpdatePlan 更新计划
func (s *planServiceImpl) UpdatePlan(ctx context.Context, planToUpdate *models.Plan) (*models.Plan, error) {
planCtx, logger := logs.Trace(ctx, s.ctx, "UpdatePlan")
const actionType = "领域层:更新计划"
existingPlan, err := s.planRepo.GetBasicPlanByID(planCtx, planToUpdate.ID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
logger.Warnf("%s: 计划不存在, ID: %d", actionType, planToUpdate.ID)
return nil, ErrPlanNotFound
}
logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, planToUpdate.ID)
return nil, err
}
// 系统计划不允许修改
if existingPlan.PlanType == models.PlanTypeSystem {
logger.Warnf("%s: 尝试修改系统计划, ID: %d", actionType, planToUpdate.ID)
return nil, ErrPlanCannotBeModified
}
// 自动判断 ContentType
if len(planToUpdate.SubPlans) > 0 {
planToUpdate.ContentType = models.PlanContentTypeSubPlans
} else {
planToUpdate.ContentType = models.PlanContentTypeTasks
}
// 验证和重排顺序 (领域逻辑)
if err := planToUpdate.ValidateExecutionOrder(); err != nil {
logger.Errorf("%s: 计划 (ID: %d) 的执行顺序无效: %v", actionType, planToUpdate.ID, err)
return nil, err
}
planToUpdate.ReorderSteps()
// 只要是更新任务,就重置执行计数器
planToUpdate.ExecuteCount = 0
logger.Infof("计划 #%d 被更新,执行计数器已重置为 0。", planToUpdate.ID)
// 在调用仓库前,准备好所有数据,包括设备关联
for i := range planToUpdate.Tasks {
taskModel := &planToUpdate.Tasks[i]
taskResolver, err := s.taskFactory.CreateTaskFromModel(planCtx, taskModel)
if err != nil {
logger.Warnf("跳过为任务类型 '%s' 解析设备ID: %v", taskModel.Type, err)
continue
}
deviceIDs, err := taskResolver.ResolveDeviceIDs(planCtx)
if err != nil {
return nil, fmt.Errorf("为任务 '%s' 提取设备ID失败: %w", taskModel.Name, err)
}
if len(deviceIDs) > 0 {
// 优化无需查询完整的设备对象只需构建包含ID的结构体即可建立关联
devices := make([]models.Device, len(deviceIDs))
for i, id := range deviceIDs {
devices[i] = models.Device{Model: gorm.Model{ID: id}}
}
taskModel.Devices = devices
}
}
// 调用仓库方法更新计划,该方法内部会处理事务
err = s.planRepo.UpdatePlanMetadataAndStructure(planCtx, planToUpdate)
if err != nil {
logger.Errorf("%s: 数据库更新计划失败: %v, Plan: %+v", actionType, err, planToUpdate)
return nil, err
}
if err := s.taskManager.EnsureAnalysisTaskDefinition(planCtx, planToUpdate.ID); err != nil {
logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err)
}
updatedPlan, err := s.planRepo.GetPlanByID(planCtx, planToUpdate.ID)
if err != nil {
logger.Errorf("%s: 获取更新后计划详情失败: %v, ID: %d", actionType, err, planToUpdate.ID)
return nil, errors.New("获取更新后计划详情时发生内部错误")
}
logger.Infof("%s: 计划更新成功, ID: %d", actionType, updatedPlan.ID)
return updatedPlan, nil
}
// DeletePlan 删除计划(软删除)
func (s *planServiceImpl) DeletePlan(ctx context.Context, id uint) error {
planCtx, logger := logs.Trace(ctx, s.ctx, "DeletePlan")
const actionType = "领域层:删除计划"
plan, err := s.planRepo.GetBasicPlanByID(planCtx, id)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
return ErrPlanNotFound
}
logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
return err
}
// 系统计划不允许删除
if plan.PlanType == models.PlanTypeSystem {
logger.Warnf("%s: 尝试删除系统计划, ID: %d", actionType, id)
return ErrPlanCannotBeDeleted
}
// 如果计划处于启用状态,先停止它
if plan.Status == models.PlanStatusEnabled {
if err := s.planRepo.StopPlanTransactionally(planCtx, id); err != nil {
logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
return err
}
}
if err := s.planRepo.DeletePlan(planCtx, id); err != nil {
logger.Errorf("%s: 数据库删除失败: %v, ID: %d", actionType, err, id)
return err
}
logger.Infof("%s: 计划删除成功, ID: %d", actionType, id)
return nil
}
// StartPlan 启动计划
func (s *planServiceImpl) StartPlan(ctx context.Context, id uint) error {
planCtx, logger := logs.Trace(ctx, s.ctx, "StartPlan")
const actionType = "领域层:启动计划"
plan, err := s.planRepo.GetBasicPlanByID(planCtx, id)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
return ErrPlanNotFound
}
logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
return err
}
// 系统计划不允许手动启动
if plan.PlanType == models.PlanTypeSystem {
logger.Warnf("%s: 尝试手动启动系统计划, ID: %d", actionType, id)
return ErrPlanCannotBeStarted
}
// 计划已处于启动状态,无需重复操作
if plan.Status == models.PlanStatusEnabled {
logger.Warnf("%s: 计划已处于启动状态,无需重复操作, ID: %d", actionType, id)
return ErrPlanAlreadyEnabled
}
// 如果计划未处于启用状态
if plan.Status != models.PlanStatusEnabled {
// 如果执行计数器大于0重置为0
if plan.ExecuteCount > 0 {
if err := s.planRepo.UpdateExecuteCount(planCtx, plan.ID, 0); err != nil {
logger.Errorf("%s: 重置计划执行计数失败: %v, ID: %d", actionType, err, plan.ID)
return err
}
logger.Infof("计划 #%d 的执行计数器已重置为 0。", plan.ID)
}
// 更新计划状态为启用
if err := s.planRepo.UpdatePlanStatus(planCtx, plan.ID, models.PlanStatusEnabled); err != nil {
logger.Errorf("%s: 更新计划状态失败: %v, ID: %d", actionType, err, plan.ID)
return err
}
logger.Infof("已成功更新计划 #%d 的状态为 '已启动'。", plan.ID)
}
// 创建或更新触发器
if err := s.taskManager.CreateOrUpdateTrigger(planCtx, plan.ID); err != nil {
logger.Errorf("%s: 创建或更新触发器失败: %v, ID: %d", actionType, err, plan.ID)
return err
}
logger.Infof("%s: 计划已成功启动, ID: %d", actionType, id)
return nil
}
// StopPlan 停止计划
func (s *planServiceImpl) StopPlan(ctx context.Context, id uint) error {
planCtx, logger := logs.Trace(ctx, s.ctx, "StopPlan")
const actionType = "领域层:停止计划"
plan, err := s.planRepo.GetBasicPlanByID(planCtx, id)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
return ErrPlanNotFound
}
logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
return err
}
// 系统计划不允许停止
if plan.PlanType == models.PlanTypeSystem {
logger.Warnf("%s: 尝试停止系统计划, ID: %d", actionType, id)
return ErrPlanCannotBeStopped
}
// 计划当前不是启用状态
if plan.Status != models.PlanStatusEnabled {
logger.Warnf("%s: 计划当前不是启用状态, ID: %d, Status: %s", actionType, id, plan.Status)
return ErrPlanNotEnabled
}
// 停止计划事务性操作
if err := s.planRepo.StopPlanTransactionally(planCtx, id); err != nil {
logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
return err
}
logger.Infof("%s: 计划已成功停止, ID: %d", actionType, id)
return nil
}