package task import ( "context" "fmt" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) // FullCollectionTask 实现了 plan.Task 接口,用于执行一次全量的设备数据采集 type FullCollectionTask struct { ctx context.Context log *models.TaskExecutionLog deviceRepo repository.DeviceRepository deviceService device.Service } // NewFullCollectionTask 创建一个全量采集任务实例 func NewFullCollectionTask( ctx context.Context, log *models.TaskExecutionLog, deviceRepo repository.DeviceRepository, deviceService device.Service, ) plan.Task { return &FullCollectionTask{ ctx: ctx, log: log, deviceRepo: deviceRepo, deviceService: deviceService, } } // Execute 是任务的核心执行逻辑 func (t *FullCollectionTask) Execute(ctx context.Context) error { taskCtx, logger := logs.Trace(ctx, t.ctx, "Execute") logger.Infow("开始执行全量采集任务", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) sensors, err := t.deviceRepo.ListAllSensors(taskCtx) if err != nil { return fmt.Errorf("全量采集任务: 从数据库获取所有传感器失败: %w", err) } if len(sensors) == 0 { logger.Infow("全量采集任务: 未发现任何传感器设备,跳过本次采集", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) return nil } sensorsByController := make(map[uint][]*models.Device) for _, sensor := range sensors { sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor) } var firstError error for controllerID, controllerSensors := range sensorsByController { logger.Infow("全量采集任务: 准备为区域主控下的传感器下发采集指令", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID, "controller_id", controllerID, "sensor_count", len(controllerSensors), ) if err := t.deviceService.Collect(taskCtx, controllerID, controllerSensors); err != nil { logger.Errorw("全量采集任务: 为区域主控下发采集指令失败", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID, "controller_id", controllerID, "error", err, ) if firstError == nil { firstError = err // 保存第一个错误 } } } if firstError != nil { return fmt.Errorf("全量采集任务执行期间发生错误: %w", firstError) } logger.Infow("全量采集任务执行完成", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) return nil } // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑 func (t *FullCollectionTask) OnFailure(ctx context.Context, executeErr error) { logger := logs.TraceLogger(ctx, t.ctx, "OnFailure") logger.Errorw("全量采集任务执行失败", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID, "error", executeErr, ) } // ResolveDeviceIDs 获取当前任务需要使用的设备ID列表 func (t *FullCollectionTask) ResolveDeviceIDs(ctx context.Context) ([]uint, error) { // 全量采集任务不和任何设备绑定, 每轮采集都会重新获取全量传感器 return []uint{}, nil }