package core import ( "fmt" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) const ( // PlanNameTimedFullDataCollection 是定时全量数据采集计划的名称 PlanNameTimedFullDataCollection = "定时全量数据采集" ) // initializeState 在应用启动时准备其初始数据状态。 // 这包括清理任何因上次异常关闭而留下的悬空任务或请求。 func (app *Application) initializeState() error { // 初始化预定义系统计划 (致命错误) if err := app.initializeSystemPlans(); err != nil { return fmt.Errorf("初始化预定义系统计划失败: %w", err) } // 清理待采集任务 (非致命错误) if err := app.initializePendingCollections(); err != nil { app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err) } // 初始化待执行任务列表 (致命错误) if err := app.initializePendingTasks(); err != nil { return fmt.Errorf("初始化待执行任务列表失败: %w", err) } return nil } // initializeSystemPlans 确保预定义的系统计划在数据库中存在并保持最新。 func (app *Application) initializeSystemPlans() error { app.Logger.Info("开始检查并更新预定义的系统计划...") // 动态构建预定义计划列表 predefinedSystemPlans := app.getPredefinedSystemPlans() // 1. 获取所有已存在的系统计划 existingPlans, _, err := app.Infra.Repos.PlanRepo.ListPlans(repository.ListPlansOptions{ PlanType: repository.PlanTypeFilterSystem, }, 1, 99999) // 使用一个较大的 pageSize 来获取所有系统计划 if err != nil { return fmt.Errorf("获取现有系统计划失败: %w", err) } // 2. 为了方便查找, 将现有计划名放入一个 map existingPlanMap := make(map[string]*models.Plan) for i := range existingPlans { existingPlanMap[existingPlans[i].Name] = &existingPlans[i] } // 3. 遍历预定义的计划列表 for i := range predefinedSystemPlans { predefinedPlan := &predefinedSystemPlans[i] // 获取可修改的指针 if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok { // 如果计划存在,则进行无差别更新 app.Logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name) // 将数据库中已存在的计划的ID和运行时状态字段赋值给预定义计划 predefinedPlan.ID = foundExistingPlan.ID predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount predefinedPlan.Status = foundExistingPlan.Status if err := app.Infra.Repos.PlanRepo.UpdatePlan(predefinedPlan); err != nil { return fmt.Errorf("更新预定义计划 '%s' 失败: %w", predefinedPlan.Name, err) } else { app.Logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name) } } else { // 如果计划不存在, 则创建 app.Logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name) if err := app.Infra.Repos.PlanRepo.CreatePlan(predefinedPlan); err != nil { return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err) } else { app.Logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name) } } } app.Logger.Info("预定义系统计划检查完成。") return nil } // getPredefinedSystemPlans 返回一个基于当前配置的预定义系统计划列表。 func (app *Application) getPredefinedSystemPlans() []models.Plan { // 根据配置创建定时全量采集计划 interval := app.Config.Collection.Interval if interval <= 0 { interval = 1 // 确保间隔至少为1分钟 } cronExpression := fmt.Sprintf("*/%d * * * *", interval) timedCollectionPlan := models.Plan{ Name: PlanNameTimedFullDataCollection, Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 分钟自动触发一次全量数据采集。", app.Config.Collection.Interval), PlanType: models.PlanTypeSystem, ExecutionType: models.PlanExecutionTypeAutomatic, CronExpression: cronExpression, Status: models.PlanStatusEnabled, ContentType: models.PlanContentTypeTasks, Tasks: []models.Task{ { Name: "全量采集", Description: "触发一次全量数据采集", ExecutionOrder: 1, Type: models.TaskTypeFullCollection, }, }, } return []models.Plan{timedCollectionPlan} } // initializePendingCollections 在应用启动时处理所有未完成的采集请求。 // 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 // 这保证了系统在每次启动时都处于一个干净、确定的状态。 func (app *Application) initializePendingCollections() error { app.Logger.Info("开始清理所有未完成的采集请求...") // 直接将所有 'pending' 状态的请求更新为 'timed_out'。 count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut() if err != nil { return fmt.Errorf("清理未完成的采集请求失败: %v", err) } else if count > 0 { app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count) } else { app.Logger.Info("没有需要清理的采集请求。") } return nil } // initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 func (app *Application) initializePendingTasks() error { logger := app.Logger planRepo := app.Infra.Repos.PlanRepo pendingTaskRepo := app.Infra.Repos.PendingTaskRepo executionLogRepo := app.Infra.Repos.ExecutionLogRepo analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager logger.Info("开始初始化待执行任务列表...") // 阶段一:修正因崩溃导致状态不一致的固定次数计划 logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...") plansToCorrect, err := planRepo.FindPlansWithPendingTasks() if err != nil { return fmt.Errorf("查找需要修正的计划失败: %w", err) } for _, plan := range plansToCorrect { logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name) // 更新计划的执行计数 plan.ExecuteCount++ logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount) if plan.ExecutionType == models.PlanExecutionTypeManual || (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) { // 更新计划状态为已停止 plan.Status = models.PlanStatusStopped logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID) } // 保存更新后的计划 if err := planRepo.UpdatePlan(plan); err != nil { logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err) // 这是一个非阻塞性错误,继续处理其他计划 } } logger.Info("阶段一:固定次数计划修正完成。") // 阶段二:清理所有待执行任务和相关日志 logger.Info("阶段二:开始清理所有待执行任务和相关日志...") // --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 --- // 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting) incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs() if err != nil { return fmt.Errorf("查找未完成的计划执行日志失败: %w", err) } // 2. 收集所有受影响的唯一 PlanID affectedPlanIDs := make(map[uint]struct{}) for _, log := range incompletePlanLogs { affectedPlanIDs[log.PlanID] = struct{}{} } // 3. 对于每个受影响的 PlanID,重置其 execute_count 并将其状态设置为 Failed for planID := range affectedPlanIDs { logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID) // 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据 if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil { logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err) // 这是一个非阻塞性错误,继续处理其他计划 } } logger.Info("阶段二:计划主表状态修正完成。") // 直接调用新的方法来更新计划执行日志状态为失败 if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil { logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err) // 这是一个非阻塞性错误,继续执行 } // 直接调用新的方法来更新任务执行日志状态为取消 if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil { logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err) // 这是一个非阻塞性错误,继续执行 } // 清空待执行列表 if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil { return fmt.Errorf("清空待执行任务列表失败: %w", err) } logger.Info("阶段二:待执行任务和相关日志清理完成。") // 阶段三:初始刷新 logger.Info("阶段三:开始刷新待执行列表...") if err := analysisPlanTaskManager.Refresh(); err != nil { return fmt.Errorf("刷新待执行任务列表失败: %w", err) } logger.Info("阶段三:待执行任务列表初始化完成。") logger.Info("待执行任务列表初始化完成。") return nil }