Merge pull request 'issue_25' (#26) from issue_25 into main

Reviewed-on: #26
This commit is contained in:
2025-09-30 00:33:33 +08:00
17 changed files with 878 additions and 781 deletions

View File

@@ -34,37 +34,36 @@ func NewController(repo repository.DeviceRepository, logger *logs.Logger) *Contr
// CreateDeviceRequest 定义了创建设备时需要传入的参数
type CreateDeviceRequest struct {
Name string `json:"name" binding:"required"`
Type models.DeviceType `json:"type" binding:"required"`
SubType models.DeviceSubType `json:"sub_type,omitempty"`
ParentID *uint `json:"parent_id,omitempty"`
Location string `json:"location,omitempty"`
Properties map[string]interface{} `json:"properties,omitempty"`
Name string `json:"name" binding:"required"`
DeviceTemplateID uint `json:"device_template_id" binding:"required"`
AreaControllerID uint `json:"area_controller_id" binding:"required"`
Location string `json:"location,omitempty"`
Properties map[string]interface{} `json:"properties,omitempty"`
}
// UpdateDeviceRequest 定义了更新设备时需要传入的参数
type UpdateDeviceRequest struct {
Name string `json:"name" binding:"required"`
Type models.DeviceType `json:"type" binding:"required"`
SubType models.DeviceSubType `json:"sub_type,omitempty"`
ParentID *uint `json:"parent_id,omitempty"`
Location string `json:"location,omitempty"`
Properties map[string]interface{} `json:"properties,omitempty"`
Name string `json:"name" binding:"required"`
DeviceTemplateID uint `json:"device_template_id" binding:"required"`
AreaControllerID uint `json:"area_controller_id" binding:"required"`
Location string `json:"location,omitempty"`
Properties map[string]interface{} `json:"properties,omitempty"`
}
// --- Response DTOs ---
// DeviceResponse 定义了返回给客户端的单个设备信息的结构
type DeviceResponse struct {
ID uint `json:"id"`
Name string `json:"name"`
Type models.DeviceType `json:"type"`
SubType models.DeviceSubType `json:"sub_type"`
ParentID *uint `json:"parent_id"`
Location string `json:"location"`
Properties map[string]interface{} `json:"properties"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
ID uint `json:"id"`
Name string `json:"name"`
DeviceTemplateID uint `json:"device_template_id"`
DeviceTemplateName string `json:"device_template_name"`
AreaControllerID uint `json:"area_controller_id"`
AreaControllerName string `json:"area_controller_name"`
Location string `json:"location"`
Properties map[string]interface{} `json:"properties"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
// --- DTO 转换函数 ---
@@ -82,16 +81,28 @@ func newDeviceResponse(device *models.Device) (*DeviceResponse, error) {
}
}
// 确保 DeviceTemplate 和 AreaController 已预加载
deviceTemplateName := ""
if device.DeviceTemplate.ID != 0 {
deviceTemplateName = device.DeviceTemplate.Name
}
areaControllerName := ""
if device.AreaController.ID != 0 {
areaControllerName = device.AreaController.Name
}
return &DeviceResponse{
ID: device.ID,
Name: device.Name,
Type: device.Type,
SubType: device.SubType,
ParentID: device.ParentID,
Location: device.Location,
Properties: props,
CreatedAt: device.CreatedAt.Format(time.RFC3339),
UpdatedAt: device.UpdatedAt.Format(time.RFC3339),
ID: device.ID,
Name: device.Name,
DeviceTemplateID: device.DeviceTemplateID,
DeviceTemplateName: deviceTemplateName,
AreaControllerID: device.AreaControllerID,
AreaControllerName: areaControllerName,
Location: device.Location,
Properties: props,
CreatedAt: device.CreatedAt.Format(time.RFC3339),
UpdatedAt: device.UpdatedAt.Format(time.RFC3339),
}, nil
}
@@ -136,18 +147,21 @@ func (c *Controller) CreateDevice(ctx *gin.Context) {
}
device := &models.Device{
Name: req.Name,
Type: req.Type,
SubType: req.SubType,
ParentID: req.ParentID,
Location: req.Location,
Properties: propertiesJSON,
Name: req.Name,
DeviceTemplateID: req.DeviceTemplateID,
AreaControllerID: req.AreaControllerID,
Location: req.Location,
Properties: propertiesJSON,
}
// 在创建设备前进行自检
if !device.SelfCheck() {
c.logger.Errorf("%s: 设备属性自检失败: %v", actionType, device)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "设备属性不符合要求", actionType, "设备属性自检失败", device)
// 注意:这里的 SelfCheck 依赖于 DeviceTemplate 和 AreaController 字段,
// 但在创建时这些关联对象可能尚未完全加载。如果 SelfCheck 内部需要这些关联对象,
// 则需要在调用 SelfCheck 之前手动加载或调整 SelfCheck 逻辑。
// 目前假设 SelfCheck 仅检查 ID 和 Properties。
if err := device.SelfCheck(); err != nil {
c.logger.Errorf("%s: 设备属性自检失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "设备属性不符合要求: "+err.Error(), actionType, "设备属性自检失败", device)
return
}
@@ -157,10 +171,18 @@ func (c *Controller) CreateDevice(ctx *gin.Context) {
return
}
resp, err := newDeviceResponse(device)
// 为了在响应中包含 DeviceTemplateName 和 AreaControllerName需要重新从数据库加载设备并预加载关联。
createdDevice, err := c.repo.FindByID(device.ID)
if err != nil {
c.logger.Errorf("%s: 重新加载创建的设备失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "设备创建成功,但重新加载设备失败", actionType, "重新加载设备失败", device)
return
}
resp, err := newDeviceResponse(createdDevice)
if err != nil {
c.logger.Errorf("%s: 序列化响应失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "设备创建成功,但响应生成失败", actionType, "响应序列化失败", device)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "设备创建成功,但响应生成失败", actionType, "响应序列化失败", createdDevice)
return
}
@@ -186,6 +208,7 @@ func (c *Controller) GetDevice(ctx *gin.Context) {
return
}
// 假设 FindByIDString 方法会预加载 DeviceTemplate 和 AreaController
device, err := c.repo.FindByIDString(deviceID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
@@ -223,6 +246,7 @@ func (c *Controller) GetDevice(ctx *gin.Context) {
// @Router /api/v1/devices [get]
func (c *Controller) ListDevices(ctx *gin.Context) {
const actionType = "获取设备列表"
// 假设 ListAll 方法会预加载 DeviceTemplate 和 AreaController
devices, err := c.repo.ListAll()
if err != nil {
c.logger.Errorf("%s: 数据库查询失败: %v", actionType, err)
@@ -256,6 +280,7 @@ func (c *Controller) UpdateDevice(ctx *gin.Context) {
deviceID := ctx.Param("id")
// 1. 检查设备是否存在
// 假设 FindByIDString 方法会预加载 DeviceTemplate 和 AreaController
existingDevice, err := c.repo.FindByIDString(deviceID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
@@ -290,16 +315,19 @@ func (c *Controller) UpdateDevice(ctx *gin.Context) {
// 3. 更新从数据库中查出的现有设备对象的字段
existingDevice.Name = req.Name
existingDevice.Type = req.Type
existingDevice.SubType = req.SubType
existingDevice.ParentID = req.ParentID
existingDevice.DeviceTemplateID = req.DeviceTemplateID
existingDevice.AreaControllerID = req.AreaControllerID
existingDevice.Location = req.Location
existingDevice.Properties = propertiesJSON
// 在更新设备前进行自检
if !existingDevice.SelfCheck() {
c.logger.Errorf("%s: 设备属性自检失败: %v", actionType, existingDevice)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "设备属性不符合要求", actionType, "设备属性自检失败", existingDevice)
// 注意:这里的 SelfCheck 依赖于 DeviceTemplate 和 AreaController 字段,
// 但在更新时这些关联对象可能尚未完全加载。如果 SelfCheck 内部需要这些关联对象,
// 则需要在调用 SelfCheck 之前手动加载或调整 SelfCheck 逻辑。
// 目前假设 SelfCheck 仅检查 ID 和 Properties。
if err := existingDevice.SelfCheck(); err != nil {
c.logger.Errorf("%s: 设备属性自检失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "设备属性不符合要求: "+err.Error(), actionType, "设备属性自检失败", existingDevice)
return
}
@@ -310,10 +338,18 @@ func (c *Controller) UpdateDevice(ctx *gin.Context) {
return
}
resp, err := newDeviceResponse(existingDevice)
// 为了在响应中包含 DeviceTemplateName 和 AreaControllerName需要重新从数据库加载设备并预加载关联。
updatedDevice, err := c.repo.FindByID(existingDevice.ID)
if err != nil {
c.logger.Errorf("%s: 序列化响应失败: %v, Device: %+v", actionType, err, existingDevice)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "设备更新成功,但响应生成失败", actionType, "响应序列化失败", existingDevice)
c.logger.Errorf("%s: 重新加载更新的设备失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "设备更新成功,但重新加载设备失败", actionType, "重新加载设备失败", existingDevice)
return
}
resp, err := newDeviceResponse(updatedDevice)
if err != nil {
c.logger.Errorf("%s: 序列化响应失败: %v, Device: %+v", actionType, err, updatedDevice)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "设备更新成功,但响应生成失败", actionType, "响应序列化失败", updatedDevice)
return
}
@@ -346,7 +382,7 @@ func (c *Controller) DeleteDevice(ctx *gin.Context) {
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
c.logger.Warnf("%s: 设备不存在, ID: %s", actionType, deviceID)
controller.SendErrorWithAudit(ctx, controller.CodeNotFound, "设备不存在", actionType, "设备不存在", deviceID)
controller.SendErrorWithAudit(ctx, controller.CodeNotFound, "设备未找到", actionType, "设备不存在", deviceID)
return
}
c.logger.Errorf("%s: 查找设备失败: %v, ID: %s", actionType, err, deviceID)

View File

@@ -3,7 +3,6 @@ package device
import (
"errors"
"fmt"
"strconv"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto"
@@ -11,6 +10,8 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/command_generater"
"github.com/google/uuid"
gproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
@@ -42,135 +43,156 @@ func NewGeneralDeviceService(
}
func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction) error {
// 校验设备参数及生成指令
if *device.ParentID == 0 {
return fmt.Errorf("设备 %v(id=%v) 的上级区域主控(id=%v) ID不合理, 无法执行指令", device.Name, device.ID, *device.ParentID)
// 1. 依赖模型自身的 SelfCheck 进行全面校验
if err := device.SelfCheck(); err != nil {
return fmt.Errorf("设备 %v(id=%v) 未通过自检: %w", device.Name, device.ID, err)
}
if err := device.DeviceTemplate.SelfCheck(); err != nil {
return fmt.Errorf("设备 %v(id=%v) 的模板未通过自检: %w", device.Name, device.ID, err)
}
if !device.SelfCheck() {
return fmt.Errorf("设备 %v(id=%v) 缺少必要信息, 无法发送指令", device.Name, device.ID)
// 2. 检查预加载的 AreaController 是否有效
areaController := &device.AreaController
if err := areaController.SelfCheck(); err != nil {
return fmt.Errorf("区域主控 %v(id=%v) 未通过自检: %w", areaController.Name, areaController.ID, err)
}
deviceInfo := make(map[string]interface{})
if err := device.ParseProperties(&deviceInfo); err != nil {
return fmt.Errorf("解析设备 %v(id=%v) 配置失败: %v", device.Name, device.ID, err)
// 3. 使用模型层预定义的 Bus485Properties 结构体解析设备属性
var deviceProps models.Bus485Properties
if err := device.ParseProperties(&deviceProps); err != nil {
return fmt.Errorf("解析设备 %v(id=%v) 的属性失败: %w", device.Name, device.ID, err)
}
// 已通过 SelfCheck 保证其为纯数字,此处仅进行类型转换
busNumber, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusNumber]))
busAddress, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusAddress]))
relayChannel, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.RelayChannel]))
// 4. 解析设备模板中的开关指令参数
var switchCmd models.SwitchCommands
if err := device.DeviceTemplate.ParseCommands(&switchCmd); err != nil {
return fmt.Errorf("解析设备 %v(id=%v) 的开关指令失败: %w", device.Name, device.ID, err)
}
data, err := anypb.New(&proto.Switch{
DeviceAction: string(action),
BusNumber: int32(busNumber),
BusAddress: int32(busAddress),
RelayChannel: int32(relayChannel),
})
// 5. 根据 action 生成 Modbus RTU 写入指令
onOffState := true // 默认为开启
if action == DeviceActionStop { // 如果是停止动作,则设置为关闭
onOffState = false
}
modbusCommandBytes, err := command_generater.GenerateModbusRTUSwitchCommand(
deviceProps.BusAddress,
switchCmd.ModbusStartAddress,
onOffState,
)
if err != nil {
return fmt.Errorf("创建指令失败: %v", err)
return fmt.Errorf("生成Modbus RTU写入指令失败: %w", err)
}
// 6. 构建 Protobuf Raw485Command包含总线号
raw485Cmd := &proto.Raw485Command{
BusNumber: int32(deviceProps.BusNumber), // 添加总线号
CommandBytes: modbusCommandBytes,
}
data, err := anypb.New(raw485Cmd)
if err != nil {
return fmt.Errorf("创建 Raw485Command Any Protobuf 失败: %w", err)
}
instruction := &proto.Instruction{
Method: proto.MethodType_SWITCH,
Method: proto.MethodType_INSTRUCTION, // 使用通用指令类型
Data: data,
}
// 获取自身LoRa设备ID, 因为可能变更, 所以每次都现获取
thisDevice, err := g.deviceRepo.FindByID(*device.ParentID)
if err != nil {
return fmt.Errorf("获取区域主控(id=%v)信息失败: %v", *device.ParentID, err)
}
if !thisDevice.SelfCheck() {
return fmt.Errorf("区域主控 %v(id=%v) 缺少必要信息, 无法发送指令", thisDevice.Name, thisDevice.ID)
}
thisDeviceinfo := make(map[string]interface{})
if err := thisDevice.ParseProperties(&thisDeviceinfo); err != nil {
return fmt.Errorf("解析区域主控 %v(id=%v) 配置失败: %v", device.Name, device.ID, err)
}
loraAddress := fmt.Sprintf("%v", thisDeviceinfo[models.LoRaAddress])
// 生成消息
message, err := gproto.Marshal(instruction)
if err != nil {
return fmt.Errorf("序列化指令失败: %v", err)
return fmt.Errorf("序列化指令失败: %w", err)
}
// 发送指令并获取 SendResult
sendResult, err := g.comm.Send(loraAddress, message)
// 7. 发送指令
networkID := areaController.NetworkID
sendResult, err := g.comm.Send(networkID, message)
if err != nil {
// 发送失败,直接返回错误
return fmt.Errorf("发送指令到设备 %s 失败: %w", loraAddress, err)
return fmt.Errorf("发送指令到 %s 失败: %w", networkID, err)
}
// 创建并保存命令日志
// 8. 创建并保存命令日志
logRecord := &models.DeviceCommandLog{
MessageID: sendResult.MessageID,
DeviceID: thisDevice.ID, // thisDevice 是我们查出来的区域主控
DeviceID: areaController.ID,
SentAt: time.Now(),
}
if err := g.deviceCommandLogRepo.Create(logRecord); err != nil {
// 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。
// 我们记录一个错误日志,然后成功返回。
g.logger.Errorf("创建指令日志失败 (MessageID: %s): %v", sendResult.MessageID, err)
}
g.logger.Infof("成功发送指令到设备 %s 并创建日志 (MessageID: %s)", loraAddress, sendResult.MessageID)
g.logger.Infof("成功发送指令到 %s 并创建日志 (MessageID: %s)", networkID, sendResult.MessageID)
return nil
}
// Collect 实现了 Service 接口,用于发起对指定区域主控下的多个设备的批量采集请求。
// 它负责查找区域主控、生成关联ID、创建待处理记录、构建指令并最终发送。
func (g *GeneralDeviceService) Collect(regionalControllerID uint, devicesToCollect []*models.Device) error {
if regionalControllerID == 0 {
return errors.New("区域主控ID不能为空")
}
if len(devicesToCollect) == 0 {
// 如果没有要采集的设备,这不是一个错误,只是一个空操作。
g.logger.Info("待采集设备列表为空,无需执行采集任务。")
return nil
}
// 1. 查找并自检区域主控设备
regionalController, err := g.deviceRepo.FindByID(regionalControllerID)
if err != nil {
return fmt.Errorf("查找区域主控 (ID: %d) 失败: %w", regionalControllerID, err)
// 1. 从设备列表中获取预加载的区域主控,并进行校验
regionalController := &devicesToCollect[0].AreaController
if regionalController.ID != regionalControllerID {
return fmt.Errorf("设备列表与指定的区域主控ID (%d) 不匹配", regionalControllerID)
}
if !regionalController.SelfCheck() {
return fmt.Errorf("区域主控 (ID: %d) 未通过自检,缺少必要属性", regionalControllerID)
if err := regionalController.SelfCheck(); err != nil {
return fmt.Errorf("区域主控 (ID: %d) 未通过自检: %w", regionalControllerID, err)
}
// 2. 准备采集任务列表和数据库存根,并验证设备
// 2. 准备采集任务列表
var childDeviceIDs []uint
var collectTasks []*proto.CollectTask
for _, dev := range devicesToCollect {
// 验证设备是否属于指定的区域主控
if dev.ParentID == nil || *dev.ParentID != regionalControllerID {
return fmt.Errorf("设备 '%s' (ID: %d) 不属于指定的区域主控 (ID: %d)", dev.Name, dev.ID, regionalControllerID)
// 依赖模型自身的 SelfCheck 进行全面校验
if err := dev.SelfCheck(); err != nil {
g.logger.Warnf("跳过设备 %d因其未通过自检: %v", dev.ID, err)
continue
}
// 对每个待采集的设备执行自检
if !dev.SelfCheck() {
g.logger.Warnf("跳过设备 %d因其未通过自检", dev.ID)
if err := dev.DeviceTemplate.SelfCheck(); err != nil {
g.logger.Warnf("跳过设备 %d因其设备模板未通过自检: %v", dev.ID, err)
continue
}
// 自检已通过,我们可以安全地解析属性
var props map[string]interface{}
// 此时 ParseProperties 不应失败
_ = dev.ParseProperties(&props)
// 使用模板的 ParseCommands 方法获取传感器指令参数
var sensorCmd models.SensorCommands
if err := dev.DeviceTemplate.ParseCommands(&sensorCmd); err != nil {
g.logger.Warnf("跳过设备 %d因其模板指令无法解析为 SensorCommands: %v", dev.ID, err)
continue
}
busNumber := props[models.BusNumber].(float64)
busAddress := props[models.BusAddress].(float64)
// 使用模型层预定义的 Bus485Properties 结构体解析设备属性
var deviceProps models.Bus485Properties
if err := dev.ParseProperties(&deviceProps); err != nil {
g.logger.Warnf("跳过设备 %d因其属性解析失败: %v", dev.ID, err)
continue
}
// 生成 Modbus RTU 读取指令
modbusCommandBytes, err := command_generater.GenerateModbusRTUReadCommand(
deviceProps.BusAddress,
sensorCmd.ModbusFunctionCode,
sensorCmd.ModbusStartAddress,
sensorCmd.ModbusQuantity,
)
if err != nil {
g.logger.Warnf("跳过设备 %d因生成Modbus RTU读取指令失败: %v", dev.ID, err)
continue
}
// 构建 Raw485Command包含总线号
raw485Cmd := &proto.Raw485Command{
BusNumber: int32(deviceProps.BusNumber), // 添加总线号
CommandBytes: modbusCommandBytes,
}
collectTasks = append(collectTasks, &proto.CollectTask{
DeviceAction: dev.Command,
BusNumber: int32(busNumber),
BusAddress: int32(busAddress),
Command: raw485Cmd,
})
childDeviceIDs = append(childDeviceIDs, dev.ID)
}
@@ -179,11 +201,8 @@ func (g *GeneralDeviceService) Collect(regionalControllerID uint, devicesToColle
return errors.New("经过滤后,没有可通过自检的有效设备")
}
// 3. 从区域主控的属性中解析出 DevEui (loraAddress)
var rcProps map[string]interface{}
// SelfCheck 已保证属性可解析
_ = regionalController.ParseProperties(&rcProps)
loraAddress := rcProps[models.LoRaAddress].(string)
// 3. 构建并发送指令
networkID := regionalController.NetworkID
// 4. 创建待处理请求记录
correlationID := uuid.New().String()
@@ -211,7 +230,7 @@ func (g *GeneralDeviceService) Collect(regionalControllerID uint, devicesToColle
return err
}
instruction := &proto.Instruction{
Method: proto.MethodType_COLLECT,
Method: proto.MethodType_COLLECT, // 使用 COLLECT 指令类型
Data: anyData,
}
payload, err := gproto.Marshal(instruction)
@@ -220,12 +239,11 @@ func (g *GeneralDeviceService) Collect(regionalControllerID uint, devicesToColle
return err
}
// 6. 发送指令
if _, err := g.comm.Send(loraAddress, payload); err != nil {
if _, err := g.comm.Send(networkID, payload); err != nil {
g.logger.DPanicf("待采集请求 (CorrelationID: %s) 已创建,但发送到设备失败: %v。数据可能不一致", correlationID, err)
return err
}
g.logger.Infof("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, loraAddress)
g.logger.Infof("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, networkID)
return nil
}

View File

@@ -26,19 +26,19 @@ const (
type MethodType int32
const (
MethodType_SWITCH MethodType = 0 // 启停
MethodType_COLLECT MethodType = 1 // 采集
MethodType_INSTRUCTION MethodType = 0 // 下发指令
MethodType_COLLECT MethodType = 1 // 批量采集
)
// Enum value maps for MethodType.
var (
MethodType_name = map[int32]string{
0: "SWITCH",
0: "INSTRUCTION",
1: "COLLECT",
}
MethodType_value = map[string]int32{
"SWITCH": 0,
"COLLECT": 1,
"INSTRUCTION": 0,
"COLLECT": 1,
}
)
@@ -69,18 +69,72 @@ func (MethodType) EnumDescriptor() ([]byte, []int) {
return file_device_proto_rawDescGZIP(), []int{0}
}
// 平台生成的原始485指令单片机直接发送到总线
type Raw485Command struct {
state protoimpl.MessageState `protogen:"open.v1"`
BusNumber int32 `protobuf:"varint,1,opt,name=bus_number,json=busNumber,proto3" json:"bus_number,omitempty"` // 总线号,用于指示单片机将指令发送到哪个总线
CommandBytes []byte `protobuf:"bytes,2,opt,name=command_bytes,json=commandBytes,proto3" json:"command_bytes,omitempty"` // 原始485指令的字节数组
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Raw485Command) Reset() {
*x = Raw485Command{}
mi := &file_device_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *Raw485Command) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Raw485Command) ProtoMessage() {}
func (x *Raw485Command) ProtoReflect() protoreflect.Message {
mi := &file_device_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Raw485Command.ProtoReflect.Descriptor instead.
func (*Raw485Command) Descriptor() ([]byte, []int) {
return file_device_proto_rawDescGZIP(), []int{0}
}
func (x *Raw485Command) GetBusNumber() int32 {
if x != nil {
return x.BusNumber
}
return 0
}
func (x *Raw485Command) GetCommandBytes() []byte {
if x != nil {
return x.CommandBytes
}
return nil
}
// 指令 (所有空中数据都会被包装在这里面)
// data字段现在可以包含 Raw485Command表示平台生成的原始485指令。
type Instruction struct {
state protoimpl.MessageState `protogen:"open.v1"`
Method MethodType `protobuf:"varint,1,opt,name=method,proto3,enum=device.MethodType" json:"method,omitempty"`
Data *anypb.Any `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
Data *anypb.Any `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // 可以是 Switch, Raw485Command 等
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Instruction) Reset() {
*x = Instruction{}
mi := &file_device_proto_msgTypes[0]
mi := &file_device_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -92,7 +146,7 @@ func (x *Instruction) String() string {
func (*Instruction) ProtoMessage() {}
func (x *Instruction) ProtoReflect() protoreflect.Message {
mi := &file_device_proto_msgTypes[0]
mi := &file_device_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -105,14 +159,14 @@ func (x *Instruction) ProtoReflect() protoreflect.Message {
// Deprecated: Use Instruction.ProtoReflect.Descriptor instead.
func (*Instruction) Descriptor() ([]byte, []int) {
return file_device_proto_rawDescGZIP(), []int{0}
return file_device_proto_rawDescGZIP(), []int{1}
}
func (x *Instruction) GetMethod() MethodType {
if x != nil {
return x.Method
}
return MethodType_SWITCH
return MethodType_INSTRUCTION
}
func (x *Instruction) GetData() *anypb.Any {
@@ -122,75 +176,6 @@ func (x *Instruction) GetData() *anypb.Any {
return nil
}
// Switch 指令的载荷
type Switch struct {
state protoimpl.MessageState `protogen:"open.v1"`
DeviceAction string `protobuf:"bytes,1,opt,name=device_action,json=deviceAction,proto3" json:"device_action,omitempty"` // 指令
BusNumber int32 `protobuf:"varint,2,opt,name=bus_number,json=busNumber,proto3" json:"bus_number,omitempty"` // 总线号
BusAddress int32 `protobuf:"varint,3,opt,name=bus_address,json=busAddress,proto3" json:"bus_address,omitempty"` // 总线地址
RelayChannel int32 `protobuf:"varint,4,opt,name=relay_channel,json=relayChannel,proto3" json:"relay_channel,omitempty"` // 继电器通道号
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Switch) Reset() {
*x = Switch{}
mi := &file_device_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *Switch) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Switch) ProtoMessage() {}
func (x *Switch) ProtoReflect() protoreflect.Message {
mi := &file_device_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Switch.ProtoReflect.Descriptor instead.
func (*Switch) Descriptor() ([]byte, []int) {
return file_device_proto_rawDescGZIP(), []int{1}
}
func (x *Switch) GetDeviceAction() string {
if x != nil {
return x.DeviceAction
}
return ""
}
func (x *Switch) GetBusNumber() int32 {
if x != nil {
return x.BusNumber
}
return 0
}
func (x *Switch) GetBusAddress() int32 {
if x != nil {
return x.BusAddress
}
return 0
}
func (x *Switch) GetRelayChannel() int32 {
if x != nil {
return x.RelayChannel
}
return 0
}
// BatchCollectCommand
// 用于在平台内部构建一个完整的、包含所有元数据的批量采集任务。
// 这个消息本身不会被发送到设备。
@@ -247,12 +232,10 @@ func (x *BatchCollectCommand) GetTasks() []*CollectTask {
}
// CollectTask
// 定义了单个采集任务的“意图”。
// 定义了单个采集任务的“意图”。现在直接包含平台生成的原始485指令并带上总线号。
type CollectTask struct {
state protoimpl.MessageState `protogen:"open.v1"`
DeviceAction string `protobuf:"bytes,1,opt,name=device_action,json=deviceAction,proto3" json:"device_action,omitempty"` // 指令
BusNumber int32 `protobuf:"varint,2,opt,name=bus_number,json=busNumber,proto3" json:"bus_number,omitempty"` // 总线号
BusAddress int32 `protobuf:"varint,3,opt,name=bus_address,json=busAddress,proto3" json:"bus_address,omitempty"` // 总线地址
Command *Raw485Command `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` // 平台生成的原始485指令
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -287,25 +270,11 @@ func (*CollectTask) Descriptor() ([]byte, []int) {
return file_device_proto_rawDescGZIP(), []int{3}
}
func (x *CollectTask) GetDeviceAction() string {
func (x *CollectTask) GetCommand() *Raw485Command {
if x != nil {
return x.DeviceAction
return x.Command
}
return ""
}
func (x *CollectTask) GetBusNumber() int32 {
if x != nil {
return x.BusNumber
}
return 0
}
func (x *CollectTask) GetBusAddress() int32 {
if x != nil {
return x.BusAddress
}
return 0
return nil
}
// CollectResult
@@ -366,33 +335,25 @@ var File_device_proto protoreflect.FileDescriptor
const file_device_proto_rawDesc = "" +
"\n" +
"\fdevice.proto\x12\x06device\x1a\x19google/protobuf/any.proto\"c\n" +
"\fdevice.proto\x12\x06device\x1a\x19google/protobuf/any.proto\"S\n" +
"\rRaw485Command\x12\x1d\n" +
"\n" +
"bus_number\x18\x01 \x01(\x05R\tbusNumber\x12#\n" +
"\rcommand_bytes\x18\x02 \x01(\fR\fcommandBytes\"c\n" +
"\vInstruction\x12*\n" +
"\x06method\x18\x01 \x01(\x0e2\x12.device.MethodTypeR\x06method\x12(\n" +
"\x04data\x18\x02 \x01(\v2\x14.google.protobuf.AnyR\x04data\"\x92\x01\n" +
"\x06Switch\x12#\n" +
"\rdevice_action\x18\x01 \x01(\tR\fdeviceAction\x12\x1d\n" +
"\n" +
"bus_number\x18\x02 \x01(\x05R\tbusNumber\x12\x1f\n" +
"\vbus_address\x18\x03 \x01(\x05R\n" +
"busAddress\x12#\n" +
"\rrelay_channel\x18\x04 \x01(\x05R\frelayChannel\"g\n" +
"\x04data\x18\x02 \x01(\v2\x14.google.protobuf.AnyR\x04data\"g\n" +
"\x13BatchCollectCommand\x12%\n" +
"\x0ecorrelation_id\x18\x01 \x01(\tR\rcorrelationId\x12)\n" +
"\x05tasks\x18\x02 \x03(\v2\x13.device.CollectTaskR\x05tasks\"r\n" +
"\vCollectTask\x12#\n" +
"\rdevice_action\x18\x01 \x01(\tR\fdeviceAction\x12\x1d\n" +
"\n" +
"bus_number\x18\x02 \x01(\x05R\tbusNumber\x12\x1f\n" +
"\vbus_address\x18\x03 \x01(\x05R\n" +
"busAddress\"N\n" +
"\x05tasks\x18\x02 \x03(\v2\x13.device.CollectTaskR\x05tasks\">\n" +
"\vCollectTask\x12/\n" +
"\acommand\x18\x02 \x01(\v2\x15.device.Raw485CommandR\acommand\"N\n" +
"\rCollectResult\x12%\n" +
"\x0ecorrelation_id\x18\x01 \x01(\tR\rcorrelationId\x12\x16\n" +
"\x06values\x18\x02 \x03(\x02R\x06values*%\n" +
"\x06values\x18\x02 \x03(\x02R\x06values**\n" +
"\n" +
"MethodType\x12\n" +
"\n" +
"\x06SWITCH\x10\x00\x12\v\n" +
"MethodType\x12\x0f\n" +
"\vINSTRUCTION\x10\x00\x12\v\n" +
"\aCOLLECT\x10\x01B#Z!internal/app/service/device/protob\x06proto3"
var (
@@ -411,8 +372,8 @@ var file_device_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_device_proto_goTypes = []any{
(MethodType)(0), // 0: device.MethodType
(*Instruction)(nil), // 1: device.Instruction
(*Switch)(nil), // 2: device.Switch
(*Raw485Command)(nil), // 1: device.Raw485Command
(*Instruction)(nil), // 2: device.Instruction
(*BatchCollectCommand)(nil), // 3: device.BatchCollectCommand
(*CollectTask)(nil), // 4: device.CollectTask
(*CollectResult)(nil), // 5: device.CollectResult
@@ -422,11 +383,12 @@ var file_device_proto_depIdxs = []int32{
0, // 0: device.Instruction.method:type_name -> device.MethodType
6, // 1: device.Instruction.data:type_name -> google.protobuf.Any
4, // 2: device.BatchCollectCommand.tasks:type_name -> device.CollectTask
3, // [3:3] is the sub-list for method output_type
3, // [3:3] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
1, // 3: device.CollectTask.command:type_name -> device.Raw485Command
4, // [4:4] is the sub-list for method output_type
4, // [4:4] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_device_proto_init() }

View File

@@ -10,46 +10,42 @@ option go_package = "internal/app/service/device/proto";
// 指令类型
enum MethodType {
SWITCH = 0; // 启停
COLLECT = 1; // 采集
INSTRUCTION = 0; // 下发指令
COLLECT = 1; // 批量采集
}
// 平台生成的原始485指令单片机直接发送到总线
message Raw485Command {
int32 bus_number = 1; // 总线号,用于指示单片机将指令发送到哪个总线
bytes command_bytes = 2; // 原始485指令的字节数组
}
// 指令 (所有空中数据都会被包装在这里面)
// data字段现在可以包含 Raw485Command表示平台生成的原始485指令。
message Instruction {
MethodType method = 1;
google.protobuf.Any data = 2;
google.protobuf.Any data = 2; // 可以是 Switch, Raw485Command 等
}
// Switch 指令的载荷
message Switch {
string device_action = 1; // 指令
int32 bus_number = 2; // 总线号
int32 bus_address = 3; // 总线地址
int32 relay_channel = 4; // 继电器通道号
}
// --- 批量采集相关结构 ---
// BatchCollectCommand
// 用于在平台内部构建一个完整的、包含所有元数据的批量采集任务。
// 这个消息本身不会被发送到设备。
message BatchCollectCommand {
string correlation_id = 1; // 用于关联请求和响应的唯一ID
string correlation_id = 1; // 用于关联请求和响应的唯一ID
repeated CollectTask tasks = 2; // 采集任务列表
}
// CollectTask
// 定义了单个采集任务的“意图”。
// 定义了单个采集任务的“意图”。现在直接包含平台生成的原始485指令并带上总线号。
message CollectTask {
string device_action = 1; // 指令
int32 bus_number = 2; // 总线号
int32 bus_address = 3; // 总线地址
Raw485Command command = 2; // 平台生成的原始485指令
}
// CollectResult
// 这是设备响应的、极致精简的数据包。
message CollectResult {
string correlation_id = 1; // 从下行指令中原样返回的关联ID
string correlation_id = 1; // 从下行指令中原样返回的关联ID
repeated float values = 2; // 按预定顺序排列的采集值
}

View File

@@ -95,7 +95,7 @@ func (r *ReleaseFeedWeightTask) Execute() error {
// 获取当前搅拌罐重量
func (r *ReleaseFeedWeightTask) getNowWeight() (float64, error) {
sensorData, err := r.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(r.mixingTankDeviceID, models.SensorDataTypeWeight)
sensorData, err := r.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(r.mixingTankDeviceID, models.SensorTypeWeight)
if err != nil {
r.logger.Errorf("获取设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID)
return 0, err

View File

@@ -1,7 +1,7 @@
package transport
import (
"encoding/base64" // 新增导入
"encoding/base64"
"encoding/json"
"io"
"net/http"
@@ -32,8 +32,9 @@ type ChirpStackListener struct {
logger *logs.Logger
sensorDataRepo repository.SensorDataRepository
deviceRepo repository.DeviceRepository
areaControllerRepo repository.AreaControllerRepository
deviceCommandLogRepo repository.DeviceCommandLogRepository
pendingCollectionRepo repository.PendingCollectionRepository // 新增
pendingCollectionRepo repository.PendingCollectionRepository
}
// NewChirpStackListener 创建一个新的 ChirpStackListener 实例
@@ -41,15 +42,17 @@ func NewChirpStackListener(
logger *logs.Logger,
sensorDataRepo repository.SensorDataRepository,
deviceRepo repository.DeviceRepository,
areaControllerRepo repository.AreaControllerRepository,
deviceCommandLogRepo repository.DeviceCommandLogRepository,
pendingCollectionRepo repository.PendingCollectionRepository, // 新增
pendingCollectionRepo repository.PendingCollectionRepository,
) ListenHandler { // 返回接口类型
return &ChirpStackListener{
logger: logger,
sensorDataRepo: sensorDataRepo,
deviceRepo: deviceRepo,
areaControllerRepo: areaControllerRepo,
deviceCommandLogRepo: deviceCommandLogRepo,
pendingCollectionRepo: pendingCollectionRepo, // 新增
pendingCollectionRepo: pendingCollectionRepo,
}
}
@@ -148,23 +151,21 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) {
// --- 业务处理函数 ---
// GenericSensorReading 表示单个传感器读数包含设备ID、类型和值。
type GenericSensorReading struct {
DeviceID uint `json:"device_id"` // 传感器设备的ID
Type models.SensorDataType `json:"type"` // 传感器类型 (复用 models.SensorDataType)
Value float64 `json:"value"` // 传感器读数
}
// handleUpEvent 处理上行数据事件
func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
c.logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui)
// 1. 查找区域主控设备
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
regionalController, err := c.areaControllerRepo.FindByNetworkID(event.DeviceInfo.DevEui)
if err != nil {
c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
return
}
// 依赖 SelfCheck 确保区域主控有效
if err := regionalController.SelfCheck(); err != nil {
c.logger.Errorf("处理 'up' 事件失败:区域主控 %v(ID: %d) 未通过自检: %v", regionalController.Name, regionalController.ID, err)
return
}
c.logger.Infof("找到区域主控: %s (ID: %d)", regionalController.Name, regionalController.ID)
// 2. 记录区域主控的信号强度 (如果存在)
@@ -178,7 +179,9 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
RssiDbm: rx.Rssi,
SnrDb: rx.Snr,
}
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics)
// 记录信号强度
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorTypeSignalMetrics, signalMetrics)
c.logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", regionalController.ID, rx.Rssi, rx.Snr)
} else {
c.logger.Warnf("处理 'up' 事件时未找到 RxInfo无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui)
@@ -210,7 +213,7 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
return
}
// 2.4 解包内层 CollectResult
// 3.4 解包内层 CollectResult
var collectResp proto.CollectResult
if err := instruction.Data.UnmarshalTo(&collectResp); err != nil {
c.logger.Errorf("解包数据信息失败: %v", err)
@@ -220,7 +223,7 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
correlationID := collectResp.CorrelationId
c.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values))
// 3. 根据 CorrelationID 查找待处理请求
// 4. 根据 CorrelationID 查找待处理请求
pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(correlationID)
if err != nil {
c.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err)
@@ -233,12 +236,11 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
return
}
// 4. 匹配数据并存入数据库
// 5. 匹配数据并存入数据库
deviceIDs := pendingReq.CommandMetadata
values := collectResp.Values
if len(deviceIDs) != len(values) {
c.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID)
// TODO 数量不匹配是否全改成失败
// 即使数量不匹配,也更新状态为完成,以防止请求永远 pending
err = c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time)
if err != nil {
@@ -248,37 +250,61 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
}
for i, deviceID := range deviceIDs {
value := values[i]
rawSensorValue := values[i] // 这是设备上报的原始值
// 5.1 获取设备及其模板
dev, err := c.deviceRepo.FindByID(deviceID)
if err != nil {
c.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err)
continue
}
sensorDataType, ok := models.DeviceSubTypeToSensorDataTypeMap[dev.SubType]
if !ok {
c.logger.Warnf("设备 %d 的子类型 '%s' 没有对应的传感器数据类型,跳过记录。", dev.ID, dev.SubType)
// 依赖 SelfCheck 确保设备和模板有效
if err := dev.SelfCheck(); err != nil {
c.logger.Warnf("跳过设备 %d因其未通过自检: %v", dev.ID, err)
continue
}
if err := dev.DeviceTemplate.SelfCheck(); err != nil {
c.logger.Warnf("跳过设备 %d因其设备模板未通过自检: %v", dev.ID, err)
continue
}
var sensorData interface{}
switch sensorDataType {
case models.SensorDataTypeTemperature:
sensorData = models.TemperatureData{TemperatureCelsius: float64(value)}
case models.SensorDataTypeHumidity:
sensorData = models.HumidityData{HumidityPercent: float64(value)}
case models.SensorDataTypeWeight:
sensorData = models.WeightData{WeightKilograms: float64(value)}
// 5.2 从设备模板中解析 ValueDescriptor
var valueDescriptors []*models.ValueDescriptor
if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil {
c.logger.Warnf("跳过设备 %d因其设备模板的 Values 属性解析失败: %v", dev.ID, err)
continue
}
// 根据 DeviceTemplate.SelfCheck这里应该只有一个 ValueDescriptor
if len(valueDescriptors) == 0 {
c.logger.Warnf("跳过设备 %d因其设备模板缺少 ValueDescriptor 定义", dev.ID)
continue
}
valueDescriptor := valueDescriptors[0]
// 5.3 应用乘数和偏移量计算最终值
parsedValue := float64(rawSensorValue)*valueDescriptor.Multiplier + valueDescriptor.Offset
// 5.4 根据传感器类型构建具体的数据结构
var dataToRecord interface{}
switch valueDescriptor.Type {
case models.SensorTypeTemperature:
dataToRecord = models.TemperatureData{TemperatureCelsius: parsedValue}
case models.SensorTypeHumidity:
dataToRecord = models.HumidityData{HumidityPercent: parsedValue}
case models.SensorTypeWeight:
dataToRecord = models.WeightData{WeightKilograms: parsedValue}
default:
c.logger.Warnf("未处理的传感器数据类型 '%s' (设备ID: %d)", sensorDataType, dev.ID)
continue
// TODO 未知传感器数据需要记录吗
c.logger.Warnf("未知的传感器类型 '%s',将使用通用格式记录", valueDescriptor.Type)
dataToRecord = map[string]float64{"value": parsedValue}
}
c.recordSensorData(pendingReq.DeviceID, dev.ID, event.Time, sensorDataType, sensorData)
c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 值=%.2f", dev.ID, sensorDataType, value)
// 5.5 记录传感器数据
c.recordSensorData(regionalController.ID, dev.ID, event.Time, valueDescriptor.Type, dataToRecord)
c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%f, 解析值=%.2f", dev.ID, valueDescriptor.Type, rawSensorValue, parsedValue)
}
// 5. 更新请求状态为“已完成”
// 6. 更新请求状态为“已完成”
if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time); err != nil {
c.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err)
} else {
@@ -290,28 +316,28 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) {
c.logger.Infof("处接收到理 'status' 事件: %+v", event)
// 记录信号强度
signalMetrics := models.SignalMetrics{
MarginDb: event.Margin,
}
// 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
// 查找区域主控设备
regionalController, err := c.areaControllerRepo.FindByNetworkID(event.DeviceInfo.DevEui)
if err != nil {
c.logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
return
}
// 记录区域主控的信号强度
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics)
// 记录 电量
// 记录信号强度
signalMetrics := models.SignalMetrics{
MarginDb: event.Margin,
}
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorTypeSignalMetrics, signalMetrics)
c.logger.Infof("已记录区域主控 (ID: %d) 的信号状态: %+v", regionalController.ID, signalMetrics)
// 记录电量
batteryLevel := models.BatteryLevel{
BatteryLevelRatio: event.BatteryLevel,
BatteryLevelUnavailable: event.BatteryLevelUnavailable,
ExternalPower: event.ExternalPower,
}
// 记录区域主控的电池电量
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel)
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorTypeBatteryLevel, batteryLevel)
c.logger.Infof("已记录区域主控 (ID: %d) 的电池状态: %+v", regionalController.ID, batteryLevel)
}
// handleAckEvent 处理下行确认事件
@@ -378,24 +404,26 @@ func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) {
// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。
// regionalControllerID: 区域主控设备的ID
// sensorDeviceID: 实际产生传感器数据的普通设备的ID
func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, dataType models.SensorDataType, data interface{}) {
// 2. 序列化数据结构体为 JSON
// sensorType: 传感器值的类型 (例如 models.SensorTypeTemperature)
// data: 具体的传感器数据结构体实例 (例如 models.TemperatureData)
func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, sensorType models.SensorType, data interface{}) {
// 1. 将传入的结构体序列化为 JSON
jsonData, err := json.Marshal(data)
if err != nil {
c.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
return
}
// 3. 构建 SensorData 模型
// 2. 构建 SensorData 模型
sensorData := &models.SensorData{
Time: eventTime,
DeviceID: sensorDeviceID,
RegionalControllerID: regionalControllerID,
SensorDataType: dataType,
SensorType: sensorType,
Data: datatypes.JSON(jsonData),
}
// 4. 调用仓库创建记录
// 3. 调用仓库创建记录
if err := c.sensorDataRepo.Create(sensorData); err != nil {
c.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
}

View File

@@ -64,6 +64,9 @@ func NewApplication(configPath string) (*Application, error) {
// 初始化设备仓库
deviceRepo := repository.NewGormDeviceRepository(storage.GetDB())
// 初始化区域主控仓库
areaControllerRepo := repository.NewGormAreaControllerRepository(storage.GetDB())
// 初始化计划仓库
planRepo := repository.NewGormPlanRepository(storage.GetDB())
@@ -89,7 +92,7 @@ func NewApplication(configPath string) (*Application, error) {
auditService := audit.NewService(userActionLogRepo, logger)
// 初始化设备上行监听器
listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo, pendingCollectionRepo)
listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, areaControllerRepo, deviceCommandLogRepo, pendingCollectionRepo)
// 初始化计划触发器管理器
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)

View File

@@ -240,13 +240,13 @@ func (ps *PostgresStorage) creatingIndex() error {
ps.logger.Info("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)")
// 为 devices 表的 properties 字段创建 GIN 索引
ps.logger.Info("正在为 devices 表的 properties 字段创建 GIN 索引")
ginDevicePropertiesIndexSQL := "CREATE INDEX IF NOT EXISTS idx_devices_properties_gin ON devices USING GIN (properties);"
if err := ps.db.Exec(ginDevicePropertiesIndexSQL).Error; err != nil {
ps.logger.Errorw("为 devices 的 properties 字段创建 GIN 索引失败", "error", err)
return fmt.Errorf("为 devices 的 properties 字段创建 GIN 索引失败: %w", err)
}
ps.logger.Info("成功为 devices 的 properties 字段创建 GIN 索引 (或已存在)")
//ps.logger.Info("正在为 devices 表的 properties 字段创建 GIN 索引")
//ginDevicePropertiesIndexSQL := "CREATE INDEX IF NOT EXISTS idx_devices_properties_gin ON devices USING GIN (properties);"
//if err := ps.db.Exec(ginDevicePropertiesIndexSQL).Error; err != nil {
// ps.logger.Errorw("为 devices 的 properties 字段创建 GIN 索引失败", "error", err)
// return fmt.Errorf("为 devices 的 properties 字段创建 GIN 索引失败: %w", err)
//}
//ps.logger.Info("成功为 devices 的 properties 字段创建 GIN 索引 (或已存在)")
return nil
}

View File

@@ -3,101 +3,108 @@ package models
import (
"encoding/json"
"errors"
"strings"
"gorm.io/datatypes"
"gorm.io/gorm"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils"
)
// DeviceType 定义了设备的高级类别
type DeviceType string
const (
// DeviceTypeAreaController 区域主控,负责管理一个片区的设备
DeviceTypeAreaController DeviceType = "area_controller"
// DeviceTypeDevice 普通设备,如传感器、阀门等
DeviceTypeDevice DeviceType = "device"
)
// DeviceSubType 定义了普通设备的具体子类别
type DeviceSubType string
const (
// SubTypeNone 未指定或不适用的子类型
SubTypeNone DeviceSubType = ""
// SubTypeSensorTemp 温度传感器
SubTypeSensorTemp DeviceSubType = "temperature"
// SubTypeSensorHumidity 湿度传感器
SubTypeSensorHumidity DeviceSubType = "humidity"
// SubTypeSensorAmmonia 氨气传感器
SubTypeSensorAmmonia DeviceSubType = "ammonia"
// SubTypeSensorWeight 电子秤
SubTypeSensorWeight DeviceSubType = "weight"
// SubTypeValveFeed 下料阀门
SubTypeValveFeed DeviceSubType = "feed_valve"
// SubTypeFan 风机
SubTypeFan DeviceSubType = "fan"
// SubTypeWaterCurtain 水帘
SubTypeWaterCurtain DeviceSubType = "water_curtain"
)
// 设备属性名大全
var (
// 普通开关式设备
BusNumber = "bus_number" // 总线号
BusAddress = "bus_address" // 总线地址
RelayChannel = "relay_channel" // 继电器通道号
// 区域主控
LoRaAddress = "lora_address" // 区域主控 LoRa 地址, 如果使用LoRa网关也可能是LoRa网关记录的设备ID
)
// --- Properties 结构体定义 ---
// LoraProperties 定义了区域主控的特有属性
type LoraProperties struct {
LoraAddress string `json:"lora_address"` // LoRa 地址
// Bus485Properties 定义了总线设备的特有属性
type Bus485Properties struct {
BusNumber uint8 `json:"bus_number"` // 485 总线号
BusAddress uint8 `json:"bus_address"` // 485 总线地址
}
// BusProperties 定义了总线设备的特有属性
type BusProperties struct {
BusID int `json:"bus_id"` // 485 总线号
BusAddress int `json:"bus_address"` // 485 总线地址
// AreaController 是一个LoRa转总线(如485)的通信网关
type AreaController struct {
gorm.Model
// Name 是主控的业务名称,例如 "1号猪舍主控"
Name string `gorm:"not null;unique" json:"name"`
// NetworkID 是主控在通信网络中的唯一标识,例如 LoRaWAN 的 DevEUI。
// 这是 transport 层用来寻址的关键。
NetworkID string `gorm:"not null;unique;index" json:"network_id"`
// Location 描述了主控的物理安装位置。
Location string `gorm:"index" json:"location"`
// Status 表示主控的在线状态等,可以后续扩展。
Status string `gorm:"default:'unknown'" json:"status"`
// Properties 用于存储其他与主控相关的属性,例如硬件版本、固件版本等。
Properties datatypes.JSON `json:"properties"`
}
// Device 代表系统中的所有设备
// SelfCheck 对 AreaController 的关键字段进行业务逻辑验证。
func (ac *AreaController) SelfCheck() error {
if strings.TrimSpace(ac.NetworkID) == "" {
return errors.New("区域主控的 NetworkID 不能为空")
}
return nil
}
// TableName 自定义 GORM 使用的数据库表名
func (AreaController) TableName() string {
return "area_controllers"
}
// Device 代表系统中的所有普通设备
type Device struct {
// gorm.Model 内嵌了标准模型字段 (ID, CreatedAt, UpdatedAt, DeletedAt)
gorm.Model
// Name 是设备的业务名称,应清晰可读,例如 "1号猪舍温度传感器" 或 "做料车间主控"
// Name 是设备的业务名称,应清晰可读,例如 "1号猪舍温度传感器"
Name string `gorm:"not null" json:"name"`
// Type 是设备的高级类别,用于区分区域主控和普通设备。建立索引以优化按类型查询。
Type DeviceType `gorm:"not null;index" json:"type"`
// DeviceTemplateID 是设备模板的外键
DeviceTemplateID uint `gorm:"not null;index" json:"device_template_id"`
// SubType 是设备的子类别,用于描述普通设备的具体功能,例如 "temperature", "fan" 等。建立索引以优化按子类型查询。
SubType DeviceSubType `gorm:"index" json:"sub_type"`
// DeviceTemplate 是设备的模板,包含了设备的通用信息
DeviceTemplate DeviceTemplate `json:"device_template"`
// ParentID 指向其父级设备的ID。对于顶层设备如区域主控此值为 NULL。
// 使用指针类型 *uint 来允许 NULL 值,从而清晰地表示“无父级”,避免了使用 0 作为魔术数字的歧义。建立索引以优化层级查询。
ParentID *uint `gorm:"index" json:"parent_id"`
// AreaControllerID 是区域主控的外键
AreaControllerID uint `gorm:"not null;index" json:"area_controller_id"`
// AreaController 是设备所属的区域主控
AreaController AreaController `json:"area_controller"`
// Location 描述了设备的物理安装位置,例如 "1号猪舍东侧",方便运维。建立索引以优化按位置查询。
Location string `gorm:"index" json:"location"`
// Command 存储了与设备交互所需的具体指令。
// 例如,对于传感器,这里存储 Modbus 采集指令;对于开关和区域主控,这里可以为空。
Command string `gorm:"type:varchar(255)" json:"command"`
// Properties 用于存储特定类型设备的独有属性采用JSON格式。
// 建议在应用层为不同子类型的设备定义专用的属性结构体(如 LoraProperties, BusProperties,以保证数据一致性。
// 建议在应用层为不同子类型的设备定义专用的属性结构体,以保证数据一致性。
Properties datatypes.JSON `json:"properties"`
}
// SelfCheck 对 Device 的关键字段和属性进行业务逻辑验证。
func (d *Device) SelfCheck() error {
if d.AreaControllerID == 0 {
return errors.New("设备必须关联一个区域主控 (AreaControllerID不能为0)")
}
if d.DeviceTemplateID == 0 {
return errors.New("设备必须关联一个设备模板 (DeviceTemplateID不能为0)")
}
// 验证 Properties 是否包含必要的总线地址信息
if d.Properties == nil {
return errors.New("设备属性 (Properties) 不能为空")
}
var props Bus485Properties
if err := json.Unmarshal(d.Properties, &props); err != nil {
return errors.New("无法解析设备属性 (Properties)")
}
if props.BusAddress == 0 {
return errors.New("设备属性 (Properties) 中缺少总线地址 (bus_address)")
}
return nil
}
// TableName 自定义 GORM 使用的数据库表名
func (Device) TableName() string {
return "devices"
@@ -115,55 +122,3 @@ func (d *Device) ParseProperties(v interface{}) error {
}
return json.Unmarshal(d.Properties, v)
}
// SelfCheck 进行参数自检, 返回检测结果
// 方法会根据自身类型进行参数检查, 参数不全时返回false
func (d *Device) SelfCheck() bool {
// 使用清晰的 switch 结构,确保所有情况都被覆盖
switch d.Type {
case DeviceTypeAreaController:
props := make(map[string]interface{})
if err := d.ParseProperties(&props); err != nil {
return false
}
_, ok := props[LoRaAddress].(string)
return ok
case DeviceTypeDevice:
// 所有普通设备都必须有父级
if d.ParentID == nil || *d.ParentID == 0 {
return false
}
props := make(map[string]interface{})
if err := d.ParseProperties(&props); err != nil {
return false
}
// hasPureNumeric 检查一个key是否存在于map中并且其值是纯数字整数或可解析为整数的字符串
hasPureNumeric := func(key string) bool {
val, ok := props[key]
if !ok {
return false // Key不存在
}
return utils.IsPureNumeric(val)
}
// 根据子类型进行具体校验
switch d.SubType {
// 所有传感器类型都必须有 Command 和总线信息,且总线信息为纯数字
case SubTypeSensorTemp, SubTypeSensorHumidity, SubTypeSensorWeight, SubTypeSensorAmmonia:
return d.Command != "" && hasPureNumeric(BusNumber) && hasPureNumeric(BusAddress)
// 所有开关类型都必须有继电器和总线信息,且都为纯数字
case SubTypeFan, SubTypeWaterCurtain, SubTypeValveFeed:
return hasPureNumeric(BusNumber) && hasPureNumeric(BusAddress) && hasPureNumeric(RelayChannel)
// 如果是未知的子类型,或者没有子类型,则认为自检失败
default:
return false
}
// 如果设备类型不是已知的任何一种,则自检失败
default:
return false
}
}

View File

@@ -0,0 +1,167 @@
package models
import (
"encoding/json"
"errors"
"fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/command_generater"
"gorm.io/datatypes"
"gorm.io/gorm"
)
// DeviceCategory 定义了设备模板的宽泛类别
type DeviceCategory string
const (
// CategoryActuator 代表一个执行器,可以被控制(例如:风机、阀门)
CategoryActuator DeviceCategory = "actuator"
// CategorySensor 代表一个传感器,用于报告测量值(例如:温度计)
CategorySensor DeviceCategory = "sensor"
)
// ValueDescriptor 描述了传感器可以报告的单个数值。
// 它提供了必要的元数据,以便应用程序能够正确解释从设备读取的原始数据。
type ValueDescriptor struct {
Type SensorType `json:"type"`
Multiplier float64 `json:"multiplier"` // 乘数,用于原始数据转换
Offset float64 `json:"offset"` // 偏移量,用于原始数据转换
}
// --- 指令结构体 (Command Structs) ---
// SwitchCommands 定义了开关类指令所需的Modbus参数
type SwitchCommands struct {
// ModbusStartAddress 记录Modbus寄存器的起始地址用于生成指令。
ModbusStartAddress uint16 `json:"modbus_start_address"`
// ModbusQuantity 记录Modbus寄存器的数量对于开关通常为1。
ModbusQuantity uint16 `json:"modbus_quantity"`
}
// SelfCheck 校验开关指令参数的有效性
func (sc *SwitchCommands) SelfCheck() error {
// 对于开关数量通常为1
if sc.ModbusQuantity != 1 {
return errors.New("'switch' 指令集 ModbusQuantity 必须为1")
}
return nil
}
// SensorCommands 定义了传感器读取指令所需的Modbus参数
type SensorCommands struct {
// ModbusFunctionCode 记录Modbus功能码例如 ReadHoldingRegisters。
ModbusFunctionCode command_generater.ModbusFunctionCode `json:"modbus_function_code"`
// ModbusStartAddress 记录Modbus寄存器的起始地址用于生成指令。
ModbusStartAddress uint16 `json:"modbus_start_address"`
// ModbusQuantity 记录Modbus寄存器的数量用于生成指令。
ModbusQuantity uint16 `json:"modbus_quantity"`
}
// SelfCheck 校验读取指令参数的有效性
func (sc *SensorCommands) SelfCheck() error {
// 校验ModbusFunctionCode是否为读取类型
switch sc.ModbusFunctionCode {
case command_generater.ReadCoils, command_generater.ReadDiscreteInputs, command_generater.ReadHoldingRegisters, command_generater.ReadInputRegisters:
// 支持的读取功能码
default:
return fmt.Errorf("'sensor' 指令集 ModbusFunctionCode %X 无效或不是读取类型", sc.ModbusFunctionCode)
}
// 校验ModbusQuantity的合理性例如不能为0且在常见Modbus读取数量限制内
if sc.ModbusQuantity == 0 || sc.ModbusQuantity > 125 {
return fmt.Errorf("'sensor' 指令集 ModbusQuantity 无效: %d, 必须在1-125之间", sc.ModbusQuantity)
}
return nil
}
// DeviceTemplate 代表一种物理设备的类型。
type DeviceTemplate struct {
gorm.Model
// Name 是此模板的唯一名称, 例如 "FanModel-XYZ-2000" 或 "TempSensor-T1"
Name string `gorm:"not null;unique" json:"name"`
// Manufacturer 是设备的制造商。
Manufacturer string `json:"manufacturer"`
// Description 提供了关于此设备类型的更多详细信息。
Description string `json:"description"`
// Category 将模板分类为传感器、执行器
Category DeviceCategory `gorm:"not null;index" json:"category"`
// Commands 存储了生成Modbus指令所需的参数而不是原始指令字符串。
// 使用 JSON 格式,具有良好的可扩展性。
// 例如,对于执行器 (开关): {"modbus_start_address": 0, "modbus_quantity": 1}
// 例如,对于传感器: {"modbus_function_code": 3, "modbus_start_address": 0, "modbus_quantity": 1}
Commands datatypes.JSON `json:"commands"`
// Values 描述了传感器模板所能提供的数据点。
// 当 Category 是 "sensor" 时,此字段尤为重要。
// 它是一个 ValueDescriptor 对象的 JSON 数组。
Values datatypes.JSON `json:"values"`
}
// TableName 自定义 GORM 使用的数据库表名
func (DeviceTemplate) TableName() string {
return "device_templates"
}
// ParseCommands ...
func (dt *DeviceTemplate) ParseCommands(v interface{}) error {
if dt.Commands == nil {
return errors.New("设备模板的 Commands 属性为空,无法解析")
}
return json.Unmarshal(dt.Commands, v)
}
// ParseValues ...
func (dt *DeviceTemplate) ParseValues(v interface{}) error {
if dt.Values == nil {
return errors.New("设备模板的 Values 属性为空,无法解析")
}
return json.Unmarshal(dt.Values, v)
}
// SelfCheck 对 DeviceTemplate 进行彻底的、基于角色的校验
func (dt *DeviceTemplate) SelfCheck() error {
if dt.Commands == nil {
return errors.New("所有设备模板都必须有 Commands 定义")
}
switch dt.Category {
case CategoryActuator:
var cmd SwitchCommands
if err := dt.ParseCommands(&cmd); err != nil {
return errors.New("执行器模板的 Commands 无法被解析为 'switch' 指令集: " + err.Error())
}
if err := cmd.SelfCheck(); err != nil {
return err
}
case CategorySensor:
var cmd SensorCommands
if err := dt.ParseCommands(&cmd); err != nil {
return errors.New("传感器模板的 Commands 无法被解析为 'sensor' 指令集: " + err.Error())
}
if err := cmd.SelfCheck(); err != nil {
return err
}
if dt.Values == nil {
return errors.New("传感器类型的设备模板缺少 Values 定义")
}
// 这里应该解析为 ValueDescriptor 的切片,因为传感器可能提供多个数据点
var values []ValueDescriptor
if err := dt.ParseValues(&values); err != nil {
return errors.New("无法解析传感器模板的 Values 属性: " + err.Error())
}
if len(values) == 0 {
return errors.New("传感器类型的设备模板 Values 属性不能为空")
}
default:
return errors.New("未知的设备模板类别")
}
return nil
}

View File

@@ -23,6 +23,8 @@ func GetAllModels() []interface{} {
&SensorData{},
&DeviceCommandLog{},
&PendingCollection{},
&AreaController{},
&DeviceTemplate{},
}
}

View File

@@ -6,25 +6,17 @@ import (
"gorm.io/datatypes"
)
// SensorDataType 定义了 SensorData 记录中 Data 字段的整体类型
type SensorDataType string
// SensorType 定义了 SensorData 记录中 Data 字段的整体类型
type SensorType string
const (
SensorDataTypeSignalMetrics SensorDataType = "signal_metrics" // 信号强度
SensorDataTypeBatteryLevel SensorDataType = "battery_level" // 电池电量
SensorDataTypeTemperature SensorDataType = "temperature" // 温度
SensorDataTypeHumidity SensorDataType = "humidity" // 湿度
SensorDataTypeWeight SensorDataType = "weight" // 重量
SensorTypeSignalMetrics SensorType = "signal_metrics" // 信号强度
SensorTypeBatteryLevel SensorType = "battery_level" // 电池电量
SensorTypeTemperature SensorType = "temperature" // 温度
SensorTypeHumidity SensorType = "humidity" // 湿度
SensorTypeWeight SensorType = "weight" // 重量
)
// DeviceSubTypeToSensorDataTypeMap 定义了设备子类型到其产生的传感器数据类型的静态映射.
// 这个公开的 map 是连接设备定义和数据记录的桥梁, 供其他包直接查询.
var DeviceSubTypeToSensorDataTypeMap = map[DeviceSubType]SensorDataType{
SubTypeSensorTemp: SensorDataTypeTemperature,
SubTypeSensorHumidity: SensorDataTypeHumidity,
SubTypeSensorWeight: SensorDataTypeWeight,
}
// SignalMetrics 存储信号强度数据
type SignalMetrics struct {
RssiDbm int `json:"rssi_dbm"` // 绝对信号强度dBm受距离、障碍物影响
@@ -66,8 +58,8 @@ type SensorData struct {
// RegionalControllerID 是上报此数据的区域主控的ID。
RegionalControllerID uint `json:"regional_controller_id"`
// SensorDataType 是传感数据的类型
SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"`
// SensorType 是传感数据的类型
SensorType SensorType `gorm:"not null;index" json:"sensor_type"`
// Data 存储一个或多个传感器读数,格式为 JSON。
Data datatypes.JSON `gorm:"type:jsonb" json:"data"`

View File

@@ -0,0 +1,40 @@
package repository
import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
)
// AreaControllerRepository 定义了对 AreaController 模型的数据库操作接口
type AreaControllerRepository interface {
FindByID(id uint) (*models.AreaController, error)
FindByNetworkID(networkID string) (*models.AreaController, error) // New method
}
// gormAreaControllerRepository 是 AreaControllerRepository 的 GORM 实现。
type gormAreaControllerRepository struct {
db *gorm.DB
}
// NewGormAreaControllerRepository 创建一个新的 AreaControllerRepository GORM 实现实例。
func NewGormAreaControllerRepository(db *gorm.DB) AreaControllerRepository {
return &gormAreaControllerRepository{db: db}
}
// FindByID 通过 ID 查找一个 AreaController。
func (r *gormAreaControllerRepository) FindByID(id uint) (*models.AreaController, error) {
var areaController models.AreaController
if err := r.db.First(&areaController, id).Error; err != nil {
return nil, err
}
return &areaController, nil
}
// FindByNetworkID 通过 NetworkID 查找一个 AreaController。
func (r *gormAreaControllerRepository) FindByNetworkID(networkID string) (*models.AreaController, error) {
var areaController models.AreaController
if err := r.db.Where("network_id = ?", networkID).First(&areaController).Error; err != nil {
return nil, err
}
return &areaController, nil
}

View File

@@ -23,9 +23,8 @@ type DeviceRepository interface {
// ListAll 获取所有设备的列表
ListAll() ([]*models.Device, error)
// ListByParentID 根据父级 ID 列出所有子设备。
// 如果 parentID 为 nil则列出所有顶层设备如区域主控
ListByParentID(parentID *uint) ([]*models.Device, error)
// ListByAreaControllerID 根据区域主控 ID 列出所有子设备。
ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error)
// Update 更新一个已有的设备信息
Update(device *models.Device) error
@@ -33,11 +32,8 @@ type DeviceRepository interface {
// Delete 根据主键 ID 删除一个设备
Delete(id uint) error
// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 (新增)
FindByDevEui(devEui string) (*models.Device, error)
// FindByParentAndPhysicalAddress 根据父设备ID和物理地址(总线号、总线地址)查找设备
FindByParentAndPhysicalAddress(parentID uint, busNumber int32, busAddress int32) (*models.Device, error)
// FindByAreaControllerAndPhysicalAddress 根据区域主控ID和物理地址(总线号、总线地址)查找设备
FindByAreaControllerAndPhysicalAddress(areaControllerID uint, busNumber int, busAddress int) (*models.Device, error)
}
// gormDeviceRepository 是 DeviceRepository 的 GORM 实现
@@ -58,7 +54,7 @@ func (r *gormDeviceRepository) Create(device *models.Device) error {
// FindByID 根据 ID 查找设备
func (r *gormDeviceRepository) FindByID(id uint) (*models.Device, error) {
var device models.Device
if err := r.db.First(&device, id).Error; err != nil {
if err := r.db.Preload("AreaController").Preload("DeviceTemplate").First(&device, id).Error; err != nil {
return nil, err
}
return &device, nil
@@ -79,24 +75,16 @@ func (r *gormDeviceRepository) FindByIDString(id string) (*models.Device, error)
// ListAll 获取所有设备的列表
func (r *gormDeviceRepository) ListAll() ([]*models.Device, error) {
var devices []*models.Device
if err := r.db.Find(&devices).Error; err != nil {
if err := r.db.Preload("AreaController").Preload("DeviceTemplate").Find(&devices).Error; err != nil {
return nil, err
}
return devices, nil
}
// ListByParentID 根据父级 ID 列出所有子设备
func (r *gormDeviceRepository) ListByParentID(parentID *uint) ([]*models.Device, error) {
// ListByAreaControllerID 根据区域主控 ID 列出所有子设备
func (r *gormDeviceRepository) ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) {
var devices []*models.Device
var err error
// 根据 parentID 是否为 nil构造不同的查询条件
if parentID == nil {
err = r.db.Where("parent_id IS NULL").Find(&devices).Error
} else {
err = r.db.Where("parent_id = ?", *parentID).Find(&devices).Error
}
err := r.db.Preload("AreaController").Preload("DeviceTemplate").Where("area_controller_id = ?", areaControllerID).Find(&devices).Error
if err != nil {
return nil, err
}
@@ -115,27 +103,17 @@ func (r *gormDeviceRepository) Delete(id uint) error {
return r.db.Delete(&models.Device{}, id).Error
}
// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备
func (r *gormDeviceRepository) FindByDevEui(devEui string) (*models.Device, error) {
// FindByAreaControllerAndPhysicalAddress 根据区域主控ID和物理地址(总线号、总线地址)查找设备
func (r *gormDeviceRepository) FindByAreaControllerAndPhysicalAddress(areaControllerID uint, busNumber int, busAddress int) (*models.Device, error) {
var device models.Device
// 使用 GORM 的 JSONB 查询语法: properties->>'lora_address'
if err := r.db.Where("properties->>'lora_address' = ?", devEui).First(&device).Error; err != nil {
return nil, err // 如果找不到或发生其他错误GORM 会返回错误
}
return &device, nil
}
// FindByParentAndPhysicalAddress 根据父设备ID和物理地址(总线号、总线地址)查找设备
func (r *gormDeviceRepository) FindByParentAndPhysicalAddress(parentID uint, busNumber int32, busAddress int32) (*models.Device, error) {
var device models.Device
// PostgreSQL 使用 ->> 操作符来查询 JSONB 字段的文本值
err := r.db.Where("parent_id = ?", parentID).
Where("properties->>'bus_number' = ?", strconv.Itoa(int(busNumber))).
Where("properties->>'bus_address' = ?", strconv.Itoa(int(busAddress))).
err := r.db.Preload("AreaController").Preload("DeviceTemplate").
Where("area_controller_id = ?", areaControllerID).
Where("properties->>'bus_number' = ?", strconv.Itoa(busNumber)).
Where("properties->>'bus_address' = ?", strconv.Itoa(busAddress)).
First(&device).Error
if err != nil {
return nil, fmt.Errorf("根据父设备ID %d 和物理地址 (总线号: %d, 总线地址: %d) 查找设备失败: %w", parentID, busNumber, busAddress, err)
return nil, fmt.Errorf("根据区域主控ID %d 和物理地址 (总线号: %d, 总线地址: %d) 查找设备失败: %w", areaControllerID, busNumber, busAddress, err)
}
return &device, nil
}

View File

@@ -1,262 +0,0 @@
package repository_test
import (
"encoding/json"
"strconv"
"testing"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
)
// createTestDevice 辅助函数,用于创建测试设备
func createTestDevice(t *testing.T, db *gorm.DB, name string, deviceType models.DeviceType, parentID *uint) *models.Device {
device := &models.Device{
Name: name,
Type: deviceType,
ParentID: parentID,
// 其他字段可以根据需要添加
}
err := db.Create(device).Error
assert.NoError(t, err)
return device
}
func TestRepoCreate(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewGormDeviceRepository(db)
loraProps, _ := json.Marshal(models.LoraProperties{LoraAddress: "0xABCD"})
t.Run("成功创建区域主控", func(t *testing.T) {
device := &models.Device{
Name: "主控A",
Type: models.DeviceTypeAreaController,
Location: "猪舍1",
Properties: loraProps,
}
err := repo.Create(device)
assert.NoError(t, err)
assert.NotZero(t, device.ID, "创建后应获得一个非零ID")
assert.Nil(t, device.ParentID, "区域主控的 ParentID 应为 nil")
})
t.Run("成功创建子设备", func(t *testing.T) {
parent := createTestDevice(t, db, "父设备", models.DeviceTypeAreaController, nil)
child := &models.Device{
Name: "子设备A",
Type: models.DeviceTypeDevice,
ParentID: &parent.ID,
}
err := repo.Create(child)
assert.NoError(t, err)
assert.NotZero(t, child.ID)
assert.NotNil(t, child.ParentID)
assert.Equal(t, parent.ID, *child.ParentID)
})
}
func TestRepoFindByID(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewGormDeviceRepository(db)
device := createTestDevice(t, db, "测试设备", models.DeviceTypeAreaController, nil)
t.Run("成功通过ID查找", func(t *testing.T) {
foundDevice, err := repo.FindByID(device.ID)
assert.NoError(t, err)
assert.NotNil(t, foundDevice)
assert.Equal(t, device.ID, foundDevice.ID)
assert.Equal(t, device.Name, foundDevice.Name)
})
t.Run("查找不存在的ID", func(t *testing.T) {
_, err := repo.FindByID(9999) // 不存在的ID
assert.Error(t, err)
assert.ErrorIs(t, err, gorm.ErrRecordNotFound)
})
t.Run("数据库查询失败", func(t *testing.T) {
// 模拟数据库连接关闭,强制查询失败
sqlDB, _ := db.DB()
sqlDB.Close()
_, err := repo.FindByID(device.ID)
assert.Error(t, err)
assert.Contains(t, err.Error(), "database is closed")
})
}
func TestRepoFindByIDString(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewGormDeviceRepository(db)
device := createTestDevice(t, db, "测试设备", models.DeviceTypeAreaController, nil)
t.Run("成功通过字符串ID查找", func(t *testing.T) {
idStr := strconv.FormatUint(uint64(device.ID), 10)
foundDevice, err := repo.FindByIDString(idStr)
assert.NoError(t, err)
assert.NotNil(t, foundDevice)
assert.Equal(t, device.ID, foundDevice.ID)
})
t.Run("无效的字符串ID格式", func(t *testing.T) {
_, err := repo.FindByIDString("invalid-id")
assert.Error(t, err)
assert.Contains(t, err.Error(), "无效的设备ID格式")
})
t.Run("查找不存在的字符串ID", func(t *testing.T) {
idStr := strconv.FormatUint(uint64(9999), 10) // 不存在的ID
_, err := repo.FindByIDString(idStr)
assert.Error(t, err)
assert.ErrorIs(t, err, gorm.ErrRecordNotFound)
})
t.Run("数据库查询失败", func(t *testing.T) {
sqlDB, _ := db.DB()
sqlDB.Close()
idStr := strconv.FormatUint(uint64(device.ID), 10)
_, err := repo.FindByIDString(idStr)
assert.Error(t, err)
assert.Contains(t, err.Error(), "database is closed")
})
}
func TestRepoListAll(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewGormDeviceRepository(db)
t.Run("成功获取空列表", func(t *testing.T) {
devices, err := repo.ListAll()
assert.NoError(t, err)
assert.Empty(t, devices)
})
t.Run("成功获取包含设备的列表", func(t *testing.T) {
createTestDevice(t, db, "设备1", models.DeviceTypeAreaController, nil)
createTestDevice(t, db, "设备2", models.DeviceTypeDevice, nil)
devices, err := repo.ListAll()
assert.NoError(t, err)
assert.Len(t, devices, 2)
assert.Equal(t, "设备1", devices[0].Name)
assert.Equal(t, "设备2", devices[1].Name)
})
t.Run("数据库查询失败", func(t *testing.T) {
sqlDB, _ := db.DB()
sqlDB.Close()
_, err := repo.ListAll()
assert.Error(t, err)
assert.Contains(t, err.Error(), "database is closed")
})
}
func TestRepoListByParentID(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewGormDeviceRepository(db)
parent1 := createTestDevice(t, db, "父设备1", models.DeviceTypeAreaController, nil)
parent2 := createTestDevice(t, db, "父设备2", models.DeviceTypeAreaController, nil)
child1_1 := createTestDevice(t, db, "子设备1-1", models.DeviceTypeDevice, &parent1.ID)
child1_2 := createTestDevice(t, db, "子设备1-2", models.DeviceTypeDevice, &parent1.ID)
_ = createTestDevice(t, db, "子设备2-1", models.DeviceTypeDevice, &parent2.ID)
t.Run("成功通过父ID查找子设备", func(t *testing.T) {
children, err := repo.ListByParentID(&parent1.ID)
assert.NoError(t, err)
assert.Len(t, children, 2)
assert.Contains(t, []uint{child1_1.ID, child1_2.ID}, children[0].ID)
assert.Contains(t, []uint{child1_1.ID, child1_2.ID}, children[1].ID)
})
t.Run("成功通过nil父ID查找顶层设备", func(t *testing.T) {
parents, err := repo.ListByParentID(nil)
assert.NoError(t, err)
assert.Len(t, parents, 2)
assert.Contains(t, []uint{parent1.ID, parent2.ID}, parents[0].ID)
assert.Contains(t, []uint{parent1.ID, parent2.ID}, parents[1].ID)
})
t.Run("查找不存在的父ID", func(t *testing.T) {
nonExistentParentID := uint(9999)
children, err := repo.ListByParentID(&nonExistentParentID)
assert.NoError(t, err) // GORM 在未找到时返回空列表而不是错误
assert.Empty(t, children)
})
t.Run("数据库查询失败", func(t *testing.T) {
sqlDB, _ := db.DB()
sqlDB.Close()
_, err := repo.ListByParentID(&parent1.ID)
assert.Error(t, err)
assert.Contains(t, err.Error(), "database is closed")
})
}
func TestRepoUpdate(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewGormDeviceRepository(db)
device := createTestDevice(t, db, "原始设备", models.DeviceTypeAreaController, nil)
t.Run("成功更新设备信息", func(t *testing.T) {
device.Name = "更新后的设备"
device.Location = "新地点"
err := repo.Update(device)
assert.NoError(t, err)
updatedDevice, err := repo.FindByID(device.ID)
assert.NoError(t, err)
assert.Equal(t, "更新后的设备", updatedDevice.Name)
assert.Equal(t, "新地点", updatedDevice.Location)
})
t.Run("数据库更新失败", func(t *testing.T) {
sqlDB, _ := db.DB()
sqlDB.Close()
device.Name = "更新失败的设备"
err := repo.Update(device)
assert.Error(t, err)
assert.Contains(t, err.Error(), "database is closed")
})
}
func TestRepoDelete(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewGormDeviceRepository(db)
device := createTestDevice(t, db, "待删除设备", models.DeviceTypeAreaController, nil)
t.Run("成功删除设备", func(t *testing.T) {
err := repo.Delete(device.ID)
assert.NoError(t, err)
// 验证设备已被软删除
_, err = repo.FindByID(device.ID)
assert.Error(t, err, "删除后应无法找到设备")
assert.ErrorIs(t, err, gorm.ErrRecordNotFound, "错误类型应为 RecordNotFound")
})
t.Run("删除不存在的设备", func(t *testing.T) {
err := repo.Delete(9999) // 不存在的ID
assert.NoError(t, err) // GORM 的 Delete 方法在删除不存在的记录时不会报错
})
t.Run("数据库删除失败", func(t *testing.T) {
sqlDB, _ := db.DB()
sqlDB.Close()
err := repo.Delete(device.ID)
assert.Error(t, err)
assert.Contains(t, err.Error(), "database is closed")
})
}

View File

@@ -10,7 +10,7 @@ import (
// SensorDataRepository 定义了与传感器数据相关的数据库操作接口。
type SensorDataRepository interface {
Create(sensorData *models.SensorData) error
GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorType models.SensorDataType) (*models.SensorData, error)
GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorType models.SensorType) (*models.SensorData, error)
}
// gormSensorDataRepository 是 SensorDataRepository 的 GORM 实现。
@@ -30,10 +30,10 @@ func (r *gormSensorDataRepository) Create(sensorData *models.SensorData) error {
}
// GetLatestSensorDataByDeviceIDAndSensorType 根据设备ID和传感器类型查询最新的传感器数据。
func (r *gormSensorDataRepository) GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorDataType models.SensorDataType) (*models.SensorData, error) {
func (r *gormSensorDataRepository) GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorType models.SensorType) (*models.SensorData, error) {
var sensorData models.SensorData
// 增加一个时间范围来缩小查询范围, 从而加快查找速度, 当使用时序数据库时时间范围可以让数据库忽略时间靠前的分片
err := r.db.Where("device_id = ? AND sensor_data_type = ? AND time >=?", deviceID, sensorDataType, time.Now().Add(-24*time.Hour)).
err := r.db.Where("device_id = ? AND sensor_type = ? AND time >=?", deviceID, sensorType, time.Now().Add(-24*time.Hour)).
Order("time DESC").
First(&sensorData).Error
return &sensorData, err

View File

@@ -0,0 +1,182 @@
package command_generater
import (
"encoding/binary"
"fmt"
)
// ModbusFunctionCode 定义Modbus功能码的枚举类型
type ModbusFunctionCode byte
// 定义常用的Modbus功能码常量及其应用场景
const (
// ReadCoils 读取线圈状态 (0x01)
// 场景: 用于读取数字量输出DO或内部标志位的当前状态这些状态通常是开关量。
ReadCoils ModbusFunctionCode = 0x01
// ReadDiscreteInputs 读取离散输入状态 (0x02)
// 场景: 用于读取数字量输入DI的当前状态这些状态通常是外部传感器的开关量信号。
ReadDiscreteInputs ModbusFunctionCode = 0x02
// ReadHoldingRegisters 读取保持寄存器 (0x03)
// 场景: 用于读取设备内部可读写的参数或数据,例如温度设定值、电机速度等模拟量或配置数据。
ReadHoldingRegisters ModbusFunctionCode = 0x03
// ReadInputRegisters 读取输入寄存器 (0x04)
// 场景: 用于读取设备的模拟量输入AI数据这些数据通常是只读的例如当前温度、压力、电压等实时测量值。
ReadInputRegisters ModbusFunctionCode = 0x04
// WriteSingleCoil 写入单个线圈 (0x05)
// 场景: 用于控制单个数字量输出DO例如打开或关闭一个继电器、指示灯等。
WriteSingleCoil ModbusFunctionCode = 0x05
// WriteSingleRegister 写入单个保持寄存器 (0x06)
// 场景: 用于修改设备内部的单个可写参数,例如设置一个温度控制器的目标温度、调整一个阀门的开度等。
WriteSingleRegister ModbusFunctionCode = 0x06
// WriteMultipleCoils 写入多个线圈 (0x0F)
// 场景: 用于批量控制多个数字量输出DO例如同时打开或关闭一组继电器。
WriteMultipleCoils ModbusFunctionCode = 0x0F
// WriteMultipleRegisters 写入多个保持寄存器 (0x10)
// 场景: 用于批量修改设备内部的多个可写参数,例如一次性更新多个配置参数或模拟量输出值。
WriteMultipleRegisters ModbusFunctionCode = 0x10
)
// GenerateModbusRTUReadCommand 生成Modbus RTU读取指令
// 该函数主要用于生成Modbus RTU的读取类指令 (如 0x01, 0x02, 0x03, 0x04)。
// 其PDU结构为: 功能码 + 起始地址 + 数量。
//
// 参数:
//
// slaveAddress: 从站地址 (1-247)。
// functionCode: 功能码,使用 ModbusFunctionCode 枚举类型 (例如: ReadHoldingRegisters)。
// 此函数仅支持读取类功能码。
// startAddress: 寄存器/线圈的起始地址 (0-65535)。
// quantity: 要读取的寄存器/线圈数量 (1-125)。
//
// 返回:
//
// []byte: 完整的Modbus RTU指令字节切片。
// error: 如果参数无效或生成过程中出现错误,则返回错误信息。
func GenerateModbusRTUReadCommand(slaveAddress uint8, functionCode ModbusFunctionCode, startAddress uint16, quantity uint16) ([]byte, error) {
// 1. 校验输入参数
if slaveAddress == 0 || slaveAddress > 247 {
return nil, fmt.Errorf("从站地址无效: %d, 必须在1-247之间", slaveAddress)
}
// 校验功能码是否为读取类型
switch functionCode {
case ReadCoils, ReadDiscreteInputs, ReadHoldingRegisters, ReadInputRegisters:
// 这些是支持的读取功能码
case WriteSingleCoil, WriteSingleRegister, WriteMultipleCoils, WriteMultipleRegisters:
return nil, fmt.Errorf("功能码 %X 是写入操作,请使用 GenerateModbusRTUWriteCoilCommand 或其他写入函数", functionCode)
default:
return nil, fmt.Errorf("不支持的功能码: %X", functionCode)
}
// 对于读取类功能码数量通常限制在1到125之间。
if quantity == 0 || quantity > 125 {
return nil, fmt.Errorf("功能码 %X (读取操作) 的数量无效: %d, 必须在1-125之间", functionCode, quantity)
}
// 2. 构建PDU (协议数据单元)
// PDU结构: 功能码 (1字节) + 起始地址 (2字节) + 数量 (2字节)
pdu := make([]byte, 5)
pdu[0] = byte(functionCode) // 将枚举类型转换为byte
// Modbus协议中地址和数量都是大端字节序 (高位在前)
binary.BigEndian.PutUint16(pdu[1:3], startAddress)
binary.BigEndian.PutUint16(pdu[3:5], quantity)
// 3. 构建ADU (应用数据单元)
// ADU结构: 从站地址 (1字节) + PDU
adu := make([]byte, 1+len(pdu))
adu[0] = slaveAddress
copy(adu[1:], pdu)
// 4. 计算CRC16校验码
crc := calculateCRC16(adu)
// 5. 组装完整的Modbus RTU指令
// 完整指令结构: ADU + CRC (2字节)
command := make([]byte, len(adu)+2)
copy(command, adu)
// Modbus RTU的CRC是低字节在前高字节在后 (小端字节序)
binary.LittleEndian.PutUint16(command[len(adu):], crc)
return command, nil
}
// GenerateModbusRTUSwitchCommand 生成Modbus RTU写入单个线圈的指令
// 该函数专门用于生成 Modbus RTU 的写入单个线圈 (0x05) 指令,用于控制开关。
//
// 参数:
//
// slaveAddress: 从站地址 (1-247)。
// coilAddress: 要写入的线圈地址 (0-65535)。
// onOffState: 开关状态true 表示开启 (ON, 0xFF00)false 表示关闭 (OFF, 0x0000)。
//
// 返回:
//
// []byte: 完整的Modbus RTU指令字节切片。
// error: 如果参数无效或生成过程中出现错误,则返回错误信息。
func GenerateModbusRTUSwitchCommand(slaveAddress uint8, coilAddress uint16, onOffState bool) ([]byte, error) {
// 1. 校验从站地址
if slaveAddress == 0 || slaveAddress > 247 {
return nil, fmt.Errorf("从站地址无效: %d, 必须在1-247之间", slaveAddress)
}
// 根据布尔值确定写入的Modbus值
var writeValue uint16
if onOffState {
writeValue = 0xFF00 // ON
} else {
writeValue = 0x0000 // OFF
}
// 2. 构建PDU (协议数据单元) for WriteSingleCoil (0x05)
// PDU结构: 功能码 (1字节) + 线圈地址 (2字节) + 写入值 (2字节)
pdu := make([]byte, 5)
pdu[0] = byte(WriteSingleCoil)
// Modbus协议中地址和值都是大端字节序 (高位在前)
binary.BigEndian.PutUint16(pdu[1:3], coilAddress)
binary.BigEndian.PutUint16(pdu[3:5], writeValue)
// 3. 构建ADU (应用数据单元)
// ADU结构: 从站地址 (1字节) + PDU
adu := make([]byte, 1+len(pdu))
adu[0] = slaveAddress
copy(adu[1:], pdu)
// 4. 计算CRC16校验码
crc := calculateCRC16(adu)
// 5. 组装完整的Modbus RTU指令
// 完整指令结构: ADU + CRC (2字节)
command := make([]byte, len(adu)+2)
copy(command, adu)
// Modbus RTU的CRC是低字节在前高字节在后 (小端字节序)
binary.LittleEndian.PutUint16(command[len(adu):], crc)
return command, nil
}
// calculateCRC16 计算Modbus RTU的CRC-16校验码
//
// 参数:
//
// data: 需要计算CRC的字节切片 (通常是ADU即从站地址+PDU)。
//
// 返回:
//
// uint16: 16位的CRC校验码。
func calculateCRC16(data []byte) uint16 {
var crc uint16 = 0xFFFF // CRC初始值
polynomial := uint16(0xA001) // Modbus RTU CRC-16多项式 (反向表示)
for _, b := range data {
crc ^= uint16(b) // 将数据字节与CRC寄存器异或
for i := 0; i < 8; i++ {
if (crc & 0x0001) != 0 { // 检查最低位是否为1
crc >>= 1 // 右移一位
crc ^= polynomial // 与多项式异或
} else {
crc >>= 1 // 否则只右移一位
}
}
}
return crc
}