初始化待执行队列
This commit is contained in:
@@ -25,6 +25,12 @@ type Application struct {
|
|||||||
Storage database.Storage
|
Storage database.Storage
|
||||||
Executor *task.Scheduler
|
Executor *task.Scheduler
|
||||||
API *api.API // 添加 API 对象
|
API *api.API // 添加 API 对象
|
||||||
|
|
||||||
|
// 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问
|
||||||
|
planRepo repository.PlanRepository
|
||||||
|
pendingTaskRepo repository.PendingTaskRepository
|
||||||
|
executionLogRepo repository.ExecutionLogRepository
|
||||||
|
analysisPlanTaskManager *task.AnalysisPlanTaskManager
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewApplication 创建并初始化一个新的 Application 实例。
|
// NewApplication 创建并初始化一个新的 Application 实例。
|
||||||
@@ -82,6 +88,11 @@ func NewApplication(configPath string) (*Application, error) {
|
|||||||
Storage: storage,
|
Storage: storage,
|
||||||
Executor: executor,
|
Executor: executor,
|
||||||
API: apiServer,
|
API: apiServer,
|
||||||
|
// 填充新增的字段
|
||||||
|
planRepo: planRepo,
|
||||||
|
pendingTaskRepo: pendingTaskRepo,
|
||||||
|
executionLogRepo: executionLogRepo,
|
||||||
|
analysisPlanTaskManager: analysisPlanTaskManager,
|
||||||
}
|
}
|
||||||
|
|
||||||
return app, nil
|
return app, nil
|
||||||
@@ -91,6 +102,17 @@ func NewApplication(configPath string) (*Application, error) {
|
|||||||
func (app *Application) Start() error {
|
func (app *Application) Start() error {
|
||||||
app.Logger.Info("应用启动中...")
|
app.Logger.Info("应用启动中...")
|
||||||
|
|
||||||
|
// --- 新增逻辑:初始化待执行任务列表 ---
|
||||||
|
if err := app.initializePendingTasks(
|
||||||
|
app.planRepo, // 传入 planRepo
|
||||||
|
app.pendingTaskRepo, // 传入 pendingTaskRepo
|
||||||
|
app.executionLogRepo, // 传入 executionLogRepo
|
||||||
|
app.analysisPlanTaskManager, // 传入 analysisPlanTaskManager
|
||||||
|
app.Logger, // 传入 logger
|
||||||
|
); err != nil {
|
||||||
|
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// 启动任务执行器
|
// 启动任务执行器
|
||||||
app.Executor.Start()
|
app.Executor.Start()
|
||||||
|
|
||||||
@@ -128,6 +150,104 @@ func (app *Application) Stop() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
|
||||||
|
func (app *Application) initializePendingTasks(
|
||||||
|
planRepo repository.PlanRepository,
|
||||||
|
pendingTaskRepo repository.PendingTaskRepository,
|
||||||
|
executionLogRepo repository.ExecutionLogRepository,
|
||||||
|
analysisPlanTaskManager *task.AnalysisPlanTaskManager,
|
||||||
|
logger *logs.Logger,
|
||||||
|
) error {
|
||||||
|
logger.Info("开始初始化待执行任务列表...")
|
||||||
|
|
||||||
|
// 阶段一:修正因崩溃导致状态不一致的固定次数计划
|
||||||
|
logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...")
|
||||||
|
plansToCorrect, err := planRepo.FindPlansWithPendingTasks()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("查找需要修正的计划失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, plan := range plansToCorrect {
|
||||||
|
logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name)
|
||||||
|
|
||||||
|
// 更新计划的执行计数
|
||||||
|
plan.ExecuteCount++
|
||||||
|
logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount)
|
||||||
|
|
||||||
|
if plan.ExecutionType == models.PlanExecutionTypeManual ||
|
||||||
|
(plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) {
|
||||||
|
// 更新计划状态为已停止
|
||||||
|
plan.Status = models.PlanStatusStopeed
|
||||||
|
logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID)
|
||||||
|
|
||||||
|
}
|
||||||
|
// 保存更新后的计划
|
||||||
|
if err := planRepo.UpdatePlan(plan); err != nil {
|
||||||
|
logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err)
|
||||||
|
// 这是一个非阻塞性错误,继续处理其他计划
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.Info("阶段一:固定次数计划修正完成。")
|
||||||
|
|
||||||
|
// 阶段二:清理所有待执行任务和相关日志
|
||||||
|
logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
|
||||||
|
pendingTasks, err := pendingTaskRepo.FindAllPendingTasks()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("获取待执行任务失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var taskLogIDsToCancel []uint
|
||||||
|
var planLogIDsToFail []uint
|
||||||
|
|
||||||
|
for _, pt := range pendingTasks {
|
||||||
|
// 确保 Task 和 TaskExecutionLog 已预加载
|
||||||
|
if pt.Task == nil || pt.TaskExecutionLog.ID == 0 { // TaskExecutionLog.ID为零说明没加载
|
||||||
|
logger.Warnf("待执行任务 %d 缺少关联的 Task 或 TaskExecutionLog,跳过处理。", pt.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 收集任务执行日志ID,所有未完成的任务都标记为取消
|
||||||
|
taskLogIDsToCancel = append(taskLogIDsToCancel, pt.TaskExecutionLog.ID)
|
||||||
|
|
||||||
|
// 收集计划执行日志ID
|
||||||
|
if pt.TaskExecutionLog.PlanExecutionLogID != 0 {
|
||||||
|
planLogIDsToFail = append(planLogIDsToFail, pt.TaskExecutionLog.PlanExecutionLogID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量更新 TaskExecutionLog 状态为取消
|
||||||
|
if len(taskLogIDsToCancel) > 0 {
|
||||||
|
if err := executionLogRepo.UpdateLogStatusByIDs(taskLogIDsToCancel, models.ExecutionStatusCancelled); err != nil {
|
||||||
|
logger.Errorf("批量更新任务执行日志状态为取消失败: %v", err)
|
||||||
|
// 这是一个非阻塞性错误,继续执行
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量更新 PlanExecutionLog 状态为失败
|
||||||
|
if len(planLogIDsToFail) > 0 {
|
||||||
|
if err := executionLogRepo.UpdatePlanExecutionLogsStatusByIDs(planLogIDsToFail, models.ExecutionStatusFailed); err != nil {
|
||||||
|
logger.Errorf("批量更新计划执行日志状态为失败失败: %v", err)
|
||||||
|
// 这是一个非阻塞性错误,继续执行
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 清空待执行列表
|
||||||
|
if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil {
|
||||||
|
return fmt.Errorf("清空待执行任务列表失败: %w", err)
|
||||||
|
}
|
||||||
|
logger.Info("阶段二:待执行任务和相关日志清理完成。")
|
||||||
|
|
||||||
|
// 阶段三:初始刷新
|
||||||
|
logger.Info("阶段三:开始刷新待执行列表...")
|
||||||
|
if err := analysisPlanTaskManager.Refresh(); err != nil {
|
||||||
|
return fmt.Errorf("刷新待执行任务列表失败: %w", err)
|
||||||
|
}
|
||||||
|
logger.Info("阶段三:待执行任务列表初始化完成。")
|
||||||
|
|
||||||
|
logger.Info("待执行任务列表初始化完成。")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// initStorage 封装了数据库的初始化、连接和迁移逻辑。
|
// initStorage 封装了数据库的初始化、连接和迁移逻辑。
|
||||||
func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) {
|
func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) {
|
||||||
// 创建存储实例
|
// 创建存储实例
|
||||||
|
|||||||
@@ -18,6 +18,12 @@ type ExecutionLogRepository interface {
|
|||||||
FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error)
|
FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error)
|
||||||
// UpdatePlanExecutionLogStatus 更新计划执行日志的状态
|
// UpdatePlanExecutionLogStatus 更新计划执行日志的状态
|
||||||
UpdatePlanExecutionLogStatus(logID uint, status models.ExecutionStatus) error
|
UpdatePlanExecutionLogStatus(logID uint, status models.ExecutionStatus) error
|
||||||
|
|
||||||
|
// UpdatePlanExecutionLogsStatusByIDs 批量更新计划执行日志的状态
|
||||||
|
UpdatePlanExecutionLogsStatusByIDs(logIDs []uint, status models.ExecutionStatus) error
|
||||||
|
|
||||||
|
// FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志
|
||||||
|
FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// gormExecutionLogRepository 是使用 GORM 的具体实现。
|
// gormExecutionLogRepository 是使用 GORM 的具体实现。
|
||||||
@@ -93,3 +99,18 @@ func (r *gormExecutionLogRepository) FindTaskExecutionLogByID(id uint) (*models.
|
|||||||
func (r *gormExecutionLogRepository) UpdatePlanExecutionLogStatus(logID uint, status models.ExecutionStatus) error {
|
func (r *gormExecutionLogRepository) UpdatePlanExecutionLogStatus(logID uint, status models.ExecutionStatus) error {
|
||||||
return r.db.Model(&models.PlanExecutionLog{}).Where("id = ?", logID).Update("status", status).Error
|
return r.db.Model(&models.PlanExecutionLog{}).Where("id = ?", logID).Update("status", status).Error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdatePlanExecutionLogsStatusByIDs 批量更新计划执行日志的状态
|
||||||
|
func (r *gormExecutionLogRepository) UpdatePlanExecutionLogsStatusByIDs(logIDs []uint, status models.ExecutionStatus) error {
|
||||||
|
if len(logIDs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return r.db.Model(&models.PlanExecutionLog{}).Where("id IN ?", logIDs).Update("status", status).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志
|
||||||
|
func (r *gormExecutionLogRepository) FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error) {
|
||||||
|
var logs []models.PlanExecutionLog
|
||||||
|
err := r.db.Where("status = ?", models.ExecutionStatusStarted).Find(&logs).Error
|
||||||
|
return logs, err
|
||||||
|
}
|
||||||
|
|||||||
@@ -21,6 +21,9 @@ type PendingTaskRepository interface {
|
|||||||
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
|
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
|
||||||
UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error
|
UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error
|
||||||
|
|
||||||
|
// ClearAllPendingTasks 清空所有待执行任务
|
||||||
|
ClearAllPendingTasks() error
|
||||||
|
|
||||||
// ClaimNextAvailableTask 原子地认领下一个可用的任务。
|
// ClaimNextAvailableTask 原子地认领下一个可用的任务。
|
||||||
// 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。
|
// 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。
|
||||||
ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error)
|
ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error)
|
||||||
@@ -41,7 +44,8 @@ func NewGormPendingTaskRepository(db *gorm.DB) PendingTaskRepository {
|
|||||||
func (r *gormPendingTaskRepository) FindAllPendingTasks() ([]models.PendingTask, error) {
|
func (r *gormPendingTaskRepository) FindAllPendingTasks() ([]models.PendingTask, error) {
|
||||||
var tasks []models.PendingTask
|
var tasks []models.PendingTask
|
||||||
// 预加载 Task 以便后续访问 Task.PlanID
|
// 预加载 Task 以便后续访问 Task.PlanID
|
||||||
err := r.db.Preload("Task").Find(&tasks).Error
|
// 预加载 TaskExecutionLog 以便后续访问 PlanExecutionLogID
|
||||||
|
err := r.db.Preload("Task").Preload("TaskExecutionLog").Find(&tasks).Error
|
||||||
return tasks, err
|
return tasks, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -82,6 +86,11 @@ func (r *gormPendingTaskRepository) UpdatePendingTaskExecuteAt(id uint, executeA
|
|||||||
return r.db.Model(&models.PendingTask{}).Where("id = ?", id).Update("execute_at", executeAt).Error
|
return r.db.Model(&models.PendingTask{}).Where("id = ?", id).Update("execute_at", executeAt).Error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClearAllPendingTasks 清空所有待执行任务
|
||||||
|
func (r *gormPendingTaskRepository) ClearAllPendingTasks() error {
|
||||||
|
return r.db.Where("1 = 1").Delete(&models.PendingTask{}).Error
|
||||||
|
}
|
||||||
|
|
||||||
// ClaimNextAvailableTask 以原子方式认领下一个可用的任务。
|
// ClaimNextAvailableTask 以原子方式认领下一个可用的任务。
|
||||||
func (r *gormPendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) {
|
func (r *gormPendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) {
|
||||||
var log models.TaskExecutionLog
|
var log models.TaskExecutionLog
|
||||||
|
|||||||
@@ -51,6 +51,9 @@ type PlanRepository interface {
|
|||||||
|
|
||||||
// CreatePlanAnalysisTask 创建一个 plan_analysis 类型的任务并返回它
|
// CreatePlanAnalysisTask 创建一个 plan_analysis 类型的任务并返回它
|
||||||
CreatePlanAnalysisTask(plan *models.Plan) (*models.Task, error)
|
CreatePlanAnalysisTask(plan *models.Plan) (*models.Task, error)
|
||||||
|
|
||||||
|
// FindPlansWithPendingTasks 查找所有正在执行的计划
|
||||||
|
FindPlansWithPendingTasks() ([]*models.Plan, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// gormPlanRepository 是 PlanRepository 的 GORM 实现
|
// gormPlanRepository 是 PlanRepository 的 GORM 实现
|
||||||
@@ -625,3 +628,18 @@ func (r *gormPlanRepository) CreatePlanAnalysisTask(plan *models.Plan) (*models.
|
|||||||
})
|
})
|
||||||
return createdTask, err
|
return createdTask, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindPlansWithPendingTasks 查找所有正在执行的计划
|
||||||
|
func (r *gormPlanRepository) FindPlansWithPendingTasks() ([]*models.Plan, error) {
|
||||||
|
var plans []*models.Plan
|
||||||
|
|
||||||
|
// 关联 pending_tasks, task_execution_logs, tasks 表来查找符合条件的计划
|
||||||
|
err := r.db.Table("plans").
|
||||||
|
Joins("JOIN tasks ON plans.id = tasks.plan_id").
|
||||||
|
Joins("JOIN task_execution_logs ON tasks.id = task_execution_logs.task_id").
|
||||||
|
Joins("JOIN pending_tasks ON task_execution_logs.id = pending_tasks.task_execution_log_id").
|
||||||
|
Group("plans.id"). // 避免重复,因为一个计划可能有多个待执行任务
|
||||||
|
Find(&plans).Error
|
||||||
|
|
||||||
|
return plans, err
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user