创建PlanNameAlarmNotification 计划

This commit is contained in:
2025-11-08 18:33:07 +08:00
parent 362ed5ad9d
commit 2ad1ae4f40
4 changed files with 119 additions and 47 deletions

View File

@@ -131,3 +131,4 @@
1. 定义告警表和告警历史表
2. 重构部分枚举, 让models包不依赖其他项目中的包
3. 创建仓库层对象(不包含方法)
4. 实现告警发送任务

View File

@@ -160,7 +160,12 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr
)
// 任务工厂
taskFactory := task.NewTaskFactory(logs.AddCompName(baseCtx, "TaskFactory"), infra.repos.sensorDataRepo, infra.repos.deviceRepo, generalDeviceService)
taskFactory := task.NewTaskFactory(logs.AddCompName(baseCtx, "TaskFactory"),
infra.repos.sensorDataRepo,
infra.repos.deviceRepo,
infra.repos.alarmRepo,
generalDeviceService,
infra.notifyService)
// 计划任务管理器
analysisPlanTaskManager := plan.NewAnalysisPlanTaskManager(logs.AddCompName(baseCtx, "AnalysisPlanTaskManager"), infra.repos.planRepo, infra.repos.pendingTaskRepo, infra.repos.executionLogRepo)

View File

@@ -10,8 +10,10 @@ import (
)
const (
// PlanNameTimedFullDataCollection 是定时全量数据采集计划的名称
PlanNameTimedFullDataCollection = "定时全量数据采集"
// PlanNamePeriodicSystemHealthCheck 是周期性系统健康检查计划的名称
PlanNamePeriodicSystemHealthCheck = "周期性系统健康检查"
// PlanNameAlarmNotification 是告警通知发送计划的名称
PlanNameAlarmNotification = "告警通知发送"
)
// initializeState 在应用启动时准备其初始数据状态。
@@ -48,13 +50,11 @@ func (app *Application) initializeState(ctx context.Context) error {
}
// initializeSystemPlans 确保预定义的系统计划在数据库中存在并保持最新。
// 它通过调用各个独立的计划初始化方法来完成此操作。
func (app *Application) initializeSystemPlans(ctx context.Context) error {
appCtx, logger := logs.Trace(ctx, app.Ctx, "InitializeSystemPlans")
logger.Info("开始检查并更新预定义的系统计划...")
// 动态构建预定义计划列表
predefinedSystemPlans := app.getPredefinedSystemPlans()
// 1. 获取所有已存在的系统计划
existingPlans, _, err := app.Infra.repos.planRepo.ListPlans(appCtx, repository.ListPlansOptions{
PlanType: repository.PlanTypeFilterSystem,
@@ -69,49 +69,23 @@ func (app *Application) initializeSystemPlans(ctx context.Context) error {
existingPlanMap[existingPlans[i].Name] = &existingPlans[i]
}
// 3. 遍历预定义的计划列表
for i := range predefinedSystemPlans {
predefinedPlan := &predefinedSystemPlans[i] // 获取可修改的指针
// 3. 调用独立的初始化方法来处理每个系统计划
if err := app.initializePeriodicSystemHealthCheckPlan(appCtx, existingPlanMap); err != nil {
return err // 如果任何一个计划初始化失败,则立即返回错误
}
if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok {
// 如果计划存在,则进行无差别更新
logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name)
// 将数据库中已存在的计划的ID和运行时状态字段赋值给预定义计划
predefinedPlan.ID = foundExistingPlan.ID
predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount
// 1. 使用 UpdatePlanMetadataAndStructure 来更新计划的元数据和关联任务
// 这会处理 Name, Description, ExecutionType, ExecuteNum, CronExpression, ContentType
// 并且最重要的是,它会正确处理 Tasks 的增删改,确保任务列表与 predefinedPlan.Tasks 完全同步
if err := app.Infra.repos.planRepo.UpdatePlanMetadataAndStructure(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的元数据和结构失败: %w", predefinedPlan.Name, err)
}
// 2. 接着使用 UpdatePlan 来更新所有顶层字段,包括 PlanType 和 Status
// 由于任务已经在上一步正确同步,此步不会导致任务冗余
if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的所有顶层字段失败: %w", predefinedPlan.Name, err)
}
logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name)
} else {
// 如果计划不存在, 则创建
logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name)
if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else {
logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name)
}
}
if err := app.initializeAlarmNotificationPlan(appCtx, existingPlanMap); err != nil {
return err
}
logger.Info("预定义系统计划检查完成。")
return nil
}
// getPredefinedSystemPlans 返回一个基于当前配置的预定义系统计划列表
func (app *Application) getPredefinedSystemPlans() []models.Plan {
// initializePeriodicSystemHealthCheckPlan 负责初始化 "周期性系统健康检查" 计划
// 它会根据当前配置动态构建计划,并决定是创建新计划还是更新现有计划。
func (app *Application) initializePeriodicSystemHealthCheckPlan(ctx context.Context, existingPlanMap map[string]*models.Plan) error {
appCtx, logger := logs.Trace(ctx, app.Ctx, "initializePeriodicSystemHealthCheckPlan")
// 根据配置创建定时全量采集计划
interval := app.Config.Collection.Interval
@@ -119,9 +93,9 @@ func (app *Application) getPredefinedSystemPlans() []models.Plan {
interval = 1 // 确保间隔至少为1分钟
}
cronExpression := fmt.Sprintf("*/%d * * * *", interval)
timedCollectionPlan := models.Plan{
Name: PlanNameTimedFullDataCollection,
Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 分钟自动触发一次全量数据采集。", app.Config.Collection.Interval),
predefinedPlan := &models.Plan{
Name: PlanNamePeriodicSystemHealthCheck,
Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 分钟自动触发一次全量数据采集, 并进行阈值校验告警。", app.Config.Collection.Interval),
PlanType: models.PlanTypeSystem,
ExecutionType: models.PlanExecutionTypeAutomatic,
CronExpression: cronExpression,
@@ -137,7 +111,89 @@ func (app *Application) getPredefinedSystemPlans() []models.Plan {
},
}
return []models.Plan{timedCollectionPlan}
if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok {
// 如果计划存在,则进行无差别更新
logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name)
// 将数据库中已存在的计划的ID和运行时状态字段赋值给预定义计划
predefinedPlan.ID = foundExistingPlan.ID
predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount
// 1. 使用 UpdatePlanMetadataAndStructure 来更新计划的元数据和关联任务
// 这会处理 Name, Description, ExecutionType, ExecuteNum, CronExpression, ContentType
// 并且最重要的是,它会正确处理 Tasks 的增删改,确保任务列表与 predefinedPlan.Tasks 完全同步
if err := app.Infra.repos.planRepo.UpdatePlanMetadataAndStructure(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的元数据和结构失败: %w", predefinedPlan.Name, err)
}
// 2. 接着使用 UpdatePlan 来更新所有顶层字段,包括 PlanType 和 Status
// 由于任务已经在上一步正确同步,此步不会导致任务冗余
if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的所有顶层字段失败: %w", predefinedPlan.Name, err)
}
logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name)
} else {
// 如果计划不存在, 则创建
logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name)
if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else {
logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name)
}
}
return nil
}
// initializeAlarmNotificationPlan 负责初始化 "告警通知发送" 计划。
// 它确保系统中存在一个每分钟执行的、用于发送告警通知的预定义计划。
func (app *Application) initializeAlarmNotificationPlan(ctx context.Context, existingPlanMap map[string]*models.Plan) error {
appCtx, logger := logs.Trace(ctx, app.Ctx, "initializeAlarmNotificationPlan")
predefinedPlan := &models.Plan{
Name: PlanNameAlarmNotification,
Description: "这是一个系统预定义的计划, 每分钟自动触发一次告警通知发送。",
PlanType: models.PlanTypeSystem,
ExecutionType: models.PlanExecutionTypeAutomatic,
CronExpression: "*/1 * * * *", // 每分钟执行一次
Status: models.PlanStatusEnabled,
ContentType: models.PlanContentTypeTasks,
Tasks: []models.Task{
{
Name: "告警通知发送",
Description: "发送所有待处理的告警通知",
ExecutionOrder: 1,
Type: models.TaskTypeAlarmNotification,
},
},
}
if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok {
// 如果计划存在,则进行无差别更新
logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name)
predefinedPlan.ID = foundExistingPlan.ID
predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount
if err := app.Infra.repos.planRepo.UpdatePlanMetadataAndStructure(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的元数据和结构失败: %w", predefinedPlan.Name, err)
}
if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的所有顶层字段失败: %w", predefinedPlan.Name, err)
}
logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name)
} else {
// 如果计划不存在, 则创建
logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name)
if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else {
logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name)
}
}
return nil
}
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。

View File

@@ -230,6 +230,16 @@ type AlarmNotificationConfig struct {
NotificationIntervals NotificationIntervalsConfig `yaml:"notification_intervals"`
}
// NewConfig 创建并返回一个新的配置实例
func NewConfig() *Config {
// 默认值可以在这里设置,但我们优先使用配置文件中的值
return &Config{
Collection: CollectionConfig{
Interval: 1, // 默认为1分钟
},
}
}
// Load 从指定路径加载配置文件
func (c *Config) Load(path string) error {
// 读取配置文件