diff --git a/design/exceeding-threshold-alarm/index.md b/design/exceeding-threshold-alarm/index.md index 7398c59..bf7f650 100644 --- a/design/exceeding-threshold-alarm/index.md +++ b/design/exceeding-threshold-alarm/index.md @@ -130,4 +130,5 @@ 1. 定义告警表和告警历史表 2. 重构部分枚举, 让models包不依赖其他项目中的包 -3. 创建仓库层对象(不包含方法) \ No newline at end of file +3. 创建仓库层对象(不包含方法) +4. 实现告警发送任务 \ No newline at end of file diff --git a/internal/core/component_initializers.go b/internal/core/component_initializers.go index 79316bb..25ff3fb 100644 --- a/internal/core/component_initializers.go +++ b/internal/core/component_initializers.go @@ -160,7 +160,12 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr ) // 任务工厂 - taskFactory := task.NewTaskFactory(logs.AddCompName(baseCtx, "TaskFactory"), infra.repos.sensorDataRepo, infra.repos.deviceRepo, generalDeviceService) + taskFactory := task.NewTaskFactory(logs.AddCompName(baseCtx, "TaskFactory"), + infra.repos.sensorDataRepo, + infra.repos.deviceRepo, + infra.repos.alarmRepo, + generalDeviceService, + infra.notifyService) // 计划任务管理器 analysisPlanTaskManager := plan.NewAnalysisPlanTaskManager(logs.AddCompName(baseCtx, "AnalysisPlanTaskManager"), infra.repos.planRepo, infra.repos.pendingTaskRepo, infra.repos.executionLogRepo) diff --git a/internal/core/data_initializer.go b/internal/core/data_initializer.go index 5a4e93d..c5ffe06 100644 --- a/internal/core/data_initializer.go +++ b/internal/core/data_initializer.go @@ -10,8 +10,10 @@ import ( ) const ( - // PlanNameTimedFullDataCollection 是定时全量数据采集计划的名称 - PlanNameTimedFullDataCollection = "定时全量数据采集" + // PlanNamePeriodicSystemHealthCheck 是周期性系统健康检查计划的名称 + PlanNamePeriodicSystemHealthCheck = "周期性系统健康检查" + // PlanNameAlarmNotification 是告警通知发送计划的名称 + PlanNameAlarmNotification = "告警通知发送" ) // initializeState 在应用启动时准备其初始数据状态。 @@ -48,13 +50,11 @@ func (app *Application) initializeState(ctx context.Context) error { } // initializeSystemPlans 确保预定义的系统计划在数据库中存在并保持最新。 +// 它通过调用各个独立的计划初始化方法来完成此操作。 func (app *Application) initializeSystemPlans(ctx context.Context) error { appCtx, logger := logs.Trace(ctx, app.Ctx, "InitializeSystemPlans") logger.Info("开始检查并更新预定义的系统计划...") - // 动态构建预定义计划列表 - predefinedSystemPlans := app.getPredefinedSystemPlans() - // 1. 获取所有已存在的系统计划 existingPlans, _, err := app.Infra.repos.planRepo.ListPlans(appCtx, repository.ListPlansOptions{ PlanType: repository.PlanTypeFilterSystem, @@ -69,49 +69,23 @@ func (app *Application) initializeSystemPlans(ctx context.Context) error { existingPlanMap[existingPlans[i].Name] = &existingPlans[i] } - // 3. 遍历预定义的计划列表 - for i := range predefinedSystemPlans { - predefinedPlan := &predefinedSystemPlans[i] // 获取可修改的指针 + // 3. 调用独立的初始化方法来处理每个系统计划 + if err := app.initializePeriodicSystemHealthCheckPlan(appCtx, existingPlanMap); err != nil { + return err // 如果任何一个计划初始化失败,则立即返回错误 + } - if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok { - // 如果计划存在,则进行无差别更新 - logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name) - - // 将数据库中已存在的计划的ID和运行时状态字段赋值给预定义计划 - predefinedPlan.ID = foundExistingPlan.ID - predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount - - // 1. 使用 UpdatePlanMetadataAndStructure 来更新计划的元数据和关联任务 - // 这会处理 Name, Description, ExecutionType, ExecuteNum, CronExpression, ContentType - // 并且最重要的是,它会正确处理 Tasks 的增删改,确保任务列表与 predefinedPlan.Tasks 完全同步 - if err := app.Infra.repos.planRepo.UpdatePlanMetadataAndStructure(appCtx, predefinedPlan); err != nil { - return fmt.Errorf("更新预定义计划 '%s' 的元数据和结构失败: %w", predefinedPlan.Name, err) - } - - // 2. 接着使用 UpdatePlan 来更新所有顶层字段,包括 PlanType 和 Status - // 由于任务已经在上一步正确同步,此步不会导致任务冗余 - if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil { - return fmt.Errorf("更新预定义计划 '%s' 的所有顶层字段失败: %w", predefinedPlan.Name, err) - } - - logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name) - } else { - // 如果计划不存在, 则创建 - logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name) - if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil { - return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err) - } else { - logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name) - } - } + if err := app.initializeAlarmNotificationPlan(appCtx, existingPlanMap); err != nil { + return err } logger.Info("预定义系统计划检查完成。") return nil } -// getPredefinedSystemPlans 返回一个基于当前配置的预定义系统计划列表。 -func (app *Application) getPredefinedSystemPlans() []models.Plan { +// initializePeriodicSystemHealthCheckPlan 负责初始化 "周期性系统健康检查" 计划。 +// 它会根据当前配置动态构建计划,并决定是创建新计划还是更新现有计划。 +func (app *Application) initializePeriodicSystemHealthCheckPlan(ctx context.Context, existingPlanMap map[string]*models.Plan) error { + appCtx, logger := logs.Trace(ctx, app.Ctx, "initializePeriodicSystemHealthCheckPlan") // 根据配置创建定时全量采集计划 interval := app.Config.Collection.Interval @@ -119,9 +93,9 @@ func (app *Application) getPredefinedSystemPlans() []models.Plan { interval = 1 // 确保间隔至少为1分钟 } cronExpression := fmt.Sprintf("*/%d * * * *", interval) - timedCollectionPlan := models.Plan{ - Name: PlanNameTimedFullDataCollection, - Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 分钟自动触发一次全量数据采集。", app.Config.Collection.Interval), + predefinedPlan := &models.Plan{ + Name: PlanNamePeriodicSystemHealthCheck, + Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 分钟自动触发一次全量数据采集, 并进行阈值校验告警。", app.Config.Collection.Interval), PlanType: models.PlanTypeSystem, ExecutionType: models.PlanExecutionTypeAutomatic, CronExpression: cronExpression, @@ -137,7 +111,89 @@ func (app *Application) getPredefinedSystemPlans() []models.Plan { }, } - return []models.Plan{timedCollectionPlan} + if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok { + // 如果计划存在,则进行无差别更新 + logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name) + + // 将数据库中已存在的计划的ID和运行时状态字段赋值给预定义计划 + predefinedPlan.ID = foundExistingPlan.ID + predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount + + // 1. 使用 UpdatePlanMetadataAndStructure 来更新计划的元数据和关联任务 + // 这会处理 Name, Description, ExecutionType, ExecuteNum, CronExpression, ContentType + // 并且最重要的是,它会正确处理 Tasks 的增删改,确保任务列表与 predefinedPlan.Tasks 完全同步 + if err := app.Infra.repos.planRepo.UpdatePlanMetadataAndStructure(appCtx, predefinedPlan); err != nil { + return fmt.Errorf("更新预定义计划 '%s' 的元数据和结构失败: %w", predefinedPlan.Name, err) + } + + // 2. 接着使用 UpdatePlan 来更新所有顶层字段,包括 PlanType 和 Status + // 由于任务已经在上一步正确同步,此步不会导致任务冗余 + if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil { + return fmt.Errorf("更新预定义计划 '%s' 的所有顶层字段失败: %w", predefinedPlan.Name, err) + } + + logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name) + } else { + // 如果计划不存在, 则创建 + logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name) + if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil { + return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err) + } else { + logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name) + } + } + return nil +} + +// initializeAlarmNotificationPlan 负责初始化 "告警通知发送" 计划。 +// 它确保系统中存在一个每分钟执行的、用于发送告警通知的预定义计划。 +func (app *Application) initializeAlarmNotificationPlan(ctx context.Context, existingPlanMap map[string]*models.Plan) error { + appCtx, logger := logs.Trace(ctx, app.Ctx, "initializeAlarmNotificationPlan") + + predefinedPlan := &models.Plan{ + Name: PlanNameAlarmNotification, + Description: "这是一个系统预定义的计划, 每分钟自动触发一次告警通知发送。", + PlanType: models.PlanTypeSystem, + ExecutionType: models.PlanExecutionTypeAutomatic, + CronExpression: "*/1 * * * *", // 每分钟执行一次 + Status: models.PlanStatusEnabled, + ContentType: models.PlanContentTypeTasks, + Tasks: []models.Task{ + { + Name: "告警通知发送", + Description: "发送所有待处理的告警通知", + ExecutionOrder: 1, + Type: models.TaskTypeAlarmNotification, + }, + }, + } + + if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok { + // 如果计划存在,则进行无差别更新 + logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name) + + predefinedPlan.ID = foundExistingPlan.ID + predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount + + if err := app.Infra.repos.planRepo.UpdatePlanMetadataAndStructure(appCtx, predefinedPlan); err != nil { + return fmt.Errorf("更新预定义计划 '%s' 的元数据和结构失败: %w", predefinedPlan.Name, err) + } + + if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil { + return fmt.Errorf("更新预定义计划 '%s' 的所有顶层字段失败: %w", predefinedPlan.Name, err) + } + + logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name) + } else { + // 如果计划不存在, 则创建 + logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name) + if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil { + return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err) + } else { + logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name) + } + } + return nil } // initializePendingCollections 在应用启动时处理所有未完成的采集请求。 diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index 2579f3f..7700d44 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -230,6 +230,16 @@ type AlarmNotificationConfig struct { NotificationIntervals NotificationIntervalsConfig `yaml:"notification_intervals"` } +// NewConfig 创建并返回一个新的配置实例 +func NewConfig() *Config { + // 默认值可以在这里设置,但我们优先使用配置文件中的值 + return &Config{ + Collection: CollectionConfig{ + Interval: 1, // 默认为1分钟 + }, + } +} + // Load 从指定路径加载配置文件 func (c *Config) Load(path string) error { // 读取配置文件