实现全量采集系统计划
This commit is contained in:
		| @@ -9,7 +9,6 @@ import ( | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/api" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/config" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| ) | ||||
|  | ||||
| // Application 是整个应用的核心,封装了所有组件和生命周期。 | ||||
| @@ -133,133 +132,3 @@ func (app *Application) Stop() error { | ||||
| 	app.Logger.Info("应用已成功关闭") | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // initializeState 在应用启动时准备其初始数据状态。 | ||||
| // 这包括清理任何因上次异常关闭而留下的悬空任务或请求。 | ||||
| func (app *Application) initializeState() error { | ||||
| 	// 清理待采集任务 (非致命错误) | ||||
| 	if err := app.initializePendingCollections(); err != nil { | ||||
| 		app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err) | ||||
| 	} | ||||
|  | ||||
| 	// 初始化待执行任务列表 (致命错误) | ||||
| 	if err := app.initializePendingTasks(); err != nil { | ||||
| 		return fmt.Errorf("初始化待执行任务列表失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // initializePendingCollections 在应用启动时处理所有未完成的采集请求。 | ||||
| // 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 | ||||
| // 这保证了系统在每次启动时都处于一个干净、确定的状态。 | ||||
| func (app *Application) initializePendingCollections() error { | ||||
| 	app.Logger.Info("开始清理所有未完成的采集请求...") | ||||
|  | ||||
| 	// 直接将所有 'pending' 状态的请求更新为 'timed_out'。 | ||||
| 	count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("清理未完成的采集请求失败: %v", err) | ||||
| 	} else if count > 0 { | ||||
| 		app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count) | ||||
| 	} else { | ||||
| 		app.Logger.Info("没有需要清理的采集请求。") | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 | ||||
| func (app *Application) initializePendingTasks() error { | ||||
| 	logger := app.Logger | ||||
| 	planRepo := app.Infra.Repos.PlanRepo | ||||
| 	pendingTaskRepo := app.Infra.Repos.PendingTaskRepo | ||||
| 	executionLogRepo := app.Infra.Repos.ExecutionLogRepo | ||||
| 	analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager | ||||
|  | ||||
| 	logger.Info("开始初始化待执行任务列表...") | ||||
|  | ||||
| 	// 阶段一:修正因崩溃导致状态不一致的固定次数计划 | ||||
| 	logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...") | ||||
| 	plansToCorrect, err := planRepo.FindPlansWithPendingTasks() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("查找需要修正的计划失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	for _, plan := range plansToCorrect { | ||||
| 		logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name) | ||||
|  | ||||
| 		// 更新计划的执行计数 | ||||
| 		plan.ExecuteCount++ | ||||
| 		logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount) | ||||
|  | ||||
| 		if plan.ExecutionType == models.PlanExecutionTypeManual || | ||||
| 			(plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) { | ||||
| 			// 更新计划状态为已停止 | ||||
| 			plan.Status = models.PlanStatusStopped | ||||
| 			logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID) | ||||
|  | ||||
| 		} | ||||
| 		// 保存更新后的计划 | ||||
| 		if err := planRepo.UpdatePlan(plan); err != nil { | ||||
| 			logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err) | ||||
| 			// 这是一个非阻塞性错误,继续处理其他计划 | ||||
| 		} | ||||
| 	} | ||||
| 	logger.Info("阶段一:固定次数计划修正完成。") | ||||
|  | ||||
| 	// 阶段二:清理所有待执行任务和相关日志 | ||||
| 	logger.Info("阶段二:开始清理所有待执行任务和相关日志...") | ||||
|  | ||||
| 	// --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 --- | ||||
| 	// 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting) | ||||
| 	incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("查找未完成的计划执行日志失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	// 2. 收集所有受影响的唯一 PlanID | ||||
| 	affectedPlanIDs := make(map[uint]struct{}) | ||||
| 	for _, log := range incompletePlanLogs { | ||||
| 		affectedPlanIDs[log.PlanID] = struct{}{} | ||||
| 	} | ||||
|  | ||||
| 	// 3. 对于每个受影响的 PlanID,重置其 execute_count 并将其状态设置为 Failed | ||||
| 	for planID := range affectedPlanIDs { | ||||
| 		logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID) | ||||
| 		// 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据 | ||||
| 		if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil { | ||||
| 			logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err) | ||||
| 			// 这是一个非阻塞性错误,继续处理其他计划 | ||||
| 		} | ||||
| 	} | ||||
| 	logger.Info("阶段二:计划主表状态修正完成。") | ||||
|  | ||||
| 	// 直接调用新的方法来更新计划执行日志状态为失败 | ||||
| 	if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil { | ||||
| 		logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err) | ||||
| 		// 这是一个非阻塞性错误,继续执行 | ||||
| 	} | ||||
|  | ||||
| 	// 直接调用新的方法来更新任务执行日志状态为取消 | ||||
| 	if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil { | ||||
| 		logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err) | ||||
| 		// 这是一个非阻塞性错误,继续执行 | ||||
| 	} | ||||
|  | ||||
| 	// 清空待执行列表 | ||||
| 	if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil { | ||||
| 		return fmt.Errorf("清空待执行任务列表失败: %w", err) | ||||
| 	} | ||||
| 	logger.Info("阶段二:待执行任务和相关日志清理完成。") | ||||
|  | ||||
| 	// 阶段三:初始刷新 | ||||
| 	logger.Info("阶段三:开始刷新待执行列表...") | ||||
| 	if err := analysisPlanTaskManager.Refresh(); err != nil { | ||||
| 		return fmt.Errorf("刷新待执行任务列表失败: %w", err) | ||||
| 	} | ||||
| 	logger.Info("阶段三:待执行任务列表初始化完成。") | ||||
|  | ||||
| 	logger.Info("待执行任务列表初始化完成。") | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user