diff --git a/internal/domain/task/alarm_notification_task.go b/internal/domain/task/alarm_notification_task.go index 14ec7d7..2b03a0d 100644 --- a/internal/domain/task/alarm_notification_task.go +++ b/internal/domain/task/alarm_notification_task.go @@ -6,9 +6,12 @@ import ( "sync" "time" + notify_domain "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) // AlarmNotificationTaskParams 定义了 AlarmNotificationTask 的参数结构 @@ -23,21 +26,22 @@ type AlarmNotificationTask struct { ctx context.Context taskLog *models.TaskExecutionLog - // notificationIntervals 告警通知的发送间隔时间,键为告警等级,值为 time.Duration - notificationIntervals map[models.SeverityLevel]time.Duration + // alarmNotificationTaskParams 是任务配置 + alarmNotificationTaskParams AlarmNotificationTaskParams onceParse sync.Once // 保证解析参数只执行一次 - // TODO: 根据实际需求添加告警通知相关的依赖,例如: - // notificationService notification.Service - // alarmRepository repository.AlarmRepository + notificationService notify_domain.Service + alarmRepository repository.AlarmRepository } // NewAlarmNotificationTask 创建一个新的告警通知发送任务实例 -func NewAlarmNotificationTask(ctx context.Context, taskLog *models.TaskExecutionLog) plan.Task { +func NewAlarmNotificationTask(ctx context.Context, taskLog *models.TaskExecutionLog, service notify_domain.Service, alarmRepository repository.AlarmRepository) plan.Task { return &AlarmNotificationTask{ - ctx: ctx, - taskLog: taskLog, + ctx: ctx, + taskLog: taskLog, + alarmRepository: alarmRepository, + notificationService: service, } } @@ -50,7 +54,47 @@ func (t *AlarmNotificationTask) Execute(ctx context.Context) error { return err } - // TODO: 实现告警通知发送逻辑,可以使用 t.notificationIntervals 来获取不同等级的发送间隔 + // 获取是否有待发送告警通知, 用于优化性能 + alarmsCount, err := t.alarmRepository.CountAlarmsForNotification(taskCtx, t.alarmNotificationTaskParams.NotificationIntervals) + if err != nil { + logger.Errorf("任务 %v: 获取告警数量失败: %v", t.taskLog.TaskID, err) + return err + } + if alarmsCount == 0 { + logger.Debugf("没有待发送的告警通知, 跳过任务, 任务ID: %d", t.taskLog.TaskID) + return nil + } + + // 获取所有待发送的告警通知 + alarms, err := t.alarmRepository.ListAlarmsForNotification(taskCtx, t.alarmNotificationTaskParams.NotificationIntervals) + if err != nil { + logger.Errorf("任务 %v: 获取告警列表失败: %v", t.taskLog.TaskID, err) + return err + } + + // 发送通知 + for _, alarm := range alarms { + // TODO 因为还没做权限管理, 所以暂时通过广播形式发给所有用户 + err = t.notificationService.BroadcastAlarm(taskCtx, notify.AlarmContent{ + Title: alarm.AlarmSummary, + Message: alarm.AlarmDetails, + Level: alarm.Level, + Timestamp: time.Now(), + }) + + if err != nil { + // 非致命错误 + logger.Errorf("任务 %v: 发送告警通知失败: %v", t.taskLog.TaskID, err) + continue + } + + // 能发送通知的告警要么是忽略期已过且到达触发时间, 要么是不忽略且到达触发时间, 二者都应该取消忽略并刷新最后一次发送时间 + err = t.alarmRepository.UpdateAlarmNotificationStatus(taskCtx, alarm.ID, time.Now(), false, nil) + if err != nil { + // 非致命错误, 没有必要因为更新失败影响后续消息发送 + logger.Errorf("任务 %v: 更新告警通知状态失败: %v", t.taskLog.TaskID, err) + } + } logger.Infof("告警通知发送任务执行完成, 任务ID: %d", t.taskLog.TaskID) return nil @@ -60,8 +104,6 @@ func (t *AlarmNotificationTask) Execute(ctx context.Context) error { func (t *AlarmNotificationTask) OnFailure(ctx context.Context, executeErr error) { logger := logs.TraceLogger(ctx, t.ctx, "OnFailure") logger.Errorf("告警通知发送任务执行失败, 任务ID: %d, 错误: %v", t.taskLog.TaskID, executeErr) - - // TODO: 实现告警通知发送失败时的回滚或清理逻辑 } // ResolveDeviceIDs 从任务配置中解析并返回所有关联的设备ID列表 @@ -89,13 +131,7 @@ func (t *AlarmNotificationTask) parseParameters(ctx context.Context) error { return } - // 初始化 notificationIntervals - t.notificationIntervals = make(map[models.SeverityLevel]time.Duration) - - // 将 uint 类型的秒数转换为 time.Duration - for level, seconds := range params.NotificationIntervals { - t.notificationIntervals[level] = time.Duration(seconds) * time.Minute - } + t.alarmNotificationTaskParams = params }) return err diff --git a/internal/domain/task/task.go b/internal/domain/task/task.go index a66570e..7ee9f09 100644 --- a/internal/domain/task/task.go +++ b/internal/domain/task/task.go @@ -5,6 +5,7 @@ import ( "fmt" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" @@ -19,23 +20,31 @@ const ( ) type taskFactory struct { - ctx context.Context + ctx context.Context + sensorDataRepo repository.SensorDataRepository deviceRepo repository.DeviceRepository - deviceService device.Service + alarmRepo repository.AlarmRepository + + deviceService device.Service + notificationService notify.Service } func NewTaskFactory( ctx context.Context, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, + alarmRepo repository.AlarmRepository, deviceService device.Service, + notifyService notify.Service, ) plan.TaskFactory { return &taskFactory{ - ctx: ctx, - sensorDataRepo: sensorDataRepo, - deviceRepo: deviceRepo, - deviceService: deviceService, + ctx: ctx, + sensorDataRepo: sensorDataRepo, + deviceRepo: deviceRepo, + alarmRepo: alarmRepo, + deviceService: deviceService, + notificationService: notifyService, } } @@ -50,7 +59,7 @@ func (t *taskFactory) Production(ctx context.Context, claimedLog *models.TaskExe case models.TaskTypeFullCollection: return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), claimedLog, t.deviceRepo, t.deviceService) case models.TaskTypeAlarmNotification: - return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), claimedLog) + return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), claimedLog, t.notificationService, t.alarmRepo) default: // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type) @@ -79,7 +88,7 @@ func (t *taskFactory) CreateTaskFromModel(ctx context.Context, taskModel *models case models.TaskTypeFullCollection: return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), tempLog, t.deviceRepo, t.deviceService), nil case models.TaskTypeAlarmNotification: - return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), tempLog), nil + return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), tempLog, t.notificationService, t.alarmRepo), nil default: return nil, fmt.Errorf("不支持为类型 '%s' 的任务创建模型实例", taskModel.Type) } diff --git a/internal/infra/repository/alarm_repository.go b/internal/infra/repository/alarm_repository.go index 35b6a99..8408ab5 100644 --- a/internal/infra/repository/alarm_repository.go +++ b/internal/infra/repository/alarm_repository.go @@ -29,14 +29,20 @@ type AlarmRepository interface { // 返回活跃告警列表、总记录数和错误。 ListActiveAlarms(ctx context.Context, opts ActiveAlarmListOptions, page, pageSize int) ([]models.ActiveAlarm, int64, error) + // UpdateAlarmNotificationStatus 显式更新告警的通知相关状态字段。 + // lastNotifiedAt: 传入具体的发送时间。 + // isIgnored: 告警新的忽略状态。 + // ignoredUntil: 告警新的忽略截止时间 (nil 表示没有忽略截止时间/已取消忽略)。 + UpdateAlarmNotificationStatus(ctx context.Context, alarmID uint, lastNotifiedAt time.Time, isIgnored bool, ignoredUntil *time.Time) error + // <-- 下列两个方法是为了性能做出的架构妥协, 业务逻辑入侵仓库层带来的收益远大于通过业务层进行数据筛选 --> // ListAlarmsForNotification 查询满足发送告警消息条件的活跃告警列表。 // 返回活跃告警列表和错误。 // intervalByLevel: key=SeverityLevel, value=interval_in_minutes - ListAlarmsForNotification(ctx context.Context, intervalByLevel map[models.SeverityLevel]int) ([]models.ActiveAlarm, error) + ListAlarmsForNotification(ctx context.Context, intervalByLevel map[models.SeverityLevel]uint) ([]models.ActiveAlarm, error) // 查询满足发送告警消息条件的记录总数 - CountAlarmsForNotification(ctx context.Context, intervalByLevel map[models.SeverityLevel]int) (int64, error) + CountAlarmsForNotification(ctx context.Context, intervalByLevel map[models.SeverityLevel]uint) (int64, error) } // gormAlarmRepository 是 AlarmRepository 的 GORM 实现。 @@ -105,8 +111,31 @@ func (r *gormAlarmRepository) ListActiveAlarms(ctx context.Context, opts ActiveA return results, total, err } +func (r *gormAlarmRepository) UpdateAlarmNotificationStatus(ctx context.Context, alarmID uint, lastNotifiedAt time.Time, isIgnored bool, ignoredUntil *time.Time) error { + repoCtx := logs.AddFuncName(ctx, r.ctx, "UpdateAlarmNotificationStatus") + + // 1. 内部安全地构造 map,将强类型参数转换为 GORM 需要的格式 + // GORM 的 Updates 方法会正确处理 *time.Time (nil -> DB NULL) + updates := map[string]interface{}{ + "last_notified_at": lastNotifiedAt, // time.Time 会被 GORM 视为非空时间 + "is_ignored": isIgnored, + "ignored_until": ignoredUntil, // *time.Time (nil) 会被 GORM 写入 NULL + } + + // 2. 执行更新 + result := r.db.WithContext(repoCtx). + Model(&models.ActiveAlarm{}). + Where("id = ?", alarmID). + Updates(updates) // 仅更新 updates map 中指定的三个字段 + + if result.Error != nil { + return result.Error + } + return nil +} + // CountAlarmsForNotification 查询满足发送告警消息条件的记录总数 -func (r *gormAlarmRepository) CountAlarmsForNotification(ctx context.Context, intervalByLevel map[models.SeverityLevel]int) (int64, error) { +func (r *gormAlarmRepository) CountAlarmsForNotification(ctx context.Context, intervalByLevel map[models.SeverityLevel]uint) (int64, error) { repoCtx := logs.AddFuncName(ctx, r.ctx, "CountAlarmsForNotification") var total int64 @@ -127,7 +156,7 @@ func (r *gormAlarmRepository) CountAlarmsForNotification(ctx context.Context, in } // ListAlarmsForNotification 查询满足发送告警消息条件的活跃告警列表 -func (r *gormAlarmRepository) ListAlarmsForNotification(ctx context.Context, intervalByLevel map[models.SeverityLevel]int) ([]models.ActiveAlarm, error) { +func (r *gormAlarmRepository) ListAlarmsForNotification(ctx context.Context, intervalByLevel map[models.SeverityLevel]uint) ([]models.ActiveAlarm, error) { repoCtx := logs.AddFuncName(ctx, r.ctx, "ListAlarmsForNotification") var results []models.ActiveAlarm @@ -148,7 +177,7 @@ func (r *gormAlarmRepository) ListAlarmsForNotification(ctx context.Context, int } // buildNotificationBaseQuery 负责组合 Group A 和 Group B 的逻辑 -func (r *gormAlarmRepository) buildNotificationBaseQuery(tx *gorm.DB, intervalByLevel map[models.SeverityLevel]int) *gorm.DB { +func (r *gormAlarmRepository) buildNotificationBaseQuery(tx *gorm.DB, intervalByLevel map[models.SeverityLevel]uint) *gorm.DB { // 1. 获取所有配置的 Level 列表 configuredLevels := make([]models.SeverityLevel, 0, len(intervalByLevel)) @@ -208,7 +237,7 @@ func (r *gormAlarmRepository) buildGroupAClause(tx *gorm.DB, configuredLevels [] // buildGroupBClause 构造 Group B 的 WHERE 语句和参数列表。 // 针对 Level 存在配置的告警,使用“间隔发送”逻辑。 -func (r *gormAlarmRepository) buildGroupBClause(tx *gorm.DB, intervalByLevel map[models.SeverityLevel]int, configuredLevels []models.SeverityLevel) *gorm.DB { +func (r *gormAlarmRepository) buildGroupBClause(tx *gorm.DB, intervalByLevel map[models.SeverityLevel]uint, configuredLevels []models.SeverityLevel) *gorm.DB { now := time.Now() // B.1. 构造 Level IN 子句