Compare commits
4 Commits
17e2c6471a
...
cf53cdfe28
| Author | SHA1 | Date | |
|---|---|---|---|
| cf53cdfe28 | |||
| 21fb9c7e57 | |||
| f764ad8962 | |||
| 53dbe41d7b |
@@ -53,6 +53,8 @@ func NewAPI(cfg config.ServerConfig,
|
||||
userRepo repository.UserRepository,
|
||||
deviceRepository repository.DeviceRepository,
|
||||
planRepository repository.PlanRepository,
|
||||
sensorDataRepository repository.SensorDataRepository,
|
||||
executionLogRepository repository.ExecutionLogRepository,
|
||||
tokenService token.TokenService,
|
||||
listenHandler transport.ListenHandler,
|
||||
analysisTaskManager *task.AnalysisPlanTaskManager) *API {
|
||||
|
||||
@@ -1,33 +1,42 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"encoding/base64" // 新增导入
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||
"gorm.io/datatypes"
|
||||
)
|
||||
|
||||
// ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型
|
||||
const (
|
||||
eventUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。
|
||||
eventStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。
|
||||
eventJoin = "join" // 入网事件:当设备成功加入网络时触发。
|
||||
eventAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。
|
||||
eventTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。
|
||||
eventLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。
|
||||
eventLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。
|
||||
eventIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。
|
||||
eventTypeUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。
|
||||
eventTypeStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。
|
||||
eventTypeJoin = "join" // 入网事件:当设备成功加入网络时触发。
|
||||
eventTypeAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。
|
||||
eventTypeTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。
|
||||
eventTypeLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。
|
||||
eventTypeLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。
|
||||
eventTypeIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。
|
||||
)
|
||||
|
||||
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
|
||||
type ChirpStackListener struct {
|
||||
logger *logs.Logger
|
||||
logger *logs.Logger
|
||||
sensorDataRepo repository.SensorDataRepository
|
||||
deviceRepo repository.DeviceRepository
|
||||
}
|
||||
|
||||
func NewChirpStackListener(logger *logs.Logger) *ChirpStackListener {
|
||||
func NewChirpStackListener(logger *logs.Logger, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository) *ChirpStackListener {
|
||||
return &ChirpStackListener{
|
||||
logger: logger,
|
||||
logger: logger,
|
||||
sensorDataRepo: sensorDataRepo,
|
||||
deviceRepo: deviceRepo,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,7 +64,7 @@ func (c *ChirpStackListener) Handler() http.HandlerFunc {
|
||||
// handler 用于处理 ChirpStack 发送的事件
|
||||
func (c *ChirpStackListener) handler(data []byte, eventType string) {
|
||||
switch eventType {
|
||||
case eventUp:
|
||||
case eventTypeUp:
|
||||
var msg UpEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data))
|
||||
@@ -63,7 +72,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) {
|
||||
}
|
||||
c.handleUpEvent(&msg)
|
||||
|
||||
case eventJoin:
|
||||
case eventTypeJoin:
|
||||
var msg JoinEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data))
|
||||
@@ -71,7 +80,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) {
|
||||
}
|
||||
c.handleJoinEvent(&msg)
|
||||
|
||||
case eventAck:
|
||||
case eventTypeAck:
|
||||
var msg AckEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data))
|
||||
@@ -79,7 +88,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) {
|
||||
}
|
||||
c.handleAckEvent(&msg)
|
||||
|
||||
case eventTxAck:
|
||||
case eventTypeTxAck:
|
||||
var msg TxAckEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data))
|
||||
@@ -87,7 +96,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) {
|
||||
}
|
||||
c.handleTxAckEvent(&msg)
|
||||
|
||||
case eventStatus:
|
||||
case eventTypeStatus:
|
||||
var msg StatusEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data))
|
||||
@@ -95,7 +104,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) {
|
||||
}
|
||||
c.handleStatusEvent(&msg)
|
||||
|
||||
case eventLog:
|
||||
case eventTypeLog:
|
||||
var msg LogEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data))
|
||||
@@ -103,7 +112,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) {
|
||||
}
|
||||
c.handleLogEvent(&msg)
|
||||
|
||||
case eventLocation:
|
||||
case eventTypeLocation:
|
||||
var msg LocationEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data))
|
||||
@@ -111,7 +120,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) {
|
||||
}
|
||||
c.handleLocationEvent(&msg)
|
||||
|
||||
case eventIntegration:
|
||||
case eventTypeIntegration:
|
||||
var msg IntegrationEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data))
|
||||
@@ -126,16 +135,124 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) {
|
||||
|
||||
// --- 业务处理函数 ---
|
||||
|
||||
// GenericSensorReading 表示单个传感器读数,包含设备ID、类型和值。
|
||||
type GenericSensorReading struct {
|
||||
DeviceID uint `json:"device_id"` // 传感器设备的ID
|
||||
Type models.SensorDataType `json:"type"` // 传感器类型 (复用 models.SensorDataType)
|
||||
Value float64 `json:"value"` // 传感器读数
|
||||
}
|
||||
|
||||
// handleUpEvent 处理上行数据事件
|
||||
func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
|
||||
c.logger.Infof("处理 'up' 事件: %+v", event)
|
||||
// 在这里添加您的业务逻辑
|
||||
|
||||
// 记录信号强度
|
||||
// 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。
|
||||
// 因此,我们只取第一个 RxInfo 中的信号数据即可。
|
||||
if len(event.RxInfo) > 0 {
|
||||
rx := event.RxInfo[0] // 取第一个接收到的网关信息
|
||||
|
||||
// 构建 SignalMetrics 结构体
|
||||
signalMetrics := models.SignalMetrics{
|
||||
RssiDbm: rx.Rssi,
|
||||
SnrDb: rx.Snr,
|
||||
}
|
||||
|
||||
// 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui
|
||||
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
|
||||
if err != nil {
|
||||
c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
||||
return
|
||||
}
|
||||
// 记录区域主控的信号强度
|
||||
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics)
|
||||
} else {
|
||||
c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui)
|
||||
}
|
||||
|
||||
// 解析并记录传感器数据 (温度、湿度、重量)
|
||||
// 假设 event.Data (frmPayload) 是 Base64 编码的 JSON 数组字符串
|
||||
if event.Data != "" {
|
||||
decodedData, err := base64.StdEncoding.DecodeString(event.Data)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data)
|
||||
return
|
||||
}
|
||||
|
||||
var readings []GenericSensorReading
|
||||
if err := json.Unmarshal(decodedData, &readings); err != nil {
|
||||
c.logger.Errorf("解析 'up' 事件的解码后 Data (JSON 数组) 失败: %v, Decoded Data: %s", err, string(decodedData))
|
||||
return
|
||||
}
|
||||
|
||||
// 查找区域主控设备,以便记录其ID
|
||||
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
|
||||
if err != nil {
|
||||
c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, reading := range readings {
|
||||
// 根据类型构建具体的传感器数据结构体
|
||||
var sensorData interface{}
|
||||
var sensorDataType models.SensorDataType
|
||||
|
||||
switch reading.Type {
|
||||
case models.SensorDataTypeTemperature: // 使用枚举常量
|
||||
sensorData = models.TemperatureData{
|
||||
TemperatureCelsius: reading.Value,
|
||||
}
|
||||
sensorDataType = models.SensorDataTypeTemperature
|
||||
case models.SensorDataTypeHumidity: // 使用枚举常量
|
||||
sensorData = models.HumidityData{
|
||||
HumidityPercent: reading.Value,
|
||||
}
|
||||
sensorDataType = models.SensorDataTypeHumidity
|
||||
case models.SensorDataTypeWeight: // 使用枚举常量
|
||||
sensorData = models.WeightData{
|
||||
WeightKilograms: reading.Value,
|
||||
}
|
||||
sensorDataType = models.SensorDataTypeWeight
|
||||
default:
|
||||
c.logger.Warnf("处理 'up' 事件时遇到未知传感器类型: %s, Value: %f. 区域主控DevEui: %s, 传感器设备ID: %d",
|
||||
reading.Type, reading.Value, event.DeviceInfo.DevEui, reading.DeviceID)
|
||||
continue // 跳过未知类型
|
||||
}
|
||||
|
||||
// 记录普通设备的传感器数据
|
||||
c.recordSensorData(regionalController.ID, reading.DeviceID, event.Time, sensorDataType, sensorData)
|
||||
}
|
||||
} else {
|
||||
c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无法记录传感器数据。DevEui: %s", event.DeviceInfo.DevEui)
|
||||
}
|
||||
}
|
||||
|
||||
// handleStatusEvent 处理设备状态事件
|
||||
func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) {
|
||||
c.logger.Infof("处接收到理 'status' 事件: %+v", event)
|
||||
// 在这里添加您的业务逻辑
|
||||
|
||||
// 记录信号强度
|
||||
signalMetrics := models.SignalMetrics{
|
||||
MarginDb: event.Margin,
|
||||
}
|
||||
// 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui
|
||||
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
|
||||
if err != nil {
|
||||
c.logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
||||
return
|
||||
}
|
||||
// 记录区域主控的信号强度
|
||||
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics)
|
||||
|
||||
// 记录 电量
|
||||
batteryLevel := models.BatteryLevel{
|
||||
BatteryLevelRatio: event.BatteryLevel,
|
||||
BatteryLevelUnavailable: event.BatteryLevelUnavailable,
|
||||
ExternalPower: event.ExternalPower,
|
||||
}
|
||||
// 记录区域主控的电池电量
|
||||
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel)
|
||||
|
||||
}
|
||||
|
||||
// handleAckEvent 处理下行确认事件
|
||||
@@ -146,8 +263,23 @@ func (c *ChirpStackListener) handleAckEvent(event *AckEvent) {
|
||||
|
||||
// handleLogEvent 处理日志事件
|
||||
func (c *ChirpStackListener) handleLogEvent(event *LogEvent) {
|
||||
c.logger.Infof("接收到 'log' 事件: %+v", event)
|
||||
// 在这里添加您的业务逻辑
|
||||
// 首先,打印完整的事件结构体,用于详细排查
|
||||
c.logger.Infof("接收到 'log' 事件的完整内容: %+v", event)
|
||||
|
||||
// 接着,根据 ChirpStack 日志的级别,使用我们自己的 logger 对应级别来打印核心信息
|
||||
logMessage := "ChirpStack 日志: [%s] %s (DevEui: %s)"
|
||||
switch event.Level {
|
||||
case "INFO":
|
||||
c.logger.Infof(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||
case "WARNING":
|
||||
c.logger.Warnf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||
case "ERROR":
|
||||
c.logger.Errorf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||
default:
|
||||
// 对于未知级别,使用 Warn 级别打印,并明确指出级别未知
|
||||
c.logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)",
|
||||
event.Level, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||
}
|
||||
}
|
||||
|
||||
// handleJoinEvent 处理入网事件
|
||||
@@ -173,3 +305,29 @@ func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) {
|
||||
c.logger.Infof("接收到 'integration' 事件: %+v", event)
|
||||
// 在这里添加您的业务逻辑
|
||||
}
|
||||
|
||||
// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。
|
||||
// regionalControllerID: 区域主控设备的ID
|
||||
// sensorDeviceID: 实际产生传感器数据的普通设备的ID
|
||||
func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, dataType models.SensorDataType, data interface{}) {
|
||||
// 2. 序列化数据结构体为 JSON
|
||||
jsonData, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
c.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 3. 构建 SensorData 模型
|
||||
sensorData := &models.SensorData{
|
||||
Time: eventTime,
|
||||
DeviceID: sensorDeviceID,
|
||||
RegionalControllerID: regionalControllerID,
|
||||
SensorDataType: dataType,
|
||||
Data: datatypes.JSON(jsonData),
|
||||
}
|
||||
|
||||
// 4. 调用仓库创建记录
|
||||
if err := c.sensorDataRepo.Create(sensorData); err != nil {
|
||||
c.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,158 +8,191 @@ import (
|
||||
// --- 通用结构体 ---
|
||||
|
||||
// DeviceInfo 包含了所有事件中通用的设备信息。
|
||||
// 基于 aiserver.proto v4 (integration)
|
||||
type DeviceInfo struct {
|
||||
TenantID string `json:"tenantId"`
|
||||
TenantName string `json:"tenantName"`
|
||||
ApplicationID string `json:"applicationId"`
|
||||
ApplicationName string `json:"applicationName"`
|
||||
DeviceProfileID string `json:"deviceProfileId"`
|
||||
DeviceProfileName string `json:"deviceProfileName"`
|
||||
DeviceName string `json:"deviceName"`
|
||||
DevEui string `json:"devEui"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
TenantID string `json:"tenant_id"` // 租户ID
|
||||
TenantName string `json:"tenant_name"` // 租户名称
|
||||
ApplicationID string `json:"application_id"` // 应用ID
|
||||
ApplicationName string `json:"application_name"` // 应用名称
|
||||
DeviceProfileID string `json:"device_profile_id"` // 设备配置文件ID
|
||||
DeviceProfileName string `json:"device_profile_name"` // 设备配置文件名称
|
||||
DeviceName string `json:"device_name"` // 设备名称
|
||||
DevEui string `json:"dev_eui"` // 设备EUI (十六进制编码)
|
||||
DeviceClassEnabled string `json:"device_class_enabled,omitempty"` // 设备启用的LoRaWAN类别 (A, B, 或 C)
|
||||
Tags map[string]string `json:"tags"` // 用户定义的标签
|
||||
}
|
||||
|
||||
// Location 包含了地理位置信息。
|
||||
type Location struct {
|
||||
Latitude float64 `json:"latitude"`
|
||||
Longitude float64 `json:"longitude"`
|
||||
Altitude float64 `json:"altitude"`
|
||||
Latitude float64 `json:"latitude"` // 纬度
|
||||
Longitude float64 `json:"longitude"` // 经度
|
||||
Altitude float64 `json:"altitude"` // 海拔
|
||||
}
|
||||
|
||||
// --- 可复用的子结构体 ---
|
||||
|
||||
// UplinkRelayRxInfo 包含了上行中继接收信息。
|
||||
type UplinkRelayRxInfo struct {
|
||||
DevEui string `json:"dev_eui"` // 中继设备的DevEUI
|
||||
Frequency uint32 `json:"frequency"` // 接收频率
|
||||
Dr uint32 `json:"dr"` // 数据速率
|
||||
Snr int32 `json:"snr"` // 信噪比
|
||||
Rssi int32 `json:"rssi"` // 接收信号强度指示
|
||||
WorChannel uint32 `json:"wor_channel"` // Work-on-Relay 通道
|
||||
}
|
||||
|
||||
// KeyEnvelope 包装了一个加密的密钥。
|
||||
// 基于 common.proto
|
||||
type KeyEnvelope struct {
|
||||
KEKLabel string `json:"kek_label,omitempty"` // 密钥加密密钥 (KEK) 标签
|
||||
AESKey string `json:"aes_key,omitempty"` // Base64 编码的加密密钥
|
||||
}
|
||||
|
||||
// JoinServerContext 包含了 Join-Server 上下文。
|
||||
// 基于 common.proto
|
||||
type JoinServerContext struct {
|
||||
SessionKeyID string `json:"session_key_id"` // 会话密钥ID
|
||||
AppSKey *KeyEnvelope `json:"app_s_key,omitempty"` // 应用会话密钥
|
||||
}
|
||||
|
||||
// UplinkRxInfo 包含了上行接收信息。
|
||||
type UplinkRxInfo struct {
|
||||
GatewayID string `json:"gatewayId"`
|
||||
UplinkID uint32 `json:"uplinkId"`
|
||||
Time time.Time `json:"time"`
|
||||
Rssi int `json:"rssi"`
|
||||
Snr float64 `json:"snr"`
|
||||
Channel int `json:"channel"`
|
||||
Location *Location `json:"location"`
|
||||
Context string `json:"context"`
|
||||
Metadata map[string]string `json:"metadata"`
|
||||
GatewayID string `json:"gateway_id"` // 接收到上行数据的网关ID
|
||||
UplinkID uint32 `json:"uplink_id"` // 上行ID
|
||||
Time time.Time `json:"time"` // 接收时间
|
||||
Rssi int `json:"rssi"` // 接收信号强度指示
|
||||
Snr float64 `json:"snr"` // 信噪比
|
||||
Channel int `json:"channel"` // 接收通道
|
||||
Location *Location `json:"location"` // 网关位置
|
||||
Context string `json:"context"` // 上下文信息
|
||||
Metadata map[string]string `json:"metadata"` // 元数据
|
||||
}
|
||||
|
||||
// LoraModulationInfo 包含了 LoRa 调制的具体参数。
|
||||
type LoraModulationInfo struct {
|
||||
Bandwidth int `json:"bandwidth"`
|
||||
SpreadingFactor int `json:"spreadingFactor"`
|
||||
CodeRate string `json:"codeRate"`
|
||||
Polarization bool `json:"polarizationInvert,omitempty"` // omitempty 因为只在下行中出现
|
||||
Bandwidth int `json:"bandwidth"` // 带宽
|
||||
SpreadingFactor int `json:"spreading_factor"` // 扩频因子
|
||||
CodeRate string `json:"code_rate"` // 编码率
|
||||
Polarization bool `json:"polarization_invert,omitempty"` // 极化反转
|
||||
}
|
||||
|
||||
// Modulation 包含了具体的调制信息。
|
||||
type Modulation struct {
|
||||
Lora LoraModulationInfo `json:"lora"`
|
||||
Lora LoraModulationInfo `json:"lora"` // LoRa 调制信息
|
||||
}
|
||||
|
||||
// UplinkTxInfo 包含了上行发送信息。
|
||||
type UplinkTxInfo struct {
|
||||
Frequency int `json:"frequency"`
|
||||
Modulation Modulation `json:"modulation"`
|
||||
Frequency int `json:"frequency"` // 发送频率
|
||||
Modulation Modulation `json:"modulation"` // 调制信息
|
||||
}
|
||||
|
||||
// DownlinkTxInfo 包含了下行发送信息。
|
||||
type DownlinkTxInfo struct {
|
||||
Frequency int `json:"frequency"`
|
||||
Power int `json:"power"`
|
||||
Modulation Modulation `json:"modulation"`
|
||||
Frequency int `json:"frequency"` // 发送频率
|
||||
Power int `json:"power"` // 发送功率
|
||||
Modulation Modulation `json:"modulation"` // 调制信息
|
||||
}
|
||||
|
||||
// ResolvedLocation 包含了地理位置解析结果。
|
||||
type ResolvedLocation struct {
|
||||
Latitude float64 `json:"latitude"`
|
||||
Longitude float64 `json:"longitude"`
|
||||
Altitude float64 `json:"altitude"`
|
||||
Source string `json:"source"` // e.g. "GEO_RESOLVER_TDOA"
|
||||
Accuracy int `json:"accuracy"`
|
||||
Latitude float64 `json:"latitude"` // 纬度
|
||||
Longitude float64 `json:"longitude"` // 经度
|
||||
Altitude float64 `json:"altitude"` // 海拔
|
||||
Source string `json:"source"` // 位置来源
|
||||
Accuracy int `json:"accuracy"` // 精度
|
||||
}
|
||||
|
||||
// --- 事件专属结构体 ---
|
||||
|
||||
// UpEvent 对应 ChirpStack 的 "up" 事件。
|
||||
type UpEvent struct {
|
||||
DeduplicationID string `json:"deduplicationId"`
|
||||
Time time.Time `json:"time"`
|
||||
DeviceInfo DeviceInfo `json:"deviceInfo"`
|
||||
DevAddr string `json:"devAddr"`
|
||||
ADR bool `json:"adr"`
|
||||
DR int `json:"dr"`
|
||||
FCnt uint32 `json:"fCnt"`
|
||||
FPort uint8 `json:"fPort"`
|
||||
Confirmed bool `json:"confirmed"`
|
||||
Data string `json:"data"` // Base64 编码的原始数据
|
||||
Object json.RawMessage `json:"object"` // Codec 解码后的 JSON 对象
|
||||
RxInfo []UplinkRxInfo `json:"rxInfo"`
|
||||
TxInfo UplinkTxInfo `json:"txInfo"`
|
||||
DeduplicationID string `json:"deduplication_id"` // 去重ID
|
||||
Time time.Time `json:"time"` // 事件时间
|
||||
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
|
||||
DevAddr string `json:"dev_addr"` // 设备地址
|
||||
ADR bool `json:"adr"` // 自适应数据速率 (ADR) 是否启用
|
||||
DR int `json:"dr"` // 数据速率
|
||||
FCnt uint32 `json:"f_cnt"` // 帧计数器
|
||||
FPort uint8 `json:"f_port"` // 端口
|
||||
Confirmed bool `json:"confirmed"` // 是否是确认帧
|
||||
Data string `json:"data"` // Base64 编码的原始负载数据
|
||||
Object json.RawMessage `json:"object"` // 解码后的JSON对象负载
|
||||
RxInfo []UplinkRxInfo `json:"rx_info"` // 接收信息列表
|
||||
TxInfo UplinkTxInfo `json:"tx_info"` // 发送信息
|
||||
RelayRxInfo *UplinkRelayRxInfo `json:"relay_rx_info,omitempty"` // 中继接收信息
|
||||
JoinServerContext *JoinServerContext `json:"join_server_context,omitempty"` // Join-Server 上下文
|
||||
RegionConfigID string `json:"region_config_id,omitempty"` // 区域配置ID
|
||||
}
|
||||
|
||||
// JoinEvent 对应 ChirpStack 的 "join" 事件。
|
||||
type JoinEvent struct {
|
||||
DeduplicationID string `json:"deduplicationId"`
|
||||
Time time.Time `json:"time"`
|
||||
DeviceInfo DeviceInfo `json:"deviceInfo"`
|
||||
DevAddr string `json:"devAddr"`
|
||||
DeduplicationID string `json:"deduplication_id"` // 去重ID
|
||||
Time time.Time `json:"time"` // 事件时间
|
||||
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
|
||||
DevAddr string `json:"dev_addr"` // 设备地址
|
||||
RelayRxInfo *UplinkRelayRxInfo `json:"relay_rx_info,omitempty"` // 中继接收信息
|
||||
JoinServerContext *JoinServerContext `json:"join_server_context,omitempty"` // Join-Server 上下文
|
||||
RegionConfigID string `json:"region_config_id,omitempty"` // 区域配置ID
|
||||
}
|
||||
|
||||
// AckEvent 对应 ChirpStack 的 "ack" 事件。
|
||||
type AckEvent struct {
|
||||
DeduplicationID string `json:"deduplicationId"`
|
||||
Time time.Time `json:"time"`
|
||||
DeviceInfo DeviceInfo `json:"deviceInfo"`
|
||||
Acknowledged bool `json:"acknowledged"`
|
||||
FCntDown uint32 `json:"fCntDown"`
|
||||
QueueItemID string `json:"queueItemId"` // 关键字段,用于关联下行指令
|
||||
DeduplicationID string `json:"deduplication_id"` // 去重ID
|
||||
Time time.Time `json:"time"` // 事件时间
|
||||
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
|
||||
Acknowledged bool `json:"acknowledged"` // 是否已确认
|
||||
FCntDown uint32 `json:"f_cnt_down"` // 下行帧计数器
|
||||
QueueItemID string `json:"queue_item_id"` // 队列项ID
|
||||
}
|
||||
|
||||
// TxAckEvent 对应 ChirpStack 的 "txack" 事件。
|
||||
type TxAckEvent struct {
|
||||
DeduplicationID string `json:"deduplicationId"`
|
||||
Time time.Time `json:"time"`
|
||||
DeviceInfo DeviceInfo `json:"deviceInfo"`
|
||||
FCntDown uint32 `json:"fCntDown"`
|
||||
GatewayID string `json:"gatewayId"`
|
||||
QueueItemID string `json:"queueItemId"` // 关键字段,用于关联下行指令
|
||||
TxInfo DownlinkTxInfo `json:"txInfo"`
|
||||
DownlinkID uint32 `json:"downlink_id"` // 下行ID
|
||||
Time time.Time `json:"time"` // 事件时间
|
||||
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
|
||||
FCntDown uint32 `json:"f_cnt_down"` // 下行帧计数器
|
||||
GatewayID string `json:"gateway_id"` // 网关ID
|
||||
QueueItemID string `json:"queue_item_id"` // 队列项ID
|
||||
TxInfo DownlinkTxInfo `json:"tx_info"` // 下行发送信息
|
||||
}
|
||||
|
||||
// StatusEvent 对应 ChirpStack 的 "status" 事件。
|
||||
type StatusEvent struct {
|
||||
DeduplicationID string `json:"deduplicationId"`
|
||||
Time time.Time `json:"time"`
|
||||
DeviceInfo DeviceInfo `json:"deviceInfo"`
|
||||
Margin int `json:"margin"` // 信号余量,可以近似看作 SNR
|
||||
ExternalPower bool `json:"externalPowerSource"`
|
||||
BatteryLevel float32 `json:"batteryLevel"` // 电池电量百分比
|
||||
BatteryLevelUnavailable bool `json:"batteryLevelUnavailable"`
|
||||
DeduplicationID string `json:"deduplication_id"` // 去重ID
|
||||
Time time.Time `json:"time"` // 事件时间
|
||||
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
|
||||
Margin int `json:"margin"` // 链路预算余量 (dB)
|
||||
ExternalPower bool `json:"external_power_source"` // 设备是否连接外部电源
|
||||
BatteryLevel float32 `json:"battery_level"` // 电池剩余电量
|
||||
BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电池电量是否不可用
|
||||
}
|
||||
|
||||
// LogEvent 对应 ChirpStack 的 "log" 事件。
|
||||
type LogEvent struct {
|
||||
DeduplicationID string `json:"deduplicationId"`
|
||||
Time time.Time `json:"time"`
|
||||
DeviceInfo DeviceInfo `json:"deviceInfo"`
|
||||
Level string `json:"level"` // 日志级别, e.g., "INFO", "WARNING", "ERROR"
|
||||
Code string `json:"code"` // 日志代码, e.g., "UPLINK_F_CNT_RETRANSMISSION"
|
||||
Description string `json:"description"`
|
||||
Context map[string]string `json:"context"`
|
||||
DeduplicationID string `json:"deduplication_id"` // 去重ID
|
||||
Time time.Time `json:"time"` // 事件时间
|
||||
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
|
||||
Level string `json:"level"` // 日志级别 (e.g., INFO, WARNING, ERROR)
|
||||
Code string `json:"code"` // 日志代码
|
||||
Description string `json:"description"` // 日志描述
|
||||
Context map[string]string `json:"context"` // 上下文信息
|
||||
}
|
||||
|
||||
// LocationEvent 对应 ChirpStack 的 "location" 事件。
|
||||
type LocationEvent struct {
|
||||
DeduplicationID string `json:"deduplicationId"`
|
||||
Time time.Time `json:"time"`
|
||||
DeviceInfo DeviceInfo `json:"deviceInfo"`
|
||||
Location ResolvedLocation `json:"location"`
|
||||
DeduplicationID string `json:"deduplication_id"` // 去重ID
|
||||
Time time.Time `json:"time"` // 事件时间
|
||||
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
|
||||
Location ResolvedLocation `json:"location"` // 解析后的位置信息
|
||||
}
|
||||
|
||||
// IntegrationEvent 对应 ChirpStack 的 "integration" 事件。
|
||||
type IntegrationEvent struct {
|
||||
DeduplicationID string `json:"deduplicationId"`
|
||||
Time time.Time `json:"time"`
|
||||
DeviceInfo DeviceInfo `json:"deviceInfo"`
|
||||
IntegrationName string `json:"integrationName"`
|
||||
Object json.RawMessage `json:"object"`
|
||||
DeduplicationID string `json:"deduplication_id"` // 去重ID
|
||||
Time time.Time `json:"time"` // 事件时间
|
||||
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
|
||||
IntegrationName string `json:"integration_name"` // 集成名称
|
||||
EventType string `json:"event_type,omitempty"` // 事件类型
|
||||
Object json.RawMessage `json:"object"` // 集成事件的原始JSON负载
|
||||
}
|
||||
|
||||
@@ -69,8 +69,11 @@ func NewApplication(configPath string) (*Application, error) {
|
||||
// 初始化执行日志仓库
|
||||
executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB())
|
||||
|
||||
// 初始化传感器数据仓库
|
||||
sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB())
|
||||
|
||||
// 初始化设备上行监听器
|
||||
listenHandler := transport.NewChirpStackListener(logger)
|
||||
listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo)
|
||||
|
||||
// 初始化计划触发器管理器
|
||||
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
|
||||
@@ -79,7 +82,7 @@ func NewApplication(configPath string) (*Application, error) {
|
||||
executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers)
|
||||
|
||||
// 初始化 API 服务器
|
||||
apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService, listenHandler, analysisPlanTaskManager)
|
||||
apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager)
|
||||
|
||||
// 组装 Application 对象
|
||||
app := &Application{
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"gorm.io/datatypes"
|
||||
)
|
||||
|
||||
// SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。
|
||||
type SensorData struct {
|
||||
// Time 是数据记录的时间戳,作为复合主键的一部分。
|
||||
// GORM 会将其映射到 'time' TIMESTAMPTZ 列。
|
||||
Time time.Time `gorm:"primaryKey" json:"time"`
|
||||
|
||||
// DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。
|
||||
// GORM 会将其映射到 'device_id' VARCHAR(50) 列。
|
||||
DeviceID uint `gorm:"primaryKey" json:"device_id"`
|
||||
|
||||
// RegionalControllerID 是上报此数据的区域主控的ID。
|
||||
// 我们为其添加了数据库索引以优化按区域查询的性能。
|
||||
RegionalControllerID uint `json:"regional_controller_id"`
|
||||
|
||||
// Data 存储一个或多个传感器读数,格式为 JSON。
|
||||
// GORM 会使用 'jsonb' 类型来创建此列。
|
||||
Data datatypes.JSON `gorm:"type:jsonb" json:"data"`
|
||||
}
|
||||
|
||||
func (SensorData) TableName() string {
|
||||
return "sensor_data"
|
||||
}
|
||||
74
internal/infra/models/sensor_data.go
Normal file
74
internal/infra/models/sensor_data.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"gorm.io/datatypes"
|
||||
)
|
||||
|
||||
// SensorDataType 定义了 SensorData 记录中 Data 字段的整体类型
|
||||
type SensorDataType string
|
||||
|
||||
const (
|
||||
SensorDataTypeSignalMetrics SensorDataType = "signal_metrics" // 信号强度
|
||||
SensorDataTypeBatteryLevel SensorDataType = "battery_level" // 电池电量
|
||||
SensorDataTypeTemperature SensorDataType = "temperature" // 温度
|
||||
SensorDataTypeHumidity SensorDataType = "humidity" // 湿度
|
||||
SensorDataTypeWeight SensorDataType = "weight" // 重量
|
||||
)
|
||||
|
||||
// SignalMetrics 存储信号强度数据
|
||||
type SignalMetrics struct {
|
||||
RssiDbm int `json:"rssi_dbm"` // 绝对信号强度(dBm),受距离、障碍物影响
|
||||
SnrDb float64 `json:"snr_db"` // 信号与噪声的相对比率(dB),由 RSSI 减去噪声地板(Noise Floor)
|
||||
SensitivityDbm int `json:"sensitivity_dbm"` // 网关的最低检测阈值(dBm)
|
||||
MarginDb int `json:"margin_db"` // SNR 相对于接收器灵敏度的余量, Margin = SNR - Sensitivity
|
||||
}
|
||||
|
||||
// BatteryLevel 存储电池电量数据
|
||||
type BatteryLevel struct {
|
||||
BatteryLevelRatio float32 `json:"battery_level_ratio"` // 电量剩余百分比(%)
|
||||
BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电量数据不可用
|
||||
ExternalPower bool `json:"external_power"` // 是否使用外部电源
|
||||
}
|
||||
|
||||
// TemperatureData 存储温度数据
|
||||
type TemperatureData struct {
|
||||
TemperatureCelsius float64 `json:"temperature_celsius"` // 温度值 (摄氏度)
|
||||
}
|
||||
|
||||
// HumidityData 存储湿度数据
|
||||
type HumidityData struct {
|
||||
HumidityPercent float64 `json:"humidity_percent"` // 湿度值 (%)
|
||||
}
|
||||
|
||||
// WeightData 存储重量数据
|
||||
type WeightData struct {
|
||||
WeightKilograms float64 `json:"weight_kilograms"` // 重量值 (公斤)
|
||||
}
|
||||
|
||||
// SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。
|
||||
type SensorData struct {
|
||||
// Time 是数据记录的时间戳,作为复合主键的一部分。
|
||||
// GORM 会将其映射到 'time' TIMESTAMPTZ 列。
|
||||
Time time.Time `gorm:"primaryKey" json:"time"`
|
||||
|
||||
// DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。
|
||||
// GORM 会将其映射到 'device_id' VARCHAR(50) 列。
|
||||
DeviceID uint `gorm:"primaryKey" json:"device_id"`
|
||||
|
||||
// RegionalControllerID 是上报此数据的区域主控的ID。
|
||||
// 我们为其添加了数据库索引以优化按区域查询的性能。
|
||||
RegionalControllerID uint `json:"regional_controller_id"`
|
||||
|
||||
// SensorDataType 是传感数据的类型
|
||||
SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"`
|
||||
|
||||
// Data 存储一个或多个传感器读数,格式为 JSON。
|
||||
// GORM 会使用 'jsonb' 类型来创建此列。
|
||||
Data datatypes.JSON `gorm:"type:jsonb" json:"data"`
|
||||
}
|
||||
|
||||
func (SensorData) TableName() string {
|
||||
return "sensor_data"
|
||||
}
|
||||
@@ -32,6 +32,9 @@ type DeviceRepository interface {
|
||||
|
||||
// Delete 根据主键 ID 删除一个设备
|
||||
Delete(id uint) error
|
||||
|
||||
// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 (新增)
|
||||
FindByDevEui(devEui string) (*models.Device, error)
|
||||
}
|
||||
|
||||
// gormDeviceRepository 是 DeviceRepository 的 GORM 实现
|
||||
@@ -108,3 +111,13 @@ func (r *gormDeviceRepository) Update(device *models.Device) error {
|
||||
func (r *gormDeviceRepository) Delete(id uint) error {
|
||||
return r.db.Delete(&models.Device{}, id).Error
|
||||
}
|
||||
|
||||
// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备
|
||||
func (r *gormDeviceRepository) FindByDevEui(devEui string) (*models.Device, error) {
|
||||
var device models.Device
|
||||
// 使用 GORM 的 JSONB 查询语法: properties->>'lora_address'
|
||||
if err := r.db.Where("properties->>'lora_address' = ?", devEui).First(&device).Error; err != nil {
|
||||
return nil, err // 如果找不到或发生其他错误,GORM 会返回错误
|
||||
}
|
||||
return &device, nil
|
||||
}
|
||||
|
||||
27
internal/infra/repository/sensor_data_repository.go
Normal file
27
internal/infra/repository/sensor_data_repository.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// SensorDataRepository 定义了与传感器数据相关的数据库操作接口。
|
||||
type SensorDataRepository interface {
|
||||
Create(sensorData *models.SensorData) error
|
||||
}
|
||||
|
||||
// gormSensorDataRepository 是 SensorDataRepository 的 GORM 实现。
|
||||
type gormSensorDataRepository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
// NewGormSensorDataRepository 创建一个新的 SensorDataRepository GORM 实现实例。
|
||||
// 它直接接收一个 *gorm.DB 实例作为依赖,完全遵循项目中的既定模式。
|
||||
func NewGormSensorDataRepository(db *gorm.DB) SensorDataRepository {
|
||||
return &gormSensorDataRepository{db: db}
|
||||
}
|
||||
|
||||
// Create 将一条新的传感器数据记录插入数据库。
|
||||
func (r *gormSensorDataRepository) Create(sensorData *models.SensorData) error {
|
||||
return r.db.Create(sensorData).Error
|
||||
}
|
||||
Reference in New Issue
Block a user