diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index 2e8ed77..2573429 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -1,7 +1,7 @@ package transport import ( - "encoding/base64" // 新增导入 + "encoding/base64" "encoding/json" "io" "net/http" @@ -32,8 +32,9 @@ type ChirpStackListener struct { logger *logs.Logger sensorDataRepo repository.SensorDataRepository deviceRepo repository.DeviceRepository + areaControllerRepo repository.AreaControllerRepository // 新增 deviceCommandLogRepo repository.DeviceCommandLogRepository - pendingCollectionRepo repository.PendingCollectionRepository // 新增 + pendingCollectionRepo repository.PendingCollectionRepository } // NewChirpStackListener 创建一个新的 ChirpStackListener 实例 @@ -41,15 +42,17 @@ func NewChirpStackListener( logger *logs.Logger, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, + areaControllerRepo repository.AreaControllerRepository, // 新增 deviceCommandLogRepo repository.DeviceCommandLogRepository, - pendingCollectionRepo repository.PendingCollectionRepository, // 新增 + pendingCollectionRepo repository.PendingCollectionRepository, ) ListenHandler { // 返回接口类型 return &ChirpStackListener{ logger: logger, sensorDataRepo: sensorDataRepo, deviceRepo: deviceRepo, + areaControllerRepo: areaControllerRepo, // 新增 deviceCommandLogRepo: deviceCommandLogRepo, - pendingCollectionRepo: pendingCollectionRepo, // 新增 + pendingCollectionRepo: pendingCollectionRepo, } } @@ -149,10 +152,11 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { // --- 业务处理函数 --- // GenericSensorReading 表示单个传感器读数,包含设备ID、类型和值。 +// 此结构体已不再使用,但保留以供参考或兼容旧代码。 type GenericSensorReading struct { - DeviceID uint `json:"device_id"` // 传感器设备的ID - Type models.SensorDataType `json:"type"` // 传感器类型 (复用 models.SensorDataType) - Value float64 `json:"value"` // 传感器读数 + DeviceID uint `json:"device_id"` // 传感器设备的ID + Type string `json:"type"` // 传感器类型 (现在直接使用 ValueDescriptor.Name) + Value float64 `json:"value"` // 传感器读数 } // handleUpEvent 处理上行数据事件 @@ -160,11 +164,16 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { c.logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui) // 1. 查找区域主控设备 - regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + regionalController, err := c.areaControllerRepo.FindByNetworkID(event.DeviceInfo.DevEui) if err != nil { c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) return } + // 依赖 SelfCheck 确保区域主控有效 + if err := regionalController.SelfCheck(); err != nil { + c.logger.Errorf("处理 'up' 事件失败:区域主控 %v(ID: %d) 未通过自检: %v", regionalController.Name, regionalController.ID, err) + return + } c.logger.Infof("找到区域主控: %s (ID: %d)", regionalController.Name, regionalController.ID) // 2. 记录区域主控的信号强度 (如果存在) @@ -178,7 +187,9 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { RssiDbm: rx.Rssi, SnrDb: rx.Snr, } - c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + // 记录信号强度,使用 ValueDescriptor 的 Name 作为 sensorName + // 简化处理,只记录 RSSI,如果需要记录 SNR,可以再添加一个 ValueDescriptor + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, "signal_strength", float64(signalMetrics.RssiDbm)) c.logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", regionalController.ID, rx.Rssi, rx.Snr) } else { c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui) @@ -210,7 +221,7 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { return } - // 2.4 解包内层 CollectResult + // 3.4 解包内层 CollectResult var collectResp proto.CollectResult if err := instruction.Data.UnmarshalTo(&collectResp); err != nil { c.logger.Errorf("解包数据信息失败: %v", err) @@ -220,7 +231,7 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { correlationID := collectResp.CorrelationId c.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values)) - // 3. 根据 CorrelationID 查找待处理请求 + // 4. 根据 CorrelationID 查找待处理请求 pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(correlationID) if err != nil { c.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err) @@ -233,12 +244,11 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { return } - // 4. 匹配数据并存入数据库 + // 5. 匹配数据并存入数据库 deviceIDs := pendingReq.CommandMetadata values := collectResp.Values if len(deviceIDs) != len(values) { c.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID) - // TODO 数量不匹配是否全改成失败 // 即使数量不匹配,也更新状态为完成,以防止请求永远 pending err = c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time) if err != nil { @@ -248,37 +258,46 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { } for i, deviceID := range deviceIDs { - value := values[i] + rawSensorValue := values[i] // 这是设备上报的原始值 + + // 5.1 获取设备及其模板 dev, err := c.deviceRepo.FindByID(deviceID) if err != nil { c.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err) continue } - - sensorDataType, ok := models.DeviceSubTypeToSensorDataTypeMap[dev.SubType] - if !ok { - c.logger.Warnf("设备 %d 的子类型 '%s' 没有对应的传感器数据类型,跳过记录。", dev.ID, dev.SubType) + // 依赖 SelfCheck 确保设备和模板有效 + if err := dev.SelfCheck(); err != nil { + c.logger.Warnf("跳过设备 %d,因其未通过自检: %v", dev.ID, err) + continue + } + if err := dev.DeviceTemplate.SelfCheck(); err != nil { + c.logger.Warnf("跳过设备 %d,因其设备模板未通过自检: %v", dev.ID, err) continue } - var sensorData interface{} - switch sensorDataType { - case models.SensorDataTypeTemperature: - sensorData = models.TemperatureData{TemperatureCelsius: float64(value)} - case models.SensorDataTypeHumidity: - sensorData = models.HumidityData{HumidityPercent: float64(value)} - case models.SensorDataTypeWeight: - sensorData = models.WeightData{WeightKilograms: float64(value)} - default: - c.logger.Warnf("未处理的传感器数据类型 '%s' (设备ID: %d)", sensorDataType, dev.ID) + // 5.2 从设备模板中解析 ValueDescriptor + var valueDescriptors []*models.ValueDescriptor + if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil { + c.logger.Warnf("跳过设备 %d,因其设备模板的 Values 属性解析失败: %v", dev.ID, err) continue } + // 根据 DeviceTemplate.SelfCheck,这里应该只有一个 ValueDescriptor + if len(valueDescriptors) == 0 { + c.logger.Warnf("跳过设备 %d,因其设备模板缺少 ValueDescriptor 定义", dev.ID) + continue + } + valueDescriptor := valueDescriptors[0] - c.recordSensorData(pendingReq.DeviceID, dev.ID, event.Time, sensorDataType, sensorData) - c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 值=%.2f", dev.ID, sensorDataType, value) + // 5.3 应用乘数和偏移量计算最终值 + parsedValue := float64(rawSensorValue)*valueDescriptor.Multiplier + valueDescriptor.Offset + + // 5.4 记录传感器数据 + c.recordSensorData(regionalController.ID, dev.ID, event.Time, valueDescriptor.Name, parsedValue) + c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%d, 解析值=%.2f", dev.ID, valueDescriptor.Name, rawSensorValue, parsedValue) } - // 5. 更新请求状态为“已完成” + // 6. 更新请求状态为“已完成” if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time); err != nil { c.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err) } else { @@ -295,13 +314,13 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { MarginDb: event.Margin, } // 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui - regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + regionalController, err := c.areaControllerRepo.FindByNetworkID(event.DeviceInfo.DevEui) if err != nil { c.logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) return } // 记录区域主控的信号强度 - c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, "signal_metrics", float64(signalMetrics.RssiDbm)) // 记录 电量 batteryLevel := models.BatteryLevel{ @@ -310,7 +329,7 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { ExternalPower: event.ExternalPower, } // 记录区域主控的电池电量 - c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel) + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, "battery_level", float64(batteryLevel.BatteryLevelRatio)) } @@ -378,24 +397,29 @@ func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) { // recordSensorData 是一个通用方法,用于将传感器数据存入数据库。 // regionalControllerID: 区域主控设备的ID // sensorDeviceID: 实际产生传感器数据的普通设备的ID -func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, dataType models.SensorDataType, data interface{}) { - // 2. 序列化数据结构体为 JSON - jsonData, err := json.Marshal(data) +// sensorName: 传感器值的名称 (例如 "temperature", "weight") +// parsedValue: 已经解析并应用了乘数和偏移量的浮点数值 +func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, sensorName string, parsedValue float64) { + // 1. 将解析后的值封装成一个简单的 JSON 对象 + dataToMarshal := map[string]float64{ + "value": parsedValue, + } + jsonData, err := json.Marshal(dataToMarshal) if err != nil { c.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err) return } - // 3. 构建 SensorData 模型 + // 2. 构建 SensorData 模型 sensorData := &models.SensorData{ Time: eventTime, DeviceID: sensorDeviceID, RegionalControllerID: regionalControllerID, - SensorDataType: dataType, + SensorDataType: sensorName, // 直接使用 sensorName 作为数据类型 Data: datatypes.JSON(jsonData), } - // 4. 调用仓库创建记录 + // 3. 调用仓库创建记录 if err := c.sensorDataRepo.Create(sensorData); err != nil { c.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err) } diff --git a/internal/infra/models/device_template.go b/internal/infra/models/device_template.go index b91405e..9ba1517 100644 --- a/internal/infra/models/device_template.go +++ b/internal/infra/models/device_template.go @@ -73,7 +73,7 @@ type DeviceTemplate struct { // Description 提供了关于此设备类型的更多详细信息。 Description string `json:"description"` - // Category 将模板分类为传感器、执行器或复合设备。 + // Category 将模板分类为传感器、执行器 Category DeviceCategory `gorm:"not null;index" json:"category"` // Commands 存储了一个从“动作名称”到“原始指令”的映射。 diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go index 31713af..86c9234 100644 --- a/internal/infra/models/sensor_data.go +++ b/internal/infra/models/sensor_data.go @@ -7,23 +7,9 @@ import ( ) // SensorDataType 定义了 SensorData 记录中 Data 字段的整体类型 -type SensorDataType string - -const ( - SensorDataTypeSignalMetrics SensorDataType = "signal_metrics" // 信号强度 - SensorDataTypeBatteryLevel SensorDataType = "battery_level" // 电池电量 - SensorDataTypeTemperature SensorDataType = "temperature" // 温度 - SensorDataTypeHumidity SensorDataType = "humidity" // 湿度 - SensorDataTypeWeight SensorDataType = "weight" // 重量 -) // DeviceSubTypeToSensorDataTypeMap 定义了设备子类型到其产生的传感器数据类型的静态映射. // 这个公开的 map 是连接设备定义和数据记录的桥梁, 供其他包直接查询. -var DeviceSubTypeToSensorDataTypeMap = map[DeviceSubType]SensorDataType{ - SubTypeSensorTemp: SensorDataTypeTemperature, - SubTypeSensorHumidity: SensorDataTypeHumidity, - SubTypeSensorWeight: SensorDataTypeWeight, -} // SignalMetrics 存储信号强度数据 type SignalMetrics struct { @@ -67,7 +53,7 @@ type SensorData struct { RegionalControllerID uint `json:"regional_controller_id"` // SensorDataType 是传感数据的类型 - SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"` + SensorDataType string `gorm:"not null;index" json:"sensor_data_type"` // Data 存储一个或多个传感器读数,格式为 JSON。 Data datatypes.JSON `gorm:"type:jsonb" json:"data"` diff --git a/internal/infra/repository/area_controller_repository.go b/internal/infra/repository/area_controller_repository.go new file mode 100644 index 0000000..15cb7c5 --- /dev/null +++ b/internal/infra/repository/area_controller_repository.go @@ -0,0 +1,40 @@ +package repository + +import ( + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "gorm.io/gorm" +) + +// AreaControllerRepository 定义了对 AreaController 模型的数据库操作接口 +type AreaControllerRepository interface { + FindByID(id uint) (*models.AreaController, error) + FindByNetworkID(networkID string) (*models.AreaController, error) // New method +} + +// gormAreaControllerRepository 是 AreaControllerRepository 的 GORM 实现。 +type gormAreaControllerRepository struct { + db *gorm.DB +} + +// NewGormAreaControllerRepository 创建一个新的 AreaControllerRepository GORM 实现实例。 +func NewGormAreaControllerRepository(db *gorm.DB) AreaControllerRepository { + return &gormAreaControllerRepository{db: db} +} + +// FindByID 通过 ID 查找一个 AreaController。 +func (r *gormAreaControllerRepository) FindByID(id uint) (*models.AreaController, error) { + var areaController models.AreaController + if err := r.db.First(&areaController, id).Error; err != nil { + return nil, err + } + return &areaController, nil +} + +// FindByNetworkID 通过 NetworkID 查找一个 AreaController。 +func (r *gormAreaControllerRepository) FindByNetworkID(networkID string) (*models.AreaController, error) { + var areaController models.AreaController + if err := r.db.Where("network_id = ?", networkID).First(&areaController).Error; err != nil { + return nil, err + } + return &areaController, nil +}