package task import ( "context" "fmt" "sync" "git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm" "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) // AreaThresholdCheckParams 定义了区域阈值检查任务的参数 type AreaThresholdCheckParams struct { AreaControllerID uint32 `json:"area_controller_id"` // 区域主控ID SensorType models.SensorType `json:"sensor_type"` // 传感器类型 Thresholds float32 `json:"thresholds"` // 阈值 Operator models.Operator `json:"operator"` // 操作符 Level models.SeverityLevel `json:"level"` // 告警级别 ExcludeDeviceIDs []uint32 `json:"exclude_device_ids"` // 排除的传感器ID } // AreaThresholdCheckTask 是一个任务,用于检查区域阈值并触发告警, 区域主控下的所有没有独立校验任务的设备都会受到此任务的检查 type AreaThresholdCheckTask struct { ctx context.Context onceParse sync.Once taskLog *models.TaskExecutionLog params AreaThresholdCheckParams sensorDataRepo repository.SensorDataRepository deviceRepo repository.DeviceRepository alarmService alarm.AlarmService } func NewAreaThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, alarmService alarm.AlarmService) plan.Task { return &AreaThresholdCheckTask{ ctx: ctx, taskLog: taskLog, sensorDataRepo: sensorDataRepo, deviceRepo: deviceRepo, alarmService: alarmService, } } // Execute 执行区域阈值检查任务 func (a *AreaThresholdCheckTask) Execute(ctx context.Context) error { taskCtx, logger := logs.Trace(ctx, a.ctx, "Execute") err := a.parseParameters(taskCtx) if err != nil { logger.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err) return err } // 1. 查询区域主控下所有设备 devices, err := a.deviceRepo.ListByAreaControllerID(taskCtx, a.params.AreaControllerID) if err != nil { logger.Errorf("任务 %v: 查询区域主控 %d 下设备失败: %v", a.taskLog.TaskID, a.params.AreaControllerID, err) return fmt.Errorf("查询区域主控 %d 下设备失败: %w", a.params.AreaControllerID, err) } // 构建忽略设备ID的map,方便快速查找 ignoredMap := make(map[uint32]struct{}) for _, id := range a.params.ExcludeDeviceIDs { ignoredMap[id] = struct{}{} } // 2. 遍历设备,排除忽略列表里的设备,并执行阈值检查 for _, device := range devices { if _, ignored := ignoredMap[device.ID]; ignored { logger.Debugf("任务 %v: 设备 %d 在忽略列表中,跳过检查。", a.taskLog.TaskID, device.ID) continue } task := a.taskLog.Task err = task.SaveParameters(DeviceThresholdCheckParams{ DeviceID: device.ID, SensorType: a.params.SensorType, Thresholds: a.params.Thresholds, Level: a.params.Level, Operator: a.params.Operator, }) if err != nil { logger.Errorf("任务 %v: 保存参数失败: %v", a.taskLog.TaskID, err) continue } // 创建一个临时的 DeviceThresholdCheckTask 实例来复用其核心逻辑 deviceCheckTask := NewDeviceThresholdCheckTask( taskCtx, &models.TaskExecutionLog{ // 为每个设备创建一个模拟的 TaskExecutionLog TaskID: a.taskLog.TaskID, Task: task, }, a.sensorDataRepo, a.alarmService, ).(*DeviceThresholdCheckTask) // 类型断言,以便访问内部参数 // 执行单设备的阈值检查 if err := deviceCheckTask.Execute(taskCtx); err != nil { logger.Errorf("任务 %v: 设备 %d 阈值检查失败: %v", a.taskLog.TaskID, device.ID, err) continue } } return nil } func (a *AreaThresholdCheckTask) OnFailure(ctx context.Context, executeErr error) { logger := logs.TraceLogger(ctx, a.ctx, "OnFailure") logger.Errorf("区域阈值检测任务执行失败, 任务ID: %v: 执行失败: %v", a.taskLog.TaskID, executeErr) } func (a *AreaThresholdCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint32, error) { taskCtx := logs.AddFuncName(ctx, a.ctx, "ResolveDeviceIDs") if err := a.parseParameters(taskCtx); err != nil { return nil, err } // 排除列表也意味着关联 return a.params.ExcludeDeviceIDs, nil } // parseParameters 解析任务参数 func (a *AreaThresholdCheckTask) parseParameters(ctx context.Context) error { logger := logs.TraceLogger(ctx, a.ctx, "parseParameters") var err error a.onceParse.Do(func() { if a.taskLog.Task.Parameters == nil { logger.Errorf("任务 %v: 缺少参数", a.taskLog.TaskID) err = fmt.Errorf("任务 %v: 参数不全", a.taskLog.TaskID) return } var params AreaThresholdCheckParams err = a.taskLog.Task.ParseParameters(¶ms) if err != nil { logger.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err) err = fmt.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err) return } if params.SensorType == "" { err = fmt.Errorf("任务 %v: 未配置传感器类型", a.taskLog.TaskID) } if params.Operator == "" { err = fmt.Errorf("任务 %v: 缺少操作符", a.taskLog.TaskID) } if params.Thresholds == 0 { err = fmt.Errorf("任务 %v: 未配置阈值", a.taskLog.TaskID) } if params.AreaControllerID == 0 { err = fmt.Errorf("任务 %v: 未配置区域主控ID", a.taskLog.TaskID) } if params.Level == "" { params.Level = models.WarnLevel } if params.ExcludeDeviceIDs == nil { params.ExcludeDeviceIDs = []uint32{} } a.params = params }) return err }