diff --git a/README.md b/README.md index bba3f97..fbe6388 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,11 @@ # 猪场管理系统 +## 安装说明 + +### 推荐使用 TimescaleDB + +TimescaleDB 是基于 PostgreSQL 的开源数据库, 专门为处理时序数据而设计的。可以应对后续传海量传感器数据 + ## 功能介绍 ### 一. 猪舍控制 diff --git a/config.yml b/config.yml index 79bff63..2b428db 100644 --- a/config.yml +++ b/config.yml @@ -8,7 +8,7 @@ app: # HTTP 服务配置 server: port: 8086 - mode: "debug" # Gin 运行模式: "debug", "release", "test" + mode: "release" # Gin 运行模式: "debug", "release", "test" # 日志配置 log: @@ -29,6 +29,7 @@ database: password: "pig-farm-controller" dbname: "pig-farm-controller" sslmode: "disable" # 在生产环境中建议使用 "require" + is_timescaledb: true max_open_conns: 25 # 最大开放连接数 max_idle_conns: 10 # 最大空闲连接数 conn_max_lifetime: 600 # 连接最大生命周期(秒) diff --git a/internal/app/api/api.go b/internal/app/api/api.go index 56f1dc8..89c48d7 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -12,6 +12,7 @@ import ( "context" "fmt" "net/http" + "net/http/pprof" "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/controller/device" @@ -52,6 +53,8 @@ func NewAPI(cfg config.ServerConfig, userRepo repository.UserRepository, deviceRepository repository.DeviceRepository, planRepository repository.PlanRepository, + sensorDataRepository repository.SensorDataRepository, + executionLogRepository repository.ExecutionLogRepository, tokenService token.TokenService, listenHandler transport.ListenHandler, analysisTaskManager *task.AnalysisPlanTaskManager) *API { @@ -99,6 +102,7 @@ func (a *API) setupRoutes() { userGroup.POST("", a.userController.CreateUser) // 注册创建用户接口 (POST /api/v1/users) userGroup.POST("/login", a.userController.Login) // 注册用户登录接口 (POST /api/v1/users/login) } + a.logger.Info("用户相关接口注册成功") // 设备相关路由组 deviceGroup := v1.Group("/devices") @@ -109,6 +113,7 @@ func (a *API) setupRoutes() { deviceGroup.PUT("/:id", a.deviceController.UpdateDevice) deviceGroup.DELETE("/:id", a.deviceController.DeleteDevice) } + a.logger.Info("设备相关接口注册成功") // 计划相关路由组 planGroup := v1.Group("/plans") @@ -121,17 +126,38 @@ func (a *API) setupRoutes() { planGroup.POST("/:id/start", a.planController.StartPlan) planGroup.POST("/:id/stop", a.planController.StopPlan) } + a.logger.Info("计划相关接口注册成功") } + // 注册 pprof 路由 + pprofGroup := a.engine.Group("/debug/pprof") + { + pprofGroup.GET("/", gin.WrapF(pprof.Index)) + pprofGroup.GET("/cmdline", gin.WrapF(pprof.Cmdline)) + pprofGroup.GET("/profile", gin.WrapF(pprof.Profile)) + pprofGroup.POST("/symbol", gin.WrapF(pprof.Symbol)) + pprofGroup.GET("/symbol", gin.WrapF(pprof.Symbol)) + pprofGroup.GET("/trace", gin.WrapF(pprof.Trace)) + pprofGroup.GET("/allocs", gin.WrapH(pprof.Handler("allocs"))) + pprofGroup.GET("/block", gin.WrapH(pprof.Handler("block"))) + pprofGroup.GET("/goroutine", gin.WrapH(pprof.Handler("goroutine"))) + pprofGroup.GET("/heap", gin.WrapH(pprof.Handler("heap"))) + pprofGroup.GET("/mutex", gin.WrapH(pprof.Handler("mutex"))) + pprofGroup.GET("/threadcreate", gin.WrapH(pprof.Handler("threadcreate"))) + } + a.logger.Info("pprof 接口注册成功") + // 上行事件监听路由 a.engine.POST("/upstream", func(c *gin.Context) { h := a.listenHandler.Handler() h.ServeHTTP(c.Writer, c.Request) }) + a.logger.Info("上行事件监听接口注册成功") - // 添加 Swagger UI 路由 + // 添加 Swagger UI 路由, Swagger UI可在 /swagger/index.html 上找到 a.engine.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) - a.logger.Info("Swagger UI is available at /swagger/index.html") + a.logger.Info("Swagger UI 接口注册成功") + } // Start 启动 HTTP 服务器 diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index 9828313..a3ec41f 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -1,52 +1,350 @@ package transport import ( + "encoding/base64" // 新增导入 + "encoding/json" "io" "net/http" + "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "gorm.io/datatypes" +) + +// ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型 +const ( + eventTypeUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。 + eventTypeStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。 + eventTypeJoin = "join" // 入网事件:当设备成功加入网络时触发。 + eventTypeAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。 + eventTypeTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。 + eventTypeLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。 + eventTypeLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。 + eventTypeIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。 ) // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 type ChirpStackListener struct { - logger *logs.Logger + logger *logs.Logger + sensorDataRepo repository.SensorDataRepository + deviceRepo repository.DeviceRepository + deviceCommandLogRepo repository.DeviceCommandLogRepository } -func NewChirpStackListener(logger *logs.Logger) *ChirpStackListener { +func NewChirpStackListener( + logger *logs.Logger, + sensorDataRepo repository.SensorDataRepository, + deviceRepo repository.DeviceRepository, + deviceCommandLogRepo repository.DeviceCommandLogRepository, +) *ChirpStackListener { return &ChirpStackListener{ - logger: logger, + logger: logger, + sensorDataRepo: sensorDataRepo, + deviceRepo: deviceRepo, + deviceCommandLogRepo: deviceCommandLogRepo, } } +// Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息 func (c *ChirpStackListener) Handler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + b, err := io.ReadAll(r.Body) if err != nil { c.logger.Errorf("读取请求体失败: %v", err) - - // TODO 直接崩溃不太合适 - panic(err) + http.Error(w, "failed to read body", http.StatusBadRequest) + return } event := r.URL.Query().Get("event") - switch event { - case "up": // 链路上行事件 - err = c.up(b) - if err != nil { - c.logger.Errorf("处理链路上行事件失败: %v", err) + w.WriteHeader(http.StatusOK) - // TODO 直接崩溃不太合适 - panic(err) - } - default: - c.logger.Errorf("未知的ChirpStack事件: %s", event) - } + // 将异步处理逻辑委托给 handler 方法 + go c.handler(b, event) } } -// up 处理链路上行事件 -func (c *ChirpStackListener) up(data []byte) error { - // TODO implement me - panic("implement me") +// handler 用于处理 ChirpStack 发送的事件 +func (c *ChirpStackListener) handler(data []byte, eventType string) { + switch eventType { + case eventTypeUp: + var msg UpEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleUpEvent(&msg) + + case eventTypeJoin: + var msg JoinEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleJoinEvent(&msg) + + case eventTypeAck: + var msg AckEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleAckEvent(&msg) + + case eventTypeTxAck: + var msg TxAckEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleTxAckEvent(&msg) + + case eventTypeStatus: + var msg StatusEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleStatusEvent(&msg) + + case eventTypeLog: + var msg LogEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleLogEvent(&msg) + + case eventTypeLocation: + var msg LocationEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleLocationEvent(&msg) + + case eventTypeIntegration: + var msg IntegrationEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleIntegrationEvent(&msg) + + default: + c.logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data)) + } +} + +// --- 业务处理函数 --- + +// GenericSensorReading 表示单个传感器读数,包含设备ID、类型和值。 +type GenericSensorReading struct { + DeviceID uint `json:"device_id"` // 传感器设备的ID + Type models.SensorDataType `json:"type"` // 传感器类型 (复用 models.SensorDataType) + Value float64 `json:"value"` // 传感器读数 +} + +// handleUpEvent 处理上行数据事件 +func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { + c.logger.Infof("处理 'up' 事件: %+v", event) + + // 记录信号强度 + // 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。 + // 因此,我们只取第一个 RxInfo 中的信号数据即可。 + if len(event.RxInfo) > 0 { + rx := event.RxInfo[0] // 取第一个接收到的网关信息 + + // 构建 SignalMetrics 结构体 + signalMetrics := models.SignalMetrics{ + RssiDbm: rx.Rssi, + SnrDb: rx.Snr, + } + + // 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + // 记录区域主控的信号强度 + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + } else { + c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui) + } + + // 解析并记录传感器数据 (温度、湿度、重量) + // 假设 event.Data (frmPayload) 是 Base64 编码的 JSON 数组字符串 + if event.Data != "" { + decodedData, err := base64.StdEncoding.DecodeString(event.Data) + if err != nil { + c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data) + return + } + + var readings []GenericSensorReading + if err := json.Unmarshal(decodedData, &readings); err != nil { + c.logger.Errorf("解析 'up' 事件的解码后 Data (JSON 数组) 失败: %v, Decoded Data: %s", err, string(decodedData)) + return + } + + // 查找区域主控设备,以便记录其ID + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + + for _, reading := range readings { + // 根据类型构建具体的传感器数据结构体 + var sensorData interface{} + var sensorDataType models.SensorDataType + + switch reading.Type { + case models.SensorDataTypeTemperature: // 使用枚举常量 + sensorData = models.TemperatureData{ + TemperatureCelsius: reading.Value, + } + sensorDataType = models.SensorDataTypeTemperature + case models.SensorDataTypeHumidity: // 使用枚举常量 + sensorData = models.HumidityData{ + HumidityPercent: reading.Value, + } + sensorDataType = models.SensorDataTypeHumidity + case models.SensorDataTypeWeight: // 使用枚举常量 + sensorData = models.WeightData{ + WeightKilograms: reading.Value, + } + sensorDataType = models.SensorDataTypeWeight + default: + c.logger.Warnf("处理 'up' 事件时遇到未知传感器类型: %s, Value: %f. 区域主控DevEui: %s, 传感器设备ID: %d", + reading.Type, reading.Value, event.DeviceInfo.DevEui, reading.DeviceID) + continue // 跳过未知类型 + } + + // 记录普通设备的传感器数据 + c.recordSensorData(regionalController.ID, reading.DeviceID, event.Time, sensorDataType, sensorData) + } + } else { + c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无法记录传感器数据。DevEui: %s", event.DeviceInfo.DevEui) + } +} + +// handleStatusEvent 处理设备状态事件 +func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { + c.logger.Infof("处接收到理 'status' 事件: %+v", event) + + // 记录信号强度 + signalMetrics := models.SignalMetrics{ + MarginDb: event.Margin, + } + // 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + // 记录区域主控的信号强度 + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + + // 记录 电量 + batteryLevel := models.BatteryLevel{ + BatteryLevelRatio: event.BatteryLevel, + BatteryLevelUnavailable: event.BatteryLevelUnavailable, + ExternalPower: event.ExternalPower, + } + // 记录区域主控的电池电量 + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel) + +} + +// handleAckEvent 处理下行确认事件 +func (c *ChirpStackListener) handleAckEvent(event *AckEvent) { + c.logger.Infof("接收到 'ack' 事件: %+v", event) + + // 更新下行任务记录的确认时间及接收成功状态 + err := c.deviceCommandLogRepo.UpdateAcknowledgedAt(event.DeduplicationID, event.Time, event.Acknowledged) + if err != nil { + c.logger.Errorf("更新下行任务记录的确认时间及接收成功状态失败 (MessageID: %s, DevEui: %s, Acknowledged: %t): %v", + event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, err) + return + } + + c.logger.Infof("成功更新下行任务记录确认时间及接收成功状态 (MessageID: %s, DevEui: %s, Acknowledged: %t, AcknowledgedAt: %s)", + event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, event.Time.Format(time.RFC3339)) +} + +// handleLogEvent 处理日志事件 +func (c *ChirpStackListener) handleLogEvent(event *LogEvent) { + // 首先,打印完整的事件结构体,用于详细排查 + c.logger.Infof("接收到 'log' 事件的完整内容: %+v", event) + + // 接着,根据 ChirpStack 日志的级别,使用我们自己的 logger 对应级别来打印核心信息 + logMessage := "ChirpStack 日志: [%s] %s (DevEui: %s)" + switch event.Level { + case "INFO": + c.logger.Infof(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui) + case "WARNING": + c.logger.Warnf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui) + case "ERROR": + c.logger.Errorf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui) + default: + // 对于未知级别,使用 Warn 级别打印,并明确指出级别未知 + c.logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)", + event.Level, event.Code, event.Description, event.DeviceInfo.DevEui) + } +} + +// handleJoinEvent 处理入网事件 +func (c *ChirpStackListener) handleJoinEvent(event *JoinEvent) { + c.logger.Infof("接收到 'join' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleTxAckEvent 处理网关发送确认事件 +func (c *ChirpStackListener) handleTxAckEvent(event *TxAckEvent) { + c.logger.Infof("接收到 'txack' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleLocationEvent 处理位置事件 +func (c *ChirpStackListener) handleLocationEvent(event *LocationEvent) { + c.logger.Infof("接收到 'location' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleIntegrationEvent 处理集成事件 +func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) { + c.logger.Infof("接收到 'integration' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。 +// regionalControllerID: 区域主控设备的ID +// sensorDeviceID: 实际产生传感器数据的普通设备的ID +func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, dataType models.SensorDataType, data interface{}) { + // 2. 序列化数据结构体为 JSON + jsonData, err := json.Marshal(data) + if err != nil { + c.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err) + return + } + + // 3. 构建 SensorData 模型 + sensorData := &models.SensorData{ + Time: eventTime, + DeviceID: sensorDeviceID, + RegionalControllerID: regionalControllerID, + SensorDataType: dataType, + Data: datatypes.JSON(jsonData), + } + + // 4. 调用仓库创建记录 + if err := c.sensorDataRepo.Create(sensorData); err != nil { + c.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err) + } } diff --git a/internal/app/service/transport/chirp_stack_types.go b/internal/app/service/transport/chirp_stack_types.go new file mode 100644 index 0000000..a8b5bdd --- /dev/null +++ b/internal/app/service/transport/chirp_stack_types.go @@ -0,0 +1,198 @@ +package transport + +import ( + "encoding/json" + "time" +) + +// --- 通用结构体 --- + +// DeviceInfo 包含了所有事件中通用的设备信息。 +// 基于 aiserver.proto v4 (integration) +type DeviceInfo struct { + TenantID string `json:"tenant_id"` // 租户ID + TenantName string `json:"tenant_name"` // 租户名称 + ApplicationID string `json:"application_id"` // 应用ID + ApplicationName string `json:"application_name"` // 应用名称 + DeviceProfileID string `json:"device_profile_id"` // 设备配置文件ID + DeviceProfileName string `json:"device_profile_name"` // 设备配置文件名称 + DeviceName string `json:"device_name"` // 设备名称 + DevEui string `json:"dev_eui"` // 设备EUI (十六进制编码) + DeviceClassEnabled string `json:"device_class_enabled,omitempty"` // 设备启用的LoRaWAN类别 (A, B, 或 C) + Tags map[string]string `json:"tags"` // 用户定义的标签 +} + +// Location 包含了地理位置信息。 +type Location struct { + Latitude float64 `json:"latitude"` // 纬度 + Longitude float64 `json:"longitude"` // 经度 + Altitude float64 `json:"altitude"` // 海拔 +} + +// --- 可复用的子结构体 --- + +// UplinkRelayRxInfo 包含了上行中继接收信息。 +type UplinkRelayRxInfo struct { + DevEui string `json:"dev_eui"` // 中继设备的DevEUI + Frequency uint32 `json:"frequency"` // 接收频率 + Dr uint32 `json:"dr"` // 数据速率 + Snr int32 `json:"snr"` // 信噪比 + Rssi int32 `json:"rssi"` // 接收信号强度指示 + WorChannel uint32 `json:"wor_channel"` // Work-on-Relay 通道 +} + +// KeyEnvelope 包装了一个加密的密钥。 +// 基于 common.proto +type KeyEnvelope struct { + KEKLabel string `json:"kek_label,omitempty"` // 密钥加密密钥 (KEK) 标签 + AESKey string `json:"aes_key,omitempty"` // Base64 编码的加密密钥 +} + +// JoinServerContext 包含了 Join-Server 上下文。 +// 基于 common.proto +type JoinServerContext struct { + SessionKeyID string `json:"session_key_id"` // 会话密钥ID + AppSKey *KeyEnvelope `json:"app_s_key,omitempty"` // 应用会话密钥 +} + +// UplinkRxInfo 包含了上行接收信息。 +type UplinkRxInfo struct { + GatewayID string `json:"gateway_id"` // 接收到上行数据的网关ID + UplinkID uint32 `json:"uplink_id"` // 上行ID + Time time.Time `json:"time"` // 接收时间 + Rssi int `json:"rssi"` // 接收信号强度指示 + Snr float64 `json:"snr"` // 信噪比 + Channel int `json:"channel"` // 接收通道 + Location *Location `json:"location"` // 网关位置 + Context string `json:"context"` // 上下文信息 + Metadata map[string]string `json:"metadata"` // 元数据 +} + +// LoraModulationInfo 包含了 LoRa 调制的具体参数。 +type LoraModulationInfo struct { + Bandwidth int `json:"bandwidth"` // 带宽 + SpreadingFactor int `json:"spreading_factor"` // 扩频因子 + CodeRate string `json:"code_rate"` // 编码率 + Polarization bool `json:"polarization_invert,omitempty"` // 极化反转 +} + +// Modulation 包含了具体的调制信息。 +type Modulation struct { + Lora LoraModulationInfo `json:"lora"` // LoRa 调制信息 +} + +// UplinkTxInfo 包含了上行发送信息。 +type UplinkTxInfo struct { + Frequency int `json:"frequency"` // 发送频率 + Modulation Modulation `json:"modulation"` // 调制信息 +} + +// DownlinkTxInfo 包含了下行发送信息。 +type DownlinkTxInfo struct { + Frequency int `json:"frequency"` // 发送频率 + Power int `json:"power"` // 发送功率 + Modulation Modulation `json:"modulation"` // 调制信息 +} + +// ResolvedLocation 包含了地理位置解析结果。 +type ResolvedLocation struct { + Latitude float64 `json:"latitude"` // 纬度 + Longitude float64 `json:"longitude"` // 经度 + Altitude float64 `json:"altitude"` // 海拔 + Source string `json:"source"` // 位置来源 + Accuracy int `json:"accuracy"` // 精度 +} + +// --- 事件专属结构体 --- + +// UpEvent 对应 ChirpStack 的 "up" 事件。 +type UpEvent struct { + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + DevAddr string `json:"dev_addr"` // 设备地址 + ADR bool `json:"adr"` // 自适应数据速率 (ADR) 是否启用 + DR int `json:"dr"` // 数据速率 + FCnt uint32 `json:"f_cnt"` // 帧计数器 + FPort uint8 `json:"f_port"` // 端口 + Confirmed bool `json:"confirmed"` // 是否是确认帧 + Data string `json:"data"` // Base64 编码的原始负载数据 + Object json.RawMessage `json:"object"` // 解码后的JSON对象负载 + RxInfo []UplinkRxInfo `json:"rx_info"` // 接收信息列表 + TxInfo UplinkTxInfo `json:"tx_info"` // 发送信息 + RelayRxInfo *UplinkRelayRxInfo `json:"relay_rx_info,omitempty"` // 中继接收信息 + JoinServerContext *JoinServerContext `json:"join_server_context,omitempty"` // Join-Server 上下文 + RegionConfigID string `json:"region_config_id,omitempty"` // 区域配置ID +} + +// JoinEvent 对应 ChirpStack 的 "join" 事件。 +type JoinEvent struct { + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + DevAddr string `json:"dev_addr"` // 设备地址 + RelayRxInfo *UplinkRelayRxInfo `json:"relay_rx_info,omitempty"` // 中继接收信息 + JoinServerContext *JoinServerContext `json:"join_server_context,omitempty"` // Join-Server 上下文 + RegionConfigID string `json:"region_config_id,omitempty"` // 区域配置ID +} + +// AckEvent 对应 ChirpStack 的 "ack" 事件。 +type AckEvent struct { + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + Acknowledged bool `json:"acknowledged"` // 是否已确认 + FCntDown uint32 `json:"f_cnt_down"` // 下行帧计数器 + QueueItemID string `json:"queue_item_id"` // 队列项ID +} + +// TxAckEvent 对应 ChirpStack 的 "txack" 事件。 +type TxAckEvent struct { + DownlinkID uint32 `json:"downlink_id"` // 下行ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + FCntDown uint32 `json:"f_cnt_down"` // 下行帧计数器 + GatewayID string `json:"gateway_id"` // 网关ID + QueueItemID string `json:"queue_item_id"` // 队列项ID + TxInfo DownlinkTxInfo `json:"tx_info"` // 下行发送信息 +} + +// StatusEvent 对应 ChirpStack 的 "status" 事件。 +type StatusEvent struct { + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + Margin int `json:"margin"` // 链路预算余量 (dB) + ExternalPower bool `json:"external_power_source"` // 设备是否连接外部电源 + BatteryLevel float32 `json:"battery_level"` // 电池剩余电量 + BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电池电量是否不可用 +} + +// LogEvent 对应 ChirpStack 的 "log" 事件。 +type LogEvent struct { + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + Level string `json:"level"` // 日志级别 (e.g., INFO, WARNING, ERROR) + Code string `json:"code"` // 日志代码 + Description string `json:"description"` // 日志描述 + Context map[string]string `json:"context"` // 上下文信息 +} + +// LocationEvent 对应 ChirpStack 的 "location" 事件。 +type LocationEvent struct { + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + Location ResolvedLocation `json:"location"` // 解析后的位置信息 +} + +// IntegrationEvent 对应 ChirpStack 的 "integration" 事件。 +type IntegrationEvent struct { + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + IntegrationName string `json:"integration_name"` // 集成名称 + EventType string `json:"event_type,omitempty"` // 事件类型 + Object json.RawMessage `json:"object"` // 集成事件的原始JSON负载 +} diff --git a/internal/core/application.go b/internal/core/application.go index e8815a7..2beddd6 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -69,8 +69,14 @@ func NewApplication(configPath string) (*Application, error) { // 初始化执行日志仓库 executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB()) + // 初始化传感器数据仓库 + sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB()) + + // 初始化命令下发历史仓库 + deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB()) + // 初始化设备上行监听器 - listenHandler := transport.NewChirpStackListener(logger) + listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo) // 初始化计划触发器管理器 analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) @@ -79,7 +85,7 @@ func NewApplication(configPath string) (*Application, error) { executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) // 初始化 API 服务器 - apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService, listenHandler, analysisPlanTaskManager) + apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager) // 组装 Application 对象 app := &Application{ @@ -258,5 +264,7 @@ func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Stora if err := storage.Migrate(models.GetAllModels()...); err != nil { return nil, fmt.Errorf("数据库迁移失败: %w", err) } + + logger.Info("数据库初始化完成。") return storage, nil } diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index 273e37c..4944f99 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -81,6 +81,9 @@ type DatabaseConfig struct { // SSLMode SSL模式 SSLMode string `yaml:"sslmode"` + // IsTimescaleDB is timescaledb + IsTimescaleDB bool `yaml:"is_timescaledb"` + // MaxOpenConns 最大开放连接数 MaxOpenConns int `yaml:"max_open_conns"` diff --git a/internal/infra/database/postgres.go b/internal/infra/database/postgres.go index c5df1b3..27d0ecc 100644 --- a/internal/infra/database/postgres.go +++ b/internal/infra/database/postgres.go @@ -16,6 +16,7 @@ import ( // 使用GORM作为ORM库 type PostgresStorage struct { db *gorm.DB + isTimescaleDB bool connectionString string maxOpenConns int maxIdleConns int @@ -25,9 +26,10 @@ type PostgresStorage struct { // NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例 // 它接收一个 logger 实例,而不是自己创建 -func NewPostgresStorage(connectionString string, maxOpenConns, maxIdleConns, connMaxLifetime int, logger *logs.Logger) *PostgresStorage { +func NewPostgresStorage(connectionString string, isTimescaleDB bool, maxOpenConns, maxIdleConns, connMaxLifetime int, logger *logs.Logger) *PostgresStorage { return &PostgresStorage{ connectionString: connectionString, + isTimescaleDB: isTimescaleDB, maxOpenConns: maxOpenConns, maxIdleConns: maxIdleConns, connMaxLifetime: connMaxLifetime, @@ -116,5 +118,77 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error { return fmt.Errorf("数据库表结构迁移失败: %w", err) } ps.logger.Info("数据库表结构迁移完成") + + // -- 处理gorm做不到的初始化逻辑 -- + if err := ps.creatingIndex(); err != nil { + return err + } + + // 如果是 TimescaleDB, 则将 sensor_data 转换为 hypertable + if ps.isTimescaleDB { + ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换") + if err := ps.creatingHyperTable(); err != nil { + return err + } + } + return nil +} + +// creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表 +func (ps *PostgresStorage) creatingHyperTable() error { + // 将 sensor_data 转换为超表 + // 使用 if_not_exists => TRUE 保证幂等性 + // 'time' 是 SensorData 模型中定义的时间列 + sqlSensorData := "SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);" + if err := ps.db.Exec(sqlSensorData).Error; err != nil { + ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err) + return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err) + } + ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)") + + // 将 device_command_log 转换为超表 + // 'sent_at' 是 DeviceCommandLog 模型中定义的时间列 + sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', if_not_exists => TRUE);" + if err := ps.db.Exec(sqlDeviceCommandLogs).Error; err != nil { + ps.logger.Errorw("将 device_command_log 转换为超表失败", "error", err) + return fmt.Errorf("将 device_command_log 转换为超表失败: %w", err) + } + ps.logger.Info("成功将 device_command_log 转换为超表 (或已转换)") + + return nil +} + +// creatingIndex 用于创建gorm无法处理的索引, 如gin索引 +func (ps *PostgresStorage) creatingIndex() error { + // 使用 IF NOT EXISTS 保证幂等性 + // 如果索引已存在,此命令不会报错 + + // 为 sensor_data 表的 data 字段创建 GIN 索引 + ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引") + ginSensorDataIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);" + if err := ps.db.Exec(ginSensorDataIndexSQL).Error; err != nil { + ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err) + return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err) + } + ps.logger.Info("成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)") + + // 为 tasks.parameters 创建 GIN 索引 + ps.logger.Info("正在为 tasks 表的 parameters 字段创建 GIN 索引") + taskGinIndexSQL := "CREATE INDEX IF NOT EXISTS idx_tasks_parameters_gin ON tasks USING GIN (parameters);" + if err := ps.db.Exec(taskGinIndexSQL).Error; err != nil { + ps.logger.Errorw("为 tasks 的 parameters 字段创建 GIN 索引失败", "error", err) + return fmt.Errorf("为 tasks 的 parameters 字段创建 GIN 索引失败: %w", err) + } + ps.logger.Info("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)") + + // 为 devices 表的 properties 字段创建 GIN 索引 + ps.logger.Info("正在为 devices 表的 properties 字段创建 GIN 索引") + ginDevicePropertiesIndexSQL := "CREATE INDEX IF NOT EXISTS idx_devices_properties_gin ON devices USING GIN (properties);" + if err := ps.db.Exec(ginDevicePropertiesIndexSQL).Error; err != nil { + ps.logger.Errorw("为 devices 的 properties 字段创建 GIN 索引失败", "error", err) + return fmt.Errorf("为 devices 的 properties 字段创建 GIN 索引失败: %w", err) + } + ps.logger.Info("成功为 devices 的 properties 字段创建 GIN 索引 (或已存在)") + return nil } diff --git a/internal/infra/database/storage.go b/internal/infra/database/storage.go index 72302d7..ceeb84c 100644 --- a/internal/infra/database/storage.go +++ b/internal/infra/database/storage.go @@ -42,9 +42,11 @@ func NewStorage(cfg config.DatabaseConfig, logger *logs.Logger) Storage { cfg.SSLMode, ) + // 当前默认返回PostgreSQL存储实现,并将 logger 注入 // 当前默认返回PostgreSQL存储实现,并将 logger 注入 return NewPostgresStorage( connectionString, + cfg.IsTimescaleDB, // <--- 添加 IsTimescaleDB cfg.MaxOpenConns, cfg.MaxIdleConns, cfg.ConnMaxLifetime, diff --git a/internal/infra/models/device_command_log.go b/internal/infra/models/device_command_log.go new file mode 100644 index 0000000..6fc420b --- /dev/null +++ b/internal/infra/models/device_command_log.go @@ -0,0 +1,32 @@ +package models + +import ( + "time" +) + +// DeviceCommandLog 记录下行任务的下发情况和设备确认状态 +type DeviceCommandLog struct { + // MessageID 是下行消息的唯一标识符。 + // 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。 + MessageID string `gorm:"primaryKey" json:"message_id"` + + // DeviceID 是接收此下行任务的设备的ID。 + // 对于 LoRaWAN,这通常是区域主控设备的ID。 + DeviceID uint `gorm:"not null;index" json:"device_id"` + + // SentAt 记录下行任务最初发送的时间。 + SentAt time.Time `gorm:"not null" json:"sent_at"` + + // AcknowledgedAt 记录设备确认收到下行消息的时间。 + // 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。 + AcknowledgedAt *time.Time `json:"acknowledged_at"` + + // ReceivedSuccess 表示设备是否成功接收到下行消息。 + // true 表示设备已确认收到,false 表示设备未确认收到或下发失败。 + ReceivedSuccess bool `gorm:"not null" json:"received_success"` +} + +// TableName 自定义 GORM 使用的数据库表名 +func (DeviceCommandLog) TableName() string { + return "device_command_log" +} diff --git a/internal/infra/models/models.go b/internal/infra/models/models.go index 4d542da..40382f4 100644 --- a/internal/infra/models/models.go +++ b/internal/infra/models/models.go @@ -12,5 +12,7 @@ func GetAllModels() []interface{} { &PlanExecutionLog{}, &TaskExecutionLog{}, &PendingTask{}, + &SensorData{}, + &DeviceCommandLog{}, } } diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 1d46c4c..516f907 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -53,8 +53,8 @@ type Plan struct { Name string `gorm:"not null" json:"name"` Description string `json:"description"` - ExecutionType PlanExecutionType `gorm:"not null" json:"execution_type"` - Status PlanStatus `gorm:"default:0" json:"status"` // 计划是否被启动 + ExecutionType PlanExecutionType `gorm:"not null;index" json:"execution_type"` + Status PlanStatus `gorm:"default:0;index" json:"status"` // 计划是否被启动 ExecuteNum uint `gorm:"default:0" json:"execute_num"` // 计划预期执行次数 ExecuteCount uint `gorm:"default:0" json:"execute_count"` // 执行计数器 diff --git a/internal/infra/models/schedule.go b/internal/infra/models/schedule.go index 40e43b1..12a6384 100644 --- a/internal/infra/models/schedule.go +++ b/internal/infra/models/schedule.go @@ -20,8 +20,8 @@ type PendingTask struct { // GORM 会根据 TaskID 字段自动填充此关联 Task *Task `gorm:"foreignKey:TaskID"` - ExecuteAt time.Time `gorm:"index"` // 任务执行时间 - TaskExecutionLogID uint `gorm:"unique;not null"` // 对应的执行历史记录ID + ExecuteAt time.Time `gorm:"index"` // 任务执行时间 + TaskExecutionLogID uint `gorm:"unique;not null;index"` // 对应的执行历史记录ID // 通过 TaskExecutionLogID 关联到唯一的 TaskExecutionLog 记录 // ON DELETE CASCADE 确保如果日志被删除,这个待办任务也会被自动清理 diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go new file mode 100644 index 0000000..a667f65 --- /dev/null +++ b/internal/infra/models/sensor_data.go @@ -0,0 +1,74 @@ +package models + +import ( + "time" + + "gorm.io/datatypes" +) + +// SensorDataType 定义了 SensorData 记录中 Data 字段的整体类型 +type SensorDataType string + +const ( + SensorDataTypeSignalMetrics SensorDataType = "signal_metrics" // 信号强度 + SensorDataTypeBatteryLevel SensorDataType = "battery_level" // 电池电量 + SensorDataTypeTemperature SensorDataType = "temperature" // 温度 + SensorDataTypeHumidity SensorDataType = "humidity" // 湿度 + SensorDataTypeWeight SensorDataType = "weight" // 重量 +) + +// SignalMetrics 存储信号强度数据 +type SignalMetrics struct { + RssiDbm int `json:"rssi_dbm"` // 绝对信号强度(dBm),受距离、障碍物影响 + SnrDb float64 `json:"snr_db"` // 信号与噪声的相对比率(dB),由 RSSI 减去噪声地板(Noise Floor) + SensitivityDbm int `json:"sensitivity_dbm"` // 网关的最低检测阈值(dBm) + MarginDb int `json:"margin_db"` // SNR 相对于接收器灵敏度的余量, Margin = SNR - Sensitivity +} + +// BatteryLevel 存储电池电量数据 +type BatteryLevel struct { + BatteryLevelRatio float32 `json:"battery_level_ratio"` // 电量剩余百分比(%) + BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电量数据不可用 + ExternalPower bool `json:"external_power"` // 是否使用外部电源 +} + +// TemperatureData 存储温度数据 +type TemperatureData struct { + TemperatureCelsius float64 `json:"temperature_celsius"` // 温度值 (摄氏度) +} + +// HumidityData 存储湿度数据 +type HumidityData struct { + HumidityPercent float64 `json:"humidity_percent"` // 湿度值 (%) +} + +// WeightData 存储重量数据 +type WeightData struct { + WeightKilograms float64 `json:"weight_kilograms"` // 重量值 (公斤) +} + +// SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 +type SensorData struct { + // Time 是数据记录的时间戳,作为复合主键的一部分。 + // GORM 会将其映射到 'time' TIMESTAMPTZ 列。 + Time time.Time `gorm:"primaryKey" json:"time"` + + // DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。 + // GORM 会将其映射到 'device_id' VARCHAR(50) 列。 + DeviceID uint `gorm:"primaryKey" json:"device_id"` + + // RegionalControllerID 是上报此数据的区域主控的ID。 + // 我们为其添加了数据库索引以优化按区域查询的性能。 + RegionalControllerID uint `json:"regional_controller_id"` + + // SensorDataType 是传感数据的类型 + SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"` + + // Data 存储一个或多个传感器读数,格式为 JSON。 + // GORM 会使用 'jsonb' 类型来创建此列。 + Data datatypes.JSON `gorm:"type:jsonb" json:"data"` +} + +func (SensorData) TableName() string { + return "sensor_data" +} diff --git a/internal/infra/repository/device_command_log_repository.go b/internal/infra/repository/device_command_log_repository.go new file mode 100644 index 0000000..a843785 --- /dev/null +++ b/internal/infra/repository/device_command_log_repository.go @@ -0,0 +1,52 @@ +package repository + +import ( + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "gorm.io/gorm" +) + +// DeviceCommandLogRepository 定义了设备下行命令历史记录的数据访问接口 +type DeviceCommandLogRepository interface { + Create(record *models.DeviceCommandLog) error + FindByMessageID(messageID string) (*models.DeviceCommandLog, error) + // UpdateAcknowledgedAt 用于更新指定 MessageID 的下行命令记录的确认时间及接收成功状态。 + // AcknowledgedAt 和 ReceivedSuccess 字段会被更新。 + UpdateAcknowledgedAt(messageID string, acknowledgedAt time.Time, receivedSuccess bool) error +} + +// gormDeviceCommandLogRepository 是 DeviceCommandLogRepository 接口的 GORM 实现 +type gormDeviceCommandLogRepository struct { + db *gorm.DB +} + +// NewGormDeviceCommandLogRepository 创建一个新的 DeviceCommandLogRepository GORM 实现 +func NewGormDeviceCommandLogRepository(db *gorm.DB) DeviceCommandLogRepository { + return &gormDeviceCommandLogRepository{db: db} +} + +// Create 实现 DeviceCommandLogRepository 接口的 Create 方法 +func (r *gormDeviceCommandLogRepository) Create(record *models.DeviceCommandLog) error { + return r.db.Create(record).Error +} + +// FindByMessageID 实现 DeviceCommandLogRepository 接口的 FindByMessageID 方法 +func (r *gormDeviceCommandLogRepository) FindByMessageID(messageID string) (*models.DeviceCommandLog, error) { + var record models.DeviceCommandLog + if err := r.db.Where("message_id = ?", messageID).First(&record).Error; err != nil { + return nil, err + } + return &record, nil +} + +// UpdateAcknowledgedAt 实现 DeviceCommandLogRepository 接口的 UpdateAcknowledgedAt 方法 +func (r *gormDeviceCommandLogRepository) UpdateAcknowledgedAt(messageID string, acknowledgedAt time.Time, receivedSuccess bool) error { + // 使用 Updates 方法更新指定字段 + return r.db.Model(&models.DeviceCommandLog{}). + Where("message_id = ?", messageID). + Updates(map[string]interface{}{ + "acknowledged_at": acknowledgedAt, + "received_success": receivedSuccess, + }).Error +} diff --git a/internal/infra/repository/device_repository.go b/internal/infra/repository/device_repository.go index b5554b3..5cc3ee8 100644 --- a/internal/infra/repository/device_repository.go +++ b/internal/infra/repository/device_repository.go @@ -32,6 +32,9 @@ type DeviceRepository interface { // Delete 根据主键 ID 删除一个设备 Delete(id uint) error + + // FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 (新增) + FindByDevEui(devEui string) (*models.Device, error) } // gormDeviceRepository 是 DeviceRepository 的 GORM 实现 @@ -108,3 +111,13 @@ func (r *gormDeviceRepository) Update(device *models.Device) error { func (r *gormDeviceRepository) Delete(id uint) error { return r.db.Delete(&models.Device{}, id).Error } + +// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 +func (r *gormDeviceRepository) FindByDevEui(devEui string) (*models.Device, error) { + var device models.Device + // 使用 GORM 的 JSONB 查询语法: properties->>'lora_address' + if err := r.db.Where("properties->>'lora_address' = ?", devEui).First(&device).Error; err != nil { + return nil, err // 如果找不到或发生其他错误,GORM 会返回错误 + } + return &device, nil +} diff --git a/internal/infra/repository/sensor_data_repository.go b/internal/infra/repository/sensor_data_repository.go new file mode 100644 index 0000000..b9b80a9 --- /dev/null +++ b/internal/infra/repository/sensor_data_repository.go @@ -0,0 +1,27 @@ +package repository + +import ( + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "gorm.io/gorm" +) + +// SensorDataRepository 定义了与传感器数据相关的数据库操作接口。 +type SensorDataRepository interface { + Create(sensorData *models.SensorData) error +} + +// gormSensorDataRepository 是 SensorDataRepository 的 GORM 实现。 +type gormSensorDataRepository struct { + db *gorm.DB +} + +// NewGormSensorDataRepository 创建一个新的 SensorDataRepository GORM 实现实例。 +// 它直接接收一个 *gorm.DB 实例作为依赖,完全遵循项目中的既定模式。 +func NewGormSensorDataRepository(db *gorm.DB) SensorDataRepository { + return &gormSensorDataRepository{db: db} +} + +// Create 将一条新的传感器数据记录插入数据库。 +func (r *gormSensorDataRepository) Create(sensorData *models.SensorData) error { + return r.db.Create(sensorData).Error +} diff --git a/internal/infra/transport/lora/chirp_stack.go b/internal/infra/transport/lora/chirp_stack.go index 2466182..16ab842 100644 --- a/internal/infra/transport/lora/chirp_stack.go +++ b/internal/infra/transport/lora/chirp_stack.go @@ -4,6 +4,8 @@ import ( "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service" "github.com/go-openapi/runtime" httptransport "github.com/go-openapi/runtime/client" @@ -33,11 +35,19 @@ type ChirpStackTransport struct { authInfo runtime.ClientAuthInfoWriter config ChirpStackConfig + deviceCommandLogRepo repository.DeviceCommandLogRepository + deviceRepo repository.DeviceRepository + logger *logs.Logger } // NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。 -func NewChirpStackTransport(config ChirpStackConfig, logger *logs.Logger) *ChirpStackTransport { +func NewChirpStackTransport( + config ChirpStackConfig, + logger *logs.Logger, + deviceCommandLogRepo repository.DeviceCommandLogRepository, + deviceRepo repository.DeviceRepository, +) *ChirpStackTransport { // 使用配置中的服务器地址创建一个 HTTP transport。 // 它会使用生成的客户端中定义的默认 base path 和 schemes。 transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes) @@ -49,10 +59,12 @@ func NewChirpStackTransport(config ChirpStackConfig, logger *logs.Logger) *Chirp authInfo := httptransport.APIKeyAuth("grpc-metadata-authorization", "header", config.GenerateAPIKey()) return &ChirpStackTransport{ - client: apiClient, - authInfo: authInfo, - config: config, - logger: logger, + client: apiClient, + authInfo: authInfo, + config: config, + logger: logger, + deviceCommandLogRepo: deviceCommandLogRepo, + deviceRepo: deviceRepo, } } @@ -80,12 +92,56 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error { // 3. 调用生成的客户端方法来发送请求。 // c.authInfo 是您在 NewChirpStackTransport 中创建的认证信息。 - _, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo) + resp, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo) if err != nil { c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err) return err } - c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功", address) + // 4. 成功发送后,尝试记录下行任务 + messageID := "" + if resp != nil && resp.Payload != nil && resp.Payload.ID != "" { // 根据实际结构,使用 resp.Payload.ID + messageID = resp.Payload.ID + } else { + c.logger.Warnf("ChirpStack Enqueue 响应未包含 MessageID (ID),无法记录下行任务。设备: %s", address) + // 即使无法获取 MessageID,也认为发送成功,因为 ChirpStack Enqueue 成功了 + return nil + } + + // 调用私有方法记录下行任务 + if err := c.recordDownlinkTask(address, messageID); err != nil { + // 记录失败不影响下行命令的发送成功 + c.logger.Errorf("记录下行任务失败 (MessageID: %s, DevEui: %s): %v", messageID, address, err) + return nil + } + + c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功,并创建下行任务记录 (MessageID: %s)", address, messageID) + + return nil +} + +// recordDownlinkTask 记录下行任务到数据库 +func (c *ChirpStackTransport) recordDownlinkTask(devEui string, messageID string) error { + // 获取区域主控的内部 DeviceID + regionalController, err := c.deviceRepo.FindByDevEui(devEui) + if err != nil { + c.logger.Errorf("记录下行任务失败:无法通过 DevEui '%s' 找到区域主控设备: %v", devEui, err) + return err + } + + // 创建 DeviceCommandLog + record := &models.DeviceCommandLog{ + MessageID: messageID, + DeviceID: regionalController.ID, + SentAt: time.Now(), + AcknowledgedAt: nil, // 初始状态为未确认 + } + + if err := c.deviceCommandLogRepo.Create(record); err != nil { + c.logger.Errorf("创建下行任务记录失败 (MessageID: %s, DeviceID: %d): %v", messageID, regionalController.ID, err) + return err + } + + c.logger.Infof("成功创建下行任务记录 (MessageID: %s, DeviceID: %d)", messageID, regionalController.ID) return nil }