410 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			410 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package plan
 | 
						||
 | 
						||
import (
 | 
						||
	"errors"
 | 
						||
	"fmt"
 | 
						||
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
						||
	"gorm.io/gorm"
 | 
						||
)
 | 
						||
 | 
						||
var (
 | 
						||
	// ErrPlanNotFound 表示未找到计划
 | 
						||
	ErrPlanNotFound = errors.New("计划不存在")
 | 
						||
	// ErrPlanCannotBeModified 表示计划不允许修改
 | 
						||
	ErrPlanCannotBeModified = errors.New("系统计划不允许修改")
 | 
						||
	// ErrPlanCannotBeDeleted 表示计划不允许删除
 | 
						||
	ErrPlanCannotBeDeleted = errors.New("系统计划不允许删除")
 | 
						||
	// ErrPlanCannotBeStarted 表示计划不允许手动启动
 | 
						||
	ErrPlanCannotBeStarted = errors.New("系统计划不允许手动启动")
 | 
						||
	// ErrPlanAlreadyEnabled 表示计划已处于启动状态
 | 
						||
	ErrPlanAlreadyEnabled = errors.New("计划已处于启动状态,无需重复操作")
 | 
						||
	// ErrPlanNotEnabled 表示计划未处于启动状态
 | 
						||
	ErrPlanNotEnabled = errors.New("计划当前不是启用状态")
 | 
						||
	// ErrPlanCannotBeStopped 表示计划不允许停止
 | 
						||
	ErrPlanCannotBeStopped = errors.New("系统计划不允许停止")
 | 
						||
)
 | 
						||
 | 
						||
// Service 定义了计划领域服务的接口。
 | 
						||
type Service interface {
 | 
						||
	// Start 启动计划相关的后台服务,例如计划执行管理器。
 | 
						||
	Start()
 | 
						||
	// Stop 停止计划相关的后台服务,例如计划执行管理器。
 | 
						||
	Stop()
 | 
						||
	// RefreshPlanTriggers 刷新计划触发器,同步数据库中的计划状态和待执行队列中的触发器任务。
 | 
						||
	RefreshPlanTriggers() error
 | 
						||
 | 
						||
	// CreatePlan 创建一个新的计划
 | 
						||
	CreatePlan(plan *models.Plan) (*models.Plan, error)
 | 
						||
	// GetPlanByID 根据ID获取计划详情
 | 
						||
	GetPlanByID(id uint) (*models.Plan, error)
 | 
						||
	// ListPlans 获取计划列表,支持过滤和分页
 | 
						||
	ListPlans(opts repository.ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error)
 | 
						||
	// UpdatePlan 更新计划
 | 
						||
	UpdatePlan(plan *models.Plan) (*models.Plan, error)
 | 
						||
	// DeletePlan 删除计划(软删除)
 | 
						||
	DeletePlan(id uint) error
 | 
						||
	// StartPlan 启动计划
 | 
						||
	StartPlan(id uint) error
 | 
						||
	// StopPlan 停止计划
 | 
						||
	StopPlan(id uint) error
 | 
						||
}
 | 
						||
 | 
						||
// planServiceImpl 是 Service 接口的具体实现。
 | 
						||
type planServiceImpl struct {
 | 
						||
	executionManager ExecutionManager
 | 
						||
	taskManager      AnalysisPlanTaskManager
 | 
						||
	planRepo         repository.PlanRepository
 | 
						||
	deviceRepo       repository.DeviceRepository
 | 
						||
	unitOfWork       repository.UnitOfWork
 | 
						||
	taskFactory      TaskFactory
 | 
						||
	logger           *logs.Logger
 | 
						||
}
 | 
						||
 | 
						||
// NewPlanService 创建一个新的 Service 实例。
 | 
						||
func NewPlanService(
 | 
						||
	executionManager ExecutionManager,
 | 
						||
	taskManager AnalysisPlanTaskManager,
 | 
						||
	planRepo repository.PlanRepository,
 | 
						||
	deviceRepo repository.DeviceRepository,
 | 
						||
	unitOfWork repository.UnitOfWork,
 | 
						||
	taskFactory TaskFactory,
 | 
						||
	logger *logs.Logger,
 | 
						||
) Service {
 | 
						||
	return &planServiceImpl{
 | 
						||
		executionManager: executionManager,
 | 
						||
		taskManager:      taskManager,
 | 
						||
		planRepo:         planRepo,
 | 
						||
		deviceRepo:       deviceRepo,
 | 
						||
		unitOfWork:       unitOfWork,
 | 
						||
		taskFactory:      taskFactory,
 | 
						||
		logger:           logger,
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
// Start 启动计划相关的后台服务。
 | 
						||
func (s *planServiceImpl) Start() {
 | 
						||
	s.logger.Infof("PlanService 正在启动...")
 | 
						||
	s.executionManager.Start()
 | 
						||
}
 | 
						||
 | 
						||
// Stop 停止计划相关的后台服务。
 | 
						||
func (s *planServiceImpl) Stop() {
 | 
						||
	s.logger.Infof("PlanService 正在停止...")
 | 
						||
	s.executionManager.Stop()
 | 
						||
}
 | 
						||
 | 
						||
// RefreshPlanTriggers 刷新计划触发器。
 | 
						||
func (s *planServiceImpl) RefreshPlanTriggers() error {
 | 
						||
	s.logger.Infof("PlanService 正在刷新计划触发器...")
 | 
						||
	return s.taskManager.Refresh()
 | 
						||
}
 | 
						||
 | 
						||
// CreatePlan 创建一个新的计划
 | 
						||
func (s *planServiceImpl) CreatePlan(planToCreate *models.Plan) (*models.Plan, error) {
 | 
						||
	const actionType = "领域层:创建计划"
 | 
						||
 | 
						||
	// 1. 业务规则处理
 | 
						||
	// 用户创建的计划永远是自定义计划
 | 
						||
	planToCreate.PlanType = models.PlanTypeCustom
 | 
						||
 | 
						||
	// 自动判断 ContentType
 | 
						||
	if len(planToCreate.SubPlans) > 0 {
 | 
						||
		planToCreate.ContentType = models.PlanContentTypeSubPlans
 | 
						||
	} else {
 | 
						||
		planToCreate.ContentType = models.PlanContentTypeTasks
 | 
						||
	}
 | 
						||
 | 
						||
	// 2. 验证和重排顺序 (领域逻辑)
 | 
						||
	if err := planToCreate.ValidateExecutionOrder(); err != nil {
 | 
						||
		s.logger.Errorf("%s: 计划 (ID: %d) 的执行顺序无效: %v", actionType, planToCreate.ID, err)
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
	planToCreate.ReorderSteps()
 | 
						||
 | 
						||
	// 3. 在调用仓库前,准备好所有数据,包括设备关联
 | 
						||
	for i := range planToCreate.Tasks {
 | 
						||
		taskModel := &planToCreate.Tasks[i]
 | 
						||
		// 使用工厂创建临时领域对象
 | 
						||
		taskResolver, err := s.taskFactory.CreateTaskFromModel(taskModel)
 | 
						||
		if err != nil {
 | 
						||
			// 如果一个任务类型不支持,我们可以选择跳过或报错
 | 
						||
			s.logger.Warnf("跳过为任务类型 '%s' 解析设备ID: %v", taskModel.Type, err)
 | 
						||
			continue
 | 
						||
		}
 | 
						||
 | 
						||
		deviceIDs, err := taskResolver.ResolveDeviceIDs()
 | 
						||
		if err != nil {
 | 
						||
			// 在事务外解析失败,直接返回错误
 | 
						||
			return nil, fmt.Errorf("为任务 '%s' 提取设备ID失败: %w", taskModel.Name, err)
 | 
						||
		}
 | 
						||
		if len(deviceIDs) > 0 {
 | 
						||
			// 优化:无需查询完整的设备对象,只需构建包含ID的结构体即可建立关联
 | 
						||
			devices := make([]models.Device, len(deviceIDs))
 | 
						||
			for i, id := range deviceIDs {
 | 
						||
				devices[i] = models.Device{Model: gorm.Model{ID: id}}
 | 
						||
			}
 | 
						||
			taskModel.Devices = devices
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	// 4. 调用仓库方法创建计划,该方法内部会处理事务
 | 
						||
	err := s.planRepo.CreatePlan(planToCreate)
 | 
						||
	if err != nil {
 | 
						||
		s.logger.Errorf("%s: 数据库创建计划失败: %v", actionType, err)
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	// 5. 创建成功后,调用 manager 确保触发器任务定义存在,但不立即加入待执行队列
 | 
						||
	if err := s.taskManager.EnsureAnalysisTaskDefinition(planToCreate.ID); err != nil {
 | 
						||
		// 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功
 | 
						||
		s.logger.Errorf("为新创建的计划 %d 确保触发器任务定义失败: %v", planToCreate.ID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	s.logger.Infof("%s: 计划创建成功, ID: %d", actionType, planToCreate.ID)
 | 
						||
	return planToCreate, nil
 | 
						||
}
 | 
						||
 | 
						||
// GetPlanByID 根据ID获取计划详情
 | 
						||
func (s *planServiceImpl) GetPlanByID(id uint) (*models.Plan, error) {
 | 
						||
	const actionType = "领域层:获取计划详情"
 | 
						||
 | 
						||
	plan, err := s.planRepo.GetPlanByID(id)
 | 
						||
	if err != nil {
 | 
						||
		if errors.Is(err, gorm.ErrRecordNotFound) {
 | 
						||
			s.logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
 | 
						||
			return nil, ErrPlanNotFound
 | 
						||
		}
 | 
						||
		s.logger.Errorf("%s: 数据库查询失败: %v, ID: %d", actionType, err, id)
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	s.logger.Infof("%s: 获取计划详情成功, ID: %d", actionType, id)
 | 
						||
	return plan, nil
 | 
						||
}
 | 
						||
 | 
						||
// ListPlans 获取计划列表,支持过滤和分页
 | 
						||
func (s *planServiceImpl) ListPlans(opts repository.ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error) {
 | 
						||
	const actionType = "领域层:获取计划列表"
 | 
						||
 | 
						||
	plans, total, err := s.planRepo.ListPlans(opts, page, pageSize)
 | 
						||
	if err != nil {
 | 
						||
		s.logger.Errorf("%s: 数据库查询失败: %v", actionType, err)
 | 
						||
		return nil, 0, err
 | 
						||
	}
 | 
						||
 | 
						||
	s.logger.Infof("%s: 获取计划列表成功, 数量: %d", actionType, len(plans))
 | 
						||
	return plans, total, nil
 | 
						||
}
 | 
						||
 | 
						||
// UpdatePlan 更新计划
 | 
						||
func (s *planServiceImpl) UpdatePlan(planToUpdate *models.Plan) (*models.Plan, error) {
 | 
						||
	const actionType = "领域层:更新计划"
 | 
						||
 | 
						||
	existingPlan, err := s.planRepo.GetBasicPlanByID(planToUpdate.ID)
 | 
						||
	if err != nil {
 | 
						||
		if errors.Is(err, gorm.ErrRecordNotFound) {
 | 
						||
			s.logger.Warnf("%s: 计划不存在, ID: %d", actionType, planToUpdate.ID)
 | 
						||
			return nil, ErrPlanNotFound
 | 
						||
		}
 | 
						||
		s.logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, planToUpdate.ID)
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	// 系统计划不允许修改
 | 
						||
	if existingPlan.PlanType == models.PlanTypeSystem {
 | 
						||
		s.logger.Warnf("%s: 尝试修改系统计划, ID: %d", actionType, planToUpdate.ID)
 | 
						||
		return nil, ErrPlanCannotBeModified
 | 
						||
	}
 | 
						||
 | 
						||
	// 自动判断 ContentType
 | 
						||
	if len(planToUpdate.SubPlans) > 0 {
 | 
						||
		planToUpdate.ContentType = models.PlanContentTypeSubPlans
 | 
						||
	} else {
 | 
						||
		planToUpdate.ContentType = models.PlanContentTypeTasks
 | 
						||
	}
 | 
						||
 | 
						||
	// 验证和重排顺序 (领域逻辑)
 | 
						||
	if err := planToUpdate.ValidateExecutionOrder(); err != nil {
 | 
						||
		s.logger.Errorf("%s: 计划 (ID: %d) 的执行顺序无效: %v", actionType, planToUpdate.ID, err)
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
	planToUpdate.ReorderSteps()
 | 
						||
 | 
						||
	// 只要是更新任务,就重置执行计数器
 | 
						||
	planToUpdate.ExecuteCount = 0
 | 
						||
	s.logger.Infof("计划 #%d 被更新,执行计数器已重置为 0。", planToUpdate.ID)
 | 
						||
 | 
						||
	// 在调用仓库前,准备好所有数据,包括设备关联
 | 
						||
	for i := range planToUpdate.Tasks {
 | 
						||
		taskModel := &planToUpdate.Tasks[i]
 | 
						||
		taskResolver, err := s.taskFactory.CreateTaskFromModel(taskModel)
 | 
						||
		if err != nil {
 | 
						||
			s.logger.Warnf("跳过为任务类型 '%s' 解析设备ID: %v", taskModel.Type, err)
 | 
						||
			continue
 | 
						||
		}
 | 
						||
 | 
						||
		deviceIDs, err := taskResolver.ResolveDeviceIDs()
 | 
						||
		if err != nil {
 | 
						||
			return nil, fmt.Errorf("为任务 '%s' 提取设备ID失败: %w", taskModel.Name, err)
 | 
						||
		}
 | 
						||
		if len(deviceIDs) > 0 {
 | 
						||
			// 优化:无需查询完整的设备对象,只需构建包含ID的结构体即可建立关联
 | 
						||
			devices := make([]models.Device, len(deviceIDs))
 | 
						||
			for i, id := range deviceIDs {
 | 
						||
				devices[i] = models.Device{Model: gorm.Model{ID: id}}
 | 
						||
			}
 | 
						||
			taskModel.Devices = devices
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	// 调用仓库方法更新计划,该方法内部会处理事务
 | 
						||
	err = s.planRepo.UpdatePlanMetadataAndStructure(planToUpdate)
 | 
						||
	if err != nil {
 | 
						||
		s.logger.Errorf("%s: 数据库更新计划失败: %v, Plan: %+v", actionType, err, planToUpdate)
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	if err := s.taskManager.EnsureAnalysisTaskDefinition(planToUpdate.ID); err != nil {
 | 
						||
		s.logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	updatedPlan, err := s.planRepo.GetPlanByID(planToUpdate.ID)
 | 
						||
	if err != nil {
 | 
						||
		s.logger.Errorf("%s: 获取更新后计划详情失败: %v, ID: %d", actionType, err, planToUpdate.ID)
 | 
						||
		return nil, errors.New("获取更新后计划详情时发生内部错误")
 | 
						||
	}
 | 
						||
 | 
						||
	s.logger.Infof("%s: 计划更新成功, ID: %d", actionType, updatedPlan.ID)
 | 
						||
	return updatedPlan, nil
 | 
						||
}
 | 
						||
 | 
						||
// DeletePlan 删除计划(软删除)
 | 
						||
func (s *planServiceImpl) DeletePlan(id uint) error {
 | 
						||
	const actionType = "领域层:删除计划"
 | 
						||
 | 
						||
	plan, err := s.planRepo.GetBasicPlanByID(id)
 | 
						||
	if err != nil {
 | 
						||
		if errors.Is(err, gorm.ErrRecordNotFound) {
 | 
						||
			s.logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
 | 
						||
			return ErrPlanNotFound
 | 
						||
		}
 | 
						||
		s.logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
 | 
						||
		return err
 | 
						||
	}
 | 
						||
 | 
						||
	// 系统计划不允许删除
 | 
						||
	if plan.PlanType == models.PlanTypeSystem {
 | 
						||
		s.logger.Warnf("%s: 尝试删除系统计划, ID: %d", actionType, id)
 | 
						||
		return ErrPlanCannotBeDeleted
 | 
						||
	}
 | 
						||
 | 
						||
	// 如果计划处于启用状态,先停止它
 | 
						||
	if plan.Status == models.PlanStatusEnabled {
 | 
						||
		if err := s.planRepo.StopPlanTransactionally(id); err != nil {
 | 
						||
			s.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
 | 
						||
			return err
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	if err := s.planRepo.DeletePlan(id); err != nil {
 | 
						||
		s.logger.Errorf("%s: 数据库删除失败: %v, ID: %d", actionType, err, id)
 | 
						||
		return err
 | 
						||
	}
 | 
						||
 | 
						||
	s.logger.Infof("%s: 计划删除成功, ID: %d", actionType, id)
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// StartPlan 启动计划
 | 
						||
func (s *planServiceImpl) StartPlan(id uint) error {
 | 
						||
	const actionType = "领域层:启动计划"
 | 
						||
 | 
						||
	plan, err := s.planRepo.GetBasicPlanByID(id)
 | 
						||
	if err != nil {
 | 
						||
		if errors.Is(err, gorm.ErrRecordNotFound) {
 | 
						||
			s.logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
 | 
						||
			return ErrPlanNotFound
 | 
						||
		}
 | 
						||
		s.logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
 | 
						||
		return err
 | 
						||
	}
 | 
						||
 | 
						||
	// 系统计划不允许手动启动
 | 
						||
	if plan.PlanType == models.PlanTypeSystem {
 | 
						||
		s.logger.Warnf("%s: 尝试手动启动系统计划, ID: %d", actionType, id)
 | 
						||
		return ErrPlanCannotBeStarted
 | 
						||
	}
 | 
						||
	// 计划已处于启动状态,无需重复操作
 | 
						||
	if plan.Status == models.PlanStatusEnabled {
 | 
						||
		s.logger.Warnf("%s: 计划已处于启动状态,无需重复操作, ID: %d", actionType, id)
 | 
						||
		return ErrPlanAlreadyEnabled
 | 
						||
	}
 | 
						||
 | 
						||
	// 如果计划未处于启用状态
 | 
						||
	if plan.Status != models.PlanStatusEnabled {
 | 
						||
		// 如果执行计数器大于0,重置为0
 | 
						||
		if plan.ExecuteCount > 0 {
 | 
						||
			if err := s.planRepo.UpdateExecuteCount(plan.ID, 0); err != nil {
 | 
						||
				s.logger.Errorf("%s: 重置计划执行计数失败: %v, ID: %d", actionType, err, plan.ID)
 | 
						||
				return err
 | 
						||
			}
 | 
						||
			s.logger.Infof("计划 #%d 的执行计数器已重置为 0。", plan.ID)
 | 
						||
		}
 | 
						||
 | 
						||
		// 更新计划状态为启用
 | 
						||
		if err := s.planRepo.UpdatePlanStatus(plan.ID, models.PlanStatusEnabled); err != nil {
 | 
						||
			s.logger.Errorf("%s: 更新计划状态失败: %v, ID: %d", actionType, err, plan.ID)
 | 
						||
			return err
 | 
						||
		}
 | 
						||
		s.logger.Infof("已成功更新计划 #%d 的状态为 '已启动'。", plan.ID)
 | 
						||
	}
 | 
						||
 | 
						||
	// 创建或更新触发器
 | 
						||
	if err := s.taskManager.CreateOrUpdateTrigger(plan.ID); err != nil {
 | 
						||
		s.logger.Errorf("%s: 创建或更新触发器失败: %v, ID: %d", actionType, err, plan.ID)
 | 
						||
		return err
 | 
						||
	}
 | 
						||
 | 
						||
	s.logger.Infof("%s: 计划已成功启动, ID: %d", actionType, id)
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// StopPlan 停止计划
 | 
						||
func (s *planServiceImpl) StopPlan(id uint) error {
 | 
						||
	const actionType = "领域层:停止计划"
 | 
						||
 | 
						||
	plan, err := s.planRepo.GetBasicPlanByID(id)
 | 
						||
	if err != nil {
 | 
						||
		if errors.Is(err, gorm.ErrRecordNotFound) {
 | 
						||
			s.logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
 | 
						||
			return ErrPlanNotFound
 | 
						||
		}
 | 
						||
		s.logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
 | 
						||
		return err
 | 
						||
	}
 | 
						||
 | 
						||
	// 系统计划不允许停止
 | 
						||
	if plan.PlanType == models.PlanTypeSystem {
 | 
						||
		s.logger.Warnf("%s: 尝试停止系统计划, ID: %d", actionType, id)
 | 
						||
		return ErrPlanCannotBeStopped
 | 
						||
	}
 | 
						||
 | 
						||
	// 计划当前不是启用状态
 | 
						||
	if plan.Status != models.PlanStatusEnabled {
 | 
						||
		s.logger.Warnf("%s: 计划当前不是启用状态, ID: %d, Status: %s", actionType, id, plan.Status)
 | 
						||
		return ErrPlanNotEnabled
 | 
						||
	}
 | 
						||
 | 
						||
	// 停止计划事务性操作
 | 
						||
	if err := s.planRepo.StopPlanTransactionally(id); err != nil {
 | 
						||
		s.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
 | 
						||
		return err
 | 
						||
	}
 | 
						||
 | 
						||
	s.logger.Infof("%s: 计划已成功停止, ID: %d", actionType, id)
 | 
						||
	return nil
 | 
						||
}
 |