issue_9 #14

Merged
huang merged 11 commits from issue_9 into main 2025-09-25 00:16:50 +08:00
18 changed files with 910 additions and 38 deletions

View File

@@ -1,5 +1,11 @@
# 猪场管理系统 # 猪场管理系统
## 安装说明
### 推荐使用 TimescaleDB
TimescaleDB 是基于 PostgreSQL 的开源数据库, 专门为处理时序数据而设计的。可以应对后续传海量传感器数据
## 功能介绍 ## 功能介绍
### 一. 猪舍控制 ### 一. 猪舍控制

View File

@@ -8,7 +8,7 @@ app:
# HTTP 服务配置 # HTTP 服务配置
server: server:
port: 8086 port: 8086
mode: "debug" # Gin 运行模式: "debug", "release", "test" mode: "release" # Gin 运行模式: "debug", "release", "test"
# 日志配置 # 日志配置
log: log:
@@ -29,6 +29,7 @@ database:
password: "pig-farm-controller" password: "pig-farm-controller"
dbname: "pig-farm-controller" dbname: "pig-farm-controller"
sslmode: "disable" # 在生产环境中建议使用 "require" sslmode: "disable" # 在生产环境中建议使用 "require"
is_timescaledb: true
max_open_conns: 25 # 最大开放连接数 max_open_conns: 25 # 最大开放连接数
max_idle_conns: 10 # 最大空闲连接数 max_idle_conns: 10 # 最大空闲连接数
conn_max_lifetime: 600 # 连接最大生命周期(秒) conn_max_lifetime: 600 # 连接最大生命周期(秒)

View File

@@ -12,6 +12,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"net/http/pprof"
"time" "time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller/device" "git.huangwc.com/pig/pig-farm-controller/internal/app/controller/device"
@@ -52,6 +53,8 @@ func NewAPI(cfg config.ServerConfig,
userRepo repository.UserRepository, userRepo repository.UserRepository,
deviceRepository repository.DeviceRepository, deviceRepository repository.DeviceRepository,
planRepository repository.PlanRepository, planRepository repository.PlanRepository,
sensorDataRepository repository.SensorDataRepository,
executionLogRepository repository.ExecutionLogRepository,
tokenService token.TokenService, tokenService token.TokenService,
listenHandler transport.ListenHandler, listenHandler transport.ListenHandler,
analysisTaskManager *task.AnalysisPlanTaskManager) *API { analysisTaskManager *task.AnalysisPlanTaskManager) *API {
@@ -99,6 +102,7 @@ func (a *API) setupRoutes() {
userGroup.POST("", a.userController.CreateUser) // 注册创建用户接口 (POST /api/v1/users) userGroup.POST("", a.userController.CreateUser) // 注册创建用户接口 (POST /api/v1/users)
userGroup.POST("/login", a.userController.Login) // 注册用户登录接口 (POST /api/v1/users/login) userGroup.POST("/login", a.userController.Login) // 注册用户登录接口 (POST /api/v1/users/login)
} }
a.logger.Info("用户相关接口注册成功")
// 设备相关路由组 // 设备相关路由组
deviceGroup := v1.Group("/devices") deviceGroup := v1.Group("/devices")
@@ -109,6 +113,7 @@ func (a *API) setupRoutes() {
deviceGroup.PUT("/:id", a.deviceController.UpdateDevice) deviceGroup.PUT("/:id", a.deviceController.UpdateDevice)
deviceGroup.DELETE("/:id", a.deviceController.DeleteDevice) deviceGroup.DELETE("/:id", a.deviceController.DeleteDevice)
} }
a.logger.Info("设备相关接口注册成功")
// 计划相关路由组 // 计划相关路由组
planGroup := v1.Group("/plans") planGroup := v1.Group("/plans")
@@ -121,17 +126,38 @@ func (a *API) setupRoutes() {
planGroup.POST("/:id/start", a.planController.StartPlan) planGroup.POST("/:id/start", a.planController.StartPlan)
planGroup.POST("/:id/stop", a.planController.StopPlan) planGroup.POST("/:id/stop", a.planController.StopPlan)
} }
a.logger.Info("计划相关接口注册成功")
} }
// 注册 pprof 路由
pprofGroup := a.engine.Group("/debug/pprof")
{
pprofGroup.GET("/", gin.WrapF(pprof.Index))
pprofGroup.GET("/cmdline", gin.WrapF(pprof.Cmdline))
pprofGroup.GET("/profile", gin.WrapF(pprof.Profile))
pprofGroup.POST("/symbol", gin.WrapF(pprof.Symbol))
pprofGroup.GET("/symbol", gin.WrapF(pprof.Symbol))
pprofGroup.GET("/trace", gin.WrapF(pprof.Trace))
pprofGroup.GET("/allocs", gin.WrapH(pprof.Handler("allocs")))
pprofGroup.GET("/block", gin.WrapH(pprof.Handler("block")))
pprofGroup.GET("/goroutine", gin.WrapH(pprof.Handler("goroutine")))
pprofGroup.GET("/heap", gin.WrapH(pprof.Handler("heap")))
pprofGroup.GET("/mutex", gin.WrapH(pprof.Handler("mutex")))
pprofGroup.GET("/threadcreate", gin.WrapH(pprof.Handler("threadcreate")))
}
a.logger.Info("pprof 接口注册成功")
// 上行事件监听路由 // 上行事件监听路由
a.engine.POST("/upstream", func(c *gin.Context) { a.engine.POST("/upstream", func(c *gin.Context) {
h := a.listenHandler.Handler() h := a.listenHandler.Handler()
h.ServeHTTP(c.Writer, c.Request) h.ServeHTTP(c.Writer, c.Request)
}) })
a.logger.Info("上行事件监听接口注册成功")
// 添加 Swagger UI 路由 // 添加 Swagger UI 路由, Swagger UI可在 /swagger/index.html 上找到
a.engine.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) a.engine.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
a.logger.Info("Swagger UI is available at /swagger/index.html") a.logger.Info("Swagger UI 接口注册成功")
} }
// Start 启动 HTTP 服务器 // Start 启动 HTTP 服务器

View File

@@ -1,52 +1,350 @@
package transport package transport
import ( import (
"encoding/base64" // 新增导入
"encoding/json"
"io" "io"
"net/http" "net/http"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"gorm.io/datatypes"
)
// ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型
const (
eventTypeUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。
eventTypeStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。
eventTypeJoin = "join" // 入网事件:当设备成功加入网络时触发。
eventTypeAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。
eventTypeTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。
eventTypeLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。
eventTypeLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。
eventTypeIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。
) )
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
type ChirpStackListener struct { type ChirpStackListener struct {
logger *logs.Logger logger *logs.Logger
sensorDataRepo repository.SensorDataRepository
deviceRepo repository.DeviceRepository
deviceCommandLogRepo repository.DeviceCommandLogRepository
} }
func NewChirpStackListener(logger *logs.Logger) *ChirpStackListener { func NewChirpStackListener(
logger *logs.Logger,
sensorDataRepo repository.SensorDataRepository,
deviceRepo repository.DeviceRepository,
deviceCommandLogRepo repository.DeviceCommandLogRepository,
) *ChirpStackListener {
return &ChirpStackListener{ return &ChirpStackListener{
logger: logger, logger: logger,
sensorDataRepo: sensorDataRepo,
deviceRepo: deviceRepo,
deviceCommandLogRepo: deviceCommandLogRepo,
} }
} }
// Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息
func (c *ChirpStackListener) Handler() http.HandlerFunc { func (c *ChirpStackListener) Handler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
b, err := io.ReadAll(r.Body) b, err := io.ReadAll(r.Body)
if err != nil { if err != nil {
c.logger.Errorf("读取请求体失败: %v", err) c.logger.Errorf("读取请求体失败: %v", err)
http.Error(w, "failed to read body", http.StatusBadRequest)
// TODO 直接崩溃不太合适 return
panic(err)
} }
event := r.URL.Query().Get("event") event := r.URL.Query().Get("event")
switch event { w.WriteHeader(http.StatusOK)
case "up": // 链路上行事件
err = c.up(b)
if err != nil {
c.logger.Errorf("处理链路上行事件失败: %v", err)
// TODO 直接崩溃不太合适 // 将异步处理逻辑委托给 handler 方法
panic(err) go c.handler(b, event)
} }
}
// handler 用于处理 ChirpStack 发送的事件
func (c *ChirpStackListener) handler(data []byte, eventType string) {
switch eventType {
case eventTypeUp:
var msg UpEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleUpEvent(&msg)
case eventTypeJoin:
var msg JoinEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleJoinEvent(&msg)
case eventTypeAck:
var msg AckEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleAckEvent(&msg)
case eventTypeTxAck:
var msg TxAckEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleTxAckEvent(&msg)
case eventTypeStatus:
var msg StatusEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleStatusEvent(&msg)
case eventTypeLog:
var msg LogEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleLogEvent(&msg)
case eventTypeLocation:
var msg LocationEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleLocationEvent(&msg)
case eventTypeIntegration:
var msg IntegrationEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleIntegrationEvent(&msg)
default: default:
c.logger.Errorf("未知的ChirpStack事件: %s", event) c.logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data))
}
} }
} }
// up 处理链路上行事件 // --- 业务处理函数 ---
func (c *ChirpStackListener) up(data []byte) error {
// TODO implement me // GenericSensorReading 表示单个传感器读数包含设备ID、类型和值。
panic("implement me") type GenericSensorReading struct {
DeviceID uint `json:"device_id"` // 传感器设备的ID
Type models.SensorDataType `json:"type"` // 传感器类型 (复用 models.SensorDataType)
Value float64 `json:"value"` // 传感器读数
}
// handleUpEvent 处理上行数据事件
func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
c.logger.Infof("处理 'up' 事件: %+v", event)
// 记录信号强度
// 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。
// 因此,我们只取第一个 RxInfo 中的信号数据即可。
if len(event.RxInfo) > 0 {
rx := event.RxInfo[0] // 取第一个接收到的网关信息
// 构建 SignalMetrics 结构体
signalMetrics := models.SignalMetrics{
RssiDbm: rx.Rssi,
SnrDb: rx.Snr,
}
// 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
if err != nil {
c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
return
}
// 记录区域主控的信号强度
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics)
} else {
c.logger.Warnf("处理 'up' 事件时未找到 RxInfo无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui)
}
// 解析并记录传感器数据 (温度、湿度、重量)
// 假设 event.Data (frmPayload) 是 Base64 编码的 JSON 数组字符串
if event.Data != "" {
decodedData, err := base64.StdEncoding.DecodeString(event.Data)
if err != nil {
c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data)
return
}
var readings []GenericSensorReading
if err := json.Unmarshal(decodedData, &readings); err != nil {
c.logger.Errorf("解析 'up' 事件的解码后 Data (JSON 数组) 失败: %v, Decoded Data: %s", err, string(decodedData))
return
}
// 查找区域主控设备以便记录其ID
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
if err != nil {
c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
return
}
for _, reading := range readings {
// 根据类型构建具体的传感器数据结构体
var sensorData interface{}
var sensorDataType models.SensorDataType
switch reading.Type {
case models.SensorDataTypeTemperature: // 使用枚举常量
sensorData = models.TemperatureData{
TemperatureCelsius: reading.Value,
}
sensorDataType = models.SensorDataTypeTemperature
case models.SensorDataTypeHumidity: // 使用枚举常量
sensorData = models.HumidityData{
HumidityPercent: reading.Value,
}
sensorDataType = models.SensorDataTypeHumidity
case models.SensorDataTypeWeight: // 使用枚举常量
sensorData = models.WeightData{
WeightKilograms: reading.Value,
}
sensorDataType = models.SensorDataTypeWeight
default:
c.logger.Warnf("处理 'up' 事件时遇到未知传感器类型: %s, Value: %f. 区域主控DevEui: %s, 传感器设备ID: %d",
reading.Type, reading.Value, event.DeviceInfo.DevEui, reading.DeviceID)
continue // 跳过未知类型
}
// 记录普通设备的传感器数据
c.recordSensorData(regionalController.ID, reading.DeviceID, event.Time, sensorDataType, sensorData)
}
} else {
c.logger.Warnf("处理 'up' 事件时 Data 字段为空无法记录传感器数据。DevEui: %s", event.DeviceInfo.DevEui)
}
}
// handleStatusEvent 处理设备状态事件
func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) {
c.logger.Infof("处接收到理 'status' 事件: %+v", event)
// 记录信号强度
signalMetrics := models.SignalMetrics{
MarginDb: event.Margin,
}
// 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui
regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui)
if err != nil {
c.logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
return
}
// 记录区域主控的信号强度
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics)
// 记录 电量
batteryLevel := models.BatteryLevel{
BatteryLevelRatio: event.BatteryLevel,
BatteryLevelUnavailable: event.BatteryLevelUnavailable,
ExternalPower: event.ExternalPower,
}
// 记录区域主控的电池电量
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel)
}
// handleAckEvent 处理下行确认事件
func (c *ChirpStackListener) handleAckEvent(event *AckEvent) {
c.logger.Infof("接收到 'ack' 事件: %+v", event)
// 更新下行任务记录的确认时间及接收成功状态
err := c.deviceCommandLogRepo.UpdateAcknowledgedAt(event.DeduplicationID, event.Time, event.Acknowledged)
if err != nil {
c.logger.Errorf("更新下行任务记录的确认时间及接收成功状态失败 (MessageID: %s, DevEui: %s, Acknowledged: %t): %v",
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, err)
return
}
c.logger.Infof("成功更新下行任务记录确认时间及接收成功状态 (MessageID: %s, DevEui: %s, Acknowledged: %t, AcknowledgedAt: %s)",
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, event.Time.Format(time.RFC3339))
}
// handleLogEvent 处理日志事件
func (c *ChirpStackListener) handleLogEvent(event *LogEvent) {
// 首先,打印完整的事件结构体,用于详细排查
c.logger.Infof("接收到 'log' 事件的完整内容: %+v", event)
// 接着,根据 ChirpStack 日志的级别,使用我们自己的 logger 对应级别来打印核心信息
logMessage := "ChirpStack 日志: [%s] %s (DevEui: %s)"
switch event.Level {
case "INFO":
c.logger.Infof(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
case "WARNING":
c.logger.Warnf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
case "ERROR":
c.logger.Errorf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
default:
// 对于未知级别,使用 Warn 级别打印,并明确指出级别未知
c.logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)",
event.Level, event.Code, event.Description, event.DeviceInfo.DevEui)
}
}
// handleJoinEvent 处理入网事件
func (c *ChirpStackListener) handleJoinEvent(event *JoinEvent) {
c.logger.Infof("接收到 'join' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleTxAckEvent 处理网关发送确认事件
func (c *ChirpStackListener) handleTxAckEvent(event *TxAckEvent) {
c.logger.Infof("接收到 'txack' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleLocationEvent 处理位置事件
func (c *ChirpStackListener) handleLocationEvent(event *LocationEvent) {
c.logger.Infof("接收到 'location' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleIntegrationEvent 处理集成事件
func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) {
c.logger.Infof("接收到 'integration' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。
// regionalControllerID: 区域主控设备的ID
// sensorDeviceID: 实际产生传感器数据的普通设备的ID
func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, dataType models.SensorDataType, data interface{}) {
// 2. 序列化数据结构体为 JSON
jsonData, err := json.Marshal(data)
if err != nil {
c.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
return
}
// 3. 构建 SensorData 模型
sensorData := &models.SensorData{
Time: eventTime,
DeviceID: sensorDeviceID,
RegionalControllerID: regionalControllerID,
SensorDataType: dataType,
Data: datatypes.JSON(jsonData),
}
// 4. 调用仓库创建记录
if err := c.sensorDataRepo.Create(sensorData); err != nil {
c.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
}
} }

View File

@@ -0,0 +1,198 @@
package transport
import (
"encoding/json"
"time"
)
// --- 通用结构体 ---
// DeviceInfo 包含了所有事件中通用的设备信息。
// 基于 aiserver.proto v4 (integration)
type DeviceInfo struct {
TenantID string `json:"tenant_id"` // 租户ID
TenantName string `json:"tenant_name"` // 租户名称
ApplicationID string `json:"application_id"` // 应用ID
ApplicationName string `json:"application_name"` // 应用名称
DeviceProfileID string `json:"device_profile_id"` // 设备配置文件ID
DeviceProfileName string `json:"device_profile_name"` // 设备配置文件名称
DeviceName string `json:"device_name"` // 设备名称
DevEui string `json:"dev_eui"` // 设备EUI (十六进制编码)
DeviceClassEnabled string `json:"device_class_enabled,omitempty"` // 设备启用的LoRaWAN类别 (A, B, 或 C)
Tags map[string]string `json:"tags"` // 用户定义的标签
}
// Location 包含了地理位置信息。
type Location struct {
Latitude float64 `json:"latitude"` // 纬度
Longitude float64 `json:"longitude"` // 经度
Altitude float64 `json:"altitude"` // 海拔
}
// --- 可复用的子结构体 ---
// UplinkRelayRxInfo 包含了上行中继接收信息。
type UplinkRelayRxInfo struct {
DevEui string `json:"dev_eui"` // 中继设备的DevEUI
Frequency uint32 `json:"frequency"` // 接收频率
Dr uint32 `json:"dr"` // 数据速率
Snr int32 `json:"snr"` // 信噪比
Rssi int32 `json:"rssi"` // 接收信号强度指示
WorChannel uint32 `json:"wor_channel"` // Work-on-Relay 通道
}
// KeyEnvelope 包装了一个加密的密钥。
// 基于 common.proto
type KeyEnvelope struct {
KEKLabel string `json:"kek_label,omitempty"` // 密钥加密密钥 (KEK) 标签
AESKey string `json:"aes_key,omitempty"` // Base64 编码的加密密钥
}
// JoinServerContext 包含了 Join-Server 上下文。
// 基于 common.proto
type JoinServerContext struct {
SessionKeyID string `json:"session_key_id"` // 会话密钥ID
AppSKey *KeyEnvelope `json:"app_s_key,omitempty"` // 应用会话密钥
}
// UplinkRxInfo 包含了上行接收信息。
type UplinkRxInfo struct {
GatewayID string `json:"gateway_id"` // 接收到上行数据的网关ID
UplinkID uint32 `json:"uplink_id"` // 上行ID
Time time.Time `json:"time"` // 接收时间
Rssi int `json:"rssi"` // 接收信号强度指示
Snr float64 `json:"snr"` // 信噪比
Channel int `json:"channel"` // 接收通道
Location *Location `json:"location"` // 网关位置
Context string `json:"context"` // 上下文信息
Metadata map[string]string `json:"metadata"` // 元数据
}
// LoraModulationInfo 包含了 LoRa 调制的具体参数。
type LoraModulationInfo struct {
Bandwidth int `json:"bandwidth"` // 带宽
SpreadingFactor int `json:"spreading_factor"` // 扩频因子
CodeRate string `json:"code_rate"` // 编码率
Polarization bool `json:"polarization_invert,omitempty"` // 极化反转
}
// Modulation 包含了具体的调制信息。
type Modulation struct {
Lora LoraModulationInfo `json:"lora"` // LoRa 调制信息
}
// UplinkTxInfo 包含了上行发送信息。
type UplinkTxInfo struct {
Frequency int `json:"frequency"` // 发送频率
Modulation Modulation `json:"modulation"` // 调制信息
}
// DownlinkTxInfo 包含了下行发送信息。
type DownlinkTxInfo struct {
Frequency int `json:"frequency"` // 发送频率
Power int `json:"power"` // 发送功率
Modulation Modulation `json:"modulation"` // 调制信息
}
// ResolvedLocation 包含了地理位置解析结果。
type ResolvedLocation struct {
Latitude float64 `json:"latitude"` // 纬度
Longitude float64 `json:"longitude"` // 经度
Altitude float64 `json:"altitude"` // 海拔
Source string `json:"source"` // 位置来源
Accuracy int `json:"accuracy"` // 精度
}
// --- 事件专属结构体 ---
// UpEvent 对应 ChirpStack 的 "up" 事件。
type UpEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
DevAddr string `json:"dev_addr"` // 设备地址
ADR bool `json:"adr"` // 自适应数据速率 (ADR) 是否启用
DR int `json:"dr"` // 数据速率
FCnt uint32 `json:"f_cnt"` // 帧计数器
FPort uint8 `json:"f_port"` // 端口
Confirmed bool `json:"confirmed"` // 是否是确认帧
Data string `json:"data"` // Base64 编码的原始负载数据
Object json.RawMessage `json:"object"` // 解码后的JSON对象负载
RxInfo []UplinkRxInfo `json:"rx_info"` // 接收信息列表
TxInfo UplinkTxInfo `json:"tx_info"` // 发送信息
RelayRxInfo *UplinkRelayRxInfo `json:"relay_rx_info,omitempty"` // 中继接收信息
JoinServerContext *JoinServerContext `json:"join_server_context,omitempty"` // Join-Server 上下文
RegionConfigID string `json:"region_config_id,omitempty"` // 区域配置ID
}
// JoinEvent 对应 ChirpStack 的 "join" 事件。
type JoinEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
DevAddr string `json:"dev_addr"` // 设备地址
RelayRxInfo *UplinkRelayRxInfo `json:"relay_rx_info,omitempty"` // 中继接收信息
JoinServerContext *JoinServerContext `json:"join_server_context,omitempty"` // Join-Server 上下文
RegionConfigID string `json:"region_config_id,omitempty"` // 区域配置ID
}
// AckEvent 对应 ChirpStack 的 "ack" 事件。
type AckEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
Acknowledged bool `json:"acknowledged"` // 是否已确认
FCntDown uint32 `json:"f_cnt_down"` // 下行帧计数器
QueueItemID string `json:"queue_item_id"` // 队列项ID
}
// TxAckEvent 对应 ChirpStack 的 "txack" 事件。
type TxAckEvent struct {
DownlinkID uint32 `json:"downlink_id"` // 下行ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
FCntDown uint32 `json:"f_cnt_down"` // 下行帧计数器
GatewayID string `json:"gateway_id"` // 网关ID
QueueItemID string `json:"queue_item_id"` // 队列项ID
TxInfo DownlinkTxInfo `json:"tx_info"` // 下行发送信息
}
// StatusEvent 对应 ChirpStack 的 "status" 事件。
type StatusEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
Margin int `json:"margin"` // 链路预算余量 (dB)
ExternalPower bool `json:"external_power_source"` // 设备是否连接外部电源
BatteryLevel float32 `json:"battery_level"` // 电池剩余电量
BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电池电量是否不可用
}
// LogEvent 对应 ChirpStack 的 "log" 事件。
type LogEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
Level string `json:"level"` // 日志级别 (e.g., INFO, WARNING, ERROR)
Code string `json:"code"` // 日志代码
Description string `json:"description"` // 日志描述
Context map[string]string `json:"context"` // 上下文信息
}
// LocationEvent 对应 ChirpStack 的 "location" 事件。
type LocationEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
Location ResolvedLocation `json:"location"` // 解析后的位置信息
}
// IntegrationEvent 对应 ChirpStack 的 "integration" 事件。
type IntegrationEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
IntegrationName string `json:"integration_name"` // 集成名称
EventType string `json:"event_type,omitempty"` // 事件类型
Object json.RawMessage `json:"object"` // 集成事件的原始JSON负载
}

View File

@@ -69,8 +69,14 @@ func NewApplication(configPath string) (*Application, error) {
// 初始化执行日志仓库 // 初始化执行日志仓库
executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB()) executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB())
// 初始化传感器数据仓库
sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB())
// 初始化命令下发历史仓库
deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB())
// 初始化设备上行监听器 // 初始化设备上行监听器
listenHandler := transport.NewChirpStackListener(logger) listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo)
// 初始化计划触发器管理器 // 初始化计划触发器管理器
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
@@ -79,7 +85,7 @@ func NewApplication(configPath string) (*Application, error) {
executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers)
// 初始化 API 服务器 // 初始化 API 服务器
apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService, listenHandler, analysisPlanTaskManager) apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager)
// 组装 Application 对象 // 组装 Application 对象
app := &Application{ app := &Application{
@@ -258,5 +264,7 @@ func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Stora
if err := storage.Migrate(models.GetAllModels()...); err != nil { if err := storage.Migrate(models.GetAllModels()...); err != nil {
return nil, fmt.Errorf("数据库迁移失败: %w", err) return nil, fmt.Errorf("数据库迁移失败: %w", err)
} }
logger.Info("数据库初始化完成。")
return storage, nil return storage, nil
} }

View File

@@ -81,6 +81,9 @@ type DatabaseConfig struct {
// SSLMode SSL模式 // SSLMode SSL模式
SSLMode string `yaml:"sslmode"` SSLMode string `yaml:"sslmode"`
// IsTimescaleDB is timescaledb
IsTimescaleDB bool `yaml:"is_timescaledb"`
// MaxOpenConns 最大开放连接数 // MaxOpenConns 最大开放连接数
MaxOpenConns int `yaml:"max_open_conns"` MaxOpenConns int `yaml:"max_open_conns"`

View File

@@ -16,6 +16,7 @@ import (
// 使用GORM作为ORM库 // 使用GORM作为ORM库
type PostgresStorage struct { type PostgresStorage struct {
db *gorm.DB db *gorm.DB
isTimescaleDB bool
connectionString string connectionString string
maxOpenConns int maxOpenConns int
maxIdleConns int maxIdleConns int
@@ -25,9 +26,10 @@ type PostgresStorage struct {
// NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例 // NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例
// 它接收一个 logger 实例,而不是自己创建 // 它接收一个 logger 实例,而不是自己创建
func NewPostgresStorage(connectionString string, maxOpenConns, maxIdleConns, connMaxLifetime int, logger *logs.Logger) *PostgresStorage { func NewPostgresStorage(connectionString string, isTimescaleDB bool, maxOpenConns, maxIdleConns, connMaxLifetime int, logger *logs.Logger) *PostgresStorage {
return &PostgresStorage{ return &PostgresStorage{
connectionString: connectionString, connectionString: connectionString,
isTimescaleDB: isTimescaleDB,
maxOpenConns: maxOpenConns, maxOpenConns: maxOpenConns,
maxIdleConns: maxIdleConns, maxIdleConns: maxIdleConns,
connMaxLifetime: connMaxLifetime, connMaxLifetime: connMaxLifetime,
@@ -116,5 +118,77 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error {
return fmt.Errorf("数据库表结构迁移失败: %w", err) return fmt.Errorf("数据库表结构迁移失败: %w", err)
} }
ps.logger.Info("数据库表结构迁移完成") ps.logger.Info("数据库表结构迁移完成")
// -- 处理gorm做不到的初始化逻辑 --
if err := ps.creatingIndex(); err != nil {
return err
}
// 如果是 TimescaleDB, 则将 sensor_data 转换为 hypertable
if ps.isTimescaleDB {
ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换")
if err := ps.creatingHyperTable(); err != nil {
return err
}
}
return nil
}
// creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表
func (ps *PostgresStorage) creatingHyperTable() error {
// 将 sensor_data 转换为超表
// 使用 if_not_exists => TRUE 保证幂等性
// 'time' 是 SensorData 模型中定义的时间列
sqlSensorData := "SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);"
if err := ps.db.Exec(sqlSensorData).Error; err != nil {
ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err)
return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err)
}
ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)")
// 将 device_command_log 转换为超表
// 'sent_at' 是 DeviceCommandLog 模型中定义的时间列
sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', if_not_exists => TRUE);"
if err := ps.db.Exec(sqlDeviceCommandLogs).Error; err != nil {
ps.logger.Errorw("将 device_command_log 转换为超表失败", "error", err)
return fmt.Errorf("将 device_command_log 转换为超表失败: %w", err)
}
ps.logger.Info("成功将 device_command_log 转换为超表 (或已转换)")
return nil
}
// creatingIndex 用于创建gorm无法处理的索引, 如gin索引
func (ps *PostgresStorage) creatingIndex() error {
// 使用 IF NOT EXISTS 保证幂等性
// 如果索引已存在,此命令不会报错
// 为 sensor_data 表的 data 字段创建 GIN 索引
ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引")
ginSensorDataIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);"
if err := ps.db.Exec(ginSensorDataIndexSQL).Error; err != nil {
ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err)
return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err)
}
ps.logger.Info("成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)")
// 为 tasks.parameters 创建 GIN 索引
ps.logger.Info("正在为 tasks 表的 parameters 字段创建 GIN 索引")
taskGinIndexSQL := "CREATE INDEX IF NOT EXISTS idx_tasks_parameters_gin ON tasks USING GIN (parameters);"
if err := ps.db.Exec(taskGinIndexSQL).Error; err != nil {
ps.logger.Errorw("为 tasks 的 parameters 字段创建 GIN 索引失败", "error", err)
return fmt.Errorf("为 tasks 的 parameters 字段创建 GIN 索引失败: %w", err)
}
ps.logger.Info("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)")
// 为 devices 表的 properties 字段创建 GIN 索引
ps.logger.Info("正在为 devices 表的 properties 字段创建 GIN 索引")
ginDevicePropertiesIndexSQL := "CREATE INDEX IF NOT EXISTS idx_devices_properties_gin ON devices USING GIN (properties);"
if err := ps.db.Exec(ginDevicePropertiesIndexSQL).Error; err != nil {
ps.logger.Errorw("为 devices 的 properties 字段创建 GIN 索引失败", "error", err)
return fmt.Errorf("为 devices 的 properties 字段创建 GIN 索引失败: %w", err)
}
ps.logger.Info("成功为 devices 的 properties 字段创建 GIN 索引 (或已存在)")
return nil return nil
} }

View File

@@ -42,9 +42,11 @@ func NewStorage(cfg config.DatabaseConfig, logger *logs.Logger) Storage {
cfg.SSLMode, cfg.SSLMode,
) )
// 当前默认返回PostgreSQL存储实现并将 logger 注入
// 当前默认返回PostgreSQL存储实现并将 logger 注入 // 当前默认返回PostgreSQL存储实现并将 logger 注入
return NewPostgresStorage( return NewPostgresStorage(
connectionString, connectionString,
cfg.IsTimescaleDB, // <--- 添加 IsTimescaleDB
cfg.MaxOpenConns, cfg.MaxOpenConns,
cfg.MaxIdleConns, cfg.MaxIdleConns,
cfg.ConnMaxLifetime, cfg.ConnMaxLifetime,

View File

@@ -0,0 +1,32 @@
package models
import (
"time"
)
// DeviceCommandLog 记录下行任务的下发情况和设备确认状态
type DeviceCommandLog struct {
// MessageID 是下行消息的唯一标识符。
// 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。
MessageID string `gorm:"primaryKey" json:"message_id"`
// DeviceID 是接收此下行任务的设备的ID。
// 对于 LoRaWAN这通常是区域主控设备的ID。
DeviceID uint `gorm:"not null;index" json:"device_id"`
// SentAt 记录下行任务最初发送的时间。
SentAt time.Time `gorm:"not null" json:"sent_at"`
// AcknowledgedAt 记录设备确认收到下行消息的时间。
// 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。
AcknowledgedAt *time.Time `json:"acknowledged_at"`
// ReceivedSuccess 表示设备是否成功接收到下行消息。
// true 表示设备已确认收到false 表示设备未确认收到或下发失败。
ReceivedSuccess bool `gorm:"not null" json:"received_success"`
}
// TableName 自定义 GORM 使用的数据库表名
func (DeviceCommandLog) TableName() string {
return "device_command_log"
}

View File

@@ -12,5 +12,7 @@ func GetAllModels() []interface{} {
&PlanExecutionLog{}, &PlanExecutionLog{},
&TaskExecutionLog{}, &TaskExecutionLog{},
&PendingTask{}, &PendingTask{},
&SensorData{},
&DeviceCommandLog{},
} }
} }

View File

@@ -53,8 +53,8 @@ type Plan struct {
Name string `gorm:"not null" json:"name"` Name string `gorm:"not null" json:"name"`
Description string `json:"description"` Description string `json:"description"`
ExecutionType PlanExecutionType `gorm:"not null" json:"execution_type"` ExecutionType PlanExecutionType `gorm:"not null;index" json:"execution_type"`
Status PlanStatus `gorm:"default:0" json:"status"` // 计划是否被启动 Status PlanStatus `gorm:"default:0;index" json:"status"` // 计划是否被启动
ExecuteNum uint `gorm:"default:0" json:"execute_num"` // 计划预期执行次数 ExecuteNum uint `gorm:"default:0" json:"execute_num"` // 计划预期执行次数
ExecuteCount uint `gorm:"default:0" json:"execute_count"` // 执行计数器 ExecuteCount uint `gorm:"default:0" json:"execute_count"` // 执行计数器

View File

@@ -21,7 +21,7 @@ type PendingTask struct {
Task *Task `gorm:"foreignKey:TaskID"` Task *Task `gorm:"foreignKey:TaskID"`
ExecuteAt time.Time `gorm:"index"` // 任务执行时间 ExecuteAt time.Time `gorm:"index"` // 任务执行时间
TaskExecutionLogID uint `gorm:"unique;not null"` // 对应的执行历史记录ID TaskExecutionLogID uint `gorm:"unique;not null;index"` // 对应的执行历史记录ID
// 通过 TaskExecutionLogID 关联到唯一的 TaskExecutionLog 记录 // 通过 TaskExecutionLogID 关联到唯一的 TaskExecutionLog 记录
// ON DELETE CASCADE 确保如果日志被删除,这个待办任务也会被自动清理 // ON DELETE CASCADE 确保如果日志被删除,这个待办任务也会被自动清理

View File

@@ -0,0 +1,74 @@
package models
import (
"time"
"gorm.io/datatypes"
)
// SensorDataType 定义了 SensorData 记录中 Data 字段的整体类型
type SensorDataType string
const (
SensorDataTypeSignalMetrics SensorDataType = "signal_metrics" // 信号强度
SensorDataTypeBatteryLevel SensorDataType = "battery_level" // 电池电量
SensorDataTypeTemperature SensorDataType = "temperature" // 温度
SensorDataTypeHumidity SensorDataType = "humidity" // 湿度
SensorDataTypeWeight SensorDataType = "weight" // 重量
)
// SignalMetrics 存储信号强度数据
type SignalMetrics struct {
RssiDbm int `json:"rssi_dbm"` // 绝对信号强度dBm受距离、障碍物影响
SnrDb float64 `json:"snr_db"` // 信号与噪声的相对比率dB由 RSSI 减去噪声地板Noise Floor
SensitivityDbm int `json:"sensitivity_dbm"` // 网关的最低检测阈值(dBm)
MarginDb int `json:"margin_db"` // SNR 相对于接收器灵敏度的余量, Margin = SNR - Sensitivity
}
// BatteryLevel 存储电池电量数据
type BatteryLevel struct {
BatteryLevelRatio float32 `json:"battery_level_ratio"` // 电量剩余百分比(%)
BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电量数据不可用
ExternalPower bool `json:"external_power"` // 是否使用外部电源
}
// TemperatureData 存储温度数据
type TemperatureData struct {
TemperatureCelsius float64 `json:"temperature_celsius"` // 温度值 (摄氏度)
}
// HumidityData 存储湿度数据
type HumidityData struct {
HumidityPercent float64 `json:"humidity_percent"` // 湿度值 (%)
}
// WeightData 存储重量数据
type WeightData struct {
WeightKilograms float64 `json:"weight_kilograms"` // 重量值 (公斤)
}
// SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。
type SensorData struct {
// Time 是数据记录的时间戳,作为复合主键的一部分。
// GORM 会将其映射到 'time' TIMESTAMPTZ 列。
Time time.Time `gorm:"primaryKey" json:"time"`
// DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。
// GORM 会将其映射到 'device_id' VARCHAR(50) 列。
DeviceID uint `gorm:"primaryKey" json:"device_id"`
// RegionalControllerID 是上报此数据的区域主控的ID。
// 我们为其添加了数据库索引以优化按区域查询的性能。
RegionalControllerID uint `json:"regional_controller_id"`
// SensorDataType 是传感数据的类型
SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"`
// Data 存储一个或多个传感器读数,格式为 JSON。
// GORM 会使用 'jsonb' 类型来创建此列。
Data datatypes.JSON `gorm:"type:jsonb" json:"data"`
}
func (SensorData) TableName() string {
return "sensor_data"
}

View File

@@ -0,0 +1,52 @@
package repository
import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
)
// DeviceCommandLogRepository 定义了设备下行命令历史记录的数据访问接口
type DeviceCommandLogRepository interface {
Create(record *models.DeviceCommandLog) error
FindByMessageID(messageID string) (*models.DeviceCommandLog, error)
// UpdateAcknowledgedAt 用于更新指定 MessageID 的下行命令记录的确认时间及接收成功状态。
// AcknowledgedAt 和 ReceivedSuccess 字段会被更新。
UpdateAcknowledgedAt(messageID string, acknowledgedAt time.Time, receivedSuccess bool) error
}
// gormDeviceCommandLogRepository 是 DeviceCommandLogRepository 接口的 GORM 实现
type gormDeviceCommandLogRepository struct {
db *gorm.DB
}
// NewGormDeviceCommandLogRepository 创建一个新的 DeviceCommandLogRepository GORM 实现
func NewGormDeviceCommandLogRepository(db *gorm.DB) DeviceCommandLogRepository {
return &gormDeviceCommandLogRepository{db: db}
}
// Create 实现 DeviceCommandLogRepository 接口的 Create 方法
func (r *gormDeviceCommandLogRepository) Create(record *models.DeviceCommandLog) error {
return r.db.Create(record).Error
}
// FindByMessageID 实现 DeviceCommandLogRepository 接口的 FindByMessageID 方法
func (r *gormDeviceCommandLogRepository) FindByMessageID(messageID string) (*models.DeviceCommandLog, error) {
var record models.DeviceCommandLog
if err := r.db.Where("message_id = ?", messageID).First(&record).Error; err != nil {
return nil, err
}
return &record, nil
}
// UpdateAcknowledgedAt 实现 DeviceCommandLogRepository 接口的 UpdateAcknowledgedAt 方法
func (r *gormDeviceCommandLogRepository) UpdateAcknowledgedAt(messageID string, acknowledgedAt time.Time, receivedSuccess bool) error {
// 使用 Updates 方法更新指定字段
return r.db.Model(&models.DeviceCommandLog{}).
Where("message_id = ?", messageID).
Updates(map[string]interface{}{
"acknowledged_at": acknowledgedAt,
"received_success": receivedSuccess,
}).Error
}

View File

@@ -32,6 +32,9 @@ type DeviceRepository interface {
// Delete 根据主键 ID 删除一个设备 // Delete 根据主键 ID 删除一个设备
Delete(id uint) error Delete(id uint) error
// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 (新增)
FindByDevEui(devEui string) (*models.Device, error)
} }
// gormDeviceRepository 是 DeviceRepository 的 GORM 实现 // gormDeviceRepository 是 DeviceRepository 的 GORM 实现
@@ -108,3 +111,13 @@ func (r *gormDeviceRepository) Update(device *models.Device) error {
func (r *gormDeviceRepository) Delete(id uint) error { func (r *gormDeviceRepository) Delete(id uint) error {
return r.db.Delete(&models.Device{}, id).Error return r.db.Delete(&models.Device{}, id).Error
} }
// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备
func (r *gormDeviceRepository) FindByDevEui(devEui string) (*models.Device, error) {
var device models.Device
// 使用 GORM 的 JSONB 查询语法: properties->>'lora_address'
if err := r.db.Where("properties->>'lora_address' = ?", devEui).First(&device).Error; err != nil {
return nil, err // 如果找不到或发生其他错误GORM 会返回错误
}
return &device, nil
}

View File

@@ -0,0 +1,27 @@
package repository
import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
)
// SensorDataRepository 定义了与传感器数据相关的数据库操作接口。
type SensorDataRepository interface {
Create(sensorData *models.SensorData) error
}
// gormSensorDataRepository 是 SensorDataRepository 的 GORM 实现。
type gormSensorDataRepository struct {
db *gorm.DB
}
// NewGormSensorDataRepository 创建一个新的 SensorDataRepository GORM 实现实例。
// 它直接接收一个 *gorm.DB 实例作为依赖,完全遵循项目中的既定模式。
func NewGormSensorDataRepository(db *gorm.DB) SensorDataRepository {
return &gormSensorDataRepository{db: db}
}
// Create 将一条新的传感器数据记录插入数据库。
func (r *gormSensorDataRepository) Create(sensorData *models.SensorData) error {
return r.db.Create(sensorData).Error
}

View File

@@ -4,6 +4,8 @@ import (
"time" "time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service"
"github.com/go-openapi/runtime" "github.com/go-openapi/runtime"
httptransport "github.com/go-openapi/runtime/client" httptransport "github.com/go-openapi/runtime/client"
@@ -33,11 +35,19 @@ type ChirpStackTransport struct {
authInfo runtime.ClientAuthInfoWriter authInfo runtime.ClientAuthInfoWriter
config ChirpStackConfig config ChirpStackConfig
deviceCommandLogRepo repository.DeviceCommandLogRepository
deviceRepo repository.DeviceRepository
logger *logs.Logger logger *logs.Logger
} }
// NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。 // NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。
func NewChirpStackTransport(config ChirpStackConfig, logger *logs.Logger) *ChirpStackTransport { func NewChirpStackTransport(
config ChirpStackConfig,
logger *logs.Logger,
deviceCommandLogRepo repository.DeviceCommandLogRepository,
deviceRepo repository.DeviceRepository,
) *ChirpStackTransport {
// 使用配置中的服务器地址创建一个 HTTP transport。 // 使用配置中的服务器地址创建一个 HTTP transport。
// 它会使用生成的客户端中定义的默认 base path 和 schemes。 // 它会使用生成的客户端中定义的默认 base path 和 schemes。
transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes) transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes)
@@ -53,6 +63,8 @@ func NewChirpStackTransport(config ChirpStackConfig, logger *logs.Logger) *Chirp
authInfo: authInfo, authInfo: authInfo,
config: config, config: config,
logger: logger, logger: logger,
deviceCommandLogRepo: deviceCommandLogRepo,
deviceRepo: deviceRepo,
} }
} }
@@ -80,12 +92,56 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error {
// 3. 调用生成的客户端方法来发送请求。 // 3. 调用生成的客户端方法来发送请求。
// c.authInfo 是您在 NewChirpStackTransport 中创建的认证信息。 // c.authInfo 是您在 NewChirpStackTransport 中创建的认证信息。
_, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo) resp, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo)
if err != nil { if err != nil {
c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err) c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err)
return err return err
} }
c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功", address) // 4. 成功发送后,尝试记录下行任务
messageID := ""
if resp != nil && resp.Payload != nil && resp.Payload.ID != "" { // 根据实际结构,使用 resp.Payload.ID
messageID = resp.Payload.ID
} else {
c.logger.Warnf("ChirpStack Enqueue 响应未包含 MessageID (ID),无法记录下行任务。设备: %s", address)
// 即使无法获取 MessageID也认为发送成功因为 ChirpStack Enqueue 成功了
return nil
}
// 调用私有方法记录下行任务
if err := c.recordDownlinkTask(address, messageID); err != nil {
// 记录失败不影响下行命令的发送成功
c.logger.Errorf("记录下行任务失败 (MessageID: %s, DevEui: %s): %v", messageID, address, err)
return nil
}
c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功并创建下行任务记录 (MessageID: %s)", address, messageID)
return nil
}
// recordDownlinkTask 记录下行任务到数据库
func (c *ChirpStackTransport) recordDownlinkTask(devEui string, messageID string) error {
// 获取区域主控的内部 DeviceID
regionalController, err := c.deviceRepo.FindByDevEui(devEui)
if err != nil {
c.logger.Errorf("记录下行任务失败:无法通过 DevEui '%s' 找到区域主控设备: %v", devEui, err)
return err
}
// 创建 DeviceCommandLog
record := &models.DeviceCommandLog{
MessageID: messageID,
DeviceID: regionalController.ID,
SentAt: time.Now(),
AcknowledgedAt: nil, // 初始状态为未确认
}
if err := c.deviceCommandLogRepo.Create(record); err != nil {
c.logger.Errorf("创建下行任务记录失败 (MessageID: %s, DeviceID: %d): %v", messageID, regionalController.ID, err)
return err
}
c.logger.Infof("成功创建下行任务记录 (MessageID: %s, DeviceID: %d)", messageID, regionalController.ID)
return nil return nil
} }