对外暴露接口

This commit is contained in:
2025-11-02 18:27:40 +08:00
parent 408df2f09c
commit 6cd566bc30
4 changed files with 53 additions and 33 deletions

View File

@@ -11,10 +11,22 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils"
)
// AnalysisPlanTaskManager 负责管理分析计划的触发器任务
// AnalysisPlanTaskManager 定义了分析计划任务管理器的接口
type AnalysisPlanTaskManager interface {
// Refresh 同步数据库中的计划状态和待执行队列中的触发器任务。
Refresh() error
// CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。
// 如果触发器已存在,会根据计划类型更新其执行时间。
CreateOrUpdateTrigger(planID uint) error
// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。
// 如果不存在,则会自动创建。此方法不涉及待执行队列。
EnsureAnalysisTaskDefinition(planID uint) error
}
// analysisPlanTaskManagerImpl 负责管理分析计划的触发器任务。
// 它确保数据库中可执行的计划在待执行队列中有对应的触发器,并移除无效的触发器。
// 这是一个有状态的组件,包含一个互斥锁以确保并发安全。
type AnalysisPlanTaskManager struct {
type analysisPlanTaskManagerImpl struct {
planRepo repository.PlanRepository
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
@@ -22,14 +34,14 @@ type AnalysisPlanTaskManager struct {
mu sync.Mutex
}
// NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。
// NewAnalysisPlanTaskManager 是 analysisPlanTaskManagerImpl 的构造函数。
func NewAnalysisPlanTaskManager(
planRepo repository.PlanRepository,
pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository,
logger *logs.Logger,
) *AnalysisPlanTaskManager {
return &AnalysisPlanTaskManager{
) AnalysisPlanTaskManager {
return &analysisPlanTaskManagerImpl{
planRepo: planRepo,
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
@@ -39,7 +51,7 @@ func NewAnalysisPlanTaskManager(
// Refresh 同步数据库中的计划状态和待执行队列中的触发器任务。
// 这是一个编排方法,将复杂的逻辑分解到多个内部方法中。
func (m *AnalysisPlanTaskManager) Refresh() error {
func (m *analysisPlanTaskManagerImpl) Refresh() error {
m.mu.Lock()
defer m.mu.Unlock()
@@ -68,7 +80,7 @@ func (m *AnalysisPlanTaskManager) Refresh() error {
// CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。
// 如果触发器已存在,会根据计划类型更新其执行时间。
func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error {
func (m *analysisPlanTaskManagerImpl) CreateOrUpdateTrigger(planID uint) error {
m.mu.Lock()
defer m.mu.Unlock()
@@ -123,7 +135,7 @@ func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error {
// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。
// 如果不存在,则会自动创建。此方法不涉及待执行队列。
func (m *AnalysisPlanTaskManager) EnsureAnalysisTaskDefinition(planID uint) error {
func (m *analysisPlanTaskManagerImpl) EnsureAnalysisTaskDefinition(planID uint) error {
m.mu.Lock()
defer m.mu.Unlock()
@@ -154,7 +166,7 @@ func (m *AnalysisPlanTaskManager) EnsureAnalysisTaskDefinition(planID uint) erro
// --- 内部私有方法 ---
// getRefreshData 从数据库获取刷新所需的所有数据。
func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) {
func (m *analysisPlanTaskManagerImpl) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) {
runnablePlans, err = m.planRepo.FindRunnablePlans()
if err != nil {
m.logger.Errorf("获取可执行计划列表失败: %v", err)
@@ -180,7 +192,7 @@ func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan
}
// cleanupInvalidTasks 清理所有与失效计划相关的待执行任务。
func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error {
func (m *analysisPlanTaskManagerImpl) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error {
if len(invalidPlanIDs) == 0 {
return nil // 没有需要清理的计划
}
@@ -224,7 +236,7 @@ func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, all
}
// addOrUpdateTriggers 检查、更新或创建触发器。
func (m *AnalysisPlanTaskManager) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error {
func (m *analysisPlanTaskManagerImpl) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error {
// 创建一个映射,存放所有已在队列中的计划触发器
pendingTriggersMap := make(map[uint]models.PendingTask)
for _, pt := range allPendingTasks {
@@ -266,7 +278,7 @@ func (m *AnalysisPlanTaskManager) addOrUpdateTriggers(runnablePlans []*models.Pl
}
// createTriggerTask 是创建触发器任务的内部核心逻辑。
func (m *AnalysisPlanTaskManager) createTriggerTask(plan *models.Plan) error {
func (m *analysisPlanTaskManagerImpl) createTriggerTask(plan *models.Plan) error {
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
if err != nil {
return fmt.Errorf("查找计划分析任务失败: %w", err)