package transport import ( "encoding/base64" // 新增导入 "encoding/json" "io" "net/http" "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" gproto "google.golang.org/protobuf/proto" "gorm.io/datatypes" ) // ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型 const ( eventTypeUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。 eventTypeStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。 eventTypeJoin = "join" // 入网事件:当设备成功加入网络时触发。 eventTypeAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。 eventTypeTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。 eventTypeLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。 eventTypeLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。 eventTypeIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。 ) // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 type ChirpStackListener struct { logger *logs.Logger sensorDataRepo repository.SensorDataRepository deviceRepo repository.DeviceRepository deviceCommandLogRepo repository.DeviceCommandLogRepository pendingCollectionRepo repository.PendingCollectionRepository // 新增 } // NewChirpStackListener 创建一个新的 ChirpStackListener 实例 func NewChirpStackListener( logger *logs.Logger, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, deviceCommandLogRepo repository.DeviceCommandLogRepository, pendingCollectionRepo repository.PendingCollectionRepository, // 新增 ) ListenHandler { // 返回接口类型 return &ChirpStackListener{ logger: logger, sensorDataRepo: sensorDataRepo, deviceRepo: deviceRepo, deviceCommandLogRepo: deviceCommandLogRepo, pendingCollectionRepo: pendingCollectionRepo, // 新增 } } // Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息 func (c *ChirpStackListener) Handler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() b, err := io.ReadAll(r.Body) if err != nil { c.logger.Errorf("读取请求体失败: %v", err) http.Error(w, "failed to read body", http.StatusBadRequest) return } event := r.URL.Query().Get("event") w.WriteHeader(http.StatusOK) // 将异步处理逻辑委托给 handler 方法 go c.handler(b, event) } } // handler 用于处理 ChirpStack 发送的事件 func (c *ChirpStackListener) handler(data []byte, eventType string) { switch eventType { case eventTypeUp: var msg UpEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data)) return } c.handleUpEvent(&msg) case eventTypeJoin: var msg JoinEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data)) return } c.handleJoinEvent(&msg) case eventTypeAck: var msg AckEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data)) return } c.handleAckEvent(&msg) case eventTypeTxAck: var msg TxAckEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data)) return } c.handleTxAckEvent(&msg) case eventTypeStatus: var msg StatusEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data)) return } c.handleStatusEvent(&msg) case eventTypeLog: var msg LogEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data)) return } c.handleLogEvent(&msg) case eventTypeLocation: var msg LocationEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data)) return } c.handleLocationEvent(&msg) case eventTypeIntegration: var msg IntegrationEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data)) return } c.handleIntegrationEvent(&msg) default: c.logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data)) } } // --- 业务处理函数 --- // GenericSensorReading 表示单个传感器读数,包含设备ID、类型和值。 type GenericSensorReading struct { DeviceID uint `json:"device_id"` // 传感器设备的ID Type models.SensorDataType `json:"type"` // 传感器类型 (复用 models.SensorDataType) Value float64 `json:"value"` // 传感器读数 } // handleUpEvent 处理上行数据事件 func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { c.logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui) // 1. 查找区域主控设备 regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) if err != nil { c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) return } c.logger.Infof("找到区域主控: %s (ID: %d)", regionalController.Name, regionalController.ID) // 2. 记录区域主控的信号强度 (如果存在) if len(event.RxInfo) > 0 { // 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。 // 因此,我们只取第一个 RxInfo 中的信号数据即可。 rx := event.RxInfo[0] // 取第一个接收到的网关信息 // 构建 SignalMetrics 结构体 signalMetrics := models.SignalMetrics{ RssiDbm: rx.Rssi, SnrDb: rx.Snr, } c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) c.logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", regionalController.ID, rx.Rssi, rx.Snr) } else { c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui) } // 3. 处理上报的传感器数据 if event.Data == "" { c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无需记录上行数据。DevEui: %s", event.DeviceInfo.DevEui) return } // 3.1 Base64 解码 decodedData, err := base64.StdEncoding.DecodeString(event.Data) if err != nil { c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data) return } // 3.2 解析外层 "信封" var instruction proto.Instruction if err := gproto.Unmarshal(decodedData, &instruction); err != nil { c.logger.Errorf("解析上行 Instruction Protobuf 失败: %v, Decoded Data: %x", err, decodedData) return } // 3.3 检查是否是采集响应 if instruction.Method != proto.MethodType_COLLECT { c.logger.Infof("收到一个非采集响应的上行指令 (Method: %s),无需处理。", instruction.Method.String()) return } // 2.4 解包内层 CollectResult var collectResp proto.CollectResult if err := instruction.Data.UnmarshalTo(&collectResp); err != nil { c.logger.Errorf("解包数据信息失败: %v", err) return } correlationID := collectResp.CorrelationId c.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values)) // 3. 根据 CorrelationID 查找待处理请求 pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(correlationID) if err != nil { c.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err) return } // 检查状态,防止重复处理 if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut { c.logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status) return } // 4. 匹配数据并存入数据库 deviceIDs := pendingReq.CommandMetadata values := collectResp.Values if len(deviceIDs) != len(values) { c.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID) // TODO 数量不匹配是否全改成失败 // 即使数量不匹配,也更新状态为完成,以防止请求永远 pending err = c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time) if err != nil { c.logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err) } return } for i, deviceID := range deviceIDs { value := values[i] dev, err := c.deviceRepo.FindByID(deviceID) if err != nil { c.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err) continue } sensorDataType, ok := models.DeviceSubTypeToSensorDataTypeMap[dev.SubType] if !ok { c.logger.Warnf("设备 %d 的子类型 '%s' 没有对应的传感器数据类型,跳过记录。", dev.ID, dev.SubType) continue } var sensorData interface{} switch sensorDataType { case models.SensorDataTypeTemperature: sensorData = models.TemperatureData{TemperatureCelsius: float64(value)} case models.SensorDataTypeHumidity: sensorData = models.HumidityData{HumidityPercent: float64(value)} case models.SensorDataTypeWeight: sensorData = models.WeightData{WeightKilograms: float64(value)} default: c.logger.Warnf("未处理的传感器数据类型 '%s' (设备ID: %d)", sensorDataType, dev.ID) continue } c.recordSensorData(pendingReq.DeviceID, dev.ID, event.Time, sensorDataType, sensorData) c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 值=%.2f", dev.ID, sensorDataType, value) } // 5. 更新请求状态为“已完成” if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time); err != nil { c.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err) } else { c.logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID) } } // handleStatusEvent 处理设备状态事件 func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { c.logger.Infof("处接收到理 'status' 事件: %+v", event) // 记录信号强度 signalMetrics := models.SignalMetrics{ MarginDb: event.Margin, } // 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) if err != nil { c.logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) return } // 记录区域主控的信号强度 c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) // 记录 电量 batteryLevel := models.BatteryLevel{ BatteryLevelRatio: event.BatteryLevel, BatteryLevelUnavailable: event.BatteryLevelUnavailable, ExternalPower: event.ExternalPower, } // 记录区域主控的电池电量 c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel) } // handleAckEvent 处理下行确认事件 func (c *ChirpStackListener) handleAckEvent(event *AckEvent) { c.logger.Infof("接收到 'ack' 事件: %+v", event) // 更新下行任务记录的确认时间及接收成功状态 err := c.deviceCommandLogRepo.UpdateAcknowledgedAt(event.DeduplicationID, event.Time, event.Acknowledged) if err != nil { c.logger.Errorf("更新下行任务记录的确认时间及接收成功状态失败 (MessageID: %s, DevEui: %s, Acknowledged: %t): %v", event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, err) return } c.logger.Infof("成功更新下行任务记录确认时间及接收成功状态 (MessageID: %s, DevEui: %s, Acknowledged: %t, AcknowledgedAt: %s)", event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, event.Time.Format(time.RFC3339)) } // handleLogEvent 处理日志事件 func (c *ChirpStackListener) handleLogEvent(event *LogEvent) { // 首先,打印完整的事件结构体,用于详细排查 c.logger.Infof("接收到 'log' 事件的完整内容: %+v", event) // 接着,根据 ChirpStack 日志的级别,使用我们自己的 logger 对应级别来打印核心信息 logMessage := "ChirpStack 日志: [%s] %s (DevEui: %s)" switch event.Level { case "INFO": c.logger.Infof(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui) case "WARNING": c.logger.Warnf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui) case "ERROR": c.logger.Errorf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui) default: // 对于未知级别,使用 Warn 级别打印,并明确指出级别未知 c.logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)", event.Level, event.Code, event.Description, event.DeviceInfo.DevEui) } } // handleJoinEvent 处理入网事件 func (c *ChirpStackListener) handleJoinEvent(event *JoinEvent) { c.logger.Infof("接收到 'join' 事件: %+v", event) // 在这里添加您的业务逻辑 } // handleTxAckEvent 处理网关发送确认事件 func (c *ChirpStackListener) handleTxAckEvent(event *TxAckEvent) { c.logger.Infof("接收到 'txack' 事件: %+v", event) // 在这里添加您的业务逻辑 } // handleLocationEvent 处理位置事件 func (c *ChirpStackListener) handleLocationEvent(event *LocationEvent) { c.logger.Infof("接收到 'location' 事件: %+v", event) // 在这里添加您的业务逻辑 } // handleIntegrationEvent 处理集成事件 func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) { c.logger.Infof("接收到 'integration' 事件: %+v", event) // 在这里添加您的业务逻辑 } // recordSensorData 是一个通用方法,用于将传感器数据存入数据库。 // regionalControllerID: 区域主控设备的ID // sensorDeviceID: 实际产生传感器数据的普通设备的ID func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, dataType models.SensorDataType, data interface{}) { // 2. 序列化数据结构体为 JSON jsonData, err := json.Marshal(data) if err != nil { c.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err) return } // 3. 构建 SensorData 模型 sensorData := &models.SensorData{ Time: eventTime, DeviceID: sensorDeviceID, RegionalControllerID: regionalControllerID, SensorDataType: dataType, Data: datatypes.JSON(jsonData), } // 4. 调用仓库创建记录 if err := c.sensorDataRepo.Create(sensorData); err != nil { c.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err) } }