让任务可以提供自身使用设备

This commit is contained in:
2025-11-02 20:47:25 +08:00
parent 4b0be88fca
commit 687c2f12ee
6 changed files with 119 additions and 33 deletions

View File

@@ -0,0 +1,34 @@
# 任务接口增加获取关联设备ID方法设计
## 1. 需求
为了在设备删除前进行验证需要为任务接口增加一个方法该方法能够直接返回指定任务配置中所有关联的设备ID列表。所有实现 `task` 接口的对象都必须实现此方法。
## 2. 新接口定义:`TaskDeviceIDResolver`
```go
// TaskDeviceIDResolver 定义了从任务配置中解析设备ID的方法
type TaskDeviceIDResolver interface {
// ResolveDeviceIDs 从任务配置中解析并返回所有关联的设备ID列表
// 返回值: uint数组每个字符串代表一个设备ID
ResolveDeviceIDs() ([]uint, error)
}
```
## 3. `task` 接口更新
`task` 接口将嵌入 `TaskDeviceIDResolver` 接口。
```go
// Task 接口(示例,具体结构可能不同)
type Task interface {
// ... 其他现有方法 ...
// 嵌入 TaskDeviceIDResolver 接口
TaskDeviceIDResolver
}
```
## 4. 实现要求
所有当前及未来实现 `Task` 接口的类型,都必须实现 `TaskDeviceIDResolver` 接口中定义的所有方法,即 `ResolveDeviceIDs` 方法。

View File

@@ -0,0 +1,18 @@
# 需求
删除设备/设备模板/区域主控前进行校验
## issue
http://git.huangwc.com/pig/pig-farm-controller/issues/50
## 需求描述
1. 删除设备时检测是否被任务使用
2. 删除设备模板时检测是否被设备使用
3. 删除区域主控时检测是否被设备使用
# 实现
1. [重构计划领域](./plan_service_refactor.md)
2. [让任务可以提供自身使用设备](./add_get_device_id_configs_to_task.md)

View File

@@ -14,6 +14,15 @@ type Task interface {
// log: 任务执行的上下文。 // log: 任务执行的上下文。
// executeErr: 从 Execute 方法返回的原始错误。 // executeErr: 从 Execute 方法返回的原始错误。
OnFailure(executeErr error) OnFailure(executeErr error)
TaskDeviceIDResolver
}
// TaskDeviceIDResolver 定义了从任务配置中解析设备ID的方法
type TaskDeviceIDResolver interface {
// ResolveDeviceIDs 从任务配置中解析并返回所有关联的设备ID列表
// 返回值: uint数组每个字符串代表一个设备ID
ResolveDeviceIDs() ([]uint, error)
} }
// TaskFactory 是一个工厂接口,用于根据任务执行日志创建任务实例。 // TaskFactory 是一个工厂接口,用于根据任务执行日志创建任务实例。

View File

@@ -65,3 +65,7 @@ func (d *DelayTask) parseParameters() error {
func (d *DelayTask) OnFailure(executeErr error) { func (d *DelayTask) OnFailure(executeErr error) {
d.logger.Errorf("任务 %v: 执行失败: %v", d.executionTask.TaskID, executeErr) d.logger.Errorf("任务 %v: 执行失败: %v", d.executionTask.TaskID, executeErr)
} }
func (d *DelayTask) ResolveDeviceIDs() ([]uint, error) {
return []uint{}, nil
}

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
@@ -23,7 +24,7 @@ func NewFullCollectionTask(
deviceRepo repository.DeviceRepository, deviceRepo repository.DeviceRepository,
deviceService device.Service, deviceService device.Service,
logger *logs.Logger, logger *logs.Logger,
) *FullCollectionTask { ) plan.Task {
return &FullCollectionTask{ return &FullCollectionTask{
log: log, log: log,
deviceRepo: deviceRepo, deviceRepo: deviceRepo,
@@ -91,3 +92,9 @@ func (t *FullCollectionTask) OnFailure(executeErr error) {
"error", executeErr, "error", executeErr,
) )
} }
// ResolveDeviceIDs 获取当前任务需要使用的设备ID列表
func (t *FullCollectionTask) ResolveDeviceIDs() ([]uint, error) {
// 全量采集任务不和任何设备绑定, 每轮采集都会重新获取全量传感器
return []uint{}, nil
}

View File

@@ -3,6 +3,7 @@ package task
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"time" "time"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
@@ -31,6 +32,9 @@ type ReleaseFeedWeightTask struct {
feedPort device.Service feedPort device.Service
// onceParse 保证解析参数只执行一次
onceParse sync.Once
logger *logs.Logger logger *logs.Logger
} }
@@ -117,45 +121,48 @@ func (r *ReleaseFeedWeightTask) getNowWeight() (float64, error) {
} }
func (r *ReleaseFeedWeightTask) parseParameters() error { func (r *ReleaseFeedWeightTask) parseParameters() error {
if r.claimedLog.Task.Parameters == nil { var err error
r.logger.Errorf("任务 %v: 缺少参数", r.claimedLog.TaskID) r.onceParse.Do(func() {
return fmt.Errorf("任务 %v: 参数不全", r.claimedLog.TaskID) if r.claimedLog.Task.Parameters == nil {
} r.logger.Errorf("任务 %v: 缺少参数", r.claimedLog.TaskID)
err = fmt.Errorf("任务 %v: 参数不全", r.claimedLog.TaskID)
}
var params ReleaseFeedWeightTaskParams var params ReleaseFeedWeightTaskParams
err := r.claimedLog.Task.ParseParameters(&params) err := r.claimedLog.Task.ParseParameters(&params)
if err != nil { if err != nil {
r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err)
return fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) err = fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err)
} }
// 校验参数是否存在 // 校验参数是否存在
if params.ReleaseWeight == 0 { if params.ReleaseWeight == 0 {
r.logger.Errorf("任务 %v: 参数 release_weight 缺失或无效", r.claimedLog.TaskID) r.logger.Errorf("任务 %v: 参数 release_weight 缺失或无效", r.claimedLog.TaskID)
return fmt.Errorf("任务 %v: 参数 release_weight 缺失或无效", r.claimedLog.TaskID) err = fmt.Errorf("任务 %v: 参数 release_weight 缺失或无效", r.claimedLog.TaskID)
} }
if params.FeedPortDeviceID == 0 { if params.FeedPortDeviceID == 0 {
r.logger.Errorf("任务 %v: 参数 feed_port_device_id 缺失或无效", r.claimedLog.TaskID) r.logger.Errorf("任务 %v: 参数 feed_port_device_id 缺失或无效", r.claimedLog.TaskID)
return fmt.Errorf("任务 %v: 参数 feed_port_device_id 缺失或无效", r.claimedLog.TaskID) err = fmt.Errorf("任务 %v: 参数 feed_port_device_id 缺失或无效", r.claimedLog.TaskID)
} }
if params.MixingTankDeviceID == 0 { if params.MixingTankDeviceID == 0 {
r.logger.Errorf("任务 %v: 参数 mixing_tank_device_id 缺失或无效", r.claimedLog.TaskID) r.logger.Errorf("任务 %v: 参数 mixing_tank_device_id 缺失或无效", r.claimedLog.TaskID)
return fmt.Errorf("任务 %v: 参数 mixing_tank_device_id 缺失或无效", r.claimedLog.TaskID) err = fmt.Errorf("任务 %v: 参数 mixing_tank_device_id 缺失或无效", r.claimedLog.TaskID)
} }
r.releaseWeight = params.ReleaseWeight r.releaseWeight = params.ReleaseWeight
r.mixingTankDeviceID = params.MixingTankDeviceID r.mixingTankDeviceID = params.MixingTankDeviceID
r.feedPortDevice, err = r.deviceRepo.FindByID(params.FeedPortDeviceID) r.feedPortDevice, err = r.deviceRepo.FindByID(params.FeedPortDeviceID)
if err != nil { if err != nil {
r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err)
return fmt.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) err = fmt.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err)
} }
return nil })
return err
} }
func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) { func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) {
r.logger.Errorf("开始善后处理, 日志ID:%v", r.claimedLog.ID) r.logger.Errorf("开始善后处理, 日志ID:%v; 错误信息: %v", r.claimedLog.ID, executeErr)
if r.feedPort != nil { if r.feedPort != nil {
err := r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop) err := r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop)
if err != nil { if err != nil {
@@ -166,3 +173,10 @@ func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) {
} }
r.logger.Errorf("善后处理完成, 日志ID:%v", r.claimedLog.ID) r.logger.Errorf("善后处理完成, 日志ID:%v", r.claimedLog.ID)
} }
func (r *ReleaseFeedWeightTask) ResolveDeviceIDs() ([]uint, error) {
if err := r.parseParameters(); err != nil {
return nil, err
}
return []uint{r.feedPortDevice.ID, r.mixingTankDeviceID}, nil
}