From 5050f76066fca1a0a988651e5b3d27f72664a669 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 29 Oct 2025 16:37:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=85=A8=E9=87=8F=E9=87=87?= =?UTF-8?q?=E9=9B=86=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/domain/task/full_collection_task.go | 93 ++++++++++++++++++++ internal/domain/task/task.go | 2 + internal/infra/models/plan.go | 1 + 3 files changed, 96 insertions(+) create mode 100644 internal/domain/task/full_collection_task.go diff --git a/internal/domain/task/full_collection_task.go b/internal/domain/task/full_collection_task.go new file mode 100644 index 0000000..8802600 --- /dev/null +++ b/internal/domain/task/full_collection_task.go @@ -0,0 +1,93 @@ +package task + +import ( + "fmt" + + "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" +) + +// FullCollectionTask 实现了 scheduler.Task 接口,用于执行一次全量的设备数据采集 +type FullCollectionTask struct { + log *models.TaskExecutionLog + deviceRepo repository.DeviceRepository + deviceService device.Service + logger *logs.Logger +} + +// NewFullCollectionTask 创建一个全量采集任务实例 +func NewFullCollectionTask( + log *models.TaskExecutionLog, + deviceRepo repository.DeviceRepository, + deviceService device.Service, + logger *logs.Logger, +) *FullCollectionTask { + return &FullCollectionTask{ + log: log, + deviceRepo: deviceRepo, + deviceService: deviceService, + logger: logger, + } +} + +// Execute 是任务的核心执行逻辑 +func (t *FullCollectionTask) Execute() error { + t.logger.Infow("开始执行全量采集任务", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) + + sensors, err := t.deviceRepo.ListAllSensors() + if err != nil { + return fmt.Errorf("全量采集任务: 从数据库获取所有传感器失败: %w", err) + } + + if len(sensors) == 0 { + t.logger.Infow("全量采集任务: 未发现任何传感器设备,跳过本次采集", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) + return nil + } + + sensorsByController := make(map[uint][]*models.Device) + for _, sensor := range sensors { + sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor) + } + + var firstError error + for controllerID, controllerSensors := range sensorsByController { + t.logger.Infow("全量采集任务: 准备为区域主控下的传感器下发采集指令", + "task_id", t.log.TaskID, + "task_type", t.log.Task.Type, + "log_id", t.log.ID, + "controller_id", controllerID, + "sensor_count", len(controllerSensors), + ) + if err := t.deviceService.Collect(controllerID, controllerSensors); err != nil { + t.logger.Errorw("全量采集任务: 为区域主控下发采集指令失败", + "task_id", t.log.TaskID, + "task_type", t.log.Task.Type, + "log_id", t.log.ID, + "controller_id", controllerID, + "error", err, + ) + if firstError == nil { + firstError = err // 保存第一个错误 + } + } + } + + if firstError != nil { + return fmt.Errorf("全量采集任务执行期间发生错误: %w", firstError) + } + + t.logger.Infow("全量采集任务执行完成", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) + return nil +} + +// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑 +func (t *FullCollectionTask) OnFailure(executeErr error) { + t.logger.Errorw("全量采集任务执行失败", + "task_id", t.log.TaskID, + "task_type", t.log.Task.Type, + "log_id", t.log.ID, + "error", executeErr, + ) +} diff --git a/internal/domain/task/task.go b/internal/domain/task/task.go index 6a5dba4..54d23e1 100644 --- a/internal/domain/task/task.go +++ b/internal/domain/task/task.go @@ -35,6 +35,8 @@ func (t *taskFactory) Production(claimedLog *models.TaskExecutionLog) scheduler. return NewDelayTask(t.logger, claimedLog) case models.TaskTypeReleaseFeedWeight: return NewReleaseFeedWeightTask(claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService, t.logger) + case models.TaskTypeFullCollection: + return NewFullCollectionTask(claimedLog, t.deviceRepo, t.deviceService, t.logger) default: // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 t.logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type) diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 33a6cb7..87bbca9 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -34,6 +34,7 @@ const ( TaskPlanAnalysis TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务 TaskTypeWaiting TaskType = "等待" // 等待任务 TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务 + TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务 ) // -- Task Parameters --