调整 ChirpStackListener
This commit is contained in:
		| @@ -1,7 +1,7 @@ | ||||
| package transport | ||||
|  | ||||
| import ( | ||||
| 	"encoding/base64" // 新增导入 | ||||
| 	"encoding/base64" | ||||
| 	"encoding/json" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| @@ -32,8 +32,9 @@ type ChirpStackListener struct { | ||||
| 	logger                *logs.Logger | ||||
| 	sensorDataRepo        repository.SensorDataRepository | ||||
| 	deviceRepo            repository.DeviceRepository | ||||
| 	areaControllerRepo    repository.AreaControllerRepository // 新增 | ||||
| 	deviceCommandLogRepo  repository.DeviceCommandLogRepository | ||||
| 	pendingCollectionRepo repository.PendingCollectionRepository // 新增 | ||||
| 	pendingCollectionRepo repository.PendingCollectionRepository | ||||
| } | ||||
|  | ||||
| // NewChirpStackListener 创建一个新的 ChirpStackListener 实例 | ||||
| @@ -41,15 +42,17 @@ func NewChirpStackListener( | ||||
| 	logger *logs.Logger, | ||||
| 	sensorDataRepo repository.SensorDataRepository, | ||||
| 	deviceRepo repository.DeviceRepository, | ||||
| 	areaControllerRepo repository.AreaControllerRepository, // 新增 | ||||
| 	deviceCommandLogRepo repository.DeviceCommandLogRepository, | ||||
| 	pendingCollectionRepo repository.PendingCollectionRepository, // 新增 | ||||
| 	pendingCollectionRepo repository.PendingCollectionRepository, | ||||
| ) ListenHandler { // 返回接口类型 | ||||
| 	return &ChirpStackListener{ | ||||
| 		logger:                logger, | ||||
| 		sensorDataRepo:        sensorDataRepo, | ||||
| 		deviceRepo:            deviceRepo, | ||||
| 		areaControllerRepo:    areaControllerRepo, // 新增 | ||||
| 		deviceCommandLogRepo:  deviceCommandLogRepo, | ||||
| 		pendingCollectionRepo: pendingCollectionRepo, // 新增 | ||||
| 		pendingCollectionRepo: pendingCollectionRepo, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -149,10 +152,11 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { | ||||
| // --- 业务处理函数 --- | ||||
|  | ||||
| // GenericSensorReading 表示单个传感器读数,包含设备ID、类型和值。 | ||||
| // 此结构体已不再使用,但保留以供参考或兼容旧代码。 | ||||
| type GenericSensorReading struct { | ||||
| 	DeviceID uint                  `json:"device_id"` // 传感器设备的ID | ||||
| 	Type     models.SensorDataType `json:"type"`      // 传感器类型 (复用 models.SensorDataType) | ||||
| 	Value    float64               `json:"value"`     // 传感器读数 | ||||
| 	DeviceID uint    `json:"device_id"` // 传感器设备的ID | ||||
| 	Type     string  `json:"type"`      // 传感器类型 (现在直接使用 ValueDescriptor.Name) | ||||
| 	Value    float64 `json:"value"`     // 传感器读数 | ||||
| } | ||||
|  | ||||
| // handleUpEvent 处理上行数据事件 | ||||
| @@ -160,11 +164,16 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { | ||||
| 	c.logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui) | ||||
|  | ||||
| 	// 1. 查找区域主控设备 | ||||
| 	regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) | ||||
| 	regionalController, err := c.areaControllerRepo.FindByNetworkID(event.DeviceInfo.DevEui) | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) | ||||
| 		return | ||||
| 	} | ||||
| 	// 依赖 SelfCheck 确保区域主控有效 | ||||
| 	if err := regionalController.SelfCheck(); err != nil { | ||||
| 		c.logger.Errorf("处理 'up' 事件失败:区域主控 %v(ID: %d) 未通过自检: %v", regionalController.Name, regionalController.ID, err) | ||||
| 		return | ||||
| 	} | ||||
| 	c.logger.Infof("找到区域主控: %s (ID: %d)", regionalController.Name, regionalController.ID) | ||||
|  | ||||
| 	// 2. 记录区域主控的信号强度 (如果存在) | ||||
| @@ -178,7 +187,9 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { | ||||
| 			RssiDbm: rx.Rssi, | ||||
| 			SnrDb:   rx.Snr, | ||||
| 		} | ||||
| 		c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) | ||||
| 		// 记录信号强度,使用 ValueDescriptor 的 Name 作为 sensorName | ||||
| 		// 简化处理,只记录 RSSI,如果需要记录 SNR,可以再添加一个 ValueDescriptor | ||||
| 		c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, "signal_strength", float64(signalMetrics.RssiDbm)) | ||||
| 		c.logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", regionalController.ID, rx.Rssi, rx.Snr) | ||||
| 	} else { | ||||
| 		c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui) | ||||
| @@ -210,7 +221,7 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// 2.4 解包内层 CollectResult | ||||
| 	// 3.4 解包内层 CollectResult | ||||
| 	var collectResp proto.CollectResult | ||||
| 	if err := instruction.Data.UnmarshalTo(&collectResp); err != nil { | ||||
| 		c.logger.Errorf("解包数据信息失败: %v", err) | ||||
| @@ -220,7 +231,7 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { | ||||
| 	correlationID := collectResp.CorrelationId | ||||
| 	c.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values)) | ||||
|  | ||||
| 	// 3. 根据 CorrelationID 查找待处理请求 | ||||
| 	// 4. 根据 CorrelationID 查找待处理请求 | ||||
| 	pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(correlationID) | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err) | ||||
| @@ -233,12 +244,11 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// 4. 匹配数据并存入数据库 | ||||
| 	// 5. 匹配数据并存入数据库 | ||||
| 	deviceIDs := pendingReq.CommandMetadata | ||||
| 	values := collectResp.Values | ||||
| 	if len(deviceIDs) != len(values) { | ||||
| 		c.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID) | ||||
| 		// TODO 数量不匹配是否全改成失败 | ||||
| 		// 即使数量不匹配,也更新状态为完成,以防止请求永远 pending | ||||
| 		err = c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time) | ||||
| 		if err != nil { | ||||
| @@ -248,37 +258,46 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { | ||||
| 	} | ||||
|  | ||||
| 	for i, deviceID := range deviceIDs { | ||||
| 		value := values[i] | ||||
| 		rawSensorValue := values[i] // 这是设备上报的原始值 | ||||
|  | ||||
| 		// 5.1 获取设备及其模板 | ||||
| 		dev, err := c.deviceRepo.FindByID(deviceID) | ||||
| 		if err != nil { | ||||
| 			c.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		sensorDataType, ok := models.DeviceSubTypeToSensorDataTypeMap[dev.SubType] | ||||
| 		if !ok { | ||||
| 			c.logger.Warnf("设备 %d 的子类型 '%s' 没有对应的传感器数据类型,跳过记录。", dev.ID, dev.SubType) | ||||
| 		// 依赖 SelfCheck 确保设备和模板有效 | ||||
| 		if err := dev.SelfCheck(); err != nil { | ||||
| 			c.logger.Warnf("跳过设备 %d,因其未通过自检: %v", dev.ID, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if err := dev.DeviceTemplate.SelfCheck(); err != nil { | ||||
| 			c.logger.Warnf("跳过设备 %d,因其设备模板未通过自检: %v", dev.ID, err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		var sensorData interface{} | ||||
| 		switch sensorDataType { | ||||
| 		case models.SensorDataTypeTemperature: | ||||
| 			sensorData = models.TemperatureData{TemperatureCelsius: float64(value)} | ||||
| 		case models.SensorDataTypeHumidity: | ||||
| 			sensorData = models.HumidityData{HumidityPercent: float64(value)} | ||||
| 		case models.SensorDataTypeWeight: | ||||
| 			sensorData = models.WeightData{WeightKilograms: float64(value)} | ||||
| 		default: | ||||
| 			c.logger.Warnf("未处理的传感器数据类型 '%s' (设备ID: %d)", sensorDataType, dev.ID) | ||||
| 		// 5.2 从设备模板中解析 ValueDescriptor | ||||
| 		var valueDescriptors []*models.ValueDescriptor | ||||
| 		if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil { | ||||
| 			c.logger.Warnf("跳过设备 %d,因其设备模板的 Values 属性解析失败: %v", dev.ID, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		// 根据 DeviceTemplate.SelfCheck,这里应该只有一个 ValueDescriptor | ||||
| 		if len(valueDescriptors) == 0 { | ||||
| 			c.logger.Warnf("跳过设备 %d,因其设备模板缺少 ValueDescriptor 定义", dev.ID) | ||||
| 			continue | ||||
| 		} | ||||
| 		valueDescriptor := valueDescriptors[0] | ||||
|  | ||||
| 		c.recordSensorData(pendingReq.DeviceID, dev.ID, event.Time, sensorDataType, sensorData) | ||||
| 		c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 值=%.2f", dev.ID, sensorDataType, value) | ||||
| 		// 5.3 应用乘数和偏移量计算最终值 | ||||
| 		parsedValue := float64(rawSensorValue)*valueDescriptor.Multiplier + valueDescriptor.Offset | ||||
|  | ||||
| 		// 5.4 记录传感器数据 | ||||
| 		c.recordSensorData(regionalController.ID, dev.ID, event.Time, valueDescriptor.Name, parsedValue) | ||||
| 		c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%d, 解析值=%.2f", dev.ID, valueDescriptor.Name, rawSensorValue, parsedValue) | ||||
| 	} | ||||
|  | ||||
| 	// 5. 更新请求状态为“已完成” | ||||
| 	// 6. 更新请求状态为“已完成” | ||||
| 	if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time); err != nil { | ||||
| 		c.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err) | ||||
| 	} else { | ||||
| @@ -295,13 +314,13 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { | ||||
| 		MarginDb: event.Margin, | ||||
| 	} | ||||
| 	// 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui | ||||
| 	regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) | ||||
| 	regionalController, err := c.areaControllerRepo.FindByNetworkID(event.DeviceInfo.DevEui) | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) | ||||
| 		return | ||||
| 	} | ||||
| 	// 记录区域主控的信号强度 | ||||
| 	c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) | ||||
| 	c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, "signal_metrics", float64(signalMetrics.RssiDbm)) | ||||
|  | ||||
| 	// 记录 电量 | ||||
| 	batteryLevel := models.BatteryLevel{ | ||||
| @@ -310,7 +329,7 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { | ||||
| 		ExternalPower:           event.ExternalPower, | ||||
| 	} | ||||
| 	// 记录区域主控的电池电量 | ||||
| 	c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel) | ||||
| 	c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, "battery_level", float64(batteryLevel.BatteryLevelRatio)) | ||||
|  | ||||
| } | ||||
|  | ||||
| @@ -378,24 +397,29 @@ func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) { | ||||
| // recordSensorData 是一个通用方法,用于将传感器数据存入数据库。 | ||||
| // regionalControllerID: 区域主控设备的ID | ||||
| // sensorDeviceID: 实际产生传感器数据的普通设备的ID | ||||
| func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, dataType models.SensorDataType, data interface{}) { | ||||
| 	// 2. 序列化数据结构体为 JSON | ||||
| 	jsonData, err := json.Marshal(data) | ||||
| // sensorName: 传感器值的名称 (例如 "temperature", "weight") | ||||
| // parsedValue: 已经解析并应用了乘数和偏移量的浮点数值 | ||||
| func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, sensorName string, parsedValue float64) { | ||||
| 	// 1. 将解析后的值封装成一个简单的 JSON 对象 | ||||
| 	dataToMarshal := map[string]float64{ | ||||
| 		"value": parsedValue, | ||||
| 	} | ||||
| 	jsonData, err := json.Marshal(dataToMarshal) | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// 3. 构建 SensorData 模型 | ||||
| 	// 2. 构建 SensorData 模型 | ||||
| 	sensorData := &models.SensorData{ | ||||
| 		Time:                 eventTime, | ||||
| 		DeviceID:             sensorDeviceID, | ||||
| 		RegionalControllerID: regionalControllerID, | ||||
| 		SensorDataType:       dataType, | ||||
| 		SensorDataType:       sensorName, // 直接使用 sensorName 作为数据类型 | ||||
| 		Data:                 datatypes.JSON(jsonData), | ||||
| 	} | ||||
|  | ||||
| 	// 4. 调用仓库创建记录 | ||||
| 	// 3. 调用仓库创建记录 | ||||
| 	if err := c.sensorDataRepo.Create(sensorData); err != nil { | ||||
| 		c.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err) | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user