issue_18 #19
@@ -50,6 +50,10 @@ chirp_stack:
|
|||||||
api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer <YOUR_API_TOKEN>
|
api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer <YOUR_API_TOKEN>
|
||||||
fport: 1
|
fport: 1
|
||||||
api_timeout: 10 # ChirpStack API请求超时时间(秒)
|
api_timeout: 10 # ChirpStack API请求超时时间(秒)
|
||||||
|
# 等待设备上行响应的超时时间(秒)。
|
||||||
|
# 对于LoRaWAN这种延迟较高的网络,建议设置为5分钟 (300秒) 或更长。
|
||||||
|
collection_request_timeout: 300
|
||||||
|
|
||||||
|
|
||||||
# 任务调度器配置
|
# 任务调度器配置
|
||||||
task:
|
task:
|
||||||
|
|||||||
@@ -53,8 +53,6 @@ func NewAPI(cfg config.ServerConfig,
|
|||||||
userRepo repository.UserRepository,
|
userRepo repository.UserRepository,
|
||||||
deviceRepository repository.DeviceRepository,
|
deviceRepository repository.DeviceRepository,
|
||||||
planRepository repository.PlanRepository,
|
planRepository repository.PlanRepository,
|
||||||
sensorDataRepository repository.SensorDataRepository,
|
|
||||||
executionLogRepository repository.ExecutionLogRepository,
|
|
||||||
tokenService token.TokenService,
|
tokenService token.TokenService,
|
||||||
listenHandler transport.ListenHandler,
|
listenHandler transport.ListenHandler,
|
||||||
analysisTaskManager *task.AnalysisPlanTaskManager) *API {
|
analysisTaskManager *task.AnalysisPlanTaskManager) *API {
|
||||||
|
|||||||
@@ -77,7 +77,7 @@ func newDeviceResponse(device *models.Device) (*DeviceResponse, error) {
|
|||||||
|
|
||||||
var props map[string]interface{}
|
var props map[string]interface{}
|
||||||
if len(device.Properties) > 0 && string(device.Properties) != "null" {
|
if len(device.Properties) > 0 && string(device.Properties) != "null" {
|
||||||
if err := json.Unmarshal(device.Properties, &props); err != nil {
|
if err := device.ParseProperties(&props); err != nil {
|
||||||
return nil, fmt.Errorf("解析设备属性失败 (ID: %d): %w", device.ID, err)
|
return nil, fmt.Errorf("解析设备属性失败 (ID: %d): %w", device.ID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -143,6 +143,13 @@ func (c *Controller) CreateDevice(ctx *gin.Context) {
|
|||||||
Properties: propertiesJSON,
|
Properties: propertiesJSON,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 在创建设备前进行自检
|
||||||
|
if !device.SelfCheck() {
|
||||||
|
c.logger.Errorf("创建设备: 设备属性自检失败: %v", device)
|
||||||
|
controller.SendErrorResponse(ctx, controller.CodeBadRequest, "设备属性不符合要求")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err := c.repo.Create(device); err != nil {
|
if err := c.repo.Create(device); err != nil {
|
||||||
c.logger.Errorf("创建设备: 数据库操作失败: %v", err)
|
c.logger.Errorf("创建设备: 数据库操作失败: %v", err)
|
||||||
controller.SendErrorResponse(ctx, controller.CodeInternalError, "创建设备失败")
|
controller.SendErrorResponse(ctx, controller.CodeInternalError, "创建设备失败")
|
||||||
@@ -272,6 +279,13 @@ func (c *Controller) UpdateDevice(ctx *gin.Context) {
|
|||||||
existingDevice.Location = req.Location
|
existingDevice.Location = req.Location
|
||||||
existingDevice.Properties = propertiesJSON
|
existingDevice.Properties = propertiesJSON
|
||||||
|
|
||||||
|
// 在更新设备前进行自检
|
||||||
|
if !existingDevice.SelfCheck() {
|
||||||
|
c.logger.Errorf("更新设备: 设备属性自检失败: %v", existingDevice)
|
||||||
|
controller.SendErrorResponse(ctx, controller.CodeBadRequest, "设备属性不符合要求")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// 4. 将修改后的 existingDevice 对象保存回数据库
|
// 4. 将修改后的 existingDevice 对象保存回数据库
|
||||||
if err := c.repo.Update(existingDevice); err != nil {
|
if err := c.repo.Update(existingDevice); err != nil {
|
||||||
c.logger.Errorf("更新设备: 数据库操作失败: %v", err)
|
c.logger.Errorf("更新设备: 数据库操作失败: %v", err)
|
||||||
|
|||||||
@@ -189,7 +189,7 @@ func TaskToResponse(task *models.Task) (TaskResponse, error) {
|
|||||||
|
|
||||||
var params map[string]interface{}
|
var params map[string]interface{}
|
||||||
if len(task.Parameters) > 0 && string(task.Parameters) != "null" {
|
if len(task.Parameters) > 0 && string(task.Parameters) != "null" {
|
||||||
if err := json.Unmarshal(task.Parameters, ¶ms); err != nil {
|
if err := task.ParseParameters(¶ms); err != nil {
|
||||||
return TaskResponse{}, fmt.Errorf("parsing task parameters failed (ID: %d): %w", task.ID, err)
|
return TaskResponse{}, fmt.Errorf("parsing task parameters failed (ID: %d): %w", task.ID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,7 +21,10 @@ var (
|
|||||||
type Service interface {
|
type Service interface {
|
||||||
|
|
||||||
// Switch 用于切换指定设备的状态, 比如启动和停止
|
// Switch 用于切换指定设备的状态, 比如启动和停止
|
||||||
Switch(device models.Device, action DeviceAction) error
|
Switch(device *models.Device, action DeviceAction) error
|
||||||
|
|
||||||
|
// Collect 用于发起对指定区域主控下的多个设备的批量采集请求。
|
||||||
|
Collect(regionalControllerID uint, devicesToCollect []*models.Device) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// 设备操作指令通用结构(最外层)
|
// 设备操作指令通用结构(最外层)
|
||||||
|
|||||||
@@ -1,31 +1,43 @@
|
|||||||
package device
|
package device
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
|
||||||
|
"github.com/google/uuid"
|
||||||
gproto "google.golang.org/protobuf/proto"
|
gproto "google.golang.org/protobuf/proto"
|
||||||
"google.golang.org/protobuf/types/known/anypb"
|
"google.golang.org/protobuf/types/known/anypb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type GeneralDeviceService struct {
|
type GeneralDeviceService struct {
|
||||||
deviceRepo repository.DeviceRepository
|
deviceRepo repository.DeviceRepository
|
||||||
logger *logs.Logger
|
deviceCommandLogRepo repository.DeviceCommandLogRepository
|
||||||
|
pendingCollectionRepo repository.PendingCollectionRepository
|
||||||
comm transport.Communicator
|
logger *logs.Logger
|
||||||
|
comm transport.Communicator
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGeneralDeviceService 创建一个通用设备服务
|
// NewGeneralDeviceService 创建一个通用设备服务
|
||||||
func NewGeneralDeviceService(deviceRepo repository.DeviceRepository, logger *logs.Logger, comm transport.Communicator) *GeneralDeviceService {
|
func NewGeneralDeviceService(
|
||||||
|
deviceRepo repository.DeviceRepository,
|
||||||
|
deviceCommandLogRepo repository.DeviceCommandLogRepository,
|
||||||
|
pendingCollectionRepo repository.PendingCollectionRepository,
|
||||||
|
logger *logs.Logger,
|
||||||
|
comm transport.Communicator,
|
||||||
|
) Service {
|
||||||
return &GeneralDeviceService{
|
return &GeneralDeviceService{
|
||||||
deviceRepo: deviceRepo,
|
deviceRepo: deviceRepo,
|
||||||
logger: logger,
|
deviceCommandLogRepo: deviceCommandLogRepo,
|
||||||
comm: comm,
|
pendingCollectionRepo: pendingCollectionRepo,
|
||||||
|
logger: logger,
|
||||||
|
comm: comm,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -45,18 +57,10 @@ func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction
|
|||||||
return fmt.Errorf("解析设备 %v(id=%v) 配置失败: %v", device.Name, device.ID, err)
|
return fmt.Errorf("解析设备 %v(id=%v) 配置失败: %v", device.Name, device.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
busNumber, err := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusNumber]))
|
// 已通过 SelfCheck 保证其为纯数字,此处仅进行类型转换
|
||||||
if err != nil {
|
busNumber, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusNumber]))
|
||||||
return fmt.Errorf("无效的总线号: %v", err)
|
busAddress, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusAddress]))
|
||||||
}
|
relayChannel, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.RelayChannel]))
|
||||||
busAddress, err := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusAddress]))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("无效的总线地址: %v", err)
|
|
||||||
}
|
|
||||||
relayChannel, err := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.RelayChannel]))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("无效的继电器通道: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := anypb.New(&proto.Switch{
|
data, err := anypb.New(&proto.Switch{
|
||||||
DeviceAction: string(action),
|
DeviceAction: string(action),
|
||||||
@@ -87,11 +91,141 @@ func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction
|
|||||||
}
|
}
|
||||||
loraAddress := fmt.Sprintf("%v", thisDeviceinfo[models.LoRaAddress])
|
loraAddress := fmt.Sprintf("%v", thisDeviceinfo[models.LoRaAddress])
|
||||||
|
|
||||||
// 生成消息并发送
|
// 生成消息
|
||||||
message, err := gproto.Marshal(instruction)
|
message, err := gproto.Marshal(instruction)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("序列化指令失败: %v", err)
|
return fmt.Errorf("序列化指令失败: %v", err)
|
||||||
}
|
}
|
||||||
return g.comm.Send(loraAddress, message)
|
|
||||||
|
|
||||||
|
// 发送指令并获取 SendResult
|
||||||
|
sendResult, err := g.comm.Send(loraAddress, message)
|
||||||
|
if err != nil {
|
||||||
|
// 发送失败,直接返回错误
|
||||||
|
return fmt.Errorf("发送指令到设备 %s 失败: %w", loraAddress, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建并保存命令日志
|
||||||
|
logRecord := &models.DeviceCommandLog{
|
||||||
|
MessageID: sendResult.MessageID,
|
||||||
|
DeviceID: thisDevice.ID, // thisDevice 是我们查出来的区域主控
|
||||||
|
SentAt: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := g.deviceCommandLogRepo.Create(logRecord); err != nil {
|
||||||
|
// 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。
|
||||||
|
// 我们记录一个错误日志,然后成功返回。
|
||||||
|
g.logger.Errorf("创建指令日志失败 (MessageID: %s): %v", sendResult.MessageID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
g.logger.Infof("成功发送指令到设备 %s 并创建日志 (MessageID: %s)", loraAddress, sendResult.MessageID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect 实现了 Service 接口,用于发起对指定区域主控下的多个设备的批量采集请求。
|
||||||
|
// 它负责查找区域主控、生成关联ID、创建待处理记录、构建指令并最终发送。
|
||||||
|
func (g *GeneralDeviceService) Collect(regionalControllerID uint, devicesToCollect []*models.Device) error {
|
||||||
|
if regionalControllerID == 0 {
|
||||||
|
return errors.New("区域主控ID不能为空")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(devicesToCollect) == 0 {
|
||||||
|
// 如果没有要采集的设备,这不是一个错误,只是一个空操作。
|
||||||
|
g.logger.Info("待采集设备列表为空,无需执行采集任务。")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. 查找并自检区域主控设备
|
||||||
|
regionalController, err := g.deviceRepo.FindByID(regionalControllerID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("查找区域主控 (ID: %d) 失败: %w", regionalControllerID, err)
|
||||||
|
}
|
||||||
|
if !regionalController.SelfCheck() {
|
||||||
|
return fmt.Errorf("区域主控 (ID: %d) 未通过自检,缺少必要属性", regionalControllerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 准备采集任务列表和数据库存根,并验证设备
|
||||||
|
var childDeviceIDs []uint
|
||||||
|
var collectTasks []*proto.CollectTask
|
||||||
|
|
||||||
|
for _, dev := range devicesToCollect {
|
||||||
|
// 验证设备是否属于指定的区域主控
|
||||||
|
if dev.ParentID == nil || *dev.ParentID != regionalControllerID {
|
||||||
|
return fmt.Errorf("设备 '%s' (ID: %d) 不属于指定的区域主控 (ID: %d)", dev.Name, dev.ID, regionalControllerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 对每个待采集的设备执行自检
|
||||||
|
if !dev.SelfCheck() {
|
||||||
|
g.logger.Warnf("跳过设备 %d,因其未通过自检", dev.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 自检已通过,我们可以安全地解析属性
|
||||||
|
var props map[string]interface{}
|
||||||
|
// 此时 ParseProperties 不应失败
|
||||||
|
_ = dev.ParseProperties(&props)
|
||||||
|
|
||||||
|
busNumber := props[models.BusNumber].(float64)
|
||||||
|
busAddress := props[models.BusAddress].(float64)
|
||||||
|
|
||||||
|
collectTasks = append(collectTasks, &proto.CollectTask{
|
||||||
|
DeviceAction: dev.Command,
|
||||||
|
BusNumber: int32(busNumber),
|
||||||
|
BusAddress: int32(busAddress),
|
||||||
|
})
|
||||||
|
childDeviceIDs = append(childDeviceIDs, dev.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(childDeviceIDs) == 0 {
|
||||||
|
return errors.New("经过滤后,没有可通过自检的有效设备")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 从区域主控的属性中解析出 DevEui (loraAddress)
|
||||||
|
var rcProps map[string]interface{}
|
||||||
|
// SelfCheck 已保证属性可解析
|
||||||
|
_ = regionalController.ParseProperties(&rcProps)
|
||||||
|
loraAddress := rcProps[models.LoRaAddress].(string)
|
||||||
|
|
||||||
|
// 4. 创建待处理请求记录
|
||||||
|
correlationID := uuid.New().String()
|
||||||
|
pendingReq := &models.PendingCollection{
|
||||||
|
CorrelationID: correlationID,
|
||||||
|
DeviceID: regionalController.ID,
|
||||||
|
CommandMetadata: childDeviceIDs,
|
||||||
|
Status: models.PendingStatusPending,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
}
|
||||||
|
if err := g.pendingCollectionRepo.Create(pendingReq); err != nil {
|
||||||
|
g.logger.Errorf("创建待采集请求失败 (CorrelationID: %s): %v", correlationID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
g.logger.Infof("成功创建待采集请求 (CorrelationID: %s, DeviceID: %d)", correlationID, regionalController.ID)
|
||||||
|
|
||||||
|
// 5. 构建最终的空中载荷
|
||||||
|
batchCmd := &proto.BatchCollectCommand{
|
||||||
|
CorrelationId: correlationID,
|
||||||
|
Tasks: collectTasks,
|
||||||
|
}
|
||||||
|
anyData, err := anypb.New(batchCmd)
|
||||||
|
if err != nil {
|
||||||
|
g.logger.Errorf("创建 Any Protobuf 失败 (CorrelationID: %s): %v", correlationID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
instruction := &proto.Instruction{
|
||||||
|
Method: proto.MethodType_COLLECT,
|
||||||
|
Data: anyData,
|
||||||
|
}
|
||||||
|
payload, err := gproto.Marshal(instruction)
|
||||||
|
if err != nil {
|
||||||
|
g.logger.Errorf("序列化采集指令失败 (CorrelationID: %s): %v", correlationID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6. 发送指令
|
||||||
|
if _, err := g.comm.Send(loraAddress, payload); err != nil {
|
||||||
|
g.logger.DPanicf("待采集请求 (CorrelationID: %s) 已创建,但发送到设备失败: %v。数据可能不一致!", correlationID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
g.logger.Infof("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, loraAddress)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ func (MethodType) EnumDescriptor() ([]byte, []int) {
|
|||||||
return file_device_proto_rawDescGZIP(), []int{0}
|
return file_device_proto_rawDescGZIP(), []int{0}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 指令
|
// 指令 (所有空中数据都会被包装在这里面)
|
||||||
type Instruction struct {
|
type Instruction struct {
|
||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
Method MethodType `protobuf:"varint,1,opt,name=method,proto3,enum=device.MethodType" json:"method,omitempty"`
|
Method MethodType `protobuf:"varint,1,opt,name=method,proto3,enum=device.MethodType" json:"method,omitempty"`
|
||||||
@@ -122,6 +122,7 @@ func (x *Instruction) GetData() *anypb.Any {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Switch 指令的载荷
|
||||||
type Switch struct {
|
type Switch struct {
|
||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
DeviceAction string `protobuf:"bytes,1,opt,name=device_action,json=deviceAction,proto3" json:"device_action,omitempty"` // 指令
|
DeviceAction string `protobuf:"bytes,1,opt,name=device_action,json=deviceAction,proto3" json:"device_action,omitempty"` // 指令
|
||||||
@@ -190,29 +191,31 @@ func (x *Switch) GetRelayChannel() int32 {
|
|||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
type Collect struct {
|
// BatchCollectCommand
|
||||||
|
// 用于在平台内部构建一个完整的、包含所有元数据的批量采集任务。
|
||||||
|
// 这个消息本身不会被发送到设备。
|
||||||
|
type BatchCollectCommand struct {
|
||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
BusNumber int32 `protobuf:"varint,1,opt,name=bus_number,json=busNumber,proto3" json:"bus_number,omitempty"` // 总线号
|
CorrelationId string `protobuf:"bytes,1,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"` // 用于关联请求和响应的唯一ID
|
||||||
BusAddress int32 `protobuf:"varint,2,opt,name=bus_address,json=busAddress,proto3" json:"bus_address,omitempty"` // 总线地址
|
Tasks []*CollectTask `protobuf:"bytes,2,rep,name=tasks,proto3" json:"tasks,omitempty"` // 采集任务列表
|
||||||
Value float32 `protobuf:"fixed32,3,opt,name=value,proto3" json:"value,omitempty"` // 采集值
|
|
||||||
unknownFields protoimpl.UnknownFields
|
unknownFields protoimpl.UnknownFields
|
||||||
sizeCache protoimpl.SizeCache
|
sizeCache protoimpl.SizeCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *Collect) Reset() {
|
func (x *BatchCollectCommand) Reset() {
|
||||||
*x = Collect{}
|
*x = BatchCollectCommand{}
|
||||||
mi := &file_device_proto_msgTypes[2]
|
mi := &file_device_proto_msgTypes[2]
|
||||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
ms.StoreMessageInfo(mi)
|
ms.StoreMessageInfo(mi)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *Collect) String() string {
|
func (x *BatchCollectCommand) String() string {
|
||||||
return protoimpl.X.MessageStringOf(x)
|
return protoimpl.X.MessageStringOf(x)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*Collect) ProtoMessage() {}
|
func (*BatchCollectCommand) ProtoMessage() {}
|
||||||
|
|
||||||
func (x *Collect) ProtoReflect() protoreflect.Message {
|
func (x *BatchCollectCommand) ProtoReflect() protoreflect.Message {
|
||||||
mi := &file_device_proto_msgTypes[2]
|
mi := &file_device_proto_msgTypes[2]
|
||||||
if x != nil {
|
if x != nil {
|
||||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
@@ -224,30 +227,139 @@ func (x *Collect) ProtoReflect() protoreflect.Message {
|
|||||||
return mi.MessageOf(x)
|
return mi.MessageOf(x)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deprecated: Use Collect.ProtoReflect.Descriptor instead.
|
// Deprecated: Use BatchCollectCommand.ProtoReflect.Descriptor instead.
|
||||||
func (*Collect) Descriptor() ([]byte, []int) {
|
func (*BatchCollectCommand) Descriptor() ([]byte, []int) {
|
||||||
return file_device_proto_rawDescGZIP(), []int{2}
|
return file_device_proto_rawDescGZIP(), []int{2}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *Collect) GetBusNumber() int32 {
|
func (x *BatchCollectCommand) GetCorrelationId() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.CorrelationId
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *BatchCollectCommand) GetTasks() []*CollectTask {
|
||||||
|
if x != nil {
|
||||||
|
return x.Tasks
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CollectTask
|
||||||
|
// 定义了单个采集任务的“意图”。
|
||||||
|
type CollectTask struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
DeviceAction string `protobuf:"bytes,1,opt,name=device_action,json=deviceAction,proto3" json:"device_action,omitempty"` // 指令
|
||||||
|
BusNumber int32 `protobuf:"varint,2,opt,name=bus_number,json=busNumber,proto3" json:"bus_number,omitempty"` // 总线号
|
||||||
|
BusAddress int32 `protobuf:"varint,3,opt,name=bus_address,json=busAddress,proto3" json:"bus_address,omitempty"` // 总线地址
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *CollectTask) Reset() {
|
||||||
|
*x = CollectTask{}
|
||||||
|
mi := &file_device_proto_msgTypes[3]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *CollectTask) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*CollectTask) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *CollectTask) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_device_proto_msgTypes[3]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use CollectTask.ProtoReflect.Descriptor instead.
|
||||||
|
func (*CollectTask) Descriptor() ([]byte, []int) {
|
||||||
|
return file_device_proto_rawDescGZIP(), []int{3}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *CollectTask) GetDeviceAction() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.DeviceAction
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *CollectTask) GetBusNumber() int32 {
|
||||||
if x != nil {
|
if x != nil {
|
||||||
return x.BusNumber
|
return x.BusNumber
|
||||||
}
|
}
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *Collect) GetBusAddress() int32 {
|
func (x *CollectTask) GetBusAddress() int32 {
|
||||||
if x != nil {
|
if x != nil {
|
||||||
return x.BusAddress
|
return x.BusAddress
|
||||||
}
|
}
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *Collect) GetValue() float32 {
|
// CollectResult
|
||||||
|
// 这是设备响应的、极致精简的数据包。
|
||||||
|
type CollectResult struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
CorrelationId string `protobuf:"bytes,1,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"` // 从下行指令中原样返回的关联ID
|
||||||
|
Values []float32 `protobuf:"fixed32,2,rep,packed,name=values,proto3" json:"values,omitempty"` // 按预定顺序排列的采集值
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *CollectResult) Reset() {
|
||||||
|
*x = CollectResult{}
|
||||||
|
mi := &file_device_proto_msgTypes[4]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *CollectResult) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*CollectResult) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *CollectResult) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_device_proto_msgTypes[4]
|
||||||
if x != nil {
|
if x != nil {
|
||||||
return x.Value
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
}
|
}
|
||||||
return 0
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use CollectResult.ProtoReflect.Descriptor instead.
|
||||||
|
func (*CollectResult) Descriptor() ([]byte, []int) {
|
||||||
|
return file_device_proto_rawDescGZIP(), []int{4}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *CollectResult) GetCorrelationId() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.CorrelationId
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *CollectResult) GetValues() []float32 {
|
||||||
|
if x != nil {
|
||||||
|
return x.Values
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var File_device_proto protoreflect.FileDescriptor
|
var File_device_proto protoreflect.FileDescriptor
|
||||||
@@ -264,13 +376,19 @@ const file_device_proto_rawDesc = "" +
|
|||||||
"bus_number\x18\x02 \x01(\x05R\tbusNumber\x12\x1f\n" +
|
"bus_number\x18\x02 \x01(\x05R\tbusNumber\x12\x1f\n" +
|
||||||
"\vbus_address\x18\x03 \x01(\x05R\n" +
|
"\vbus_address\x18\x03 \x01(\x05R\n" +
|
||||||
"busAddress\x12#\n" +
|
"busAddress\x12#\n" +
|
||||||
"\rrelay_channel\x18\x04 \x01(\x05R\frelayChannel\"_\n" +
|
"\rrelay_channel\x18\x04 \x01(\x05R\frelayChannel\"g\n" +
|
||||||
"\aCollect\x12\x1d\n" +
|
"\x13BatchCollectCommand\x12%\n" +
|
||||||
|
"\x0ecorrelation_id\x18\x01 \x01(\tR\rcorrelationId\x12)\n" +
|
||||||
|
"\x05tasks\x18\x02 \x03(\v2\x13.device.CollectTaskR\x05tasks\"r\n" +
|
||||||
|
"\vCollectTask\x12#\n" +
|
||||||
|
"\rdevice_action\x18\x01 \x01(\tR\fdeviceAction\x12\x1d\n" +
|
||||||
"\n" +
|
"\n" +
|
||||||
"bus_number\x18\x01 \x01(\x05R\tbusNumber\x12\x1f\n" +
|
"bus_number\x18\x02 \x01(\x05R\tbusNumber\x12\x1f\n" +
|
||||||
"\vbus_address\x18\x02 \x01(\x05R\n" +
|
"\vbus_address\x18\x03 \x01(\x05R\n" +
|
||||||
"busAddress\x12\x14\n" +
|
"busAddress\"N\n" +
|
||||||
"\x05value\x18\x03 \x01(\x02R\x05value*%\n" +
|
"\rCollectResult\x12%\n" +
|
||||||
|
"\x0ecorrelation_id\x18\x01 \x01(\tR\rcorrelationId\x12\x16\n" +
|
||||||
|
"\x06values\x18\x02 \x03(\x02R\x06values*%\n" +
|
||||||
"\n" +
|
"\n" +
|
||||||
"MethodType\x12\n" +
|
"MethodType\x12\n" +
|
||||||
"\n" +
|
"\n" +
|
||||||
@@ -290,22 +408,25 @@ func file_device_proto_rawDescGZIP() []byte {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var file_device_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
|
var file_device_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
|
||||||
var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
|
var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
|
||||||
var file_device_proto_goTypes = []any{
|
var file_device_proto_goTypes = []any{
|
||||||
(MethodType)(0), // 0: device.MethodType
|
(MethodType)(0), // 0: device.MethodType
|
||||||
(*Instruction)(nil), // 1: device.Instruction
|
(*Instruction)(nil), // 1: device.Instruction
|
||||||
(*Switch)(nil), // 2: device.Switch
|
(*Switch)(nil), // 2: device.Switch
|
||||||
(*Collect)(nil), // 3: device.Collect
|
(*BatchCollectCommand)(nil), // 3: device.BatchCollectCommand
|
||||||
(*anypb.Any)(nil), // 4: google.protobuf.Any
|
(*CollectTask)(nil), // 4: device.CollectTask
|
||||||
|
(*CollectResult)(nil), // 5: device.CollectResult
|
||||||
|
(*anypb.Any)(nil), // 6: google.protobuf.Any
|
||||||
}
|
}
|
||||||
var file_device_proto_depIdxs = []int32{
|
var file_device_proto_depIdxs = []int32{
|
||||||
0, // 0: device.Instruction.method:type_name -> device.MethodType
|
0, // 0: device.Instruction.method:type_name -> device.MethodType
|
||||||
4, // 1: device.Instruction.data:type_name -> google.protobuf.Any
|
6, // 1: device.Instruction.data:type_name -> google.protobuf.Any
|
||||||
2, // [2:2] is the sub-list for method output_type
|
4, // 2: device.BatchCollectCommand.tasks:type_name -> device.CollectTask
|
||||||
2, // [2:2] is the sub-list for method input_type
|
3, // [3:3] is the sub-list for method output_type
|
||||||
2, // [2:2] is the sub-list for extension type_name
|
3, // [3:3] is the sub-list for method input_type
|
||||||
2, // [2:2] is the sub-list for extension extendee
|
3, // [3:3] is the sub-list for extension type_name
|
||||||
0, // [0:2] is the sub-list for field type_name
|
3, // [3:3] is the sub-list for extension extendee
|
||||||
|
0, // [0:3] is the sub-list for field type_name
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() { file_device_proto_init() }
|
func init() { file_device_proto_init() }
|
||||||
@@ -319,7 +440,7 @@ func file_device_proto_init() {
|
|||||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||||
RawDescriptor: unsafe.Slice(unsafe.StringData(file_device_proto_rawDesc), len(file_device_proto_rawDesc)),
|
RawDescriptor: unsafe.Slice(unsafe.StringData(file_device_proto_rawDesc), len(file_device_proto_rawDesc)),
|
||||||
NumEnums: 1,
|
NumEnums: 1,
|
||||||
NumMessages: 3,
|
NumMessages: 5,
|
||||||
NumExtensions: 0,
|
NumExtensions: 0,
|
||||||
NumServices: 0,
|
NumServices: 0,
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -6,27 +6,50 @@ import "google/protobuf/any.proto";
|
|||||||
|
|
||||||
option go_package = "internal/app/service/device/proto";
|
option go_package = "internal/app/service/device/proto";
|
||||||
|
|
||||||
|
// --- 通用指令结构 ---
|
||||||
|
|
||||||
// 指令类型
|
// 指令类型
|
||||||
enum MethodType{
|
enum MethodType {
|
||||||
SWITCH = 0; // 启停
|
SWITCH = 0; // 启停
|
||||||
COLLECT = 1; // 采集
|
COLLECT = 1; // 采集
|
||||||
}
|
}
|
||||||
|
|
||||||
// 指令
|
// 指令 (所有空中数据都会被包装在这里面)
|
||||||
message Instruction{
|
message Instruction {
|
||||||
MethodType method = 1;
|
MethodType method = 1;
|
||||||
google.protobuf.Any data = 2;
|
google.protobuf.Any data = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message Switch{
|
// Switch 指令的载荷
|
||||||
|
message Switch {
|
||||||
string device_action = 1; // 指令
|
string device_action = 1; // 指令
|
||||||
int32 bus_number = 2; // 总线号
|
int32 bus_number = 2; // 总线号
|
||||||
int32 bus_address = 3; // 总线地址
|
int32 bus_address = 3; // 总线地址
|
||||||
int32 relay_channel = 4; // 继电器通道号
|
int32 relay_channel = 4; // 继电器通道号
|
||||||
}
|
}
|
||||||
|
|
||||||
message Collect{
|
|
||||||
int32 bus_number = 1; // 总线号
|
// --- 批量采集相关结构 ---
|
||||||
int32 bus_address = 2; // 总线地址
|
|
||||||
float value = 3; // 采集值
|
// BatchCollectCommand
|
||||||
|
// 用于在平台内部构建一个完整的、包含所有元数据的批量采集任务。
|
||||||
|
// 这个消息本身不会被发送到设备。
|
||||||
|
message BatchCollectCommand {
|
||||||
|
string correlation_id = 1; // 用于关联请求和响应的唯一ID
|
||||||
|
repeated CollectTask tasks = 2; // 采集任务列表
|
||||||
|
}
|
||||||
|
|
||||||
|
// CollectTask
|
||||||
|
// 定义了单个采集任务的“意图”。
|
||||||
|
message CollectTask {
|
||||||
|
string device_action = 1; // 指令
|
||||||
|
int32 bus_number = 2; // 总线号
|
||||||
|
int32 bus_address = 3; // 总线地址
|
||||||
|
}
|
||||||
|
|
||||||
|
// CollectResult
|
||||||
|
// 这是设备响应的、极致精简的数据包。
|
||||||
|
message CollectResult {
|
||||||
|
string correlation_id = 1; // 从下行指令中原样返回的关联ID
|
||||||
|
repeated float values = 2; // 按预定顺序排列的采集值
|
||||||
}
|
}
|
||||||
@@ -1,7 +1,6 @@
|
|||||||
package task
|
package task
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -46,7 +45,7 @@ func (d *DelayTask) parseParameters() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var params DelayTaskParams
|
var params DelayTaskParams
|
||||||
err := json.Unmarshal(d.executionTask.Task.Parameters, ¶ms)
|
err := d.executionTask.Task.ParseParameters(¶ms)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err)
|
d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err)
|
||||||
return fmt.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err)
|
return fmt.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err)
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import (
|
|||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReleaseFeedWeightTaskParams 定义了 ReleaseFeedWeightTask 的参数结构
|
// ReleaseFeedWeightTaskParams 定义了 ReleaseFeedWeightTask 的参数结构
|
||||||
@@ -25,23 +24,28 @@ type ReleaseFeedWeightTask struct {
|
|||||||
sensorDataRepo repository.SensorDataRepository
|
sensorDataRepo repository.SensorDataRepository
|
||||||
claimedLog *models.TaskExecutionLog
|
claimedLog *models.TaskExecutionLog
|
||||||
|
|
||||||
feedPortDevice *models.Device // 下料口基本信息
|
feedPortDevice *models.Device
|
||||||
releaseWeight float64 // 需要释放的重量
|
releaseWeight float64
|
||||||
mixingTankDeviceID uint // 搅拌罐称重传感器ID
|
mixingTankDeviceID uint
|
||||||
|
|
||||||
comm transport.Communicator
|
feedPort device.Service
|
||||||
feedPort *device.GeneralDeviceService // 下料口指令下发器
|
|
||||||
|
|
||||||
logger *logs.Logger
|
logger *logs.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例
|
// NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例
|
||||||
func NewReleaseFeedWeightTask(claimedLog *models.TaskExecutionLog, deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator, logger *logs.Logger) Task {
|
func NewReleaseFeedWeightTask(
|
||||||
|
claimedLog *models.TaskExecutionLog,
|
||||||
|
sensorDataRepo repository.SensorDataRepository,
|
||||||
|
deviceRepo repository.DeviceRepository,
|
||||||
|
deviceService device.Service,
|
||||||
|
logger *logs.Logger,
|
||||||
|
) Task {
|
||||||
return &ReleaseFeedWeightTask{
|
return &ReleaseFeedWeightTask{
|
||||||
claimedLog: claimedLog,
|
claimedLog: claimedLog,
|
||||||
deviceRepo: deviceRepo,
|
deviceRepo: deviceRepo,
|
||||||
sensorDataRepo: sensorDataRepo,
|
sensorDataRepo: sensorDataRepo,
|
||||||
comm: comm,
|
feedPort: deviceService, // 直接注入
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -118,7 +122,7 @@ func (r *ReleaseFeedWeightTask) parseParameters() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var params ReleaseFeedWeightTaskParams
|
var params ReleaseFeedWeightTaskParams
|
||||||
err := json.Unmarshal(r.claimedLog.Task.Parameters, ¶ms)
|
err := r.claimedLog.Task.ParseParameters(¶ms)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err)
|
r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err)
|
||||||
return fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err)
|
return fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err)
|
||||||
@@ -140,7 +144,6 @@ func (r *ReleaseFeedWeightTask) parseParameters() error {
|
|||||||
|
|
||||||
r.releaseWeight = params.ReleaseWeight
|
r.releaseWeight = params.ReleaseWeight
|
||||||
r.mixingTankDeviceID = params.MixingTankDeviceID
|
r.mixingTankDeviceID = params.MixingTankDeviceID
|
||||||
r.feedPort = device.NewGeneralDeviceService(r.deviceRepo, r.logger, r.comm)
|
|
||||||
r.feedPortDevice, err = r.deviceRepo.FindByID(params.FeedPortDeviceID)
|
r.feedPortDevice, err = r.deviceRepo.FindByID(params.FeedPortDeviceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err)
|
r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err)
|
||||||
|
|||||||
@@ -1,15 +1,14 @@
|
|||||||
package task
|
package task
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
|
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
)
|
)
|
||||||
@@ -84,9 +83,9 @@ type Scheduler struct {
|
|||||||
deviceRepo repository.DeviceRepository
|
deviceRepo repository.DeviceRepository
|
||||||
sensorDataRepo repository.SensorDataRepository
|
sensorDataRepo repository.SensorDataRepository
|
||||||
planRepo repository.PlanRepository
|
planRepo repository.PlanRepository
|
||||||
comm transport.Communicator
|
|
||||||
analysisPlanTaskManager *AnalysisPlanTaskManager
|
analysisPlanTaskManager *AnalysisPlanTaskManager
|
||||||
progressTracker *ProgressTracker
|
progressTracker *ProgressTracker
|
||||||
|
deviceService device.Service
|
||||||
|
|
||||||
pool *ants.Pool // 使用 ants 协程池来管理并发
|
pool *ants.Pool // 使用 ants 协程池来管理并发
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
@@ -100,20 +99,21 @@ func NewScheduler(
|
|||||||
deviceRepo repository.DeviceRepository,
|
deviceRepo repository.DeviceRepository,
|
||||||
sensorDataRepo repository.SensorDataRepository,
|
sensorDataRepo repository.SensorDataRepository,
|
||||||
planRepo repository.PlanRepository,
|
planRepo repository.PlanRepository,
|
||||||
comm transport.Communicator,
|
|
||||||
analysisPlanTaskManager *AnalysisPlanTaskManager,
|
analysisPlanTaskManager *AnalysisPlanTaskManager,
|
||||||
logger *logs.Logger,
|
logger *logs.Logger,
|
||||||
|
deviceService device.Service,
|
||||||
interval time.Duration,
|
interval time.Duration,
|
||||||
numWorkers int) *Scheduler {
|
numWorkers int,
|
||||||
|
) *Scheduler {
|
||||||
return &Scheduler{
|
return &Scheduler{
|
||||||
pendingTaskRepo: pendingTaskRepo,
|
pendingTaskRepo: pendingTaskRepo,
|
||||||
executionLogRepo: executionLogRepo,
|
executionLogRepo: executionLogRepo,
|
||||||
deviceRepo: deviceRepo,
|
deviceRepo: deviceRepo,
|
||||||
sensorDataRepo: sensorDataRepo,
|
sensorDataRepo: sensorDataRepo,
|
||||||
planRepo: planRepo,
|
planRepo: planRepo,
|
||||||
comm: comm,
|
|
||||||
analysisPlanTaskManager: analysisPlanTaskManager,
|
analysisPlanTaskManager: analysisPlanTaskManager,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
deviceService: deviceService,
|
||||||
pollingInterval: interval,
|
pollingInterval: interval,
|
||||||
workers: numWorkers,
|
workers: numWorkers,
|
||||||
progressTracker: NewProgressTracker(),
|
progressTracker: NewProgressTracker(),
|
||||||
@@ -289,7 +289,7 @@ func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task {
|
|||||||
case models.TaskTypeWaiting:
|
case models.TaskTypeWaiting:
|
||||||
return NewDelayTask(s.logger, claimedLog)
|
return NewDelayTask(s.logger, claimedLog)
|
||||||
case models.TaskTypeReleaseFeedWeight:
|
case models.TaskTypeReleaseFeedWeight:
|
||||||
return NewReleaseFeedWeightTask(claimedLog, s.deviceRepo, s.sensorDataRepo, s.comm, s.logger)
|
return NewReleaseFeedWeightTask(claimedLog, s.sensorDataRepo, s.deviceRepo, s.deviceService, s.logger)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型
|
// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型
|
||||||
@@ -304,7 +304,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
|
|||||||
var params struct {
|
var params struct {
|
||||||
PlanID uint `json:"plan_id"`
|
PlanID uint `json:"plan_id"`
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal(claimedLog.Task.Parameters, ¶ms); err != nil {
|
if err := claimedLog.Task.ParseParameters(¶ms); err != nil {
|
||||||
s.logger.Errorf("解析任务参数中的计划ID失败,日志ID: %d, 错误: %v", claimedLog.ID, err)
|
s.logger.Errorf("解析任务参数中的计划ID失败,日志ID: %d, 错误: %v", claimedLog.ID, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,9 +7,11 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||||
|
gproto "google.golang.org/protobuf/proto"
|
||||||
"gorm.io/datatypes"
|
"gorm.io/datatypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -27,23 +29,27 @@ const (
|
|||||||
|
|
||||||
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
|
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
|
||||||
type ChirpStackListener struct {
|
type ChirpStackListener struct {
|
||||||
logger *logs.Logger
|
logger *logs.Logger
|
||||||
sensorDataRepo repository.SensorDataRepository
|
sensorDataRepo repository.SensorDataRepository
|
||||||
deviceRepo repository.DeviceRepository
|
deviceRepo repository.DeviceRepository
|
||||||
deviceCommandLogRepo repository.DeviceCommandLogRepository
|
deviceCommandLogRepo repository.DeviceCommandLogRepository
|
||||||
|
pendingCollectionRepo repository.PendingCollectionRepository // 新增
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewChirpStackListener 创建一个新的 ChirpStackListener 实例
|
||||||
func NewChirpStackListener(
|
func NewChirpStackListener(
|
||||||
logger *logs.Logger,
|
logger *logs.Logger,
|
||||||
sensorDataRepo repository.SensorDataRepository,
|
sensorDataRepo repository.SensorDataRepository,
|
||||||
deviceRepo repository.DeviceRepository,
|
deviceRepo repository.DeviceRepository,
|
||||||
deviceCommandLogRepo repository.DeviceCommandLogRepository,
|
deviceCommandLogRepo repository.DeviceCommandLogRepository,
|
||||||
) *ChirpStackListener {
|
pendingCollectionRepo repository.PendingCollectionRepository, // 新增
|
||||||
|
) ListenHandler { // 返回接口类型
|
||||||
return &ChirpStackListener{
|
return &ChirpStackListener{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
sensorDataRepo: sensorDataRepo,
|
sensorDataRepo: sensorDataRepo,
|
||||||
deviceRepo: deviceRepo,
|
deviceRepo: deviceRepo,
|
||||||
deviceCommandLogRepo: deviceCommandLogRepo,
|
deviceCommandLogRepo: deviceCommandLogRepo,
|
||||||
|
pendingCollectionRepo: pendingCollectionRepo, // 新增
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -151,12 +157,20 @@ type GenericSensorReading struct {
|
|||||||
|
|
||||||
// handleUpEvent 处理上行数据事件
|
// handleUpEvent 处理上行数据事件
|
||||||
func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
|
func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
|
||||||
c.logger.Infof("处理 'up' 事件: %+v", event)
|
c.logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui)
|
||||||
|
|
||||||
// 记录信号强度
|
// 1. 查找区域主控设备
|
||||||
// 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。
|
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
|
||||||
// 因此,我们只取第一个 RxInfo 中的信号数据即可。
|
if err != nil {
|
||||||
|
c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.logger.Infof("找到区域主控: %s (ID: %d)", regionalController.Name, regionalController.ID)
|
||||||
|
|
||||||
|
// 2. 记录区域主控的信号强度 (如果存在)
|
||||||
if len(event.RxInfo) > 0 {
|
if len(event.RxInfo) > 0 {
|
||||||
|
// 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。
|
||||||
|
// 因此,我们只取第一个 RxInfo 中的信号数据即可。
|
||||||
rx := event.RxInfo[0] // 取第一个接收到的网关信息
|
rx := event.RxInfo[0] // 取第一个接收到的网关信息
|
||||||
|
|
||||||
// 构建 SignalMetrics 结构体
|
// 构建 SignalMetrics 结构体
|
||||||
@@ -164,73 +178,111 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
|
|||||||
RssiDbm: rx.Rssi,
|
RssiDbm: rx.Rssi,
|
||||||
SnrDb: rx.Snr,
|
SnrDb: rx.Snr,
|
||||||
}
|
}
|
||||||
|
|
||||||
// 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui
|
|
||||||
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// 记录区域主控的信号强度
|
|
||||||
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics)
|
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics)
|
||||||
|
c.logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", regionalController.ID, rx.Rssi, rx.Snr)
|
||||||
} else {
|
} else {
|
||||||
c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui)
|
c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 解析并记录传感器数据 (温度、湿度、重量)
|
// 3. 处理上报的传感器数据
|
||||||
// 假设 event.Data (frmPayload) 是 Base64 编码的 JSON 数组字符串
|
if event.Data == "" {
|
||||||
if event.Data != "" {
|
c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无需记录上行数据。DevEui: %s", event.DeviceInfo.DevEui)
|
||||||
decodedData, err := base64.StdEncoding.DecodeString(event.Data)
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3.1 Base64 解码
|
||||||
|
decodedData, err := base64.StdEncoding.DecodeString(event.Data)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3.2 解析外层 "信封"
|
||||||
|
var instruction proto.Instruction
|
||||||
|
if err := gproto.Unmarshal(decodedData, &instruction); err != nil {
|
||||||
|
c.logger.Errorf("解析上行 Instruction Protobuf 失败: %v, Decoded Data: %x", err, decodedData)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3.3 检查是否是采集响应
|
||||||
|
if instruction.Method != proto.MethodType_COLLECT {
|
||||||
|
c.logger.Infof("收到一个非采集响应的上行指令 (Method: %s),无需处理。", instruction.Method.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.4 解包内层 CollectResult
|
||||||
|
var collectResp proto.CollectResult
|
||||||
|
if err := instruction.Data.UnmarshalTo(&collectResp); err != nil {
|
||||||
|
c.logger.Errorf("解包数据信息失败: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
correlationID := collectResp.CorrelationId
|
||||||
|
c.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values))
|
||||||
|
|
||||||
|
// 3. 根据 CorrelationID 查找待处理请求
|
||||||
|
pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(correlationID)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查状态,防止重复处理
|
||||||
|
if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut {
|
||||||
|
c.logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. 匹配数据并存入数据库
|
||||||
|
deviceIDs := pendingReq.CommandMetadata
|
||||||
|
values := collectResp.Values
|
||||||
|
if len(deviceIDs) != len(values) {
|
||||||
|
c.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID)
|
||||||
|
// TODO 数量不匹配是否全改成失败
|
||||||
|
// 即使数量不匹配,也更新状态为完成,以防止请求永远 pending
|
||||||
|
err = c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data)
|
c.logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var readings []GenericSensorReading
|
for i, deviceID := range deviceIDs {
|
||||||
if err := json.Unmarshal(decodedData, &readings); err != nil {
|
value := values[i]
|
||||||
c.logger.Errorf("解析 'up' 事件的解码后 Data (JSON 数组) 失败: %v, Decoded Data: %s", err, string(decodedData))
|
dev, err := c.deviceRepo.FindByID(deviceID)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// 查找区域主控设备,以便记录其ID
|
|
||||||
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
c.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err)
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, reading := range readings {
|
sensorDataType, ok := models.DeviceSubTypeToSensorDataTypeMap[dev.SubType]
|
||||||
// 根据类型构建具体的传感器数据结构体
|
if !ok {
|
||||||
var sensorData interface{}
|
c.logger.Warnf("设备 %d 的子类型 '%s' 没有对应的传感器数据类型,跳过记录。", dev.ID, dev.SubType)
|
||||||
var sensorDataType models.SensorDataType
|
continue
|
||||||
|
|
||||||
switch reading.Type {
|
|
||||||
case models.SensorDataTypeTemperature: // 使用枚举常量
|
|
||||||
sensorData = models.TemperatureData{
|
|
||||||
TemperatureCelsius: reading.Value,
|
|
||||||
}
|
|
||||||
sensorDataType = models.SensorDataTypeTemperature
|
|
||||||
case models.SensorDataTypeHumidity: // 使用枚举常量
|
|
||||||
sensorData = models.HumidityData{
|
|
||||||
HumidityPercent: reading.Value,
|
|
||||||
}
|
|
||||||
sensorDataType = models.SensorDataTypeHumidity
|
|
||||||
case models.SensorDataTypeWeight: // 使用枚举常量
|
|
||||||
sensorData = models.WeightData{
|
|
||||||
WeightKilograms: reading.Value,
|
|
||||||
}
|
|
||||||
sensorDataType = models.SensorDataTypeWeight
|
|
||||||
default:
|
|
||||||
c.logger.Warnf("处理 'up' 事件时遇到未知传感器类型: %s, Value: %f. 区域主控DevEui: %s, 传感器设备ID: %d",
|
|
||||||
reading.Type, reading.Value, event.DeviceInfo.DevEui, reading.DeviceID)
|
|
||||||
continue // 跳过未知类型
|
|
||||||
}
|
|
||||||
|
|
||||||
// 记录普通设备的传感器数据
|
|
||||||
c.recordSensorData(regionalController.ID, reading.DeviceID, event.Time, sensorDataType, sensorData)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var sensorData interface{}
|
||||||
|
switch sensorDataType {
|
||||||
|
case models.SensorDataTypeTemperature:
|
||||||
|
sensorData = models.TemperatureData{TemperatureCelsius: float64(value)}
|
||||||
|
case models.SensorDataTypeHumidity:
|
||||||
|
sensorData = models.HumidityData{HumidityPercent: float64(value)}
|
||||||
|
case models.SensorDataTypeWeight:
|
||||||
|
sensorData = models.WeightData{WeightKilograms: float64(value)}
|
||||||
|
default:
|
||||||
|
c.logger.Warnf("未处理的传感器数据类型 '%s' (设备ID: %d)", sensorDataType, dev.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c.recordSensorData(pendingReq.DeviceID, dev.ID, event.Time, sensorDataType, sensorData)
|
||||||
|
c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 值=%.2f", dev.ID, sensorDataType, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. 更新请求状态为“已完成”
|
||||||
|
if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time); err != nil {
|
||||||
|
c.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err)
|
||||||
} else {
|
} else {
|
||||||
c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无法记录传感器数据。DevEui: %s", event.DeviceInfo.DevEui)
|
c.logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/task"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/task"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport"
|
||||||
@@ -31,6 +32,7 @@ type Application struct {
|
|||||||
planRepo repository.PlanRepository
|
planRepo repository.PlanRepository
|
||||||
pendingTaskRepo repository.PendingTaskRepository
|
pendingTaskRepo repository.PendingTaskRepository
|
||||||
executionLogRepo repository.ExecutionLogRepository
|
executionLogRepo repository.ExecutionLogRepository
|
||||||
|
pendingCollectionRepo repository.PendingCollectionRepository
|
||||||
analysisPlanTaskManager *task.AnalysisPlanTaskManager
|
analysisPlanTaskManager *task.AnalysisPlanTaskManager
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -76,32 +78,64 @@ func NewApplication(configPath string) (*Application, error) {
|
|||||||
// 初始化命令下发历史仓库
|
// 初始化命令下发历史仓库
|
||||||
deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB())
|
deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB())
|
||||||
|
|
||||||
|
// 初始化待采集请求仓库
|
||||||
|
pendingCollectionRepo := repository.NewGormPendingCollectionRepository(storage.GetDB())
|
||||||
|
|
||||||
// 初始化设备上行监听器
|
// 初始化设备上行监听器
|
||||||
listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo)
|
listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo, pendingCollectionRepo)
|
||||||
|
|
||||||
// 初始化计划触发器管理器
|
// 初始化计划触发器管理器
|
||||||
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
|
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
|
||||||
|
|
||||||
// 初始化设备通信器
|
// 初始化设备通信器 (纯粹的通信客户端)
|
||||||
comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger, deviceCommandLogRepo, deviceRepo)
|
comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger)
|
||||||
|
|
||||||
|
// 初始化通用设备服务
|
||||||
|
generalDeviceService := device.NewGeneralDeviceService(
|
||||||
|
deviceRepo,
|
||||||
|
deviceCommandLogRepo,
|
||||||
|
pendingCollectionRepo,
|
||||||
|
logger,
|
||||||
|
comm,
|
||||||
|
)
|
||||||
|
|
||||||
// 初始化任务执行器
|
// 初始化任务执行器
|
||||||
executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, deviceRepo, sensorDataRepo, planRepo, comm, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers)
|
executor := task.NewScheduler(
|
||||||
|
pendingTaskRepo,
|
||||||
|
executionLogRepo,
|
||||||
|
deviceRepo,
|
||||||
|
sensorDataRepo,
|
||||||
|
planRepo,
|
||||||
|
analysisPlanTaskManager,
|
||||||
|
logger,
|
||||||
|
generalDeviceService,
|
||||||
|
time.Duration(cfg.Task.Interval)*time.Second,
|
||||||
|
cfg.Task.NumWorkers,
|
||||||
|
)
|
||||||
|
|
||||||
// 初始化 API 服务器
|
// 初始化 API 服务器
|
||||||
apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager)
|
apiServer := api.NewAPI(
|
||||||
|
cfg.Server,
|
||||||
|
logger,
|
||||||
|
userRepo,
|
||||||
|
deviceRepo,
|
||||||
|
planRepo,
|
||||||
|
tokenService,
|
||||||
|
listenHandler,
|
||||||
|
analysisPlanTaskManager,
|
||||||
|
)
|
||||||
|
|
||||||
// 组装 Application 对象
|
// 组装 Application 对象
|
||||||
app := &Application{
|
app := &Application{
|
||||||
Config: cfg,
|
Config: cfg,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
Storage: storage,
|
Storage: storage,
|
||||||
Executor: executor,
|
Executor: executor,
|
||||||
API: apiServer,
|
API: apiServer,
|
||||||
// 填充新增的字段
|
|
||||||
planRepo: planRepo,
|
planRepo: planRepo,
|
||||||
pendingTaskRepo: pendingTaskRepo,
|
pendingTaskRepo: pendingTaskRepo,
|
||||||
executionLogRepo: executionLogRepo,
|
executionLogRepo: executionLogRepo,
|
||||||
|
pendingCollectionRepo: pendingCollectionRepo,
|
||||||
analysisPlanTaskManager: analysisPlanTaskManager,
|
analysisPlanTaskManager: analysisPlanTaskManager,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,7 +146,13 @@ func NewApplication(configPath string) (*Application, error) {
|
|||||||
func (app *Application) Start() error {
|
func (app *Application) Start() error {
|
||||||
app.Logger.Info("应用启动中...")
|
app.Logger.Info("应用启动中...")
|
||||||
|
|
||||||
// --- 新增逻辑:初始化待执行任务列表 ---
|
// --- 清理待采集任务 ---
|
||||||
|
if err := app.initializePendingCollections(); err != nil {
|
||||||
|
// 这是一个非致命错误,记录它,但应用应继续启动
|
||||||
|
app.Logger.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- 初始化待执行任务列表 ---
|
||||||
if err := app.initializePendingTasks(
|
if err := app.initializePendingTasks(
|
||||||
app.planRepo, // 传入 planRepo
|
app.planRepo, // 传入 planRepo
|
||||||
app.pendingTaskRepo, // 传入 pendingTaskRepo
|
app.pendingTaskRepo, // 传入 pendingTaskRepo
|
||||||
@@ -160,6 +200,25 @@ func (app *Application) Stop() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。
|
||||||
|
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
|
||||||
|
// 这保证了系统在每次启动时都处于一个干净、确定的状态。
|
||||||
|
func (app *Application) initializePendingCollections() error {
|
||||||
|
app.Logger.Info("开始清理所有未完成的采集请求...")
|
||||||
|
|
||||||
|
// 直接将所有 'pending' 状态的请求更新为 'timed_out'。
|
||||||
|
count, err := app.pendingCollectionRepo.MarkAllPendingAsTimedOut()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("清理未完成的采集请求失败: %v", err)
|
||||||
|
} else if count > 0 {
|
||||||
|
app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count)
|
||||||
|
} else {
|
||||||
|
app.Logger.Info("没有需要清理的采集请求。")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
|
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
|
||||||
func (app *Application) initializePendingTasks(
|
func (app *Application) initializePendingTasks(
|
||||||
planRepo repository.PlanRepository,
|
planRepo repository.PlanRepository,
|
||||||
|
|||||||
@@ -117,10 +117,11 @@ type HeartbeatConfig struct {
|
|||||||
|
|
||||||
// ChirpStackConfig 代表 ChirpStack API 配置
|
// ChirpStackConfig 代表 ChirpStack API 配置
|
||||||
type ChirpStackConfig struct {
|
type ChirpStackConfig struct {
|
||||||
APIHost string `yaml:"api_host"`
|
APIHost string `yaml:"api_host"`
|
||||||
APIToken string `yaml:"api_token"`
|
APIToken string `yaml:"api_token"`
|
||||||
FPort int `yaml:"fport"`
|
FPort int `yaml:"fport"`
|
||||||
APITimeout int `yaml:"api_timeout"`
|
APITimeout int `yaml:"api_timeout"`
|
||||||
|
CollectionRequestTimeout int `yaml:"collection_request_timeout"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// TaskConfig 代表任务调度配置
|
// TaskConfig 代表任务调度配置
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ import (
|
|||||||
|
|
||||||
"gorm.io/datatypes"
|
"gorm.io/datatypes"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DeviceType 定义了设备的高级类别
|
// DeviceType 定义了设备的高级类别
|
||||||
@@ -30,6 +32,8 @@ const (
|
|||||||
SubTypeSensorHumidity DeviceSubType = "humidity"
|
SubTypeSensorHumidity DeviceSubType = "humidity"
|
||||||
// SubTypeSensorAmmonia 氨气传感器
|
// SubTypeSensorAmmonia 氨气传感器
|
||||||
SubTypeSensorAmmonia DeviceSubType = "ammonia"
|
SubTypeSensorAmmonia DeviceSubType = "ammonia"
|
||||||
|
// SubTypeSensorWeight 电子秤
|
||||||
|
SubTypeSensorWeight DeviceSubType = "weight"
|
||||||
|
|
||||||
// SubTypeValveFeed 下料阀门
|
// SubTypeValveFeed 下料阀门
|
||||||
SubTypeValveFeed DeviceSubType = "feed_valve"
|
SubTypeValveFeed DeviceSubType = "feed_valve"
|
||||||
@@ -85,6 +89,10 @@ type Device struct {
|
|||||||
// Location 描述了设备的物理安装位置,例如 "1号猪舍东侧",方便运维。建立索引以优化按位置查询。
|
// Location 描述了设备的物理安装位置,例如 "1号猪舍东侧",方便运维。建立索引以优化按位置查询。
|
||||||
Location string `gorm:"index" json:"location"`
|
Location string `gorm:"index" json:"location"`
|
||||||
|
|
||||||
|
// Command 存储了与设备交互所需的具体指令。
|
||||||
|
// 例如,对于传感器,这里存储 Modbus 采集指令;对于开关和区域主控,这里可以为空。
|
||||||
|
Command string `gorm:"type:varchar(255)" json:"command"`
|
||||||
|
|
||||||
// Properties 用于存储特定类型设备的独有属性,采用JSON格式。
|
// Properties 用于存储特定类型设备的独有属性,采用JSON格式。
|
||||||
// 建议在应用层为不同子类型的设备定义专用的属性结构体(如 LoraProperties, BusProperties),以保证数据一致性。
|
// 建议在应用层为不同子类型的设备定义专用的属性结构体(如 LoraProperties, BusProperties),以保证数据一致性。
|
||||||
Properties datatypes.JSON `json:"properties"`
|
Properties datatypes.JSON `json:"properties"`
|
||||||
@@ -110,28 +118,52 @@ func (d *Device) ParseProperties(v interface{}) error {
|
|||||||
|
|
||||||
// SelfCheck 进行参数自检, 返回检测结果
|
// SelfCheck 进行参数自检, 返回检测结果
|
||||||
// 方法会根据自身类型进行参数检查, 参数不全时返回false
|
// 方法会根据自身类型进行参数检查, 参数不全时返回false
|
||||||
// TODO 没写单测
|
|
||||||
func (d *Device) SelfCheck() bool {
|
func (d *Device) SelfCheck() bool {
|
||||||
|
// 使用清晰的 switch 结构,确保所有情况都被覆盖
|
||||||
properties := make(map[string]interface{})
|
switch d.Type {
|
||||||
if err := d.ParseProperties(&properties); err != nil {
|
case DeviceTypeAreaController:
|
||||||
return false
|
props := make(map[string]interface{})
|
||||||
}
|
if err := d.ParseProperties(&props); err != nil {
|
||||||
|
|
||||||
has := func(key string) bool {
|
|
||||||
_, ok := properties[key]
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
switch d.SubType {
|
|
||||||
case SubTypeFan:
|
|
||||||
if !has(BusNumber) || !has(BusAddress) || !has(RelayChannel) {
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
_, ok := props[LoRaAddress].(string)
|
||||||
|
return ok
|
||||||
|
|
||||||
|
case DeviceTypeDevice:
|
||||||
|
// 所有普通设备都必须有父级
|
||||||
|
if d.ParentID == nil || *d.ParentID == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
props := make(map[string]interface{})
|
||||||
|
if err := d.ParseProperties(&props); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// hasPureNumeric 检查一个key是否存在于map中,并且其值是纯数字(整数或可解析为整数的字符串)
|
||||||
|
hasPureNumeric := func(key string) bool {
|
||||||
|
val, ok := props[key]
|
||||||
|
if !ok {
|
||||||
|
return false // Key不存在
|
||||||
|
}
|
||||||
|
return utils.IsPureNumeric(val)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 根据子类型进行具体校验
|
||||||
|
switch d.SubType {
|
||||||
|
// 所有传感器类型都必须有 Command 和总线信息,且总线信息为纯数字
|
||||||
|
case SubTypeSensorTemp, SubTypeSensorHumidity, SubTypeSensorWeight, SubTypeSensorAmmonia:
|
||||||
|
return d.Command != "" && hasPureNumeric(BusNumber) && hasPureNumeric(BusAddress)
|
||||||
|
// 所有开关类型都必须有继电器和总线信息,且都为纯数字
|
||||||
|
case SubTypeFan, SubTypeWaterCurtain, SubTypeValveFeed:
|
||||||
|
return hasPureNumeric(BusNumber) && hasPureNumeric(BusAddress) && hasPureNumeric(RelayChannel)
|
||||||
|
// 如果是未知的子类型,或者没有子类型,则认为自检失败
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果设备类型不是已知的任何一种,则自检失败
|
||||||
default:
|
default:
|
||||||
// 不应该有类型未知的设备
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,32 +0,0 @@
|
|||||||
package models
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// DeviceCommandLog 记录下行任务的下发情况和设备确认状态
|
|
||||||
type DeviceCommandLog struct {
|
|
||||||
// MessageID 是下行消息的唯一标识符。
|
|
||||||
// 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。
|
|
||||||
MessageID string `gorm:"primaryKey" json:"message_id"`
|
|
||||||
|
|
||||||
// DeviceID 是接收此下行任务的设备的ID。
|
|
||||||
// 对于 LoRaWAN,这通常是区域主控设备的ID。
|
|
||||||
DeviceID uint `gorm:"not null;index" json:"device_id"`
|
|
||||||
|
|
||||||
// SentAt 记录下行任务最初发送的时间。
|
|
||||||
SentAt time.Time `gorm:"not null" json:"sent_at"`
|
|
||||||
|
|
||||||
// AcknowledgedAt 记录设备确认收到下行消息的时间。
|
|
||||||
// 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。
|
|
||||||
AcknowledgedAt *time.Time `json:"acknowledged_at"`
|
|
||||||
|
|
||||||
// ReceivedSuccess 表示设备是否成功接收到下行消息。
|
|
||||||
// true 表示设备已确认收到,false 表示设备未确认收到或下发失败。
|
|
||||||
ReceivedSuccess bool `gorm:"not null" json:"received_success"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TableName 自定义 GORM 使用的数据库表名
|
|
||||||
func (DeviceCommandLog) TableName() string {
|
|
||||||
return "device_command_log"
|
|
||||||
}
|
|
||||||
@@ -73,3 +73,70 @@ func (log *TaskExecutionLog) AfterFind(tx *gorm.DB) (err error) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- 指令与采集 ---
|
||||||
|
|
||||||
|
// PendingCollectionStatus 定义了待采集请求的状态
|
||||||
|
type PendingCollectionStatus string
|
||||||
|
|
||||||
|
const (
|
||||||
|
PendingStatusPending PendingCollectionStatus = "pending" // 请求已发送,等待设备响应
|
||||||
|
PendingStatusFulfilled PendingCollectionStatus = "fulfilled" // 已收到设备响应并成功处理
|
||||||
|
PendingStatusTimedOut PendingCollectionStatus = "timed_out" // 请求超时,未收到设备响应
|
||||||
|
)
|
||||||
|
|
||||||
|
// DeviceCommandLog 记录所有“发后即忘”的下行指令日志。
|
||||||
|
// 这张表主要用于追踪指令是否被网关成功发送 (ack)。
|
||||||
|
type DeviceCommandLog struct {
|
||||||
|
// MessageID 是下行消息的唯一标识符。
|
||||||
|
// 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。
|
||||||
|
MessageID string `gorm:"primaryKey" json:"message_id"`
|
||||||
|
|
||||||
|
// DeviceID 是接收此下行任务的设备的ID。
|
||||||
|
// 对于 LoRaWAN,这通常是区域主控设备的ID。
|
||||||
|
DeviceID uint `gorm:"not null;index" json:"device_id"`
|
||||||
|
|
||||||
|
// SentAt 记录下行任务最初发送的时间。
|
||||||
|
SentAt time.Time `gorm:"not null" json:"sent_at"`
|
||||||
|
|
||||||
|
// AcknowledgedAt 记录设备确认收到下行消息的时间。
|
||||||
|
// 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。
|
||||||
|
AcknowledgedAt *time.Time `json:"acknowledged_at"`
|
||||||
|
|
||||||
|
// ReceivedSuccess 表示设备是否成功接收到下行消息。
|
||||||
|
// true 表示设备已确认收到,false 表示设备未确认收到或下发失败。
|
||||||
|
ReceivedSuccess bool `gorm:"not null" json:"received_success"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TableName 自定义 GORM 使用的数据库表名
|
||||||
|
func (DeviceCommandLog) TableName() string {
|
||||||
|
return "device_command_log"
|
||||||
|
}
|
||||||
|
|
||||||
|
// PendingCollection 记录所有需要设备响应的“待采集请求”。
|
||||||
|
// 这是一张状态机表,追踪从请求发送到收到响应的整个生命周期。
|
||||||
|
type PendingCollection struct {
|
||||||
|
// CorrelationID 是由平台生成的、在请求和响应之间全局唯一的关联ID,作为主键。
|
||||||
|
CorrelationID string `gorm:"primaryKey"`
|
||||||
|
|
||||||
|
// DeviceID 是接收此任务的设备ID
|
||||||
|
// 对于 LoRaWAN,这通常是区域主控设备的ID。
|
||||||
|
DeviceID uint `gorm:"index"`
|
||||||
|
|
||||||
|
// CommandMetadata 存储了此次采集任务对应的设备ID列表,顺序与设备响应值的顺序一致。
|
||||||
|
CommandMetadata UintArray `gorm:"type:bigint[]"`
|
||||||
|
|
||||||
|
// Status 是该请求的当前状态,用于状态机管理和超时处理。
|
||||||
|
Status PendingCollectionStatus `gorm:"index"`
|
||||||
|
|
||||||
|
// FulfilledAt 是收到设备响应并成功处理的时间。使用指针以允许 NULL 值。
|
||||||
|
FulfilledAt *time.Time
|
||||||
|
|
||||||
|
// CreatedAt 是 GORM 的标准字段,记录请求创建时间。
|
||||||
|
CreatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// TableName 自定义 GORM 使用的数据库表名
|
||||||
|
func (PendingCollection) TableName() string {
|
||||||
|
return "pending_collections"
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,5 +1,13 @@
|
|||||||
package models
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql/driver"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
// GetAllModels 返回一个包含所有数据库模型实例的切片。
|
// GetAllModels 返回一个包含所有数据库模型实例的切片。
|
||||||
// 这个函数用于在数据库初始化时自动迁移所有的表结构。
|
// 这个函数用于在数据库初始化时自动迁移所有的表结构。
|
||||||
func GetAllModels() []interface{} {
|
func GetAllModels() []interface{} {
|
||||||
@@ -14,5 +22,70 @@ func GetAllModels() []interface{} {
|
|||||||
&PendingTask{},
|
&PendingTask{},
|
||||||
&SensorData{},
|
&SensorData{},
|
||||||
&DeviceCommandLog{},
|
&DeviceCommandLog{},
|
||||||
|
&PendingCollection{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UintArray 是一个自定义类型,代表 uint 的切片。
|
||||||
|
// 它实现了 gorm.Scanner 和 driver.Valuer 接口,
|
||||||
|
// 以便能与数据库的 bigint[] 类型进行原生映射。
|
||||||
|
type UintArray []uint
|
||||||
|
|
||||||
|
// Value 实现了 driver.Valuer 接口。
|
||||||
|
// 它告诉 GORM 如何将 UintArray ([]) 转换为数据库能够理解的格式。
|
||||||
|
func (a UintArray) Value() (driver.Value, error) {
|
||||||
|
if a == nil {
|
||||||
|
return "{}", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var b strings.Builder
|
||||||
|
b.WriteString("{")
|
||||||
|
for i, v := range a {
|
||||||
|
if i > 0 {
|
||||||
|
b.WriteString(",")
|
||||||
|
}
|
||||||
|
b.WriteString(strconv.FormatUint(uint64(v), 10))
|
||||||
|
}
|
||||||
|
b.WriteString("}")
|
||||||
|
return b.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scan 实现了 gorm.Scanner 接口。
|
||||||
|
// 它告诉 GORM 如何将从数据库读取的数据转换为我们的 UintArray ([])。
|
||||||
|
func (a *UintArray) Scan(src interface{}) error {
|
||||||
|
if src == nil {
|
||||||
|
*a = nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var srcStr string
|
||||||
|
switch v := src.(type) {
|
||||||
|
case []byte:
|
||||||
|
srcStr = string(v)
|
||||||
|
case string:
|
||||||
|
srcStr = v
|
||||||
|
default:
|
||||||
|
return errors.New("无法扫描非字符串或字节类型的源到 UintArray")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 去掉花括号
|
||||||
|
srcStr = strings.Trim(srcStr, "{}")
|
||||||
|
if srcStr == "" {
|
||||||
|
*a = []uint{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 按逗号分割
|
||||||
|
parts := strings.Split(srcStr, ",")
|
||||||
|
arr := make([]uint, len(parts))
|
||||||
|
for i, p := range parts {
|
||||||
|
val, err := strconv.ParseUint(p, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("解析 UintArray 元素失败: %w", err)
|
||||||
|
}
|
||||||
|
arr[i] = uint(val)
|
||||||
|
}
|
||||||
|
|
||||||
|
*a = arr
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package models
|
package models
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
@@ -172,3 +174,16 @@ type Task struct {
|
|||||||
func (Task) TableName() string {
|
func (Task) TableName() string {
|
||||||
return "tasks"
|
return "tasks"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParseParameters 解析 JSON 属性到一个具体的结构体中。
|
||||||
|
// 调用方需要传入一个指向目标结构体实例的指针。
|
||||||
|
// 示例:
|
||||||
|
//
|
||||||
|
// var param LoraParameters
|
||||||
|
// if err := task.ParseParameters(¶m); err != nil { ... }
|
||||||
|
func (t Task) ParseParameters(v interface{}) error {
|
||||||
|
if t.Parameters == nil {
|
||||||
|
return errors.New("设备属性为空,无法解析")
|
||||||
|
}
|
||||||
|
return json.Unmarshal(t.Parameters, v)
|
||||||
|
}
|
||||||
|
|||||||
@@ -17,6 +17,14 @@ const (
|
|||||||
SensorDataTypeWeight SensorDataType = "weight" // 重量
|
SensorDataTypeWeight SensorDataType = "weight" // 重量
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DeviceSubTypeToSensorDataTypeMap 定义了设备子类型到其产生的传感器数据类型的静态映射.
|
||||||
|
// 这个公开的 map 是连接设备定义和数据记录的桥梁, 供其他包直接查询.
|
||||||
|
var DeviceSubTypeToSensorDataTypeMap = map[DeviceSubType]SensorDataType{
|
||||||
|
SubTypeSensorTemp: SensorDataTypeTemperature,
|
||||||
|
SubTypeSensorHumidity: SensorDataTypeHumidity,
|
||||||
|
SubTypeSensorWeight: SensorDataTypeWeight,
|
||||||
|
}
|
||||||
|
|
||||||
// SignalMetrics 存储信号强度数据
|
// SignalMetrics 存储信号强度数据
|
||||||
type SignalMetrics struct {
|
type SignalMetrics struct {
|
||||||
RssiDbm int `json:"rssi_dbm"` // 绝对信号强度(dBm),受距离、障碍物影响
|
RssiDbm int `json:"rssi_dbm"` // 绝对信号强度(dBm),受距离、障碍物影响
|
||||||
|
|||||||
@@ -35,6 +35,9 @@ type DeviceRepository interface {
|
|||||||
|
|
||||||
// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 (新增)
|
// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 (新增)
|
||||||
FindByDevEui(devEui string) (*models.Device, error)
|
FindByDevEui(devEui string) (*models.Device, error)
|
||||||
|
|
||||||
|
// FindByParentAndPhysicalAddress 根据父设备ID和物理地址(总线号、总线地址)查找设备
|
||||||
|
FindByParentAndPhysicalAddress(parentID uint, busNumber int32, busAddress int32) (*models.Device, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// gormDeviceRepository 是 DeviceRepository 的 GORM 实现
|
// gormDeviceRepository 是 DeviceRepository 的 GORM 实现
|
||||||
@@ -121,3 +124,18 @@ func (r *gormDeviceRepository) FindByDevEui(devEui string) (*models.Device, erro
|
|||||||
}
|
}
|
||||||
return &device, nil
|
return &device, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindByParentAndPhysicalAddress 根据父设备ID和物理地址(总线号、总线地址)查找设备
|
||||||
|
func (r *gormDeviceRepository) FindByParentAndPhysicalAddress(parentID uint, busNumber int32, busAddress int32) (*models.Device, error) {
|
||||||
|
var device models.Device
|
||||||
|
// PostgreSQL 使用 ->> 操作符来查询 JSONB 字段的文本值
|
||||||
|
err := r.db.Where("parent_id = ?", parentID).
|
||||||
|
Where("properties->>'bus_number' = ?", strconv.Itoa(int(busNumber))).
|
||||||
|
Where("properties->>'bus_address' = ?", strconv.Itoa(int(busAddress))).
|
||||||
|
First(&device).Error
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("根据父设备ID %d 和物理地址 (总线号: %d, 总线地址: %d) 查找设备失败: %w", parentID, busNumber, busAddress, err)
|
||||||
|
}
|
||||||
|
return &device, nil
|
||||||
|
}
|
||||||
|
|||||||
67
internal/infra/repository/pending_collection_repository.go
Normal file
67
internal/infra/repository/pending_collection_repository.go
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PendingCollectionRepository 定义了与待采集请求相关的数据库操作接口。
|
||||||
|
type PendingCollectionRepository interface {
|
||||||
|
// Create 创建一个新的待采集请求。
|
||||||
|
Create(req *models.PendingCollection) error
|
||||||
|
|
||||||
|
// FindByCorrelationID 根据关联ID查找一个待采集请求。
|
||||||
|
FindByCorrelationID(correlationID string) (*models.PendingCollection, error)
|
||||||
|
|
||||||
|
// UpdateStatusToFulfilled 将指定关联ID的请求状态更新为“已完成”。
|
||||||
|
UpdateStatusToFulfilled(correlationID string, fulfilledAt time.Time) error
|
||||||
|
|
||||||
|
// MarkAllPendingAsTimedOut 将所有“待处理”请求更新为“已超时”。
|
||||||
|
MarkAllPendingAsTimedOut() (int64, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// gormPendingCollectionRepository 是 PendingCollectionRepository 的 GORM 实现。
|
||||||
|
type gormPendingCollectionRepository struct {
|
||||||
|
db *gorm.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewGormPendingCollectionRepository 创建一个新的 PendingCollectionRepository GORM 实现实例。
|
||||||
|
func NewGormPendingCollectionRepository(db *gorm.DB) PendingCollectionRepository {
|
||||||
|
return &gormPendingCollectionRepository{db: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create 创建一个新的待采集请求。
|
||||||
|
func (r *gormPendingCollectionRepository) Create(req *models.PendingCollection) error {
|
||||||
|
return r.db.Create(req).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindByCorrelationID 根据关联ID查找一个待采集请求。
|
||||||
|
func (r *gormPendingCollectionRepository) FindByCorrelationID(correlationID string) (*models.PendingCollection, error) {
|
||||||
|
var req models.PendingCollection
|
||||||
|
if err := r.db.First(&req, "correlation_id = ?", correlationID).Error; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateStatusToFulfilled 将指定关联ID的请求状态更新为“已完成”。
|
||||||
|
func (r *gormPendingCollectionRepository) UpdateStatusToFulfilled(correlationID string, fulfilledAt time.Time) error {
|
||||||
|
return r.db.Model(&models.PendingCollection{}).
|
||||||
|
Where("correlation_id = ?", correlationID).
|
||||||
|
Updates(map[string]interface{}{
|
||||||
|
"status": models.PendingStatusFulfilled,
|
||||||
|
"fulfilled_at": &fulfilledAt,
|
||||||
|
}).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarkAllPendingAsTimedOut 将所有状态为 'pending' 的记录更新为 'timed_out'。
|
||||||
|
// 返回被更新的记录数量和错误。
|
||||||
|
func (r *gormPendingCollectionRepository) MarkAllPendingAsTimedOut() (int64, error) {
|
||||||
|
result := r.db.Model(&models.PendingCollection{}).
|
||||||
|
Where("status = ?", models.PendingStatusPending).
|
||||||
|
Update("status", models.PendingStatusTimedOut)
|
||||||
|
|
||||||
|
return result.RowsAffected, result.Error
|
||||||
|
}
|
||||||
@@ -1,12 +1,12 @@
|
|||||||
package lora
|
package lora
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service"
|
||||||
"github.com/go-openapi/runtime"
|
"github.com/go-openapi/runtime"
|
||||||
httptransport "github.com/go-openapi/runtime/client"
|
httptransport "github.com/go-openapi/runtime/client"
|
||||||
@@ -20,19 +20,13 @@ type ChirpStackTransport struct {
|
|||||||
client *client.ChirpStackRESTAPI
|
client *client.ChirpStackRESTAPI
|
||||||
authInfo runtime.ClientAuthInfoWriter
|
authInfo runtime.ClientAuthInfoWriter
|
||||||
config config.ChirpStackConfig
|
config config.ChirpStackConfig
|
||||||
|
logger *logs.Logger
|
||||||
deviceCommandLogRepo repository.DeviceCommandLogRepository
|
|
||||||
deviceRepo repository.DeviceRepository
|
|
||||||
|
|
||||||
logger *logs.Logger
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。
|
// NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。
|
||||||
func NewChirpStackTransport(
|
func NewChirpStackTransport(
|
||||||
config config.ChirpStackConfig,
|
config config.ChirpStackConfig,
|
||||||
logger *logs.Logger,
|
logger *logs.Logger,
|
||||||
deviceCommandLogRepo repository.DeviceCommandLogRepository,
|
|
||||||
deviceRepo repository.DeviceRepository,
|
|
||||||
) *ChirpStackTransport {
|
) *ChirpStackTransport {
|
||||||
// 使用配置中的服务器地址创建一个 HTTP transport。
|
// 使用配置中的服务器地址创建一个 HTTP transport。
|
||||||
// 它会使用生成的客户端中定义的默认 base path 和 schemes。
|
// 它会使用生成的客户端中定义的默认 base path 和 schemes。
|
||||||
@@ -45,16 +39,14 @@ func NewChirpStackTransport(
|
|||||||
authInfo := httptransport.APIKeyAuth("grpc-metadata-authorization", "header", config.GenerateAPIKey())
|
authInfo := httptransport.APIKeyAuth("grpc-metadata-authorization", "header", config.GenerateAPIKey())
|
||||||
|
|
||||||
return &ChirpStackTransport{
|
return &ChirpStackTransport{
|
||||||
client: apiClient,
|
client: apiClient,
|
||||||
authInfo: authInfo,
|
authInfo: authInfo,
|
||||||
config: config,
|
config: config,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
deviceCommandLogRepo: deviceCommandLogRepo,
|
|
||||||
deviceRepo: deviceRepo,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ChirpStackTransport) Send(address string, payload []byte) error {
|
func (c *ChirpStackTransport) Send(address string, payload []byte) (*transport.SendResult, error) {
|
||||||
// 1. 构建 API 请求体。
|
// 1. 构建 API 请求体。
|
||||||
// - Confirmed: true 表示确认消息, 设为false将不保证消息送达(但可以节约下行容量)。
|
// - Confirmed: true 表示确认消息, 设为false将不保证消息送达(但可以节约下行容量)。
|
||||||
// - Data: 经过 Base64 编码的数据。
|
// - Data: 经过 Base64 编码的数据。
|
||||||
@@ -72,7 +64,7 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error {
|
|||||||
// - WithQueueItemDevEui 指定目标设备的 EUI。
|
// - WithQueueItemDevEui 指定目标设备的 EUI。
|
||||||
// - WithBody 设置请求体。
|
// - WithBody 设置请求体。
|
||||||
params := device_service.NewDeviceServiceEnqueueParams().
|
params := device_service.NewDeviceServiceEnqueueParams().
|
||||||
WithTimeout(10 * time.Second).
|
WithTimeout(time.Duration(c.config.APITimeout) * time.Second).
|
||||||
WithQueueItemDevEui(address).
|
WithQueueItemDevEui(address).
|
||||||
WithBody(body)
|
WithBody(body)
|
||||||
|
|
||||||
@@ -81,53 +73,23 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error {
|
|||||||
resp, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo)
|
resp, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err)
|
c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err)
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. 成功发送后,尝试记录下行任务
|
if resp == nil || resp.Payload == nil || resp.Payload.ID == "" {
|
||||||
messageID := ""
|
// 这是一个需要明确处理的错误情况,因为调用方依赖 MessageID。
|
||||||
if resp != nil && resp.Payload != nil && resp.Payload.ID != "" { // 根据实际结构,使用 resp.Payload.ID
|
errMsg := "ChirpStack Enqueue 响应未包含 MessageID (ID)"
|
||||||
messageID = resp.Payload.ID
|
c.logger.Errorf(errMsg)
|
||||||
} else {
|
return nil, errors.New(errMsg)
|
||||||
c.logger.Warnf("ChirpStack Enqueue 响应未包含 MessageID (ID),无法记录下行任务。设备: %s", address)
|
|
||||||
// 即使无法获取 MessageID,也认为发送成功,因为 ChirpStack Enqueue 成功了
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 调用私有方法记录下行任务
|
c.logger.Infof("成功将 payload 发送到设备 %s 的队列 (MessageID: %s)", address, resp.Payload.ID)
|
||||||
if err := c.recordDownlinkTask(address, messageID); err != nil {
|
|
||||||
// 记录失败不影响下行命令的发送成功
|
// 将 MessageID 包装在 SendResult 中返回
|
||||||
c.logger.Errorf("记录下行任务失败 (MessageID: %s, DevEui: %s): %v", messageID, address, err)
|
result := &transport.SendResult{
|
||||||
return nil
|
MessageID: resp.Payload.ID,
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功,并创建下行任务记录 (MessageID: %s)", address, messageID)
|
return result, nil
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// recordDownlinkTask 记录下行任务到数据库
|
|
||||||
func (c *ChirpStackTransport) recordDownlinkTask(devEui string, messageID string) error {
|
|
||||||
// 获取区域主控的内部 DeviceID
|
|
||||||
regionalController, err := c.deviceRepo.FindByDevEui(devEui)
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Errorf("记录下行任务失败:无法通过 DevEui '%s' 找到区域主控设备: %v", devEui, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 创建 DeviceCommandLog
|
|
||||||
record := &models.DeviceCommandLog{
|
|
||||||
MessageID: messageID,
|
|
||||||
DeviceID: regionalController.ID,
|
|
||||||
SentAt: time.Now(),
|
|
||||||
AcknowledgedAt: nil, // 初始状态为未确认
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.deviceCommandLogRepo.Create(record); err != nil {
|
|
||||||
c.logger.Errorf("创建下行任务记录失败 (MessageID: %s, DeviceID: %d): %v", messageID, regionalController.ID, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
c.logger.Infof("成功创建下行任务记录 (MessageID: %s, DeviceID: %d)", messageID, regionalController.ID)
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,5 +3,13 @@ package transport
|
|||||||
// Communicator 用于其他设备通信
|
// Communicator 用于其他设备通信
|
||||||
type Communicator interface {
|
type Communicator interface {
|
||||||
// Send 用于发送一条单向数据(不等待回信)
|
// Send 用于发送一条单向数据(不等待回信)
|
||||||
Send(address string, payload []byte) error
|
// 成功时,它返回一个包含 MessageID 的 SendResult,以便调用方追踪。
|
||||||
|
Send(address string, payload []byte) (*SendResult, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendResult 包含了 SendGo 方法成功执行后返回的结果。
|
||||||
|
type SendResult struct {
|
||||||
|
// MessageID 是通信服务为此次发送分配的唯一标识符。
|
||||||
|
// 调用方需要保存此 ID,以便后续关联 ACK 等事件。
|
||||||
|
MessageID string
|
||||||
}
|
}
|
||||||
|
|||||||
13
internal/infra/utils/validation.go
Normal file
13
internal/infra/utils/validation.go
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
// IsPureNumeric 检查值是否纯数字(整数或可转换为整数的字符串)。
|
||||||
|
func IsPureNumeric(val interface{}) bool {
|
||||||
|
v := fmt.Sprintf("%v", val)
|
||||||
|
_, err := strconv.Atoi(v)
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user