diff --git a/internal/app/service/device/proto/device.pb.go b/internal/app/service/device/proto/device.pb.go index a775c64..f77e8c3 100644 --- a/internal/app/service/device/proto/device.pb.go +++ b/internal/app/service/device/proto/device.pb.go @@ -250,6 +250,51 @@ func (x *Collect) GetValue() float32 { return 0 } +// 用于批量上报的顶层消息 +type UplinkPayload struct { + state protoimpl.MessageState `protogen:"open.v1"` + Readings []*Collect `protobuf:"bytes,1,rep,name=readings,proto3" json:"readings,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UplinkPayload) Reset() { + *x = UplinkPayload{} + mi := &file_device_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UplinkPayload) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UplinkPayload) ProtoMessage() {} + +func (x *UplinkPayload) ProtoReflect() protoreflect.Message { + mi := &file_device_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UplinkPayload.ProtoReflect.Descriptor instead. +func (*UplinkPayload) Descriptor() ([]byte, []int) { + return file_device_proto_rawDescGZIP(), []int{3} +} + +func (x *UplinkPayload) GetReadings() []*Collect { + if x != nil { + return x.Readings + } + return nil +} + var File_device_proto protoreflect.FileDescriptor const file_device_proto_rawDesc = "" + @@ -270,7 +315,9 @@ const file_device_proto_rawDesc = "" + "bus_number\x18\x01 \x01(\x05R\tbusNumber\x12\x1f\n" + "\vbus_address\x18\x02 \x01(\x05R\n" + "busAddress\x12\x14\n" + - "\x05value\x18\x03 \x01(\x02R\x05value*%\n" + + "\x05value\x18\x03 \x01(\x02R\x05value\"<\n" + + "\rUplinkPayload\x12+\n" + + "\breadings\x18\x01 \x03(\v2\x0f.device.CollectR\breadings*%\n" + "\n" + "MethodType\x12\n" + "\n" + @@ -290,22 +337,24 @@ func file_device_proto_rawDescGZIP() []byte { } var file_device_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_device_proto_goTypes = []any{ - (MethodType)(0), // 0: device.MethodType - (*Instruction)(nil), // 1: device.Instruction - (*Switch)(nil), // 2: device.Switch - (*Collect)(nil), // 3: device.Collect - (*anypb.Any)(nil), // 4: google.protobuf.Any + (MethodType)(0), // 0: device.MethodType + (*Instruction)(nil), // 1: device.Instruction + (*Switch)(nil), // 2: device.Switch + (*Collect)(nil), // 3: device.Collect + (*UplinkPayload)(nil), // 4: device.UplinkPayload + (*anypb.Any)(nil), // 5: google.protobuf.Any } var file_device_proto_depIdxs = []int32{ 0, // 0: device.Instruction.method:type_name -> device.MethodType - 4, // 1: device.Instruction.data:type_name -> google.protobuf.Any - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 5, // 1: device.Instruction.data:type_name -> google.protobuf.Any + 3, // 2: device.UplinkPayload.readings:type_name -> device.Collect + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_device_proto_init() } @@ -319,7 +368,7 @@ func file_device_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_device_proto_rawDesc), len(file_device_proto_rawDesc)), NumEnums: 1, - NumMessages: 3, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/app/service/device/proto/device.proto b/internal/app/service/device/proto/device.proto index 8a98e32..11f6b9d 100644 --- a/internal/app/service/device/proto/device.proto +++ b/internal/app/service/device/proto/device.proto @@ -29,4 +29,9 @@ message Collect{ int32 bus_number = 1; // 总线号 int32 bus_address = 2; // 总线地址 float value = 3; // 采集值 +} + +// 用于批量上报的顶层消息 +message UplinkPayload { + repeated Collect readings = 1; } \ No newline at end of file diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index a3ec41f..35c17a4 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -7,9 +7,11 @@ import ( "net/http" "time" + "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + gproto "google.golang.org/protobuf/proto" "gorm.io/datatypes" ) @@ -151,12 +153,20 @@ type GenericSensorReading struct { // handleUpEvent 处理上行数据事件 func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { - c.logger.Infof("处理 'up' 事件: %+v", event) + c.logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui) - // 记录信号强度 - // 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。 - // 因此,我们只取第一个 RxInfo 中的信号数据即可。 + // 1. 查找区域主控设备 + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + c.logger.Infof("找到区域主控: %s (ID: %d)", regionalController.Name, regionalController.ID) + + // 2. 记录区域主控的信号强度 (如果存在) if len(event.RxInfo) > 0 { + // 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。 + // 因此,我们只取第一个 RxInfo 中的信号数据即可。 rx := event.RxInfo[0] // 取第一个接收到的网关信息 // 构建 SignalMetrics 结构体 @@ -164,73 +174,73 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { RssiDbm: rx.Rssi, SnrDb: rx.Snr, } - - // 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui - regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) - if err != nil { - c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) - return - } - // 记录区域主控的信号强度 c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + c.logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", regionalController.ID, rx.Rssi, rx.Snr) } else { c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui) } - // 解析并记录传感器数据 (温度、湿度、重量) - // 假设 event.Data (frmPayload) 是 Base64 编码的 JSON 数组字符串 - if event.Data != "" { - decodedData, err := base64.StdEncoding.DecodeString(event.Data) + // 3. 处理上报的传感器数据 + if event.Data == "" { + c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无需记录上行数据。DevEui: %s", event.DeviceInfo.DevEui) + return + } + + // 3.1 Base64 解码 + decodedData, err := base64.StdEncoding.DecodeString(event.Data) + if err != nil { + c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data) + return + } + + // 3.2 Protobuf 反序列化 + var payload proto.UplinkPayload + if err := gproto.Unmarshal(decodedData, &payload); err != nil { + c.logger.Errorf("Protobuf 反序列化 'up' 事件的解码后 Data 失败: %v, Decoded Data: %x", err, decodedData) + return + } + c.logger.Infof("成功解析 Protobuf 数据, 包含 %d 条读数", len(payload.Readings)) + + // 3.3 遍历处理每一条读数 + for _, reading := range payload.Readings { + // 3.3.1 根据物理地址查找对应的传感器设备 + sensorDevice, err := c.deviceRepo.FindByParentAndPhysicalAddress(regionalController.ID, reading.BusNumber, reading.BusAddress) if err != nil { - c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data) - return + c.logger.Errorf("查找传感器设备失败: %v", err) + continue // 继续处理下一条读数 } - var readings []GenericSensorReading - if err := json.Unmarshal(decodedData, &readings); err != nil { - c.logger.Errorf("解析 'up' 事件的解码后 Data (JSON 数组) 失败: %v, Decoded Data: %s", err, string(decodedData)) - return + // ✨ 核心修正: 直接从 models 包的公开 map 中查找转换关系 ✨ + sensorDataType, ok := models.DeviceSubTypeToSensorDataTypeMap[sensorDevice.SubType] + if !ok { + // 如果一个设备子类型不在 map 中, 说明它不是一个需要记录数据的传感器, 这属于正常情况, 无需记录日志. + continue } - // 查找区域主控设备,以便记录其ID - regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) - if err != nil { - c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) - return - } - - for _, reading := range readings { - // 根据类型构建具体的传感器数据结构体 - var sensorData interface{} - var sensorDataType models.SensorDataType - - switch reading.Type { - case models.SensorDataTypeTemperature: // 使用枚举常量 - sensorData = models.TemperatureData{ - TemperatureCelsius: reading.Value, - } - sensorDataType = models.SensorDataTypeTemperature - case models.SensorDataTypeHumidity: // 使用枚举常量 - sensorData = models.HumidityData{ - HumidityPercent: reading.Value, - } - sensorDataType = models.SensorDataTypeHumidity - case models.SensorDataTypeWeight: // 使用枚举常量 - sensorData = models.WeightData{ - WeightKilograms: reading.Value, - } - sensorDataType = models.SensorDataTypeWeight - default: - c.logger.Warnf("处理 'up' 事件时遇到未知传感器类型: %s, Value: %f. 区域主控DevEui: %s, 传感器设备ID: %d", - reading.Type, reading.Value, event.DeviceInfo.DevEui, reading.DeviceID) - continue // 跳过未知类型 + // 3.3.2 根据转换后的 sensorDataType 构建具体的数据结构 + var sensorData interface{} + switch sensorDataType { + case models.SensorDataTypeTemperature: + sensorData = models.TemperatureData{ + TemperatureCelsius: float64(reading.Value), } - - // 记录普通设备的传感器数据 - c.recordSensorData(regionalController.ID, reading.DeviceID, event.Time, sensorDataType, sensorData) + case models.SensorDataTypeHumidity: + sensorData = models.HumidityData{ + HumidityPercent: float64(reading.Value), + } + case models.SensorDataTypeWeight: + sensorData = models.WeightData{ + WeightKilograms: float64(reading.Value), + } + default: + // 这个 case 理论上不会被触发 + c.logger.Warnf("未处理的传感器数据类型 '%s' (设备ID: %d)", sensorDataType, sensorDevice.ID) + continue } - } else { - c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无法记录传感器数据。DevEui: %s", event.DeviceInfo.DevEui) + + // 3.3.3 记录传感器数据 + c.recordSensorData(regionalController.ID, sensorDevice.ID, event.Time, sensorDataType, sensorData) + c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 值=%.2f", sensorDevice.ID, sensorDataType, reading.Value) } } diff --git a/internal/infra/models/device.go b/internal/infra/models/device.go index 469129e..a6b6ea6 100644 --- a/internal/infra/models/device.go +++ b/internal/infra/models/device.go @@ -30,6 +30,8 @@ const ( SubTypeSensorHumidity DeviceSubType = "humidity" // SubTypeSensorAmmonia 氨气传感器 SubTypeSensorAmmonia DeviceSubType = "ammonia" + // SubTypeSensorWeight 电子秤 + SubTypeSensorWeight DeviceSubType = "weight" // SubTypeValveFeed 下料阀门 SubTypeValveFeed DeviceSubType = "feed_valve" diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go index 0e1a6a9..31713af 100644 --- a/internal/infra/models/sensor_data.go +++ b/internal/infra/models/sensor_data.go @@ -17,6 +17,14 @@ const ( SensorDataTypeWeight SensorDataType = "weight" // 重量 ) +// DeviceSubTypeToSensorDataTypeMap 定义了设备子类型到其产生的传感器数据类型的静态映射. +// 这个公开的 map 是连接设备定义和数据记录的桥梁, 供其他包直接查询. +var DeviceSubTypeToSensorDataTypeMap = map[DeviceSubType]SensorDataType{ + SubTypeSensorTemp: SensorDataTypeTemperature, + SubTypeSensorHumidity: SensorDataTypeHumidity, + SubTypeSensorWeight: SensorDataTypeWeight, +} + // SignalMetrics 存储信号强度数据 type SignalMetrics struct { RssiDbm int `json:"rssi_dbm"` // 绝对信号强度(dBm),受距离、障碍物影响 diff --git a/internal/infra/repository/device_repository.go b/internal/infra/repository/device_repository.go index 5cc3ee8..1005a1c 100644 --- a/internal/infra/repository/device_repository.go +++ b/internal/infra/repository/device_repository.go @@ -35,6 +35,9 @@ type DeviceRepository interface { // FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 (新增) FindByDevEui(devEui string) (*models.Device, error) + + // FindByParentAndPhysicalAddress 根据父设备ID和物理地址(总线号、总线地址)查找设备 + FindByParentAndPhysicalAddress(parentID uint, busNumber int32, busAddress int32) (*models.Device, error) } // gormDeviceRepository 是 DeviceRepository 的 GORM 实现 @@ -121,3 +124,18 @@ func (r *gormDeviceRepository) FindByDevEui(devEui string) (*models.Device, erro } return &device, nil } + +// FindByParentAndPhysicalAddress 根据父设备ID和物理地址(总线号、总线地址)查找设备 +func (r *gormDeviceRepository) FindByParentAndPhysicalAddress(parentID uint, busNumber int32, busAddress int32) (*models.Device, error) { + var device models.Device + // PostgreSQL 使用 ->> 操作符来查询 JSONB 字段的文本值 + err := r.db.Where("parent_id = ?", parentID). + Where("properties->>'bus_number' = ?", strconv.Itoa(int(busNumber))). + Where("properties->>'bus_address' = ?", strconv.Itoa(int(busAddress))). + First(&device).Error + + if err != nil { + return nil, fmt.Errorf("根据父设备ID %d 和物理地址 (总线号: %d, 总线地址: %d) 查找设备失败: %w", parentID, busNumber, busAddress, err) + } + return &device, nil +}