package task import ( "context" "fmt" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm" "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) type DeviceThresholdCheckParams struct { DeviceID uint32 `json:"device_id"` // 设备ID SensorType models.SensorType `json:"sensor_type"` // 传感器类型 Thresholds float32 `json:"thresholds"` // 阈值 Operator models.Operator `json:"operator"` // 操作符 Level models.SeverityLevel `json:"level"` // 告警等级 } // DeviceThresholdCheckTask 是一个任务,用于检查设备传感器数据是否满足阈值条件。 type DeviceThresholdCheckTask struct { ctx context.Context onceParse sync.Once taskLog *models.TaskExecutionLog params DeviceThresholdCheckParams sensorDataRepo repository.SensorDataRepository alarmService alarm.AlarmService } func NewDeviceThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, alarmService alarm.AlarmService) plan.Task { return &DeviceThresholdCheckTask{ ctx: ctx, taskLog: taskLog, sensorDataRepo: sensorDataRepo, alarmService: alarmService, } } func (d *DeviceThresholdCheckTask) Execute(ctx context.Context) error { taskCtx, logger := logs.Trace(ctx, d.ctx, "Execute") err := d.parseParameters(taskCtx) if err != nil { return err } sensorData, err := d.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(taskCtx, d.params.DeviceID, d.params.SensorType) if err != nil { logger.Errorf("任务 %v: 获取最新传感器数据失败: %v", d.taskLog.TaskID, err) return fmt.Errorf("任务 %v: 获取最新传感器数据失败: %v", d.taskLog.TaskID, err) } var currentValue float32 var alarmCode models.AlarmCode switch d.params.SensorType { case models.SensorTypeTemperature: var data models.TemperatureData if err := sensorData.ParseData(&data); err != nil { return fmt.Errorf("任务 %v: 解析温度数据失败: %v", d.taskLog.TaskID, err) } currentValue = data.TemperatureCelsius alarmCode = models.AlarmCodeTemperature case models.SensorTypeHumidity: var data models.HumidityData if err := sensorData.ParseData(&data); err != nil { return fmt.Errorf("任务 %v: 解析湿度数据失败: %v", d.taskLog.TaskID, err) } currentValue = data.HumidityPercent alarmCode = models.AlarmCodeHumidity case models.SensorTypeWeight: var data models.WeightData if err := sensorData.ParseData(&data); err != nil { return fmt.Errorf("任务 %v: 解析重量数据失败: %v", d.taskLog.TaskID, err) } currentValue = data.WeightKilograms alarmCode = models.AlarmCodeWeight default: return fmt.Errorf("任务 %v: 不支持的传感器类型: %v", d.taskLog.TaskID, d.params.SensorType) } // 阈值检查未通过 isExceeded := !d.shouldTriggerAlarm(currentValue, d.params.Operator, d.params.Thresholds) if isExceeded { // 状态一:检查未通过,确保告警开启 summary := fmt.Sprintf("设备 %d(%s) 触发阈值告警条件 (%s %.2f)", d.params.DeviceID, d.params.SensorType, d.params.Operator, d.params.Thresholds) details := fmt.Sprintf("当前检测值: %.2f", currentValue) logger.Infof("任务 %v: %s。%s", d.taskLog.TaskID, summary, details) newAlarm := &models.ActiveAlarm{ SourceType: models.AlarmSourceTypeDevice, SourceID: d.params.DeviceID, AlarmCode: alarmCode, AlarmSummary: summary, AlarmDetails: details, Level: d.params.Level, TriggerTime: time.Now(), } if err := d.alarmService.CreateAlarmIfNotExists(taskCtx, newAlarm); err != nil { logger.Errorf("任务 %v: 创建告警失败: %v", d.taskLog.TaskID, err) // 根据策略决定是否需要返回错误,这里选择不中断任务执行 } } else { // 状态二:检查已通过,确保告警关闭 resolveMethod := "系统自动解决:阈值恢复正常" logger.Infof("任务 %v: 设备 %d 的 %s 已恢复正常,正在尝试关闭告警。", d.taskLog.TaskID, d.params.DeviceID, d.params.SensorType) if err := d.alarmService.CloseAlarm(taskCtx, models.AlarmSourceTypeDevice, d.params.DeviceID, alarmCode, resolveMethod, nil); err != nil { logger.Errorf("任务 %v: 关闭告警失败: %v", d.taskLog.TaskID, err) // 根据策略决定是否需要返回错误,这里选择不中断任务执行 } } return nil } // shouldTriggerAlarm 判断当前值是否触发告警条件 func (d *DeviceThresholdCheckTask) shouldTriggerAlarm(currentValue float32, operator models.Operator, threshold float32) bool { switch operator { case models.OperatorLessThan: return currentValue >= threshold case models.OperatorLessThanOrEqualTo: return currentValue > threshold case models.OperatorGreaterThan: return currentValue <= threshold case models.OperatorGreaterThanOrEqualTo: return currentValue < threshold case models.OperatorEqualTo: return currentValue != threshold case models.OperatorNotEqualTo: return currentValue == threshold default: return false // 默认不触发告警 } } // parseParameters 解析任务参数 func (d *DeviceThresholdCheckTask) parseParameters(ctx context.Context) error { logger := logs.TraceLogger(ctx, d.ctx, "parseParameters") var err error d.onceParse.Do(func() { if d.taskLog.Task.Parameters == nil { logger.Errorf("任务 %v: 缺少参数", d.taskLog.TaskID) err = fmt.Errorf("任务 %v: 参数不全", d.taskLog.TaskID) return } var params DeviceThresholdCheckParams err = d.taskLog.Task.ParseParameters(¶ms) if err != nil { logger.Errorf("任务 %v: 解析参数失败: %v", d.taskLog.TaskID, err) err = fmt.Errorf("任务 %v: 解析参数失败: %v", d.taskLog.TaskID, err) return } if params.SensorType == "" { err = fmt.Errorf("任务 %v: 未配置传感器类型", d.taskLog.TaskID) } if params.Operator == "" { err = fmt.Errorf("任务 %v: 缺少操作符", d.taskLog.TaskID) } if params.Thresholds == 0 { err = fmt.Errorf("任务 %v: 未配置阈值", d.taskLog.TaskID) } if params.DeviceID == 0 { err = fmt.Errorf("任务 %v: 未配置设备ID", d.taskLog.TaskID) } if params.Level == "" { params.Level = models.WarnLevel } d.params = params }) return err } func (d *DeviceThresholdCheckTask) OnFailure(ctx context.Context, executeErr error) { logger := logs.TraceLogger(ctx, d.ctx, "OnFailure") logger.Errorf("设备阈值检测任务执行失败, 任务ID: %v: 执行失败: %v", d.taskLog.TaskID, executeErr) } func (d *DeviceThresholdCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint32, error) { taskCtx := logs.AddFuncName(ctx, d.ctx, "ResolveDeviceIDs") if err := d.parseParameters(taskCtx); err != nil { return nil, err } return []uint32{d.params.DeviceID}, nil }