diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index 9cd1831..593f17a 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -1,6 +1,7 @@ package transport import ( + "encoding/base64" // 新增导入 "encoding/json" "io" "net/http" @@ -14,14 +15,14 @@ import ( // ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型 const ( - eventUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。 - eventStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。 - eventJoin = "join" // 入网事件:当设备成功加入网络时触发。 - eventAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。 - eventTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。 - eventLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。 - eventLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。 - eventIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。 + eventTypeUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。 + eventTypeStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。 + eventTypeJoin = "join" // 入网事件:当设备成功加入网络时触发。 + eventTypeAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。 + eventTypeTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。 + eventTypeLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。 + eventTypeLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。 + eventTypeIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。 ) // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 @@ -63,7 +64,7 @@ func (c *ChirpStackListener) Handler() http.HandlerFunc { // handler 用于处理 ChirpStack 发送的事件 func (c *ChirpStackListener) handler(data []byte, eventType string) { switch eventType { - case eventUp: + case eventTypeUp: var msg UpEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data)) @@ -71,7 +72,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleUpEvent(&msg) - case eventJoin: + case eventTypeJoin: var msg JoinEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data)) @@ -79,7 +80,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleJoinEvent(&msg) - case eventAck: + case eventTypeAck: var msg AckEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data)) @@ -87,7 +88,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleAckEvent(&msg) - case eventTxAck: + case eventTypeTxAck: var msg TxAckEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data)) @@ -95,7 +96,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleTxAckEvent(&msg) - case eventStatus: + case eventTypeStatus: var msg StatusEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data)) @@ -103,7 +104,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleStatusEvent(&msg) - case eventLog: + case eventTypeLog: var msg LogEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data)) @@ -111,7 +112,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleLogEvent(&msg) - case eventLocation: + case eventTypeLocation: var msg LocationEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data)) @@ -119,7 +120,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleLocationEvent(&msg) - case eventIntegration: + case eventTypeIntegration: var msg IntegrationEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data)) @@ -134,6 +135,13 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { // --- 业务处理函数 --- +// GenericSensorReading 表示单个传感器读数,包含设备ID、类型和值。 +type GenericSensorReading struct { + DeviceID uint `json:"device_id"` // 传感器设备的ID + Type models.SensorDataType `json:"type"` // 传感器类型 (复用 models.SensorDataType) + Value float64 `json:"value"` // 传感器读数 +} + // handleUpEvent 处理上行数据事件 func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { c.logger.Infof("处理 'up' 事件: %+v", event) @@ -146,15 +154,77 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { // 构建 SignalMetrics 结构体 signalMetrics := models.SignalMetrics{ - RSSI: rx.Rssi, - SNR: rx.Snr, + RssiDbm: rx.Rssi, + SnrDb: rx.Snr, } - c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + + // 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + // 记录区域主控的信号强度 + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) } else { c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui) } - // 在这里添加您的业务逻辑 + // 解析并记录传感器数据 (温度、湿度、重量) + // 假设 event.Data (frmPayload) 是 Base64 编码的 JSON 数组字符串 + if event.Data != "" { + decodedData, err := base64.StdEncoding.DecodeString(event.Data) + if err != nil { + c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data) + return + } + + var readings []GenericSensorReading + if err := json.Unmarshal(decodedData, &readings); err != nil { + c.logger.Errorf("解析 'up' 事件的解码后 Data (JSON 数组) 失败: %v, Decoded Data: %s", err, string(decodedData)) + return + } + + // 查找区域主控设备,以便记录其ID + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + + for _, reading := range readings { + // 根据类型构建具体的传感器数据结构体 + var sensorData interface{} + var sensorDataType models.SensorDataType + + switch reading.Type { + case models.SensorDataTypeTemperature: // 使用枚举常量 + sensorData = models.TemperatureData{ + TemperatureCelsius: reading.Value, + } + sensorDataType = models.SensorDataTypeTemperature + case models.SensorDataTypeHumidity: // 使用枚举常量 + sensorData = models.HumidityData{ + HumidityPercent: reading.Value, + } + sensorDataType = models.SensorDataTypeHumidity + case models.SensorDataTypeWeight: // 使用枚举常量 + sensorData = models.WeightData{ + WeightKilograms: reading.Value, + } + sensorDataType = models.SensorDataTypeWeight + default: + c.logger.Warnf("处理 'up' 事件时遇到未知传感器类型: %s, Value: %f. 区域主控DevEui: %s, 传感器设备ID: %d", + reading.Type, reading.Value, event.DeviceInfo.DevEui, reading.DeviceID) + continue // 跳过未知类型 + } + + // 记录普通设备的传感器数据 + c.recordSensorData(regionalController.ID, reading.DeviceID, event.Time, sensorDataType, sensorData) + } + } else { + c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无法记录传感器数据。DevEui: %s", event.DeviceInfo.DevEui) + } } // handleStatusEvent 处理设备状态事件 @@ -163,17 +233,25 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { // 记录信号强度 signalMetrics := models.SignalMetrics{ - Margin: event.Margin, + MarginDb: event.Margin, } - c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + // 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + // 记录区域主控的信号强度 + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) // 记录 电量 batteryLevel := models.BatteryLevel{ - BatteryLevel: event.BatteryLevel, + BatteryLevelRatio: event.BatteryLevel, BatteryLevelUnavailable: event.BatteryLevelUnavailable, ExternalPower: event.ExternalPower, } - c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel) + // 记录区域主控的电池电量 + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel) } @@ -229,14 +307,9 @@ func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) { } // recordSensorData 是一个通用方法,用于将传感器数据存入数据库。 -func (c *ChirpStackListener) recordSensorData(devEui string, eventTime time.Time, dataType models.SensorDataType, data interface{}) { - // 1. 查找设备 - device, err := c.deviceRepo.FindByDevEui(devEui) - if err != nil { - c.logger.Warnf("记录传感器数据失败:无法通过 DevEui '%s' 找到设备: %v", devEui, err) - return - } - +// regionalControllerID: 区域主控设备的ID +// sensorDeviceID: 实际产生传感器数据的普通设备的ID +func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, dataType models.SensorDataType, data interface{}) { // 2. 序列化数据结构体为 JSON jsonData, err := json.Marshal(data) if err != nil { @@ -247,9 +320,9 @@ func (c *ChirpStackListener) recordSensorData(devEui string, eventTime time.Time // 3. 构建 SensorData 模型 sensorData := &models.SensorData{ Time: eventTime, - DeviceID: device.ID, - RegionalControllerID: *device.ParentID, - SensorDataType: dataType, // 设置传感器数据类型 + DeviceID: sensorDeviceID, + RegionalControllerID: regionalControllerID, + SensorDataType: dataType, Data: datatypes.JSON(jsonData), } diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go index 93898ed..a667f65 100644 --- a/internal/infra/models/sensor_data.go +++ b/internal/infra/models/sensor_data.go @@ -12,23 +12,41 @@ type SensorDataType string const ( SensorDataTypeSignalMetrics SensorDataType = "signal_metrics" // 信号强度 SensorDataTypeBatteryLevel SensorDataType = "battery_level" // 电池电量 + SensorDataTypeTemperature SensorDataType = "temperature" // 温度 + SensorDataTypeHumidity SensorDataType = "humidity" // 湿度 + SensorDataTypeWeight SensorDataType = "weight" // 重量 ) // SignalMetrics 存储信号强度数据 type SignalMetrics struct { - RSSI int `json:"rssi"` // 绝对信号强度(dBm),受距离、障碍物影响 - SNR float64 `json:"snr"` // 信号与噪声的相对比率(dB),由 RSSI 减去噪声地板(Noise Floor) - Sensitivity int `json:"sensitivity"` // 网关的最低检测阈值(dBm) - Margin int `json:"margin"` // SNR 相对于接收器灵敏度的余量, Margin = SNR - Sensitivity + RssiDbm int `json:"rssi_dbm"` // 绝对信号强度(dBm),受距离、障碍物影响 + SnrDb float64 `json:"snr_db"` // 信号与噪声的相对比率(dB),由 RSSI 减去噪声地板(Noise Floor) + SensitivityDbm int `json:"sensitivity_dbm"` // 网关的最低检测阈值(dBm) + MarginDb int `json:"margin_db"` // SNR 相对于接收器灵敏度的余量, Margin = SNR - Sensitivity } // BatteryLevel 存储电池电量数据 type BatteryLevel struct { - BatteryLevel float32 `json:"battery_level"` // 电量剩余百分比 + BatteryLevelRatio float32 `json:"battery_level_ratio"` // 电量剩余百分比(%) BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电量数据不可用 ExternalPower bool `json:"external_power"` // 是否使用外部电源 } +// TemperatureData 存储温度数据 +type TemperatureData struct { + TemperatureCelsius float64 `json:"temperature_celsius"` // 温度值 (摄氏度) +} + +// HumidityData 存储湿度数据 +type HumidityData struct { + HumidityPercent float64 `json:"humidity_percent"` // 湿度值 (%) +} + +// WeightData 存储重量数据 +type WeightData struct { + WeightKilograms float64 `json:"weight_kilograms"` // 重量值 (公斤) +} + // SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 type SensorData struct { // Time 是数据记录的时间戳,作为复合主键的一部分。