实现ReleaseFeedWeightTask

This commit is contained in:
2025-09-25 11:17:13 +08:00
parent f6941fe002
commit 50aac8d7e5
7 changed files with 142 additions and 18 deletions

View File

@@ -29,7 +29,7 @@ func NewGeneralDeviceService(deviceRepo repository.DeviceRepository, logger *log
} }
} }
func (g *GeneralDeviceService) Switch(device models.Device, action DeviceAction) error { func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction) error {
// 校验设备参数及生成指令 // 校验设备参数及生成指令
if *device.ParentID == 0 { if *device.ParentID == 0 {

View File

@@ -34,6 +34,7 @@ func (d *DelayTask) Execute() error {
return fmt.Errorf("任务 %v: 参数不全", d.executionTask.TaskID) return fmt.Errorf("任务 %v: 参数不全", d.executionTask.TaskID)
} }
// TODO 定义成结构体放model包中
var params map[string]interface{} var params map[string]interface{}
if err := json.Unmarshal(d.executionTask.Task.Parameters, &params); err != nil { if err := json.Unmarshal(d.executionTask.Task.Parameters, &params); err != nil {
d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err)

View File

@@ -1,36 +1,149 @@
package task package task
import ( import (
"encoding/json"
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
) )
const (
// 这个参数是 TaskTaskReleaseFeedWeight 类型的 Task Parameters 中用于记录释放的重量字段的key
ParamsReleaseWeight = "release_weight"
// 这个参数是 TaskTaskReleaseFeedWeight 类型的 Task Parameters 中用于记录下料口ID字段的key
ParamsFeedPortDeviceID = "feed_port_device_id"
// 这个参数是 TaskTaskReleaseFeedWeight 类型的 Task Parameters 中用于记录称重传感器ID字段的key
ParamsMixingTankDeviceID = "mixing_tank_device_id"
)
// ReleaseFeedWeightTask 是一个控制下料口释放指定重量的任务 // ReleaseFeedWeightTask 是一个控制下料口释放指定重量的任务
type ReleaseFeedWeightTask struct { type ReleaseFeedWeightTask struct {
deviceRepo repository.DeviceRepository deviceRepo repository.DeviceRepository
sensorDataRepo repository.SensorDataRepository sensorDataRepo repository.SensorDataRepository
claimedLog *models.TaskExecutionLog
feedPortDevice *models.Device // 下料口基本信息
releaseWeight float64 // 需要释放的重量
mixingTankDeviceID uint // 搅拌罐称重传感器ID
comm transport.Communicator comm transport.Communicator
feedPort *device.GeneralDeviceService // 下料口指令下发器
logger *logs.Logger logger *logs.Logger
} }
func (r *ReleaseFeedWeightTask) Execute() error {
//TODO implement me
panic("implement me")
}
func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) {
//TODO implement me
panic("implement me")
}
// NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例 // NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例
func NewReleaseFeedWeightTask(deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator) Task { func NewReleaseFeedWeightTask(claimedLog *models.TaskExecutionLog, deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator, logger *logs.Logger) Task {
return &ReleaseFeedWeightTask{ return &ReleaseFeedWeightTask{
claimedLog: claimedLog,
deviceRepo: deviceRepo, deviceRepo: deviceRepo,
sensorDataRepo: sensorDataRepo, sensorDataRepo: sensorDataRepo,
comm: comm, comm: comm,
logger: logger,
} }
} }
func (r *ReleaseFeedWeightTask) Execute() error {
r.logger.Infof("任务 %v: 开始执行, 日志ID: %v", r.claimedLog.TaskID, r.claimedLog.ID)
if err := r.parseParameters(); err != nil {
return err
}
weight, err := r.getNowWeight()
if err != nil {
return err
}
if err = r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStart); err != nil {
r.logger.Errorf("启动下料口(id=%v)失败: %v , 日志ID: %v", r.feedPortDevice.ID, err, r.claimedLog.ID)
return err
}
targetWeight := weight - r.releaseWeight
errCount := 1
// TODO 这个判断有延迟, 尤其是LoRa通信本身延迟较高, 可以考虑根据信号质量或其他指标提前发送停止命令
for targetWeight <= weight {
weight, err = r.getNowWeight()
if err != nil {
errCount++
if errCount > 3 { // 如果连续三次没成功采集到重量数据,则认为计划执行失败
r.logger.Errorf("获取当前计划执行日志(id=%v)的当前搅拌罐重量失败: %v, 任务结束", r.claimedLog.ID, err)
return err
}
r.logger.Warnf("第%v次尝试获取当前计划执行日志(id=%v)的当前搅拌罐重量失败: %v", errCount, r.claimedLog.ID, err)
continue
}
time.Sleep(100 * time.Millisecond)
}
if err = r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop); err != nil {
r.logger.Errorf("关闭下料口(id=%v)失败: %v , 日志ID: %v", r.feedPortDevice.ID, err, r.claimedLog.ID)
return err
}
r.logger.Infof("完成计划执行日志(id=%v)的当前计划, 完成下料 %vkg, 搅拌罐剩余重量 %vkg", r.claimedLog.ID, r.releaseWeight, weight)
return nil
}
// 获取当前搅拌罐重量
func (r *ReleaseFeedWeightTask) getNowWeight() (float64, error) {
sensorData, err := r.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(r.mixingTankDeviceID, models.SensorDataTypeWeight)
if err != nil {
r.logger.Errorf("获取设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID)
return 0, err
}
wg := &models.WeightData{}
err = json.Unmarshal(sensorData.Data, wg)
if err != nil {
r.logger.Errorf("反序列化设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID)
return 0, err
}
return wg.WeightKilograms, nil
}
func (r *ReleaseFeedWeightTask) parseParameters() error {
if r.claimedLog.Task.Parameters == nil {
r.logger.Errorf("任务 %v: 缺少参数", r.claimedLog.TaskID)
return fmt.Errorf("任务 %v: 参数不全", r.claimedLog.TaskID)
}
// TODO 定义成结构体放model包中
var params map[string]interface{}
err := json.Unmarshal(r.claimedLog.Task.Parameters, &params)
if err != nil {
r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err)
return fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err)
}
r.releaseWeight = params[ParamsReleaseWeight].(float64)
r.mixingTankDeviceID = params[ParamsMixingTankDeviceID].(uint)
r.feedPortDevice, err = r.deviceRepo.FindByID(params[ParamsFeedPortDeviceID].(uint))
if err != nil {
r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err)
return fmt.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err)
}
r.feedPort = device.NewGeneralDeviceService(r.deviceRepo, r.logger, r.comm)
return nil
}
func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) {
r.logger.Errorf("开始善后处理, 日志ID:%v", r.claimedLog.ID)
if r.feedPort != nil {
err := r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop)
if err != nil {
r.logger.Errorf("[严重] 下料口停止失败, 日志ID: %v, 错误: %v", r.claimedLog.ID, err)
}
} else {
r.logger.Warnf("[警告] 下料口通信器尚未初始化, 不进行任何操作, 日志ID: %v", r.claimedLog.ID)
}
r.logger.Errorf("善后处理完成, 日志ID:%v", r.claimedLog.ID)
}

View File

@@ -289,7 +289,7 @@ func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task {
case models.TaskTypeWaiting: case models.TaskTypeWaiting:
return NewDelayTask(s.logger, claimedLog) return NewDelayTask(s.logger, claimedLog)
case models.TaskTypeReleaseFeedWeight: case models.TaskTypeReleaseFeedWeight:
return NewReleaseFeedWeightTask(s.deviceRepo, s.sensorDataRepo, s.comm) return NewReleaseFeedWeightTask(claimedLog, s.deviceRepo, s.sensorDataRepo, s.comm, s.logger)
default: default:
// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型

View File

@@ -34,6 +34,7 @@ const (
TaskTypeReleaseFeedWeight TaskType = "release_feed_weight" // 下料口释放指定重量任务 TaskTypeReleaseFeedWeight TaskType = "release_feed_weight" // 下料口释放指定重量任务
) )
// -- Task Parameters --
const ( const (
// 这个参数是 TaskPlanAnalysis 类型的 Task Parameters 中用于记录plan_id的字段的key // 这个参数是 TaskPlanAnalysis 类型的 Task Parameters 中用于记录plan_id的字段的key
ParamsPlanID = "plan_id" ParamsPlanID = "plan_id"

View File

@@ -50,22 +50,18 @@ type WeightData struct {
// SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 // SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。
type SensorData struct { type SensorData struct {
// Time 是数据记录的时间戳,作为复合主键的一部分。 // Time 是数据记录的时间戳,作为复合主键的一部分。
// GORM 会将其映射到 'time' TIMESTAMPTZ 列。
Time time.Time `gorm:"primaryKey" json:"time"` Time time.Time `gorm:"primaryKey" json:"time"`
// DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。 // DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。
// GORM 会将其映射到 'device_id' VARCHAR(50) 列。
DeviceID uint `gorm:"primaryKey" json:"device_id"` DeviceID uint `gorm:"primaryKey" json:"device_id"`
// RegionalControllerID 是上报此数据的区域主控的ID。 // RegionalControllerID 是上报此数据的区域主控的ID。
// 我们为其添加了数据库索引以优化按区域查询的性能。
RegionalControllerID uint `json:"regional_controller_id"` RegionalControllerID uint `json:"regional_controller_id"`
// SensorDataType 是传感数据的类型 // SensorDataType 是传感数据的类型
SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"` SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"`
// Data 存储一个或多个传感器读数,格式为 JSON。 // Data 存储一个或多个传感器读数,格式为 JSON。
// GORM 会使用 'jsonb' 类型来创建此列。
Data datatypes.JSON `gorm:"type:jsonb" json:"data"` Data datatypes.JSON `gorm:"type:jsonb" json:"data"`
} }

View File

@@ -1,6 +1,8 @@
package repository package repository
import ( import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm" "gorm.io/gorm"
) )
@@ -8,6 +10,7 @@ import (
// SensorDataRepository 定义了与传感器数据相关的数据库操作接口。 // SensorDataRepository 定义了与传感器数据相关的数据库操作接口。
type SensorDataRepository interface { type SensorDataRepository interface {
Create(sensorData *models.SensorData) error Create(sensorData *models.SensorData) error
GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorType models.SensorDataType) (*models.SensorData, error)
} }
// gormSensorDataRepository 是 SensorDataRepository 的 GORM 实现。 // gormSensorDataRepository 是 SensorDataRepository 的 GORM 实现。
@@ -25,3 +28,13 @@ func NewGormSensorDataRepository(db *gorm.DB) SensorDataRepository {
func (r *gormSensorDataRepository) Create(sensorData *models.SensorData) error { func (r *gormSensorDataRepository) Create(sensorData *models.SensorData) error {
return r.db.Create(sensorData).Error return r.db.Create(sensorData).Error
} }
// GetLatestSensorDataByDeviceIDAndSensorType 根据设备ID和传感器类型查询最新的传感器数据。
func (r *gormSensorDataRepository) GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorDataType models.SensorDataType) (*models.SensorData, error) {
var sensorData models.SensorData
// 增加一个时间范围来缩小查询范围, 从而加快查找速度, 当使用时序数据库时时间范围可以让数据库忽略时间靠前的分片
err := r.db.Where("device_id = ? AND sensor_data_type = ? AND time >=?", deviceID, sensorDataType, time.Now().Add(-24*time.Hour)).
Order("time DESC").
First(&sensorData).Error
return &sensorData, err
}