package task import ( "encoding/json" "fmt" "time" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) // ReleaseFeedWeightTaskParams 定义了 ReleaseFeedWeightTask 的参数结构 type ReleaseFeedWeightTaskParams struct { ReleaseWeight float64 `json:"release_weight"` // 需要释放的重量 FeedPortDeviceID uint `json:"feed_port_device_id"` // 下料口ID MixingTankDeviceID uint `json:"mixing_tank_device_id"` // 称重传感器ID } // ReleaseFeedWeightTask 是一个控制下料口释放指定重量的任务 type ReleaseFeedWeightTask struct { deviceRepo repository.DeviceRepository sensorDataRepo repository.SensorDataRepository claimedLog *models.TaskExecutionLog feedPortDevice *models.Device releaseWeight float64 mixingTankDeviceID uint feedPort device.Service logger *logs.Logger } // NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例 func NewReleaseFeedWeightTask( claimedLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, deviceService device.Service, logger *logs.Logger, ) Task { return &ReleaseFeedWeightTask{ claimedLog: claimedLog, deviceRepo: deviceRepo, sensorDataRepo: sensorDataRepo, feedPort: deviceService, // 直接注入 logger: logger, } } func (r *ReleaseFeedWeightTask) Execute() error { r.logger.Infof("任务 %v: 开始执行, 日志ID: %v", r.claimedLog.TaskID, r.claimedLog.ID) if err := r.parseParameters(); err != nil { return err } weight, err := r.getNowWeight() if err != nil { return err } if err = r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStart); err != nil { r.logger.Errorf("启动下料口(id=%v)失败: %v , 日志ID: %v", r.feedPortDevice.ID, err, r.claimedLog.ID) return err } targetWeight := weight - r.releaseWeight errCount := 1 // TODO 这个判断有延迟, 尤其是LoRa通信本身延迟较高, 可以考虑根据信号质量或其他指标提前发送停止命令 for targetWeight <= weight { weight, err = r.getNowWeight() if err != nil { errCount++ if errCount > 3 { // 如果连续三次没成功采集到重量数据,则认为计划执行失败 r.logger.Errorf("获取当前计划执行日志(id=%v)的当前搅拌罐重量失败: %v, 任务结束", r.claimedLog.ID, err) return err } r.logger.Warnf("第%v次尝试获取当前计划执行日志(id=%v)的当前搅拌罐重量失败: %v", errCount, r.claimedLog.ID, err) continue } time.Sleep(100 * time.Millisecond) } if err = r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop); err != nil { r.logger.Errorf("关闭下料口(id=%v)失败: %v , 日志ID: %v", r.feedPortDevice.ID, err, r.claimedLog.ID) return err } r.logger.Infof("完成计划执行日志(id=%v)的当前计划, 完成下料 %vkg, 搅拌罐剩余重量 %vkg", r.claimedLog.ID, r.releaseWeight, weight) return nil } // 获取当前搅拌罐重量 func (r *ReleaseFeedWeightTask) getNowWeight() (float64, error) { sensorData, err := r.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(r.mixingTankDeviceID, models.SensorTypeWeight) if err != nil { r.logger.Errorf("获取设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID) return 0, err } if sensorData == nil { return 0, fmt.Errorf("未找到设备 %v 的最新重量传感器数据", r.mixingTankDeviceID) } wg := &models.WeightData{} err = json.Unmarshal(sensorData.Data, wg) if err != nil { r.logger.Errorf("反序列化设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID) return 0, err } return wg.WeightKilograms, nil } func (r *ReleaseFeedWeightTask) parseParameters() error { if r.claimedLog.Task.Parameters == nil { r.logger.Errorf("任务 %v: 缺少参数", r.claimedLog.TaskID) return fmt.Errorf("任务 %v: 参数不全", r.claimedLog.TaskID) } var params ReleaseFeedWeightTaskParams err := r.claimedLog.Task.ParseParameters(¶ms) if err != nil { r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) return fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) } // 校验参数是否存在 if params.ReleaseWeight == 0 { r.logger.Errorf("任务 %v: 参数 release_weight 缺失或无效", r.claimedLog.TaskID) return fmt.Errorf("任务 %v: 参数 release_weight 缺失或无效", r.claimedLog.TaskID) } if params.FeedPortDeviceID == 0 { r.logger.Errorf("任务 %v: 参数 feed_port_device_id 缺失或无效", r.claimedLog.TaskID) return fmt.Errorf("任务 %v: 参数 feed_port_device_id 缺失或无效", r.claimedLog.TaskID) } if params.MixingTankDeviceID == 0 { r.logger.Errorf("任务 %v: 参数 mixing_tank_device_id 缺失或无效", r.claimedLog.TaskID) return fmt.Errorf("任务 %v: 参数 mixing_tank_device_id 缺失或无效", r.claimedLog.TaskID) } r.releaseWeight = params.ReleaseWeight r.mixingTankDeviceID = params.MixingTankDeviceID r.feedPortDevice, err = r.deviceRepo.FindByID(params.FeedPortDeviceID) if err != nil { r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) return fmt.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) } return nil } func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) { r.logger.Errorf("开始善后处理, 日志ID:%v", r.claimedLog.ID) if r.feedPort != nil { err := r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop) if err != nil { r.logger.Errorf("[严重] 下料口停止失败, 日志ID: %v, 错误: %v", r.claimedLog.ID, err) } } else { r.logger.Warnf("[警告] 下料口通信器尚未初始化, 不进行任何操作, 日志ID: %v", r.claimedLog.ID) } r.logger.Errorf("善后处理完成, 日志ID:%v", r.claimedLog.ID) }