package task import ( "fmt" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) // FullCollectionTask 实现了 scheduler.Task 接口,用于执行一次全量的设备数据采集 type FullCollectionTask struct { log *models.TaskExecutionLog deviceRepo repository.DeviceRepository deviceService device.Service logger *logs.Logger } // NewFullCollectionTask 创建一个全量采集任务实例 func NewFullCollectionTask( log *models.TaskExecutionLog, deviceRepo repository.DeviceRepository, deviceService device.Service, logger *logs.Logger, ) *FullCollectionTask { return &FullCollectionTask{ log: log, deviceRepo: deviceRepo, deviceService: deviceService, logger: logger, } } // Execute 是任务的核心执行逻辑 func (t *FullCollectionTask) Execute() error { t.logger.Infow("开始执行全量采集任务", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) sensors, err := t.deviceRepo.ListAllSensors() if err != nil { return fmt.Errorf("全量采集任务: 从数据库获取所有传感器失败: %w", err) } if len(sensors) == 0 { t.logger.Infow("全量采集任务: 未发现任何传感器设备,跳过本次采集", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) return nil } sensorsByController := make(map[uint][]*models.Device) for _, sensor := range sensors { sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor) } var firstError error for controllerID, controllerSensors := range sensorsByController { t.logger.Infow("全量采集任务: 准备为区域主控下的传感器下发采集指令", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID, "controller_id", controllerID, "sensor_count", len(controllerSensors), ) if err := t.deviceService.Collect(controllerID, controllerSensors); err != nil { t.logger.Errorw("全量采集任务: 为区域主控下发采集指令失败", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID, "controller_id", controllerID, "error", err, ) if firstError == nil { firstError = err // 保存第一个错误 } } } if firstError != nil { return fmt.Errorf("全量采集任务执行期间发生错误: %w", firstError) } t.logger.Infow("全量采集任务执行完成", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) return nil } // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑 func (t *FullCollectionTask) OnFailure(executeErr error) { t.logger.Errorw("全量采集任务执行失败", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID, "error", executeErr, ) }