From 50aac8d7e550894f37212303abf1c10bdad80fca Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Thu, 25 Sep 2025 11:17:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0ReleaseFeedWeightTask?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/device/general_device_service.go | 2 +- internal/app/service/task/delay_task.go | 1 + .../service/task/release_feed_weight_task.go | 137 ++++++++++++++++-- internal/app/service/task/scheduler.go | 2 +- internal/infra/models/plan.go | 1 + internal/infra/models/sensor_data.go | 4 - .../repository/sensor_data_repository.go | 13 ++ 7 files changed, 142 insertions(+), 18 deletions(-) diff --git a/internal/app/service/device/general_device_service.go b/internal/app/service/device/general_device_service.go index 6e09a8e..473ff03 100644 --- a/internal/app/service/device/general_device_service.go +++ b/internal/app/service/device/general_device_service.go @@ -29,7 +29,7 @@ func NewGeneralDeviceService(deviceRepo repository.DeviceRepository, logger *log } } -func (g *GeneralDeviceService) Switch(device models.Device, action DeviceAction) error { +func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction) error { // 校验设备参数及生成指令 if *device.ParentID == 0 { diff --git a/internal/app/service/task/delay_task.go b/internal/app/service/task/delay_task.go index eae2e00..ee62e64 100644 --- a/internal/app/service/task/delay_task.go +++ b/internal/app/service/task/delay_task.go @@ -34,6 +34,7 @@ func (d *DelayTask) Execute() error { return fmt.Errorf("任务 %v: 参数不全", d.executionTask.TaskID) } + // TODO 定义成结构体放model包中 var params map[string]interface{} if err := json.Unmarshal(d.executionTask.Task.Parameters, ¶ms); err != nil { d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) diff --git a/internal/app/service/task/release_feed_weight_task.go b/internal/app/service/task/release_feed_weight_task.go index 02350e6..868dfc0 100644 --- a/internal/app/service/task/release_feed_weight_task.go +++ b/internal/app/service/task/release_feed_weight_task.go @@ -1,36 +1,149 @@ package task import ( + "encoding/json" + "fmt" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" ) +const ( + // 这个参数是 TaskTaskReleaseFeedWeight 类型的 Task Parameters 中用于记录释放的重量字段的key + ParamsReleaseWeight = "release_weight" + // 这个参数是 TaskTaskReleaseFeedWeight 类型的 Task Parameters 中用于记录下料口ID字段的key + ParamsFeedPortDeviceID = "feed_port_device_id" + // 这个参数是 TaskTaskReleaseFeedWeight 类型的 Task Parameters 中用于记录称重传感器ID字段的key + ParamsMixingTankDeviceID = "mixing_tank_device_id" +) + // ReleaseFeedWeightTask 是一个控制下料口释放指定重量的任务 type ReleaseFeedWeightTask struct { deviceRepo repository.DeviceRepository sensorDataRepo repository.SensorDataRepository + claimedLog *models.TaskExecutionLog - comm transport.Communicator + feedPortDevice *models.Device // 下料口基本信息 + releaseWeight float64 // 需要释放的重量 + mixingTankDeviceID uint // 搅拌罐称重传感器ID + + comm transport.Communicator + feedPort *device.GeneralDeviceService // 下料口指令下发器 logger *logs.Logger } -func (r *ReleaseFeedWeightTask) Execute() error { - //TODO implement me - panic("implement me") -} - -func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) { - //TODO implement me - panic("implement me") -} - // NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例 -func NewReleaseFeedWeightTask(deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator) Task { +func NewReleaseFeedWeightTask(claimedLog *models.TaskExecutionLog, deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator, logger *logs.Logger) Task { return &ReleaseFeedWeightTask{ + claimedLog: claimedLog, deviceRepo: deviceRepo, sensorDataRepo: sensorDataRepo, comm: comm, + logger: logger, } } + +func (r *ReleaseFeedWeightTask) Execute() error { + r.logger.Infof("任务 %v: 开始执行, 日志ID: %v", r.claimedLog.TaskID, r.claimedLog.ID) + if err := r.parseParameters(); err != nil { + return err + } + + weight, err := r.getNowWeight() + if err != nil { + return err + } + + if err = r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStart); err != nil { + r.logger.Errorf("启动下料口(id=%v)失败: %v , 日志ID: %v", r.feedPortDevice.ID, err, r.claimedLog.ID) + return err + } + + targetWeight := weight - r.releaseWeight + errCount := 1 + + // TODO 这个判断有延迟, 尤其是LoRa通信本身延迟较高, 可以考虑根据信号质量或其他指标提前发送停止命令 + for targetWeight <= weight { + weight, err = r.getNowWeight() + if err != nil { + errCount++ + if errCount > 3 { // 如果连续三次没成功采集到重量数据,则认为计划执行失败 + r.logger.Errorf("获取当前计划执行日志(id=%v)的当前搅拌罐重量失败: %v, 任务结束", r.claimedLog.ID, err) + return err + } + r.logger.Warnf("第%v次尝试获取当前计划执行日志(id=%v)的当前搅拌罐重量失败: %v", errCount, r.claimedLog.ID, err) + continue + } + time.Sleep(100 * time.Millisecond) + } + + if err = r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop); err != nil { + r.logger.Errorf("关闭下料口(id=%v)失败: %v , 日志ID: %v", r.feedPortDevice.ID, err, r.claimedLog.ID) + return err + } + + r.logger.Infof("完成计划执行日志(id=%v)的当前计划, 完成下料 %vkg, 搅拌罐剩余重量 %vkg", r.claimedLog.ID, r.releaseWeight, weight) + return nil +} + +// 获取当前搅拌罐重量 +func (r *ReleaseFeedWeightTask) getNowWeight() (float64, error) { + sensorData, err := r.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(r.mixingTankDeviceID, models.SensorDataTypeWeight) + if err != nil { + r.logger.Errorf("获取设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID) + return 0, err + } + + wg := &models.WeightData{} + err = json.Unmarshal(sensorData.Data, wg) + if err != nil { + r.logger.Errorf("反序列化设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID) + return 0, err + } + + return wg.WeightKilograms, nil +} + +func (r *ReleaseFeedWeightTask) parseParameters() error { + if r.claimedLog.Task.Parameters == nil { + r.logger.Errorf("任务 %v: 缺少参数", r.claimedLog.TaskID) + return fmt.Errorf("任务 %v: 参数不全", r.claimedLog.TaskID) + } + + // TODO 定义成结构体放model包中 + var params map[string]interface{} + err := json.Unmarshal(r.claimedLog.Task.Parameters, ¶ms) + if err != nil { + r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) + return fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) + } + + r.releaseWeight = params[ParamsReleaseWeight].(float64) + r.mixingTankDeviceID = params[ParamsMixingTankDeviceID].(uint) + r.feedPortDevice, err = r.deviceRepo.FindByID(params[ParamsFeedPortDeviceID].(uint)) + if err != nil { + r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) + return fmt.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) + } + r.feedPort = device.NewGeneralDeviceService(r.deviceRepo, r.logger, r.comm) + + return nil +} + +func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) { + r.logger.Errorf("开始善后处理, 日志ID:%v", r.claimedLog.ID) + if r.feedPort != nil { + err := r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop) + if err != nil { + r.logger.Errorf("[严重] 下料口停止失败, 日志ID: %v, 错误: %v", r.claimedLog.ID, err) + } + } else { + r.logger.Warnf("[警告] 下料口通信器尚未初始化, 不进行任何操作, 日志ID: %v", r.claimedLog.ID) + } + r.logger.Errorf("善后处理完成, 日志ID:%v", r.claimedLog.ID) +} diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index 6f546f6..53fe5ae 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -289,7 +289,7 @@ func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task { case models.TaskTypeWaiting: return NewDelayTask(s.logger, claimedLog) case models.TaskTypeReleaseFeedWeight: - return NewReleaseFeedWeightTask(s.deviceRepo, s.sensorDataRepo, s.comm) + return NewReleaseFeedWeightTask(claimedLog, s.deviceRepo, s.sensorDataRepo, s.comm, s.logger) default: // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 5dee1e1..bbb2315 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -34,6 +34,7 @@ const ( TaskTypeReleaseFeedWeight TaskType = "release_feed_weight" // 下料口释放指定重量任务 ) +// -- Task Parameters -- const ( // 这个参数是 TaskPlanAnalysis 类型的 Task Parameters 中用于记录plan_id的字段的key ParamsPlanID = "plan_id" diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go index a667f65..0e1a6a9 100644 --- a/internal/infra/models/sensor_data.go +++ b/internal/infra/models/sensor_data.go @@ -50,22 +50,18 @@ type WeightData struct { // SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 type SensorData struct { // Time 是数据记录的时间戳,作为复合主键的一部分。 - // GORM 会将其映射到 'time' TIMESTAMPTZ 列。 Time time.Time `gorm:"primaryKey" json:"time"` // DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。 - // GORM 会将其映射到 'device_id' VARCHAR(50) 列。 DeviceID uint `gorm:"primaryKey" json:"device_id"` // RegionalControllerID 是上报此数据的区域主控的ID。 - // 我们为其添加了数据库索引以优化按区域查询的性能。 RegionalControllerID uint `json:"regional_controller_id"` // SensorDataType 是传感数据的类型 SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"` // Data 存储一个或多个传感器读数,格式为 JSON。 - // GORM 会使用 'jsonb' 类型来创建此列。 Data datatypes.JSON `gorm:"type:jsonb" json:"data"` } diff --git a/internal/infra/repository/sensor_data_repository.go b/internal/infra/repository/sensor_data_repository.go index b9b80a9..94f6f81 100644 --- a/internal/infra/repository/sensor_data_repository.go +++ b/internal/infra/repository/sensor_data_repository.go @@ -1,6 +1,8 @@ package repository import ( + "time" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "gorm.io/gorm" ) @@ -8,6 +10,7 @@ import ( // SensorDataRepository 定义了与传感器数据相关的数据库操作接口。 type SensorDataRepository interface { Create(sensorData *models.SensorData) error + GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorType models.SensorDataType) (*models.SensorData, error) } // gormSensorDataRepository 是 SensorDataRepository 的 GORM 实现。 @@ -25,3 +28,13 @@ func NewGormSensorDataRepository(db *gorm.DB) SensorDataRepository { func (r *gormSensorDataRepository) Create(sensorData *models.SensorData) error { return r.db.Create(sensorData).Error } + +// GetLatestSensorDataByDeviceIDAndSensorType 根据设备ID和传感器类型查询最新的传感器数据。 +func (r *gormSensorDataRepository) GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorDataType models.SensorDataType) (*models.SensorData, error) { + var sensorData models.SensorData + // 增加一个时间范围来缩小查询范围, 从而加快查找速度, 当使用时序数据库时时间范围可以让数据库忽略时间靠前的分片 + err := r.db.Where("device_id = ? AND sensor_data_type = ? AND time >=?", deviceID, sensorDataType, time.Now().Add(-24*time.Hour)). + Order("time DESC"). + First(&sensorData).Error + return &sensorData, err +}