From b668f3fbb51f266d673c3dbfc840fa85c3f6e600 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 24 Sep 2025 16:06:05 +0800 Subject: [PATCH 01/11] =?UTF-8?q?=E5=AE=9A=E4=B9=89=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E8=AE=B0=E5=BD=95=E6=98=AF=E4=B8=8D=E6=98=AF?= =?UTF-8?q?timescaledb?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 ++++++ config.yml | 1 + internal/infra/config/config.go | 3 +++ internal/infra/database/postgres.go | 4 +++- internal/infra/database/storage.go | 2 ++ internal/infra/models/SensorData.go | 30 +++++++++++++++++++++++++++++ internal/infra/models/models.go | 1 + 7 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 internal/infra/models/SensorData.go diff --git a/README.md b/README.md index bba3f97..fbe6388 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,11 @@ # 猪场管理系统 +## 安装说明 + +### 推荐使用 TimescaleDB + +TimescaleDB 是基于 PostgreSQL 的开源数据库, 专门为处理时序数据而设计的。可以应对后续传海量传感器数据 + ## 功能介绍 ### 一. 猪舍控制 diff --git a/config.yml b/config.yml index 79bff63..33d0652 100644 --- a/config.yml +++ b/config.yml @@ -29,6 +29,7 @@ database: password: "pig-farm-controller" dbname: "pig-farm-controller" sslmode: "disable" # 在生产环境中建议使用 "require" + is_timescaledb: false max_open_conns: 25 # 最大开放连接数 max_idle_conns: 10 # 最大空闲连接数 conn_max_lifetime: 600 # 连接最大生命周期(秒) diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index 273e37c..4944f99 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -81,6 +81,9 @@ type DatabaseConfig struct { // SSLMode SSL模式 SSLMode string `yaml:"sslmode"` + // IsTimescaleDB is timescaledb + IsTimescaleDB bool `yaml:"is_timescaledb"` + // MaxOpenConns 最大开放连接数 MaxOpenConns int `yaml:"max_open_conns"` diff --git a/internal/infra/database/postgres.go b/internal/infra/database/postgres.go index c5df1b3..5d77cf9 100644 --- a/internal/infra/database/postgres.go +++ b/internal/infra/database/postgres.go @@ -16,6 +16,7 @@ import ( // 使用GORM作为ORM库 type PostgresStorage struct { db *gorm.DB + isTimescaleDB bool connectionString string maxOpenConns int maxIdleConns int @@ -25,9 +26,10 @@ type PostgresStorage struct { // NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例 // 它接收一个 logger 实例,而不是自己创建 -func NewPostgresStorage(connectionString string, maxOpenConns, maxIdleConns, connMaxLifetime int, logger *logs.Logger) *PostgresStorage { +func NewPostgresStorage(connectionString string, isTimescaleDB bool, maxOpenConns, maxIdleConns, connMaxLifetime int, logger *logs.Logger) *PostgresStorage { return &PostgresStorage{ connectionString: connectionString, + isTimescaleDB: isTimescaleDB, maxOpenConns: maxOpenConns, maxIdleConns: maxIdleConns, connMaxLifetime: connMaxLifetime, diff --git a/internal/infra/database/storage.go b/internal/infra/database/storage.go index 72302d7..ceeb84c 100644 --- a/internal/infra/database/storage.go +++ b/internal/infra/database/storage.go @@ -42,9 +42,11 @@ func NewStorage(cfg config.DatabaseConfig, logger *logs.Logger) Storage { cfg.SSLMode, ) + // 当前默认返回PostgreSQL存储实现,并将 logger 注入 // 当前默认返回PostgreSQL存储实现,并将 logger 注入 return NewPostgresStorage( connectionString, + cfg.IsTimescaleDB, // <--- 添加 IsTimescaleDB cfg.MaxOpenConns, cfg.MaxIdleConns, cfg.ConnMaxLifetime, diff --git a/internal/infra/models/SensorData.go b/internal/infra/models/SensorData.go new file mode 100644 index 0000000..1fdb548 --- /dev/null +++ b/internal/infra/models/SensorData.go @@ -0,0 +1,30 @@ +package models + +import ( + "time" + + "gorm.io/datatypes" +) + +// SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 +type SensorData struct { + // Time 是数据记录的时间戳,作为复合主键的一部分。 + // GORM 会将其映射到 'time' TIMESTAMPTZ 列。 + Time time.Time `gorm:"primaryKey"` + + // DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。 + // GORM 会将其映射到 'device_id' VARCHAR(50) 列。 + DeviceID string `gorm:"primaryKey;size:50"` + + // RegionalControllerID 是上报此数据的区域主控的ID。 + // 我们为其添加了数据库索引以优化按区域查询的性能。 + RegionalControllerID string `gorm:"size:50;index"` + + // Data 存储一个或多个传感器读数,格式为 JSON。 + // GORM 会使用 'jsonb' 类型来创建此列。 + Data datatypes.JSON `gorm:"type:jsonb"` +} + +func (SensorData) TableName() string { + return "sensor_data" +} diff --git a/internal/infra/models/models.go b/internal/infra/models/models.go index 4d542da..812be1a 100644 --- a/internal/infra/models/models.go +++ b/internal/infra/models/models.go @@ -12,5 +12,6 @@ func GetAllModels() []interface{} { &PlanExecutionLog{}, &TaskExecutionLog{}, &PendingTask{}, + &SensorData{}, } } -- 2.49.1 From 47b8c5bc651f27ce4763d8ea86fc3c4326b37c42 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 24 Sep 2025 16:34:16 +0800 Subject: [PATCH 02/11] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20timescaledb=20?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=E5=92=8Cgin=E7=B4=A2?= =?UTF-8?q?=E5=BC=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/infra/database/postgres.go | 26 ++++++++++++++++++++++++++ internal/infra/models/SensorData.go | 8 ++++---- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/internal/infra/database/postgres.go b/internal/infra/database/postgres.go index 5d77cf9..6837f62 100644 --- a/internal/infra/database/postgres.go +++ b/internal/infra/database/postgres.go @@ -118,5 +118,31 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error { return fmt.Errorf("数据库表结构迁移失败: %w", err) } ps.logger.Info("数据库表结构迁移完成") + + // -- 处理gorm做不到的初始化逻辑 -- + // 创建GIN索引(用于优化JSONB查询) + ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引") + // 使用 IF NOT EXISTS 保证幂等性 + // 如果索引已存在,此命令不会报错 + ginIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);" + if err := ps.db.Exec(ginIndexSQL).Error; err != nil { + ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err) + return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err) + } + ps.logger.Info("成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)") + + // 如果是 TimescaleDB, 则将 sensor_data 转换为 hypertable + if ps.isTimescaleDB { + ps.logger.Info("检测到 TimescaleDB, 准备转换 sensor_data 为超表") + // 使用 if_not_exists => TRUE 保证幂等性 + // 如果 sensor_data 已经是超表,此命令不会报错 + // 'time' 是 SensorData 模型中定义的时间列 + sql := "SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);" + if err := ps.db.Exec(sql).Error; err != nil { + ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err) + return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err) + } + ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)") + } return nil } diff --git a/internal/infra/models/SensorData.go b/internal/infra/models/SensorData.go index 1fdb548..0697553 100644 --- a/internal/infra/models/SensorData.go +++ b/internal/infra/models/SensorData.go @@ -10,19 +10,19 @@ import ( type SensorData struct { // Time 是数据记录的时间戳,作为复合主键的一部分。 // GORM 会将其映射到 'time' TIMESTAMPTZ 列。 - Time time.Time `gorm:"primaryKey"` + Time time.Time `gorm:"primaryKey" json:"time"` // DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。 // GORM 会将其映射到 'device_id' VARCHAR(50) 列。 - DeviceID string `gorm:"primaryKey;size:50"` + DeviceID uint `gorm:"primaryKey" json:"device_id"` // RegionalControllerID 是上报此数据的区域主控的ID。 // 我们为其添加了数据库索引以优化按区域查询的性能。 - RegionalControllerID string `gorm:"size:50;index"` + RegionalControllerID uint `json:"regional_controller_id"` // Data 存储一个或多个传感器读数,格式为 JSON。 // GORM 会使用 'jsonb' 类型来创建此列。 - Data datatypes.JSON `gorm:"type:jsonb"` + Data datatypes.JSON `gorm:"type:jsonb" json:"data"` } func (SensorData) TableName() string { -- 2.49.1 From 2070653f2f23c8e18d86c2bd9ae7a6095fb3d0fa Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 24 Sep 2025 16:48:41 +0800 Subject: [PATCH 03/11] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=B4=A2=E5=BC=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/infra/database/postgres.go | 35 +++++++++++++++++++++-------- internal/infra/models/plan.go | 4 ++-- internal/infra/models/schedule.go | 4 ++-- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/internal/infra/database/postgres.go b/internal/infra/database/postgres.go index 6837f62..0ce5090 100644 --- a/internal/infra/database/postgres.go +++ b/internal/infra/database/postgres.go @@ -120,16 +120,9 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error { ps.logger.Info("数据库表结构迁移完成") // -- 处理gorm做不到的初始化逻辑 -- - // 创建GIN索引(用于优化JSONB查询) - ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引") - // 使用 IF NOT EXISTS 保证幂等性 - // 如果索引已存在,此命令不会报错 - ginIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);" - if err := ps.db.Exec(ginIndexSQL).Error; err != nil { - ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err) - return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err) + if err := ps.creatingIndex(); err != nil { + return err } - ps.logger.Info("成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)") // 如果是 TimescaleDB, 则将 sensor_data 转换为 hypertable if ps.isTimescaleDB { @@ -146,3 +139,27 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error { } return nil } + +// creatingIndex 用于创建gorm无法处理的索引, 如gin索引 +func (ps *PostgresStorage) creatingIndex() error { + // 创建GIN索引(用于优化JSONB查询) + ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引") + // 使用 IF NOT EXISTS 保证幂等性 + // 如果索引已存在,此命令不会报错 + ginIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);" + if err := ps.db.Exec(ginIndexSQL).Error; err != nil { + ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err) + return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err) + } + ps.logger.Info("成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)") + + // 为 tasks.parameters 创建 GIN 索引 + ps.logger.Info("正在为 tasks 表的 parameters 字段创建 GIN 索引") + taskGinIndexSQL := "CREATE INDEX IF NOT EXISTS idx_tasks_parameters_gin ON tasks USING GIN (parameters);" + if err := ps.db.Exec(taskGinIndexSQL).Error; err != nil { + ps.logger.Errorw("为 tasks 的 parameters 字段创建 GIN 索引失败", "error", err) + return fmt.Errorf("为 tasks 的 parameters 字段创建 GIN 索引失败: %w", err) + } + ps.logger.Info("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)") + return nil +} diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 1d46c4c..516f907 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -53,8 +53,8 @@ type Plan struct { Name string `gorm:"not null" json:"name"` Description string `json:"description"` - ExecutionType PlanExecutionType `gorm:"not null" json:"execution_type"` - Status PlanStatus `gorm:"default:0" json:"status"` // 计划是否被启动 + ExecutionType PlanExecutionType `gorm:"not null;index" json:"execution_type"` + Status PlanStatus `gorm:"default:0;index" json:"status"` // 计划是否被启动 ExecuteNum uint `gorm:"default:0" json:"execute_num"` // 计划预期执行次数 ExecuteCount uint `gorm:"default:0" json:"execute_count"` // 执行计数器 diff --git a/internal/infra/models/schedule.go b/internal/infra/models/schedule.go index 40e43b1..12a6384 100644 --- a/internal/infra/models/schedule.go +++ b/internal/infra/models/schedule.go @@ -20,8 +20,8 @@ type PendingTask struct { // GORM 会根据 TaskID 字段自动填充此关联 Task *Task `gorm:"foreignKey:TaskID"` - ExecuteAt time.Time `gorm:"index"` // 任务执行时间 - TaskExecutionLogID uint `gorm:"unique;not null"` // 对应的执行历史记录ID + ExecuteAt time.Time `gorm:"index"` // 任务执行时间 + TaskExecutionLogID uint `gorm:"unique;not null;index"` // 对应的执行历史记录ID // 通过 TaskExecutionLogID 关联到唯一的 TaskExecutionLog 记录 // ON DELETE CASCADE 确保如果日志被删除,这个待办任务也会被自动清理 -- 2.49.1 From 3a030f5bca48334cef0234c61dd0121863531ec7 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 24 Sep 2025 18:09:29 +0800 Subject: [PATCH 04/11] =?UTF-8?q?=E5=A2=9E=E5=8A=A0pprof?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yml | 4 ++-- internal/app/api/api.go | 28 ++++++++++++++++++++++++++-- internal/core/application.go | 2 ++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/config.yml b/config.yml index 33d0652..2b428db 100644 --- a/config.yml +++ b/config.yml @@ -8,7 +8,7 @@ app: # HTTP 服务配置 server: port: 8086 - mode: "debug" # Gin 运行模式: "debug", "release", "test" + mode: "release" # Gin 运行模式: "debug", "release", "test" # 日志配置 log: @@ -29,7 +29,7 @@ database: password: "pig-farm-controller" dbname: "pig-farm-controller" sslmode: "disable" # 在生产环境中建议使用 "require" - is_timescaledb: false + is_timescaledb: true max_open_conns: 25 # 最大开放连接数 max_idle_conns: 10 # 最大空闲连接数 conn_max_lifetime: 600 # 连接最大生命周期(秒) diff --git a/internal/app/api/api.go b/internal/app/api/api.go index 56f1dc8..6d2dbf9 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -12,6 +12,7 @@ import ( "context" "fmt" "net/http" + "net/http/pprof" "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/controller/device" @@ -99,6 +100,7 @@ func (a *API) setupRoutes() { userGroup.POST("", a.userController.CreateUser) // 注册创建用户接口 (POST /api/v1/users) userGroup.POST("/login", a.userController.Login) // 注册用户登录接口 (POST /api/v1/users/login) } + a.logger.Info("用户相关接口注册成功") // 设备相关路由组 deviceGroup := v1.Group("/devices") @@ -109,6 +111,7 @@ func (a *API) setupRoutes() { deviceGroup.PUT("/:id", a.deviceController.UpdateDevice) deviceGroup.DELETE("/:id", a.deviceController.DeleteDevice) } + a.logger.Info("设备相关接口注册成功") // 计划相关路由组 planGroup := v1.Group("/plans") @@ -121,17 +124,38 @@ func (a *API) setupRoutes() { planGroup.POST("/:id/start", a.planController.StartPlan) planGroup.POST("/:id/stop", a.planController.StopPlan) } + a.logger.Info("计划相关接口注册成功") } + // 注册 pprof 路由 + pprofGroup := a.engine.Group("/debug/pprof") + { + pprofGroup.GET("/", gin.WrapF(pprof.Index)) + pprofGroup.GET("/cmdline", gin.WrapF(pprof.Cmdline)) + pprofGroup.GET("/profile", gin.WrapF(pprof.Profile)) + pprofGroup.POST("/symbol", gin.WrapF(pprof.Symbol)) + pprofGroup.GET("/symbol", gin.WrapF(pprof.Symbol)) + pprofGroup.GET("/trace", gin.WrapF(pprof.Trace)) + pprofGroup.GET("/allocs", gin.WrapH(pprof.Handler("allocs"))) + pprofGroup.GET("/block", gin.WrapH(pprof.Handler("block"))) + pprofGroup.GET("/goroutine", gin.WrapH(pprof.Handler("goroutine"))) + pprofGroup.GET("/heap", gin.WrapH(pprof.Handler("heap"))) + pprofGroup.GET("/mutex", gin.WrapH(pprof.Handler("mutex"))) + pprofGroup.GET("/threadcreate", gin.WrapH(pprof.Handler("threadcreate"))) + } + a.logger.Info("pprof 接口注册成功") + // 上行事件监听路由 a.engine.POST("/upstream", func(c *gin.Context) { h := a.listenHandler.Handler() h.ServeHTTP(c.Writer, c.Request) }) + a.logger.Info("上行事件监听接口注册成功") - // 添加 Swagger UI 路由 + // 添加 Swagger UI 路由, Swagger UI可在 /swagger/index.html 上找到 a.engine.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) - a.logger.Info("Swagger UI is available at /swagger/index.html") + a.logger.Info("Swagger UI 接口注册成功") + } // Start 启动 HTTP 服务器 diff --git a/internal/core/application.go b/internal/core/application.go index e8815a7..a446e11 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -258,5 +258,7 @@ func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Stora if err := storage.Migrate(models.GetAllModels()...); err != nil { return nil, fmt.Errorf("数据库迁移失败: %w", err) } + + logger.Info("数据库初始化完成。") return storage, nil } -- 2.49.1 From 17e2c6471ae89403acd5349d7e34ee57d11a837f Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 24 Sep 2025 19:13:15 +0800 Subject: [PATCH 05/11] =?UTF-8?q?=E5=AE=9A=E4=B9=89=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E7=BB=93=E6=9E=84=E4=BD=93=E5=92=8C=E6=8E=A5=E6=94=B6=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/service/transport/chirp_stack.go | 159 +++++++++++++++-- .../service/transport/chirp_stack_types.go | 165 ++++++++++++++++++ 2 files changed, 306 insertions(+), 18 deletions(-) create mode 100644 internal/app/service/transport/chirp_stack_types.go diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index 9828313..9bfb555 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -1,12 +1,25 @@ package transport import ( + "encoding/json" "io" "net/http" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" ) +// ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型 +const ( + eventUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。 + eventStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。 + eventJoin = "join" // 入网事件:当设备成功加入网络时触发。 + eventAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。 + eventTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。 + eventLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。 + eventLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。 + eventIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。 +) + // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 type ChirpStackListener struct { logger *logs.Logger @@ -18,35 +31,145 @@ func NewChirpStackListener(logger *logs.Logger) *ChirpStackListener { } } +// Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息 func (c *ChirpStackListener) Handler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + b, err := io.ReadAll(r.Body) if err != nil { c.logger.Errorf("读取请求体失败: %v", err) - - // TODO 直接崩溃不太合适 - panic(err) + http.Error(w, "failed to read body", http.StatusBadRequest) + return } event := r.URL.Query().Get("event") - switch event { - case "up": // 链路上行事件 - err = c.up(b) - if err != nil { - c.logger.Errorf("处理链路上行事件失败: %v", err) + w.WriteHeader(http.StatusOK) - // TODO 直接崩溃不太合适 - panic(err) - } - default: - c.logger.Errorf("未知的ChirpStack事件: %s", event) - } + // 将异步处理逻辑委托给 handler 方法 + go c.handler(b, event) } } -// up 处理链路上行事件 -func (c *ChirpStackListener) up(data []byte) error { - // TODO implement me - panic("implement me") +// handler 用于处理 ChirpStack 发送的事件 +func (c *ChirpStackListener) handler(data []byte, eventType string) { + switch eventType { + case eventUp: + var msg UpEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleUpEvent(&msg) + + case eventJoin: + var msg JoinEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleJoinEvent(&msg) + + case eventAck: + var msg AckEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleAckEvent(&msg) + + case eventTxAck: + var msg TxAckEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleTxAckEvent(&msg) + + case eventStatus: + var msg StatusEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleStatusEvent(&msg) + + case eventLog: + var msg LogEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleLogEvent(&msg) + + case eventLocation: + var msg LocationEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleLocationEvent(&msg) + + case eventIntegration: + var msg IntegrationEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleIntegrationEvent(&msg) + + default: + c.logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data)) + } +} + +// --- 业务处理函数 --- + +// handleUpEvent 处理上行数据事件 +func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { + c.logger.Infof("处理 'up' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleStatusEvent 处理设备状态事件 +func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { + c.logger.Infof("处接收到理 'status' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleAckEvent 处理下行确认事件 +func (c *ChirpStackListener) handleAckEvent(event *AckEvent) { + c.logger.Infof("接收到 'ack' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleLogEvent 处理日志事件 +func (c *ChirpStackListener) handleLogEvent(event *LogEvent) { + c.logger.Infof("接收到 'log' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleJoinEvent 处理入网事件 +func (c *ChirpStackListener) handleJoinEvent(event *JoinEvent) { + c.logger.Infof("接收到 'join' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleTxAckEvent 处理网关发送确认事件 +func (c *ChirpStackListener) handleTxAckEvent(event *TxAckEvent) { + c.logger.Infof("接收到 'txack' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleLocationEvent 处理位置事件 +func (c *ChirpStackListener) handleLocationEvent(event *LocationEvent) { + c.logger.Infof("接收到 'location' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleIntegrationEvent 处理集成事件 +func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) { + c.logger.Infof("接收到 'integration' 事件: %+v", event) + // 在这里添加您的业务逻辑 } diff --git a/internal/app/service/transport/chirp_stack_types.go b/internal/app/service/transport/chirp_stack_types.go new file mode 100644 index 0000000..4de752a --- /dev/null +++ b/internal/app/service/transport/chirp_stack_types.go @@ -0,0 +1,165 @@ +package transport + +import ( + "encoding/json" + "time" +) + +// --- 通用结构体 --- + +// DeviceInfo 包含了所有事件中通用的设备信息。 +type DeviceInfo struct { + TenantID string `json:"tenantId"` + TenantName string `json:"tenantName"` + ApplicationID string `json:"applicationId"` + ApplicationName string `json:"applicationName"` + DeviceProfileID string `json:"deviceProfileId"` + DeviceProfileName string `json:"deviceProfileName"` + DeviceName string `json:"deviceName"` + DevEui string `json:"devEui"` + Tags map[string]string `json:"tags"` +} + +// Location 包含了地理位置信息。 +type Location struct { + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Altitude float64 `json:"altitude"` +} + +// --- 可复用的子结构体 --- + +// UplinkRxInfo 包含了上行接收信息。 +type UplinkRxInfo struct { + GatewayID string `json:"gatewayId"` + UplinkID uint32 `json:"uplinkId"` + Time time.Time `json:"time"` + Rssi int `json:"rssi"` + Snr float64 `json:"snr"` + Channel int `json:"channel"` + Location *Location `json:"location"` + Context string `json:"context"` + Metadata map[string]string `json:"metadata"` +} + +// LoraModulationInfo 包含了 LoRa 调制的具体参数。 +type LoraModulationInfo struct { + Bandwidth int `json:"bandwidth"` + SpreadingFactor int `json:"spreadingFactor"` + CodeRate string `json:"codeRate"` + Polarization bool `json:"polarizationInvert,omitempty"` // omitempty 因为只在下行中出现 +} + +// Modulation 包含了具体的调制信息。 +type Modulation struct { + Lora LoraModulationInfo `json:"lora"` +} + +// UplinkTxInfo 包含了上行发送信息。 +type UplinkTxInfo struct { + Frequency int `json:"frequency"` + Modulation Modulation `json:"modulation"` +} + +// DownlinkTxInfo 包含了下行发送信息。 +type DownlinkTxInfo struct { + Frequency int `json:"frequency"` + Power int `json:"power"` + Modulation Modulation `json:"modulation"` +} + +// ResolvedLocation 包含了地理位置解析结果。 +type ResolvedLocation struct { + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Altitude float64 `json:"altitude"` + Source string `json:"source"` // e.g. "GEO_RESOLVER_TDOA" + Accuracy int `json:"accuracy"` +} + +// --- 事件专属结构体 --- + +// UpEvent 对应 ChirpStack 的 "up" 事件。 +type UpEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + DevAddr string `json:"devAddr"` + ADR bool `json:"adr"` + DR int `json:"dr"` + FCnt uint32 `json:"fCnt"` + FPort uint8 `json:"fPort"` + Confirmed bool `json:"confirmed"` + Data string `json:"data"` // Base64 编码的原始数据 + Object json.RawMessage `json:"object"` // Codec 解码后的 JSON 对象 + RxInfo []UplinkRxInfo `json:"rxInfo"` + TxInfo UplinkTxInfo `json:"txInfo"` +} + +// JoinEvent 对应 ChirpStack 的 "join" 事件。 +type JoinEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + DevAddr string `json:"devAddr"` +} + +// AckEvent 对应 ChirpStack 的 "ack" 事件。 +type AckEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + Acknowledged bool `json:"acknowledged"` + FCntDown uint32 `json:"fCntDown"` + QueueItemID string `json:"queueItemId"` // 关键字段,用于关联下行指令 +} + +// TxAckEvent 对应 ChirpStack 的 "txack" 事件。 +type TxAckEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + FCntDown uint32 `json:"fCntDown"` + GatewayID string `json:"gatewayId"` + QueueItemID string `json:"queueItemId"` // 关键字段,用于关联下行指令 + TxInfo DownlinkTxInfo `json:"txInfo"` +} + +// StatusEvent 对应 ChirpStack 的 "status" 事件。 +type StatusEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + Margin int `json:"margin"` // 信号余量,可以近似看作 SNR + ExternalPower bool `json:"externalPowerSource"` + BatteryLevel float32 `json:"batteryLevel"` // 电池电量百分比 + BatteryLevelUnavailable bool `json:"batteryLevelUnavailable"` +} + +// LogEvent 对应 ChirpStack 的 "log" 事件。 +type LogEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + Level string `json:"level"` // 日志级别, e.g., "INFO", "WARNING", "ERROR" + Code string `json:"code"` // 日志代码, e.g., "UPLINK_F_CNT_RETRANSMISSION" + Description string `json:"description"` + Context map[string]string `json:"context"` +} + +// LocationEvent 对应 ChirpStack 的 "location" 事件。 +type LocationEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + Location ResolvedLocation `json:"location"` +} + +// IntegrationEvent 对应 ChirpStack 的 "integration" 事件。 +type IntegrationEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + IntegrationName string `json:"integrationName"` + Object json.RawMessage `json:"object"` +} -- 2.49.1 From 53dbe41d7bf3f2bd6b28465fe24dabb557f64545 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 24 Sep 2025 20:26:26 +0800 Subject: [PATCH 06/11] =?UTF-8?q?=E5=A4=84=E7=90=86=20ChirpStack=20?= =?UTF-8?q?=E7=9A=84=20log=20=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/service/transport/chirp_stack.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index 9bfb555..60f949b 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -146,7 +146,24 @@ func (c *ChirpStackListener) handleAckEvent(event *AckEvent) { // handleLogEvent 处理日志事件 func (c *ChirpStackListener) handleLogEvent(event *LogEvent) { - c.logger.Infof("接收到 'log' 事件: %+v", event) + // 首先,打印完整的事件结构体,用于详细排查 + c.logger.Infof("接收到 'log' 事件的完整内容: %+v", event) + + // 接着,根据 ChirpStack 日志的级别,使用我们自己的 logger 对应级别来打印核心信息 + logMessage := "ChirpStack 日志: [%s] %s (DevEui: %s)" + switch event.Level { + case "INFO": + c.logger.Infof(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui) + case "WARNING": + c.logger.Warnf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui) + case "ERROR": + c.logger.Errorf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui) + default: + // 对于未知级别,使用 Warn 级别打印,并明确指出级别未知 + c.logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)", + event.Level, event.Code, event.Description, event.DeviceInfo.DevEui) + } + // 在这里添加您的业务逻辑 } -- 2.49.1 From f764ad8962438aac7082b0664677c3a6b074e21a Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 24 Sep 2025 20:43:40 +0800 Subject: [PATCH 07/11] =?UTF-8?q?=E5=AE=8C=E5=96=84=E7=BB=93=E6=9E=84?= =?UTF-8?q?=E4=BD=93=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/transport/chirp_stack_types.go | 113 +++++++++++------- 1 file changed, 73 insertions(+), 40 deletions(-) diff --git a/internal/app/service/transport/chirp_stack_types.go b/internal/app/service/transport/chirp_stack_types.go index 4de752a..defc48a 100644 --- a/internal/app/service/transport/chirp_stack_types.go +++ b/internal/app/service/transport/chirp_stack_types.go @@ -8,16 +8,18 @@ import ( // --- 通用结构体 --- // DeviceInfo 包含了所有事件中通用的设备信息。 +// 基于 aiserver.proto v4 (integration) type DeviceInfo struct { - TenantID string `json:"tenantId"` - TenantName string `json:"tenantName"` - ApplicationID string `json:"applicationId"` - ApplicationName string `json:"applicationName"` - DeviceProfileID string `json:"deviceProfileId"` - DeviceProfileName string `json:"deviceProfileName"` - DeviceName string `json:"deviceName"` - DevEui string `json:"devEui"` - Tags map[string]string `json:"tags"` + TenantID string `json:"tenantId"` + TenantName string `json:"tenantName"` + ApplicationID string `json:"applicationId"` + ApplicationName string `json:"applicationName"` + DeviceProfileID string `json:"deviceProfileId"` + DeviceProfileName string `json:"deviceProfileName"` + DeviceName string `json:"deviceName"` + DevEui string `json:"devEui"` + DeviceClassEnabled string `json:"deviceClassEnabled,omitempty"` // Class A, B, or C + Tags map[string]string `json:"tags"` } // Location 包含了地理位置信息。 @@ -29,6 +31,30 @@ type Location struct { // --- 可复用的子结构体 --- +// UplinkRelayRxInfo 包含了上行中继接收信息。 +type UplinkRelayRxInfo struct { + DevEui string `json:"devEui"` + Frequency uint32 `json:"frequency"` + Dr uint32 `json:"dr"` + Snr int32 `json:"snr"` + Rssi int32 `json:"rssi"` + WorChannel uint32 `json:"worChannel"` +} + +// KeyEnvelope 包装了一个加密的密钥。 +// 基于 common.proto +type KeyEnvelope struct { + KEKLabel string `json:"kekLabel,omitempty"` + AESKey string `json:"aesKey,omitempty"` // Base64 编码的加密密钥 +} + +// JoinServerContext 包含了 Join-Server 上下文。 +// 基于 common.proto +type JoinServerContext struct { + SessionKeyID string `json:"sessionKeyId"` + AppSKey *KeyEnvelope `json:"appSKey,omitempty"` +} + // UplinkRxInfo 包含了上行接收信息。 type UplinkRxInfo struct { GatewayID string `json:"gatewayId"` @@ -47,7 +73,7 @@ type LoraModulationInfo struct { Bandwidth int `json:"bandwidth"` SpreadingFactor int `json:"spreadingFactor"` CodeRate string `json:"codeRate"` - Polarization bool `json:"polarizationInvert,omitempty"` // omitempty 因为只在下行中出现 + Polarization bool `json:"polarizationInvert,omitempty"` } // Modulation 包含了具体的调制信息。 @@ -73,7 +99,7 @@ type ResolvedLocation struct { Latitude float64 `json:"latitude"` Longitude float64 `json:"longitude"` Altitude float64 `json:"altitude"` - Source string `json:"source"` // e.g. "GEO_RESOLVER_TDOA" + Source string `json:"source"` Accuracy int `json:"accuracy"` } @@ -81,27 +107,33 @@ type ResolvedLocation struct { // UpEvent 对应 ChirpStack 的 "up" 事件。 type UpEvent struct { - DeduplicationID string `json:"deduplicationId"` - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - DevAddr string `json:"devAddr"` - ADR bool `json:"adr"` - DR int `json:"dr"` - FCnt uint32 `json:"fCnt"` - FPort uint8 `json:"fPort"` - Confirmed bool `json:"confirmed"` - Data string `json:"data"` // Base64 编码的原始数据 - Object json.RawMessage `json:"object"` // Codec 解码后的 JSON 对象 - RxInfo []UplinkRxInfo `json:"rxInfo"` - TxInfo UplinkTxInfo `json:"txInfo"` + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + DevAddr string `json:"devAddr"` + ADR bool `json:"adr"` + DR int `json:"dr"` + FCnt uint32 `json:"fCnt"` + FPort uint8 `json:"fPort"` + Confirmed bool `json:"confirmed"` + Data string `json:"data"` + Object json.RawMessage `json:"object"` + RxInfo []UplinkRxInfo `json:"rxInfo"` + TxInfo UplinkTxInfo `json:"txInfo"` + RelayRxInfo *UplinkRelayRxInfo `json:"relayRxInfo,omitempty"` + JoinServerContext *JoinServerContext `json:"joinServerContext,omitempty"` + RegionConfigID string `json:"regionConfigId,omitempty"` } // JoinEvent 对应 ChirpStack 的 "join" 事件。 type JoinEvent struct { - DeduplicationID string `json:"deduplicationId"` - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - DevAddr string `json:"devAddr"` + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + DevAddr string `json:"devAddr"` + RelayRxInfo *UplinkRelayRxInfo `json:"relayRxInfo,omitempty"` + JoinServerContext *JoinServerContext `json:"joinServerContext,omitempty"` + RegionConfigID string `json:"regionConfigId,omitempty"` } // AckEvent 对应 ChirpStack 的 "ack" 事件。 @@ -111,18 +143,18 @@ type AckEvent struct { DeviceInfo DeviceInfo `json:"deviceInfo"` Acknowledged bool `json:"acknowledged"` FCntDown uint32 `json:"fCntDown"` - QueueItemID string `json:"queueItemId"` // 关键字段,用于关联下行指令 + QueueItemID string `json:"queueItemId"` } // TxAckEvent 对应 ChirpStack 的 "txack" 事件。 type TxAckEvent struct { - DeduplicationID string `json:"deduplicationId"` - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - FCntDown uint32 `json:"fCntDown"` - GatewayID string `json:"gatewayId"` - QueueItemID string `json:"queueItemId"` // 关键字段,用于关联下行指令 - TxInfo DownlinkTxInfo `json:"txInfo"` + DownlinkID uint32 `json:"downlinkId"` // 修改: 替换 DeduplicationID + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + FCntDown uint32 `json:"fCntDown"` + GatewayID string `json:"gatewayId"` + QueueItemID string `json:"queueItemId"` + TxInfo DownlinkTxInfo `json:"txInfo"` } // StatusEvent 对应 ChirpStack 的 "status" 事件。 @@ -130,9 +162,9 @@ type StatusEvent struct { DeduplicationID string `json:"deduplicationId"` Time time.Time `json:"time"` DeviceInfo DeviceInfo `json:"deviceInfo"` - Margin int `json:"margin"` // 信号余量,可以近似看作 SNR + Margin int `json:"margin"` ExternalPower bool `json:"externalPowerSource"` - BatteryLevel float32 `json:"batteryLevel"` // 电池电量百分比 + BatteryLevel float32 `json:"batteryLevel"` BatteryLevelUnavailable bool `json:"batteryLevelUnavailable"` } @@ -141,8 +173,8 @@ type LogEvent struct { DeduplicationID string `json:"deduplicationId"` Time time.Time `json:"time"` DeviceInfo DeviceInfo `json:"deviceInfo"` - Level string `json:"level"` // 日志级别, e.g., "INFO", "WARNING", "ERROR" - Code string `json:"code"` // 日志代码, e.g., "UPLINK_F_CNT_RETRANSMISSION" + Level string `json:"level"` + Code string `json:"code"` Description string `json:"description"` Context map[string]string `json:"context"` } @@ -161,5 +193,6 @@ type IntegrationEvent struct { Time time.Time `json:"time"` DeviceInfo DeviceInfo `json:"deviceInfo"` IntegrationName string `json:"integrationName"` + EventType string `json:"eventType,omitempty"` Object json.RawMessage `json:"object"` } -- 2.49.1 From 21fb9c7e57621e558ef53ca155c43c9484d8edf3 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 24 Sep 2025 21:53:18 +0800 Subject: [PATCH 08/11] =?UTF-8?q?=E8=AE=B0=E5=BD=95=E4=BF=A1=E5=8F=B7?= =?UTF-8?q?=E5=BC=BA=E5=BA=A6=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/api/api.go | 2 + internal/app/service/transport/chirp_stack.go | 80 ++++++- .../service/transport/chirp_stack_types.go | 214 +++++++++--------- internal/core/application.go | 7 +- internal/infra/models/SensorData.go | 30 --- internal/infra/models/sensor_data.go | 56 +++++ .../infra/repository/device_repository.go | 13 ++ .../repository/sensor_data_repository.go | 27 +++ 8 files changed, 284 insertions(+), 145 deletions(-) delete mode 100644 internal/infra/models/SensorData.go create mode 100644 internal/infra/models/sensor_data.go create mode 100644 internal/infra/repository/sensor_data_repository.go diff --git a/internal/app/api/api.go b/internal/app/api/api.go index 6d2dbf9..89c48d7 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -53,6 +53,8 @@ func NewAPI(cfg config.ServerConfig, userRepo repository.UserRepository, deviceRepository repository.DeviceRepository, planRepository repository.PlanRepository, + sensorDataRepository repository.SensorDataRepository, + executionLogRepository repository.ExecutionLogRepository, tokenService token.TokenService, listenHandler transport.ListenHandler, analysisTaskManager *task.AnalysisPlanTaskManager) *API { diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index 60f949b..9cd1831 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -4,8 +4,12 @@ import ( "encoding/json" "io" "net/http" + "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "gorm.io/datatypes" ) // ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型 @@ -22,12 +26,16 @@ const ( // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 type ChirpStackListener struct { - logger *logs.Logger + logger *logs.Logger + sensorDataRepo repository.SensorDataRepository + deviceRepo repository.DeviceRepository } -func NewChirpStackListener(logger *logs.Logger) *ChirpStackListener { +func NewChirpStackListener(logger *logs.Logger, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository) *ChirpStackListener { return &ChirpStackListener{ - logger: logger, + logger: logger, + sensorDataRepo: sensorDataRepo, + deviceRepo: deviceRepo, } } @@ -129,13 +137,44 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { // handleUpEvent 处理上行数据事件 func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { c.logger.Infof("处理 'up' 事件: %+v", event) + + // 记录信号强度 + // 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。 + // 因此,我们只取第一个 RxInfo 中的信号数据即可。 + if len(event.RxInfo) > 0 { + rx := event.RxInfo[0] // 取第一个接收到的网关信息 + + // 构建 SignalMetrics 结构体 + signalMetrics := models.SignalMetrics{ + RSSI: rx.Rssi, + SNR: rx.Snr, + } + c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + } else { + c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui) + } + // 在这里添加您的业务逻辑 } // handleStatusEvent 处理设备状态事件 func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { c.logger.Infof("处接收到理 'status' 事件: %+v", event) - // 在这里添加您的业务逻辑 + + // 记录信号强度 + signalMetrics := models.SignalMetrics{ + Margin: event.Margin, + } + c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + + // 记录 电量 + batteryLevel := models.BatteryLevel{ + BatteryLevel: event.BatteryLevel, + BatteryLevelUnavailable: event.BatteryLevelUnavailable, + ExternalPower: event.ExternalPower, + } + c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel) + } // handleAckEvent 处理下行确认事件 @@ -163,8 +202,6 @@ func (c *ChirpStackListener) handleLogEvent(event *LogEvent) { c.logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)", event.Level, event.Code, event.Description, event.DeviceInfo.DevEui) } - - // 在这里添加您的业务逻辑 } // handleJoinEvent 处理入网事件 @@ -190,3 +227,34 @@ func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) { c.logger.Infof("接收到 'integration' 事件: %+v", event) // 在这里添加您的业务逻辑 } + +// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。 +func (c *ChirpStackListener) recordSensorData(devEui string, eventTime time.Time, dataType models.SensorDataType, data interface{}) { + // 1. 查找设备 + device, err := c.deviceRepo.FindByDevEui(devEui) + if err != nil { + c.logger.Warnf("记录传感器数据失败:无法通过 DevEui '%s' 找到设备: %v", devEui, err) + return + } + + // 2. 序列化数据结构体为 JSON + jsonData, err := json.Marshal(data) + if err != nil { + c.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err) + return + } + + // 3. 构建 SensorData 模型 + sensorData := &models.SensorData{ + Time: eventTime, + DeviceID: device.ID, + RegionalControllerID: *device.ParentID, + SensorDataType: dataType, // 设置传感器数据类型 + Data: datatypes.JSON(jsonData), + } + + // 4. 调用仓库创建记录 + if err := c.sensorDataRepo.Create(sensorData); err != nil { + c.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err) + } +} diff --git a/internal/app/service/transport/chirp_stack_types.go b/internal/app/service/transport/chirp_stack_types.go index defc48a..a8b5bdd 100644 --- a/internal/app/service/transport/chirp_stack_types.go +++ b/internal/app/service/transport/chirp_stack_types.go @@ -10,189 +10,189 @@ import ( // DeviceInfo 包含了所有事件中通用的设备信息。 // 基于 aiserver.proto v4 (integration) type DeviceInfo struct { - TenantID string `json:"tenantId"` - TenantName string `json:"tenantName"` - ApplicationID string `json:"applicationId"` - ApplicationName string `json:"applicationName"` - DeviceProfileID string `json:"deviceProfileId"` - DeviceProfileName string `json:"deviceProfileName"` - DeviceName string `json:"deviceName"` - DevEui string `json:"devEui"` - DeviceClassEnabled string `json:"deviceClassEnabled,omitempty"` // Class A, B, or C - Tags map[string]string `json:"tags"` + TenantID string `json:"tenant_id"` // 租户ID + TenantName string `json:"tenant_name"` // 租户名称 + ApplicationID string `json:"application_id"` // 应用ID + ApplicationName string `json:"application_name"` // 应用名称 + DeviceProfileID string `json:"device_profile_id"` // 设备配置文件ID + DeviceProfileName string `json:"device_profile_name"` // 设备配置文件名称 + DeviceName string `json:"device_name"` // 设备名称 + DevEui string `json:"dev_eui"` // 设备EUI (十六进制编码) + DeviceClassEnabled string `json:"device_class_enabled,omitempty"` // 设备启用的LoRaWAN类别 (A, B, 或 C) + Tags map[string]string `json:"tags"` // 用户定义的标签 } // Location 包含了地理位置信息。 type Location struct { - Latitude float64 `json:"latitude"` - Longitude float64 `json:"longitude"` - Altitude float64 `json:"altitude"` + Latitude float64 `json:"latitude"` // 纬度 + Longitude float64 `json:"longitude"` // 经度 + Altitude float64 `json:"altitude"` // 海拔 } // --- 可复用的子结构体 --- // UplinkRelayRxInfo 包含了上行中继接收信息。 type UplinkRelayRxInfo struct { - DevEui string `json:"devEui"` - Frequency uint32 `json:"frequency"` - Dr uint32 `json:"dr"` - Snr int32 `json:"snr"` - Rssi int32 `json:"rssi"` - WorChannel uint32 `json:"worChannel"` + DevEui string `json:"dev_eui"` // 中继设备的DevEUI + Frequency uint32 `json:"frequency"` // 接收频率 + Dr uint32 `json:"dr"` // 数据速率 + Snr int32 `json:"snr"` // 信噪比 + Rssi int32 `json:"rssi"` // 接收信号强度指示 + WorChannel uint32 `json:"wor_channel"` // Work-on-Relay 通道 } // KeyEnvelope 包装了一个加密的密钥。 // 基于 common.proto type KeyEnvelope struct { - KEKLabel string `json:"kekLabel,omitempty"` - AESKey string `json:"aesKey,omitempty"` // Base64 编码的加密密钥 + KEKLabel string `json:"kek_label,omitempty"` // 密钥加密密钥 (KEK) 标签 + AESKey string `json:"aes_key,omitempty"` // Base64 编码的加密密钥 } // JoinServerContext 包含了 Join-Server 上下文。 // 基于 common.proto type JoinServerContext struct { - SessionKeyID string `json:"sessionKeyId"` - AppSKey *KeyEnvelope `json:"appSKey,omitempty"` + SessionKeyID string `json:"session_key_id"` // 会话密钥ID + AppSKey *KeyEnvelope `json:"app_s_key,omitempty"` // 应用会话密钥 } // UplinkRxInfo 包含了上行接收信息。 type UplinkRxInfo struct { - GatewayID string `json:"gatewayId"` - UplinkID uint32 `json:"uplinkId"` - Time time.Time `json:"time"` - Rssi int `json:"rssi"` - Snr float64 `json:"snr"` - Channel int `json:"channel"` - Location *Location `json:"location"` - Context string `json:"context"` - Metadata map[string]string `json:"metadata"` + GatewayID string `json:"gateway_id"` // 接收到上行数据的网关ID + UplinkID uint32 `json:"uplink_id"` // 上行ID + Time time.Time `json:"time"` // 接收时间 + Rssi int `json:"rssi"` // 接收信号强度指示 + Snr float64 `json:"snr"` // 信噪比 + Channel int `json:"channel"` // 接收通道 + Location *Location `json:"location"` // 网关位置 + Context string `json:"context"` // 上下文信息 + Metadata map[string]string `json:"metadata"` // 元数据 } // LoraModulationInfo 包含了 LoRa 调制的具体参数。 type LoraModulationInfo struct { - Bandwidth int `json:"bandwidth"` - SpreadingFactor int `json:"spreadingFactor"` - CodeRate string `json:"codeRate"` - Polarization bool `json:"polarizationInvert,omitempty"` + Bandwidth int `json:"bandwidth"` // 带宽 + SpreadingFactor int `json:"spreading_factor"` // 扩频因子 + CodeRate string `json:"code_rate"` // 编码率 + Polarization bool `json:"polarization_invert,omitempty"` // 极化反转 } // Modulation 包含了具体的调制信息。 type Modulation struct { - Lora LoraModulationInfo `json:"lora"` + Lora LoraModulationInfo `json:"lora"` // LoRa 调制信息 } // UplinkTxInfo 包含了上行发送信息。 type UplinkTxInfo struct { - Frequency int `json:"frequency"` - Modulation Modulation `json:"modulation"` + Frequency int `json:"frequency"` // 发送频率 + Modulation Modulation `json:"modulation"` // 调制信息 } // DownlinkTxInfo 包含了下行发送信息。 type DownlinkTxInfo struct { - Frequency int `json:"frequency"` - Power int `json:"power"` - Modulation Modulation `json:"modulation"` + Frequency int `json:"frequency"` // 发送频率 + Power int `json:"power"` // 发送功率 + Modulation Modulation `json:"modulation"` // 调制信息 } // ResolvedLocation 包含了地理位置解析结果。 type ResolvedLocation struct { - Latitude float64 `json:"latitude"` - Longitude float64 `json:"longitude"` - Altitude float64 `json:"altitude"` - Source string `json:"source"` - Accuracy int `json:"accuracy"` + Latitude float64 `json:"latitude"` // 纬度 + Longitude float64 `json:"longitude"` // 经度 + Altitude float64 `json:"altitude"` // 海拔 + Source string `json:"source"` // 位置来源 + Accuracy int `json:"accuracy"` // 精度 } // --- 事件专属结构体 --- // UpEvent 对应 ChirpStack 的 "up" 事件。 type UpEvent struct { - DeduplicationID string `json:"deduplicationId"` - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - DevAddr string `json:"devAddr"` - ADR bool `json:"adr"` - DR int `json:"dr"` - FCnt uint32 `json:"fCnt"` - FPort uint8 `json:"fPort"` - Confirmed bool `json:"confirmed"` - Data string `json:"data"` - Object json.RawMessage `json:"object"` - RxInfo []UplinkRxInfo `json:"rxInfo"` - TxInfo UplinkTxInfo `json:"txInfo"` - RelayRxInfo *UplinkRelayRxInfo `json:"relayRxInfo,omitempty"` - JoinServerContext *JoinServerContext `json:"joinServerContext,omitempty"` - RegionConfigID string `json:"regionConfigId,omitempty"` + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + DevAddr string `json:"dev_addr"` // 设备地址 + ADR bool `json:"adr"` // 自适应数据速率 (ADR) 是否启用 + DR int `json:"dr"` // 数据速率 + FCnt uint32 `json:"f_cnt"` // 帧计数器 + FPort uint8 `json:"f_port"` // 端口 + Confirmed bool `json:"confirmed"` // 是否是确认帧 + Data string `json:"data"` // Base64 编码的原始负载数据 + Object json.RawMessage `json:"object"` // 解码后的JSON对象负载 + RxInfo []UplinkRxInfo `json:"rx_info"` // 接收信息列表 + TxInfo UplinkTxInfo `json:"tx_info"` // 发送信息 + RelayRxInfo *UplinkRelayRxInfo `json:"relay_rx_info,omitempty"` // 中继接收信息 + JoinServerContext *JoinServerContext `json:"join_server_context,omitempty"` // Join-Server 上下文 + RegionConfigID string `json:"region_config_id,omitempty"` // 区域配置ID } // JoinEvent 对应 ChirpStack 的 "join" 事件。 type JoinEvent struct { - DeduplicationID string `json:"deduplicationId"` - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - DevAddr string `json:"devAddr"` - RelayRxInfo *UplinkRelayRxInfo `json:"relayRxInfo,omitempty"` - JoinServerContext *JoinServerContext `json:"joinServerContext,omitempty"` - RegionConfigID string `json:"regionConfigId,omitempty"` + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + DevAddr string `json:"dev_addr"` // 设备地址 + RelayRxInfo *UplinkRelayRxInfo `json:"relay_rx_info,omitempty"` // 中继接收信息 + JoinServerContext *JoinServerContext `json:"join_server_context,omitempty"` // Join-Server 上下文 + RegionConfigID string `json:"region_config_id,omitempty"` // 区域配置ID } // AckEvent 对应 ChirpStack 的 "ack" 事件。 type AckEvent struct { - DeduplicationID string `json:"deduplicationId"` - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - Acknowledged bool `json:"acknowledged"` - FCntDown uint32 `json:"fCntDown"` - QueueItemID string `json:"queueItemId"` + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + Acknowledged bool `json:"acknowledged"` // 是否已确认 + FCntDown uint32 `json:"f_cnt_down"` // 下行帧计数器 + QueueItemID string `json:"queue_item_id"` // 队列项ID } // TxAckEvent 对应 ChirpStack 的 "txack" 事件。 type TxAckEvent struct { - DownlinkID uint32 `json:"downlinkId"` // 修改: 替换 DeduplicationID - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - FCntDown uint32 `json:"fCntDown"` - GatewayID string `json:"gatewayId"` - QueueItemID string `json:"queueItemId"` - TxInfo DownlinkTxInfo `json:"txInfo"` + DownlinkID uint32 `json:"downlink_id"` // 下行ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + FCntDown uint32 `json:"f_cnt_down"` // 下行帧计数器 + GatewayID string `json:"gateway_id"` // 网关ID + QueueItemID string `json:"queue_item_id"` // 队列项ID + TxInfo DownlinkTxInfo `json:"tx_info"` // 下行发送信息 } // StatusEvent 对应 ChirpStack 的 "status" 事件。 type StatusEvent struct { - DeduplicationID string `json:"deduplicationId"` - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - Margin int `json:"margin"` - ExternalPower bool `json:"externalPowerSource"` - BatteryLevel float32 `json:"batteryLevel"` - BatteryLevelUnavailable bool `json:"batteryLevelUnavailable"` + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + Margin int `json:"margin"` // 链路预算余量 (dB) + ExternalPower bool `json:"external_power_source"` // 设备是否连接外部电源 + BatteryLevel float32 `json:"battery_level"` // 电池剩余电量 + BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电池电量是否不可用 } // LogEvent 对应 ChirpStack 的 "log" 事件。 type LogEvent struct { - DeduplicationID string `json:"deduplicationId"` - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - Level string `json:"level"` - Code string `json:"code"` - Description string `json:"description"` - Context map[string]string `json:"context"` + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + Level string `json:"level"` // 日志级别 (e.g., INFO, WARNING, ERROR) + Code string `json:"code"` // 日志代码 + Description string `json:"description"` // 日志描述 + Context map[string]string `json:"context"` // 上下文信息 } // LocationEvent 对应 ChirpStack 的 "location" 事件。 type LocationEvent struct { - DeduplicationID string `json:"deduplicationId"` - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - Location ResolvedLocation `json:"location"` + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + Location ResolvedLocation `json:"location"` // 解析后的位置信息 } // IntegrationEvent 对应 ChirpStack 的 "integration" 事件。 type IntegrationEvent struct { - DeduplicationID string `json:"deduplicationId"` - Time time.Time `json:"time"` - DeviceInfo DeviceInfo `json:"deviceInfo"` - IntegrationName string `json:"integrationName"` - EventType string `json:"eventType,omitempty"` - Object json.RawMessage `json:"object"` + DeduplicationID string `json:"deduplication_id"` // 去重ID + Time time.Time `json:"time"` // 事件时间 + DeviceInfo DeviceInfo `json:"device_info"` // 设备信息 + IntegrationName string `json:"integration_name"` // 集成名称 + EventType string `json:"event_type,omitempty"` // 事件类型 + Object json.RawMessage `json:"object"` // 集成事件的原始JSON负载 } diff --git a/internal/core/application.go b/internal/core/application.go index a446e11..80ee2ae 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -69,8 +69,11 @@ func NewApplication(configPath string) (*Application, error) { // 初始化执行日志仓库 executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB()) + // 初始化传感器数据仓库 + sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB()) + // 初始化设备上行监听器 - listenHandler := transport.NewChirpStackListener(logger) + listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo) // 初始化计划触发器管理器 analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) @@ -79,7 +82,7 @@ func NewApplication(configPath string) (*Application, error) { executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) // 初始化 API 服务器 - apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService, listenHandler, analysisPlanTaskManager) + apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager) // 组装 Application 对象 app := &Application{ diff --git a/internal/infra/models/SensorData.go b/internal/infra/models/SensorData.go deleted file mode 100644 index 0697553..0000000 --- a/internal/infra/models/SensorData.go +++ /dev/null @@ -1,30 +0,0 @@ -package models - -import ( - "time" - - "gorm.io/datatypes" -) - -// SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 -type SensorData struct { - // Time 是数据记录的时间戳,作为复合主键的一部分。 - // GORM 会将其映射到 'time' TIMESTAMPTZ 列。 - Time time.Time `gorm:"primaryKey" json:"time"` - - // DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。 - // GORM 会将其映射到 'device_id' VARCHAR(50) 列。 - DeviceID uint `gorm:"primaryKey" json:"device_id"` - - // RegionalControllerID 是上报此数据的区域主控的ID。 - // 我们为其添加了数据库索引以优化按区域查询的性能。 - RegionalControllerID uint `json:"regional_controller_id"` - - // Data 存储一个或多个传感器读数,格式为 JSON。 - // GORM 会使用 'jsonb' 类型来创建此列。 - Data datatypes.JSON `gorm:"type:jsonb" json:"data"` -} - -func (SensorData) TableName() string { - return "sensor_data" -} diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go new file mode 100644 index 0000000..93898ed --- /dev/null +++ b/internal/infra/models/sensor_data.go @@ -0,0 +1,56 @@ +package models + +import ( + "time" + + "gorm.io/datatypes" +) + +// SensorDataType 定义了 SensorData 记录中 Data 字段的整体类型 +type SensorDataType string + +const ( + SensorDataTypeSignalMetrics SensorDataType = "signal_metrics" // 信号强度 + SensorDataTypeBatteryLevel SensorDataType = "battery_level" // 电池电量 +) + +// SignalMetrics 存储信号强度数据 +type SignalMetrics struct { + RSSI int `json:"rssi"` // 绝对信号强度(dBm),受距离、障碍物影响 + SNR float64 `json:"snr"` // 信号与噪声的相对比率(dB),由 RSSI 减去噪声地板(Noise Floor) + Sensitivity int `json:"sensitivity"` // 网关的最低检测阈值(dBm) + Margin int `json:"margin"` // SNR 相对于接收器灵敏度的余量, Margin = SNR - Sensitivity +} + +// BatteryLevel 存储电池电量数据 +type BatteryLevel struct { + BatteryLevel float32 `json:"battery_level"` // 电量剩余百分比 + BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电量数据不可用 + ExternalPower bool `json:"external_power"` // 是否使用外部电源 +} + +// SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 +type SensorData struct { + // Time 是数据记录的时间戳,作为复合主键的一部分。 + // GORM 会将其映射到 'time' TIMESTAMPTZ 列。 + Time time.Time `gorm:"primaryKey" json:"time"` + + // DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。 + // GORM 会将其映射到 'device_id' VARCHAR(50) 列。 + DeviceID uint `gorm:"primaryKey" json:"device_id"` + + // RegionalControllerID 是上报此数据的区域主控的ID。 + // 我们为其添加了数据库索引以优化按区域查询的性能。 + RegionalControllerID uint `json:"regional_controller_id"` + + // SensorDataType 是传感数据的类型 + SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"` + + // Data 存储一个或多个传感器读数,格式为 JSON。 + // GORM 会使用 'jsonb' 类型来创建此列。 + Data datatypes.JSON `gorm:"type:jsonb" json:"data"` +} + +func (SensorData) TableName() string { + return "sensor_data" +} diff --git a/internal/infra/repository/device_repository.go b/internal/infra/repository/device_repository.go index b5554b3..5cc3ee8 100644 --- a/internal/infra/repository/device_repository.go +++ b/internal/infra/repository/device_repository.go @@ -32,6 +32,9 @@ type DeviceRepository interface { // Delete 根据主键 ID 删除一个设备 Delete(id uint) error + + // FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 (新增) + FindByDevEui(devEui string) (*models.Device, error) } // gormDeviceRepository 是 DeviceRepository 的 GORM 实现 @@ -108,3 +111,13 @@ func (r *gormDeviceRepository) Update(device *models.Device) error { func (r *gormDeviceRepository) Delete(id uint) error { return r.db.Delete(&models.Device{}, id).Error } + +// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 +func (r *gormDeviceRepository) FindByDevEui(devEui string) (*models.Device, error) { + var device models.Device + // 使用 GORM 的 JSONB 查询语法: properties->>'lora_address' + if err := r.db.Where("properties->>'lora_address' = ?", devEui).First(&device).Error; err != nil { + return nil, err // 如果找不到或发生其他错误,GORM 会返回错误 + } + return &device, nil +} diff --git a/internal/infra/repository/sensor_data_repository.go b/internal/infra/repository/sensor_data_repository.go new file mode 100644 index 0000000..b9b80a9 --- /dev/null +++ b/internal/infra/repository/sensor_data_repository.go @@ -0,0 +1,27 @@ +package repository + +import ( + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "gorm.io/gorm" +) + +// SensorDataRepository 定义了与传感器数据相关的数据库操作接口。 +type SensorDataRepository interface { + Create(sensorData *models.SensorData) error +} + +// gormSensorDataRepository 是 SensorDataRepository 的 GORM 实现。 +type gormSensorDataRepository struct { + db *gorm.DB +} + +// NewGormSensorDataRepository 创建一个新的 SensorDataRepository GORM 实现实例。 +// 它直接接收一个 *gorm.DB 实例作为依赖,完全遵循项目中的既定模式。 +func NewGormSensorDataRepository(db *gorm.DB) SensorDataRepository { + return &gormSensorDataRepository{db: db} +} + +// Create 将一条新的传感器数据记录插入数据库。 +func (r *gormSensorDataRepository) Create(sensorData *models.SensorData) error { + return r.db.Create(sensorData).Error +} -- 2.49.1 From cf53cdfe28c33bbeabe7e1d6de00e0708b677db3 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 24 Sep 2025 22:34:11 +0800 Subject: [PATCH 09/11] =?UTF-8?q?=E8=AE=B0=E5=BD=95=E6=B8=A9=E5=BA=A6?= =?UTF-8?q?=E6=B9=BF=E5=BA=A6=E7=A7=B0=E9=87=8D=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/service/transport/chirp_stack.go | 143 +++++++++++++----- internal/infra/models/sensor_data.go | 28 +++- 2 files changed, 131 insertions(+), 40 deletions(-) diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index 9cd1831..593f17a 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -1,6 +1,7 @@ package transport import ( + "encoding/base64" // 新增导入 "encoding/json" "io" "net/http" @@ -14,14 +15,14 @@ import ( // ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型 const ( - eventUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。 - eventStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。 - eventJoin = "join" // 入网事件:当设备成功加入网络时触发。 - eventAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。 - eventTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。 - eventLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。 - eventLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。 - eventIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。 + eventTypeUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。 + eventTypeStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。 + eventTypeJoin = "join" // 入网事件:当设备成功加入网络时触发。 + eventTypeAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。 + eventTypeTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。 + eventTypeLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。 + eventTypeLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。 + eventTypeIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。 ) // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 @@ -63,7 +64,7 @@ func (c *ChirpStackListener) Handler() http.HandlerFunc { // handler 用于处理 ChirpStack 发送的事件 func (c *ChirpStackListener) handler(data []byte, eventType string) { switch eventType { - case eventUp: + case eventTypeUp: var msg UpEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data)) @@ -71,7 +72,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleUpEvent(&msg) - case eventJoin: + case eventTypeJoin: var msg JoinEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data)) @@ -79,7 +80,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleJoinEvent(&msg) - case eventAck: + case eventTypeAck: var msg AckEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data)) @@ -87,7 +88,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleAckEvent(&msg) - case eventTxAck: + case eventTypeTxAck: var msg TxAckEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data)) @@ -95,7 +96,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleTxAckEvent(&msg) - case eventStatus: + case eventTypeStatus: var msg StatusEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data)) @@ -103,7 +104,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleStatusEvent(&msg) - case eventLog: + case eventTypeLog: var msg LogEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data)) @@ -111,7 +112,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleLogEvent(&msg) - case eventLocation: + case eventTypeLocation: var msg LocationEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data)) @@ -119,7 +120,7 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { } c.handleLocationEvent(&msg) - case eventIntegration: + case eventTypeIntegration: var msg IntegrationEvent if err := json.Unmarshal(data, &msg); err != nil { c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data)) @@ -134,6 +135,13 @@ func (c *ChirpStackListener) handler(data []byte, eventType string) { // --- 业务处理函数 --- +// GenericSensorReading 表示单个传感器读数,包含设备ID、类型和值。 +type GenericSensorReading struct { + DeviceID uint `json:"device_id"` // 传感器设备的ID + Type models.SensorDataType `json:"type"` // 传感器类型 (复用 models.SensorDataType) + Value float64 `json:"value"` // 传感器读数 +} + // handleUpEvent 处理上行数据事件 func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { c.logger.Infof("处理 'up' 事件: %+v", event) @@ -146,15 +154,77 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { // 构建 SignalMetrics 结构体 signalMetrics := models.SignalMetrics{ - RSSI: rx.Rssi, - SNR: rx.Snr, + RssiDbm: rx.Rssi, + SnrDb: rx.Snr, } - c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + + // 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + // 记录区域主控的信号强度 + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) } else { c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui) } - // 在这里添加您的业务逻辑 + // 解析并记录传感器数据 (温度、湿度、重量) + // 假设 event.Data (frmPayload) 是 Base64 编码的 JSON 数组字符串 + if event.Data != "" { + decodedData, err := base64.StdEncoding.DecodeString(event.Data) + if err != nil { + c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data) + return + } + + var readings []GenericSensorReading + if err := json.Unmarshal(decodedData, &readings); err != nil { + c.logger.Errorf("解析 'up' 事件的解码后 Data (JSON 数组) 失败: %v, Decoded Data: %s", err, string(decodedData)) + return + } + + // 查找区域主控设备,以便记录其ID + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + + for _, reading := range readings { + // 根据类型构建具体的传感器数据结构体 + var sensorData interface{} + var sensorDataType models.SensorDataType + + switch reading.Type { + case models.SensorDataTypeTemperature: // 使用枚举常量 + sensorData = models.TemperatureData{ + TemperatureCelsius: reading.Value, + } + sensorDataType = models.SensorDataTypeTemperature + case models.SensorDataTypeHumidity: // 使用枚举常量 + sensorData = models.HumidityData{ + HumidityPercent: reading.Value, + } + sensorDataType = models.SensorDataTypeHumidity + case models.SensorDataTypeWeight: // 使用枚举常量 + sensorData = models.WeightData{ + WeightKilograms: reading.Value, + } + sensorDataType = models.SensorDataTypeWeight + default: + c.logger.Warnf("处理 'up' 事件时遇到未知传感器类型: %s, Value: %f. 区域主控DevEui: %s, 传感器设备ID: %d", + reading.Type, reading.Value, event.DeviceInfo.DevEui, reading.DeviceID) + continue // 跳过未知类型 + } + + // 记录普通设备的传感器数据 + c.recordSensorData(regionalController.ID, reading.DeviceID, event.Time, sensorDataType, sensorData) + } + } else { + c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无法记录传感器数据。DevEui: %s", event.DeviceInfo.DevEui) + } } // handleStatusEvent 处理设备状态事件 @@ -163,17 +233,25 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { // 记录信号强度 signalMetrics := models.SignalMetrics{ - Margin: event.Margin, + MarginDb: event.Margin, } - c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + // 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + // 记录区域主控的信号强度 + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) // 记录 电量 batteryLevel := models.BatteryLevel{ - BatteryLevel: event.BatteryLevel, + BatteryLevelRatio: event.BatteryLevel, BatteryLevelUnavailable: event.BatteryLevelUnavailable, ExternalPower: event.ExternalPower, } - c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel) + // 记录区域主控的电池电量 + c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel) } @@ -229,14 +307,9 @@ func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) { } // recordSensorData 是一个通用方法,用于将传感器数据存入数据库。 -func (c *ChirpStackListener) recordSensorData(devEui string, eventTime time.Time, dataType models.SensorDataType, data interface{}) { - // 1. 查找设备 - device, err := c.deviceRepo.FindByDevEui(devEui) - if err != nil { - c.logger.Warnf("记录传感器数据失败:无法通过 DevEui '%s' 找到设备: %v", devEui, err) - return - } - +// regionalControllerID: 区域主控设备的ID +// sensorDeviceID: 实际产生传感器数据的普通设备的ID +func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, dataType models.SensorDataType, data interface{}) { // 2. 序列化数据结构体为 JSON jsonData, err := json.Marshal(data) if err != nil { @@ -247,9 +320,9 @@ func (c *ChirpStackListener) recordSensorData(devEui string, eventTime time.Time // 3. 构建 SensorData 模型 sensorData := &models.SensorData{ Time: eventTime, - DeviceID: device.ID, - RegionalControllerID: *device.ParentID, - SensorDataType: dataType, // 设置传感器数据类型 + DeviceID: sensorDeviceID, + RegionalControllerID: regionalControllerID, + SensorDataType: dataType, Data: datatypes.JSON(jsonData), } diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go index 93898ed..a667f65 100644 --- a/internal/infra/models/sensor_data.go +++ b/internal/infra/models/sensor_data.go @@ -12,23 +12,41 @@ type SensorDataType string const ( SensorDataTypeSignalMetrics SensorDataType = "signal_metrics" // 信号强度 SensorDataTypeBatteryLevel SensorDataType = "battery_level" // 电池电量 + SensorDataTypeTemperature SensorDataType = "temperature" // 温度 + SensorDataTypeHumidity SensorDataType = "humidity" // 湿度 + SensorDataTypeWeight SensorDataType = "weight" // 重量 ) // SignalMetrics 存储信号强度数据 type SignalMetrics struct { - RSSI int `json:"rssi"` // 绝对信号强度(dBm),受距离、障碍物影响 - SNR float64 `json:"snr"` // 信号与噪声的相对比率(dB),由 RSSI 减去噪声地板(Noise Floor) - Sensitivity int `json:"sensitivity"` // 网关的最低检测阈值(dBm) - Margin int `json:"margin"` // SNR 相对于接收器灵敏度的余量, Margin = SNR - Sensitivity + RssiDbm int `json:"rssi_dbm"` // 绝对信号强度(dBm),受距离、障碍物影响 + SnrDb float64 `json:"snr_db"` // 信号与噪声的相对比率(dB),由 RSSI 减去噪声地板(Noise Floor) + SensitivityDbm int `json:"sensitivity_dbm"` // 网关的最低检测阈值(dBm) + MarginDb int `json:"margin_db"` // SNR 相对于接收器灵敏度的余量, Margin = SNR - Sensitivity } // BatteryLevel 存储电池电量数据 type BatteryLevel struct { - BatteryLevel float32 `json:"battery_level"` // 电量剩余百分比 + BatteryLevelRatio float32 `json:"battery_level_ratio"` // 电量剩余百分比(%) BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电量数据不可用 ExternalPower bool `json:"external_power"` // 是否使用外部电源 } +// TemperatureData 存储温度数据 +type TemperatureData struct { + TemperatureCelsius float64 `json:"temperature_celsius"` // 温度值 (摄氏度) +} + +// HumidityData 存储湿度数据 +type HumidityData struct { + HumidityPercent float64 `json:"humidity_percent"` // 湿度值 (%) +} + +// WeightData 存储重量数据 +type WeightData struct { + WeightKilograms float64 `json:"weight_kilograms"` // 重量值 (公斤) +} + // SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 type SensorData struct { // Time 是数据记录的时间戳,作为复合主键的一部分。 -- 2.49.1 From 6b931648dc75629f204521d7c22dde3900115038 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 24 Sep 2025 23:08:59 +0800 Subject: [PATCH 10/11] =?UTF-8?q?=E5=AE=9A=E4=B9=89ack=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/infra/database/postgres.go | 55 ++++++++++++++----- internal/infra/models/downlink_task_record.go | 28 ++++++++++ internal/infra/models/models.go | 1 + 3 files changed, 71 insertions(+), 13 deletions(-) create mode 100644 internal/infra/models/downlink_task_record.go diff --git a/internal/infra/database/postgres.go b/internal/infra/database/postgres.go index 0ce5090..d41ed90 100644 --- a/internal/infra/database/postgres.go +++ b/internal/infra/database/postgres.go @@ -126,28 +126,47 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error { // 如果是 TimescaleDB, 则将 sensor_data 转换为 hypertable if ps.isTimescaleDB { - ps.logger.Info("检测到 TimescaleDB, 准备转换 sensor_data 为超表") - // 使用 if_not_exists => TRUE 保证幂等性 - // 如果 sensor_data 已经是超表,此命令不会报错 - // 'time' 是 SensorData 模型中定义的时间列 - sql := "SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);" - if err := ps.db.Exec(sql).Error; err != nil { - ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err) - return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err) + ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换") + if err := ps.creatingHyperTable(); err != nil { + return err } - ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)") } return nil } +// creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表 +func (ps *PostgresStorage) creatingHyperTable() error { + // 将 sensor_data 转换为超表 + // 使用 if_not_exists => TRUE 保证幂等性 + // 'time' 是 SensorData 模型中定义的时间列 + sqlSensorData := "SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);" + if err := ps.db.Exec(sqlSensorData).Error; err != nil { + ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err) + return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err) + } + ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)") + + // 将 downlink_task_records 转换为超表 + // 'sent_at' 是 DownlinkTaskRecord 模型中定义的时间列 + sqlDownlinkTaskRecords := "SELECT create_hypertable('downlink_task_records', 'sent_at', if_not_exists => TRUE);" + if err := ps.db.Exec(sqlDownlinkTaskRecords).Error; err != nil { + ps.logger.Errorw("将 downlink_task_records 转换为超表失败", "error", err) + return fmt.Errorf("将 downlink_task_records 转换为超表失败: %w", err) + } + ps.logger.Info("成功将 downlink_task_records 转换为超表 (或已转换)") + + return nil +} + // creatingIndex 用于创建gorm无法处理的索引, 如gin索引 func (ps *PostgresStorage) creatingIndex() error { - // 创建GIN索引(用于优化JSONB查询) - ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引") // 使用 IF NOT EXISTS 保证幂等性 // 如果索引已存在,此命令不会报错 - ginIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);" - if err := ps.db.Exec(ginIndexSQL).Error; err != nil { + + // 为 sensor_data 表的 data 字段创建 GIN 索引 + ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引") + ginSensorDataIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);" + if err := ps.db.Exec(ginSensorDataIndexSQL).Error; err != nil { ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err) return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err) } @@ -161,5 +180,15 @@ func (ps *PostgresStorage) creatingIndex() error { return fmt.Errorf("为 tasks 的 parameters 字段创建 GIN 索引失败: %w", err) } ps.logger.Info("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)") + + // 为 devices 表的 properties 字段创建 GIN 索引 + ps.logger.Info("正在为 devices 表的 properties 字段创建 GIN 索引") + ginDevicePropertiesIndexSQL := "CREATE INDEX IF NOT EXISTS idx_devices_properties_gin ON devices USING GIN (properties);" + if err := ps.db.Exec(ginDevicePropertiesIndexSQL).Error; err != nil { + ps.logger.Errorw("为 devices 的 properties 字段创建 GIN 索引失败", "error", err) + return fmt.Errorf("为 devices 的 properties 字段创建 GIN 索引失败: %w", err) + } + ps.logger.Info("成功为 devices 的 properties 字段创建 GIN 索引 (或已存在)") + return nil } diff --git a/internal/infra/models/downlink_task_record.go b/internal/infra/models/downlink_task_record.go new file mode 100644 index 0000000..5dba66a --- /dev/null +++ b/internal/infra/models/downlink_task_record.go @@ -0,0 +1,28 @@ +package models + +import ( + "time" +) + +// DownlinkTaskRecord 记录下行任务的下发情况和设备确认状态 +type DownlinkTaskRecord struct { + // MessageID 是下行消息的唯一标识符。 + // 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。 + MessageID string `gorm:"uniqueIndex;not null" json:"message_id"` + + // DeviceID 是接收此下行任务的设备的ID。 + // 对于 LoRaWAN,这通常是区域主控设备的ID。 + DeviceID uint `gorm:"not null;index" json:"device_id"` + + // SentAt 记录下行任务最初发送的时间。 + SentAt time.Time `gorm:"not null" json:"sent_at"` + + // AcknowledgedAt 记录设备确认收到下行消息的时间。 + // 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。 + AcknowledgedAt *time.Time `json:"acknowledged_at"` +} + +// TableName 自定义 GORM 使用的数据库表名 +func (DownlinkTaskRecord) TableName() string { + return "downlink_task_records" +} diff --git a/internal/infra/models/models.go b/internal/infra/models/models.go index 812be1a..d7030fa 100644 --- a/internal/infra/models/models.go +++ b/internal/infra/models/models.go @@ -13,5 +13,6 @@ func GetAllModels() []interface{} { &TaskExecutionLog{}, &PendingTask{}, &SensorData{}, + &DownlinkTaskRecord{}, } } -- 2.49.1 From e2e21601f4407aefc7d138cf2d0ec12c00d3052d Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Thu, 25 Sep 2025 00:17:01 +0800 Subject: [PATCH 11/11] =?UTF-8?q?=E8=AE=B0=E5=BD=95=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E4=B8=8B=E5=8F=91=E5=8E=86=E5=8F=B2=E5=92=8C=E6=8E=A5=E6=94=B6?= =?UTF-8?q?=E6=98=AF=E5=90=A6=E6=88=90=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/service/transport/chirp_stack.go | 33 ++++++--- internal/core/application.go | 5 +- internal/infra/database/postgres.go | 14 ++-- ...k_task_record.go => device_command_log.go} | 14 ++-- internal/infra/models/models.go | 2 +- .../device_command_log_repository.go | 52 ++++++++++++++ internal/infra/transport/lora/chirp_stack.go | 70 +++++++++++++++++-- 7 files changed, 161 insertions(+), 29 deletions(-) rename internal/infra/models/{downlink_task_record.go => device_command_log.go} (60%) create mode 100644 internal/infra/repository/device_command_log_repository.go diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index 593f17a..a3ec41f 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -27,16 +27,23 @@ const ( // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 type ChirpStackListener struct { - logger *logs.Logger - sensorDataRepo repository.SensorDataRepository - deviceRepo repository.DeviceRepository + logger *logs.Logger + sensorDataRepo repository.SensorDataRepository + deviceRepo repository.DeviceRepository + deviceCommandLogRepo repository.DeviceCommandLogRepository } -func NewChirpStackListener(logger *logs.Logger, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository) *ChirpStackListener { +func NewChirpStackListener( + logger *logs.Logger, + sensorDataRepo repository.SensorDataRepository, + deviceRepo repository.DeviceRepository, + deviceCommandLogRepo repository.DeviceCommandLogRepository, +) *ChirpStackListener { return &ChirpStackListener{ - logger: logger, - sensorDataRepo: sensorDataRepo, - deviceRepo: deviceRepo, + logger: logger, + sensorDataRepo: sensorDataRepo, + deviceRepo: deviceRepo, + deviceCommandLogRepo: deviceCommandLogRepo, } } @@ -258,7 +265,17 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { // handleAckEvent 处理下行确认事件 func (c *ChirpStackListener) handleAckEvent(event *AckEvent) { c.logger.Infof("接收到 'ack' 事件: %+v", event) - // 在这里添加您的业务逻辑 + + // 更新下行任务记录的确认时间及接收成功状态 + err := c.deviceCommandLogRepo.UpdateAcknowledgedAt(event.DeduplicationID, event.Time, event.Acknowledged) + if err != nil { + c.logger.Errorf("更新下行任务记录的确认时间及接收成功状态失败 (MessageID: %s, DevEui: %s, Acknowledged: %t): %v", + event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, err) + return + } + + c.logger.Infof("成功更新下行任务记录确认时间及接收成功状态 (MessageID: %s, DevEui: %s, Acknowledged: %t, AcknowledgedAt: %s)", + event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, event.Time.Format(time.RFC3339)) } // handleLogEvent 处理日志事件 diff --git a/internal/core/application.go b/internal/core/application.go index 80ee2ae..2beddd6 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -72,8 +72,11 @@ func NewApplication(configPath string) (*Application, error) { // 初始化传感器数据仓库 sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB()) + // 初始化命令下发历史仓库 + deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB()) + // 初始化设备上行监听器 - listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo) + listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo) // 初始化计划触发器管理器 analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) diff --git a/internal/infra/database/postgres.go b/internal/infra/database/postgres.go index d41ed90..27d0ecc 100644 --- a/internal/infra/database/postgres.go +++ b/internal/infra/database/postgres.go @@ -146,14 +146,14 @@ func (ps *PostgresStorage) creatingHyperTable() error { } ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)") - // 将 downlink_task_records 转换为超表 - // 'sent_at' 是 DownlinkTaskRecord 模型中定义的时间列 - sqlDownlinkTaskRecords := "SELECT create_hypertable('downlink_task_records', 'sent_at', if_not_exists => TRUE);" - if err := ps.db.Exec(sqlDownlinkTaskRecords).Error; err != nil { - ps.logger.Errorw("将 downlink_task_records 转换为超表失败", "error", err) - return fmt.Errorf("将 downlink_task_records 转换为超表失败: %w", err) + // 将 device_command_log 转换为超表 + // 'sent_at' 是 DeviceCommandLog 模型中定义的时间列 + sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', if_not_exists => TRUE);" + if err := ps.db.Exec(sqlDeviceCommandLogs).Error; err != nil { + ps.logger.Errorw("将 device_command_log 转换为超表失败", "error", err) + return fmt.Errorf("将 device_command_log 转换为超表失败: %w", err) } - ps.logger.Info("成功将 downlink_task_records 转换为超表 (或已转换)") + ps.logger.Info("成功将 device_command_log 转换为超表 (或已转换)") return nil } diff --git a/internal/infra/models/downlink_task_record.go b/internal/infra/models/device_command_log.go similarity index 60% rename from internal/infra/models/downlink_task_record.go rename to internal/infra/models/device_command_log.go index 5dba66a..6fc420b 100644 --- a/internal/infra/models/downlink_task_record.go +++ b/internal/infra/models/device_command_log.go @@ -4,11 +4,11 @@ import ( "time" ) -// DownlinkTaskRecord 记录下行任务的下发情况和设备确认状态 -type DownlinkTaskRecord struct { +// DeviceCommandLog 记录下行任务的下发情况和设备确认状态 +type DeviceCommandLog struct { // MessageID 是下行消息的唯一标识符。 // 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。 - MessageID string `gorm:"uniqueIndex;not null" json:"message_id"` + MessageID string `gorm:"primaryKey" json:"message_id"` // DeviceID 是接收此下行任务的设备的ID。 // 对于 LoRaWAN,这通常是区域主控设备的ID。 @@ -20,9 +20,13 @@ type DownlinkTaskRecord struct { // AcknowledgedAt 记录设备确认收到下行消息的时间。 // 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。 AcknowledgedAt *time.Time `json:"acknowledged_at"` + + // ReceivedSuccess 表示设备是否成功接收到下行消息。 + // true 表示设备已确认收到,false 表示设备未确认收到或下发失败。 + ReceivedSuccess bool `gorm:"not null" json:"received_success"` } // TableName 自定义 GORM 使用的数据库表名 -func (DownlinkTaskRecord) TableName() string { - return "downlink_task_records" +func (DeviceCommandLog) TableName() string { + return "device_command_log" } diff --git a/internal/infra/models/models.go b/internal/infra/models/models.go index d7030fa..40382f4 100644 --- a/internal/infra/models/models.go +++ b/internal/infra/models/models.go @@ -13,6 +13,6 @@ func GetAllModels() []interface{} { &TaskExecutionLog{}, &PendingTask{}, &SensorData{}, - &DownlinkTaskRecord{}, + &DeviceCommandLog{}, } } diff --git a/internal/infra/repository/device_command_log_repository.go b/internal/infra/repository/device_command_log_repository.go new file mode 100644 index 0000000..a843785 --- /dev/null +++ b/internal/infra/repository/device_command_log_repository.go @@ -0,0 +1,52 @@ +package repository + +import ( + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "gorm.io/gorm" +) + +// DeviceCommandLogRepository 定义了设备下行命令历史记录的数据访问接口 +type DeviceCommandLogRepository interface { + Create(record *models.DeviceCommandLog) error + FindByMessageID(messageID string) (*models.DeviceCommandLog, error) + // UpdateAcknowledgedAt 用于更新指定 MessageID 的下行命令记录的确认时间及接收成功状态。 + // AcknowledgedAt 和 ReceivedSuccess 字段会被更新。 + UpdateAcknowledgedAt(messageID string, acknowledgedAt time.Time, receivedSuccess bool) error +} + +// gormDeviceCommandLogRepository 是 DeviceCommandLogRepository 接口的 GORM 实现 +type gormDeviceCommandLogRepository struct { + db *gorm.DB +} + +// NewGormDeviceCommandLogRepository 创建一个新的 DeviceCommandLogRepository GORM 实现 +func NewGormDeviceCommandLogRepository(db *gorm.DB) DeviceCommandLogRepository { + return &gormDeviceCommandLogRepository{db: db} +} + +// Create 实现 DeviceCommandLogRepository 接口的 Create 方法 +func (r *gormDeviceCommandLogRepository) Create(record *models.DeviceCommandLog) error { + return r.db.Create(record).Error +} + +// FindByMessageID 实现 DeviceCommandLogRepository 接口的 FindByMessageID 方法 +func (r *gormDeviceCommandLogRepository) FindByMessageID(messageID string) (*models.DeviceCommandLog, error) { + var record models.DeviceCommandLog + if err := r.db.Where("message_id = ?", messageID).First(&record).Error; err != nil { + return nil, err + } + return &record, nil +} + +// UpdateAcknowledgedAt 实现 DeviceCommandLogRepository 接口的 UpdateAcknowledgedAt 方法 +func (r *gormDeviceCommandLogRepository) UpdateAcknowledgedAt(messageID string, acknowledgedAt time.Time, receivedSuccess bool) error { + // 使用 Updates 方法更新指定字段 + return r.db.Model(&models.DeviceCommandLog{}). + Where("message_id = ?", messageID). + Updates(map[string]interface{}{ + "acknowledged_at": acknowledgedAt, + "received_success": receivedSuccess, + }).Error +} diff --git a/internal/infra/transport/lora/chirp_stack.go b/internal/infra/transport/lora/chirp_stack.go index 2466182..16ab842 100644 --- a/internal/infra/transport/lora/chirp_stack.go +++ b/internal/infra/transport/lora/chirp_stack.go @@ -4,6 +4,8 @@ import ( "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service" "github.com/go-openapi/runtime" httptransport "github.com/go-openapi/runtime/client" @@ -33,11 +35,19 @@ type ChirpStackTransport struct { authInfo runtime.ClientAuthInfoWriter config ChirpStackConfig + deviceCommandLogRepo repository.DeviceCommandLogRepository + deviceRepo repository.DeviceRepository + logger *logs.Logger } // NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。 -func NewChirpStackTransport(config ChirpStackConfig, logger *logs.Logger) *ChirpStackTransport { +func NewChirpStackTransport( + config ChirpStackConfig, + logger *logs.Logger, + deviceCommandLogRepo repository.DeviceCommandLogRepository, + deviceRepo repository.DeviceRepository, +) *ChirpStackTransport { // 使用配置中的服务器地址创建一个 HTTP transport。 // 它会使用生成的客户端中定义的默认 base path 和 schemes。 transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes) @@ -49,10 +59,12 @@ func NewChirpStackTransport(config ChirpStackConfig, logger *logs.Logger) *Chirp authInfo := httptransport.APIKeyAuth("grpc-metadata-authorization", "header", config.GenerateAPIKey()) return &ChirpStackTransport{ - client: apiClient, - authInfo: authInfo, - config: config, - logger: logger, + client: apiClient, + authInfo: authInfo, + config: config, + logger: logger, + deviceCommandLogRepo: deviceCommandLogRepo, + deviceRepo: deviceRepo, } } @@ -80,12 +92,56 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error { // 3. 调用生成的客户端方法来发送请求。 // c.authInfo 是您在 NewChirpStackTransport 中创建的认证信息。 - _, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo) + resp, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo) if err != nil { c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err) return err } - c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功", address) + // 4. 成功发送后,尝试记录下行任务 + messageID := "" + if resp != nil && resp.Payload != nil && resp.Payload.ID != "" { // 根据实际结构,使用 resp.Payload.ID + messageID = resp.Payload.ID + } else { + c.logger.Warnf("ChirpStack Enqueue 响应未包含 MessageID (ID),无法记录下行任务。设备: %s", address) + // 即使无法获取 MessageID,也认为发送成功,因为 ChirpStack Enqueue 成功了 + return nil + } + + // 调用私有方法记录下行任务 + if err := c.recordDownlinkTask(address, messageID); err != nil { + // 记录失败不影响下行命令的发送成功 + c.logger.Errorf("记录下行任务失败 (MessageID: %s, DevEui: %s): %v", messageID, address, err) + return nil + } + + c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功,并创建下行任务记录 (MessageID: %s)", address, messageID) + + return nil +} + +// recordDownlinkTask 记录下行任务到数据库 +func (c *ChirpStackTransport) recordDownlinkTask(devEui string, messageID string) error { + // 获取区域主控的内部 DeviceID + regionalController, err := c.deviceRepo.FindByDevEui(devEui) + if err != nil { + c.logger.Errorf("记录下行任务失败:无法通过 DevEui '%s' 找到区域主控设备: %v", devEui, err) + return err + } + + // 创建 DeviceCommandLog + record := &models.DeviceCommandLog{ + MessageID: messageID, + DeviceID: regionalController.ID, + SentAt: time.Now(), + AcknowledgedAt: nil, // 初始状态为未确认 + } + + if err := c.deviceCommandLogRepo.Create(record); err != nil { + c.logger.Errorf("创建下行任务记录失败 (MessageID: %s, DeviceID: %d): %v", messageID, regionalController.ID, err) + return err + } + + c.logger.Infof("成功创建下行任务记录 (MessageID: %s, DeviceID: %d)", messageID, regionalController.ID) return nil } -- 2.49.1