diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index 593f17a..a3ec41f 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -27,16 +27,23 @@ const ( // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 type ChirpStackListener struct { - logger *logs.Logger - sensorDataRepo repository.SensorDataRepository - deviceRepo repository.DeviceRepository + logger *logs.Logger + sensorDataRepo repository.SensorDataRepository + deviceRepo repository.DeviceRepository + deviceCommandLogRepo repository.DeviceCommandLogRepository } -func NewChirpStackListener(logger *logs.Logger, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository) *ChirpStackListener { +func NewChirpStackListener( + logger *logs.Logger, + sensorDataRepo repository.SensorDataRepository, + deviceRepo repository.DeviceRepository, + deviceCommandLogRepo repository.DeviceCommandLogRepository, +) *ChirpStackListener { return &ChirpStackListener{ - logger: logger, - sensorDataRepo: sensorDataRepo, - deviceRepo: deviceRepo, + logger: logger, + sensorDataRepo: sensorDataRepo, + deviceRepo: deviceRepo, + deviceCommandLogRepo: deviceCommandLogRepo, } } @@ -258,7 +265,17 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { // handleAckEvent 处理下行确认事件 func (c *ChirpStackListener) handleAckEvent(event *AckEvent) { c.logger.Infof("接收到 'ack' 事件: %+v", event) - // 在这里添加您的业务逻辑 + + // 更新下行任务记录的确认时间及接收成功状态 + err := c.deviceCommandLogRepo.UpdateAcknowledgedAt(event.DeduplicationID, event.Time, event.Acknowledged) + if err != nil { + c.logger.Errorf("更新下行任务记录的确认时间及接收成功状态失败 (MessageID: %s, DevEui: %s, Acknowledged: %t): %v", + event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, err) + return + } + + c.logger.Infof("成功更新下行任务记录确认时间及接收成功状态 (MessageID: %s, DevEui: %s, Acknowledged: %t, AcknowledgedAt: %s)", + event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, event.Time.Format(time.RFC3339)) } // handleLogEvent 处理日志事件 diff --git a/internal/core/application.go b/internal/core/application.go index 80ee2ae..2beddd6 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -72,8 +72,11 @@ func NewApplication(configPath string) (*Application, error) { // 初始化传感器数据仓库 sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB()) + // 初始化命令下发历史仓库 + deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB()) + // 初始化设备上行监听器 - listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo) + listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo) // 初始化计划触发器管理器 analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) diff --git a/internal/infra/database/postgres.go b/internal/infra/database/postgres.go index d41ed90..27d0ecc 100644 --- a/internal/infra/database/postgres.go +++ b/internal/infra/database/postgres.go @@ -146,14 +146,14 @@ func (ps *PostgresStorage) creatingHyperTable() error { } ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)") - // 将 downlink_task_records 转换为超表 - // 'sent_at' 是 DownlinkTaskRecord 模型中定义的时间列 - sqlDownlinkTaskRecords := "SELECT create_hypertable('downlink_task_records', 'sent_at', if_not_exists => TRUE);" - if err := ps.db.Exec(sqlDownlinkTaskRecords).Error; err != nil { - ps.logger.Errorw("将 downlink_task_records 转换为超表失败", "error", err) - return fmt.Errorf("将 downlink_task_records 转换为超表失败: %w", err) + // 将 device_command_log 转换为超表 + // 'sent_at' 是 DeviceCommandLog 模型中定义的时间列 + sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', if_not_exists => TRUE);" + if err := ps.db.Exec(sqlDeviceCommandLogs).Error; err != nil { + ps.logger.Errorw("将 device_command_log 转换为超表失败", "error", err) + return fmt.Errorf("将 device_command_log 转换为超表失败: %w", err) } - ps.logger.Info("成功将 downlink_task_records 转换为超表 (或已转换)") + ps.logger.Info("成功将 device_command_log 转换为超表 (或已转换)") return nil } diff --git a/internal/infra/models/downlink_task_record.go b/internal/infra/models/device_command_log.go similarity index 60% rename from internal/infra/models/downlink_task_record.go rename to internal/infra/models/device_command_log.go index 5dba66a..6fc420b 100644 --- a/internal/infra/models/downlink_task_record.go +++ b/internal/infra/models/device_command_log.go @@ -4,11 +4,11 @@ import ( "time" ) -// DownlinkTaskRecord 记录下行任务的下发情况和设备确认状态 -type DownlinkTaskRecord struct { +// DeviceCommandLog 记录下行任务的下发情况和设备确认状态 +type DeviceCommandLog struct { // MessageID 是下行消息的唯一标识符。 // 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。 - MessageID string `gorm:"uniqueIndex;not null" json:"message_id"` + MessageID string `gorm:"primaryKey" json:"message_id"` // DeviceID 是接收此下行任务的设备的ID。 // 对于 LoRaWAN,这通常是区域主控设备的ID。 @@ -20,9 +20,13 @@ type DownlinkTaskRecord struct { // AcknowledgedAt 记录设备确认收到下行消息的时间。 // 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。 AcknowledgedAt *time.Time `json:"acknowledged_at"` + + // ReceivedSuccess 表示设备是否成功接收到下行消息。 + // true 表示设备已确认收到,false 表示设备未确认收到或下发失败。 + ReceivedSuccess bool `gorm:"not null" json:"received_success"` } // TableName 自定义 GORM 使用的数据库表名 -func (DownlinkTaskRecord) TableName() string { - return "downlink_task_records" +func (DeviceCommandLog) TableName() string { + return "device_command_log" } diff --git a/internal/infra/models/models.go b/internal/infra/models/models.go index d7030fa..40382f4 100644 --- a/internal/infra/models/models.go +++ b/internal/infra/models/models.go @@ -13,6 +13,6 @@ func GetAllModels() []interface{} { &TaskExecutionLog{}, &PendingTask{}, &SensorData{}, - &DownlinkTaskRecord{}, + &DeviceCommandLog{}, } } diff --git a/internal/infra/repository/device_command_log_repository.go b/internal/infra/repository/device_command_log_repository.go new file mode 100644 index 0000000..a843785 --- /dev/null +++ b/internal/infra/repository/device_command_log_repository.go @@ -0,0 +1,52 @@ +package repository + +import ( + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "gorm.io/gorm" +) + +// DeviceCommandLogRepository 定义了设备下行命令历史记录的数据访问接口 +type DeviceCommandLogRepository interface { + Create(record *models.DeviceCommandLog) error + FindByMessageID(messageID string) (*models.DeviceCommandLog, error) + // UpdateAcknowledgedAt 用于更新指定 MessageID 的下行命令记录的确认时间及接收成功状态。 + // AcknowledgedAt 和 ReceivedSuccess 字段会被更新。 + UpdateAcknowledgedAt(messageID string, acknowledgedAt time.Time, receivedSuccess bool) error +} + +// gormDeviceCommandLogRepository 是 DeviceCommandLogRepository 接口的 GORM 实现 +type gormDeviceCommandLogRepository struct { + db *gorm.DB +} + +// NewGormDeviceCommandLogRepository 创建一个新的 DeviceCommandLogRepository GORM 实现 +func NewGormDeviceCommandLogRepository(db *gorm.DB) DeviceCommandLogRepository { + return &gormDeviceCommandLogRepository{db: db} +} + +// Create 实现 DeviceCommandLogRepository 接口的 Create 方法 +func (r *gormDeviceCommandLogRepository) Create(record *models.DeviceCommandLog) error { + return r.db.Create(record).Error +} + +// FindByMessageID 实现 DeviceCommandLogRepository 接口的 FindByMessageID 方法 +func (r *gormDeviceCommandLogRepository) FindByMessageID(messageID string) (*models.DeviceCommandLog, error) { + var record models.DeviceCommandLog + if err := r.db.Where("message_id = ?", messageID).First(&record).Error; err != nil { + return nil, err + } + return &record, nil +} + +// UpdateAcknowledgedAt 实现 DeviceCommandLogRepository 接口的 UpdateAcknowledgedAt 方法 +func (r *gormDeviceCommandLogRepository) UpdateAcknowledgedAt(messageID string, acknowledgedAt time.Time, receivedSuccess bool) error { + // 使用 Updates 方法更新指定字段 + return r.db.Model(&models.DeviceCommandLog{}). + Where("message_id = ?", messageID). + Updates(map[string]interface{}{ + "acknowledged_at": acknowledgedAt, + "received_success": receivedSuccess, + }).Error +} diff --git a/internal/infra/transport/lora/chirp_stack.go b/internal/infra/transport/lora/chirp_stack.go index 2466182..16ab842 100644 --- a/internal/infra/transport/lora/chirp_stack.go +++ b/internal/infra/transport/lora/chirp_stack.go @@ -4,6 +4,8 @@ import ( "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service" "github.com/go-openapi/runtime" httptransport "github.com/go-openapi/runtime/client" @@ -33,11 +35,19 @@ type ChirpStackTransport struct { authInfo runtime.ClientAuthInfoWriter config ChirpStackConfig + deviceCommandLogRepo repository.DeviceCommandLogRepository + deviceRepo repository.DeviceRepository + logger *logs.Logger } // NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。 -func NewChirpStackTransport(config ChirpStackConfig, logger *logs.Logger) *ChirpStackTransport { +func NewChirpStackTransport( + config ChirpStackConfig, + logger *logs.Logger, + deviceCommandLogRepo repository.DeviceCommandLogRepository, + deviceRepo repository.DeviceRepository, +) *ChirpStackTransport { // 使用配置中的服务器地址创建一个 HTTP transport。 // 它会使用生成的客户端中定义的默认 base path 和 schemes。 transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes) @@ -49,10 +59,12 @@ func NewChirpStackTransport(config ChirpStackConfig, logger *logs.Logger) *Chirp authInfo := httptransport.APIKeyAuth("grpc-metadata-authorization", "header", config.GenerateAPIKey()) return &ChirpStackTransport{ - client: apiClient, - authInfo: authInfo, - config: config, - logger: logger, + client: apiClient, + authInfo: authInfo, + config: config, + logger: logger, + deviceCommandLogRepo: deviceCommandLogRepo, + deviceRepo: deviceRepo, } } @@ -80,12 +92,56 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error { // 3. 调用生成的客户端方法来发送请求。 // c.authInfo 是您在 NewChirpStackTransport 中创建的认证信息。 - _, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo) + resp, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo) if err != nil { c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err) return err } - c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功", address) + // 4. 成功发送后,尝试记录下行任务 + messageID := "" + if resp != nil && resp.Payload != nil && resp.Payload.ID != "" { // 根据实际结构,使用 resp.Payload.ID + messageID = resp.Payload.ID + } else { + c.logger.Warnf("ChirpStack Enqueue 响应未包含 MessageID (ID),无法记录下行任务。设备: %s", address) + // 即使无法获取 MessageID,也认为发送成功,因为 ChirpStack Enqueue 成功了 + return nil + } + + // 调用私有方法记录下行任务 + if err := c.recordDownlinkTask(address, messageID); err != nil { + // 记录失败不影响下行命令的发送成功 + c.logger.Errorf("记录下行任务失败 (MessageID: %s, DevEui: %s): %v", messageID, address, err) + return nil + } + + c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功,并创建下行任务记录 (MessageID: %s)", address, messageID) + + return nil +} + +// recordDownlinkTask 记录下行任务到数据库 +func (c *ChirpStackTransport) recordDownlinkTask(devEui string, messageID string) error { + // 获取区域主控的内部 DeviceID + regionalController, err := c.deviceRepo.FindByDevEui(devEui) + if err != nil { + c.logger.Errorf("记录下行任务失败:无法通过 DevEui '%s' 找到区域主控设备: %v", devEui, err) + return err + } + + // 创建 DeviceCommandLog + record := &models.DeviceCommandLog{ + MessageID: messageID, + DeviceID: regionalController.ID, + SentAt: time.Now(), + AcknowledgedAt: nil, // 初始状态为未确认 + } + + if err := c.deviceCommandLogRepo.Create(record); err != nil { + c.logger.Errorf("创建下行任务记录失败 (MessageID: %s, DeviceID: %d): %v", messageID, regionalController.ID, err) + return err + } + + c.logger.Infof("成功创建下行任务记录 (MessageID: %s, DeviceID: %d)", messageID, regionalController.ID) return nil }