package device import ( "errors" "fmt" "strconv" "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "github.com/google/uuid" gproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) type GeneralDeviceService struct { deviceRepo repository.DeviceRepository deviceCommandLogRepo repository.DeviceCommandLogRepository pendingCollectionRepo repository.PendingCollectionRepository logger *logs.Logger comm transport.Communicator } // NewGeneralDeviceService 创建一个通用设备服务 func NewGeneralDeviceService( deviceRepo repository.DeviceRepository, deviceCommandLogRepo repository.DeviceCommandLogRepository, pendingCollectionRepo repository.PendingCollectionRepository, logger *logs.Logger, comm transport.Communicator, ) Service { return &GeneralDeviceService{ deviceRepo: deviceRepo, deviceCommandLogRepo: deviceCommandLogRepo, pendingCollectionRepo: pendingCollectionRepo, logger: logger, comm: comm, } } func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction) error { // 校验设备参数及生成指令 if *device.ParentID == 0 { return fmt.Errorf("设备 %v(id=%v) 的上级区域主控(id=%v) ID不合理, 无法执行指令", device.Name, device.ID, *device.ParentID) } if !device.SelfCheck() { return fmt.Errorf("设备 %v(id=%v) 缺少必要信息, 无法发送指令", device.Name, device.ID) } deviceInfo := make(map[string]interface{}) if err := device.ParseProperties(&deviceInfo); err != nil { return fmt.Errorf("解析设备 %v(id=%v) 配置失败: %v", device.Name, device.ID, err) } // 已通过 SelfCheck 保证其为纯数字,此处仅进行类型转换 busNumber, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusNumber])) busAddress, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusAddress])) relayChannel, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.RelayChannel])) data, err := anypb.New(&proto.Switch{ DeviceAction: string(action), BusNumber: int32(busNumber), BusAddress: int32(busAddress), RelayChannel: int32(relayChannel), }) if err != nil { return fmt.Errorf("创建指令失败: %v", err) } instruction := &proto.Instruction{ Method: proto.MethodType_SWITCH, Data: data, } // 获取自身LoRa设备ID, 因为可能变更, 所以每次都现获取 thisDevice, err := g.deviceRepo.FindByID(*device.ParentID) if err != nil { return fmt.Errorf("获取区域主控(id=%v)信息失败: %v", *device.ParentID, err) } if !thisDevice.SelfCheck() { return fmt.Errorf("区域主控 %v(id=%v) 缺少必要信息, 无法发送指令", thisDevice.Name, thisDevice.ID) } thisDeviceinfo := make(map[string]interface{}) if err := thisDevice.ParseProperties(&thisDeviceinfo); err != nil { return fmt.Errorf("解析区域主控 %v(id=%v) 配置失败: %v", device.Name, device.ID, err) } loraAddress := fmt.Sprintf("%v", thisDeviceinfo[models.LoRaAddress]) // 生成消息 message, err := gproto.Marshal(instruction) if err != nil { return fmt.Errorf("序列化指令失败: %v", err) } // 发送指令并获取 SendResult sendResult, err := g.comm.Send(loraAddress, message) if err != nil { // 发送失败,直接返回错误 return fmt.Errorf("发送指令到设备 %s 失败: %w", loraAddress, err) } // 创建并保存命令日志 logRecord := &models.DeviceCommandLog{ MessageID: sendResult.MessageID, DeviceID: thisDevice.ID, // thisDevice 是我们查出来的区域主控 SentAt: time.Now(), } if err := g.deviceCommandLogRepo.Create(logRecord); err != nil { // 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。 // 我们记录一个错误日志,然后成功返回。 g.logger.Errorf("创建指令日志失败 (MessageID: %s): %v", sendResult.MessageID, err) } g.logger.Infof("成功发送指令到设备 %s 并创建日志 (MessageID: %s)", loraAddress, sendResult.MessageID) return nil } // Collect 实现了 Service 接口,用于发起对指定区域主控下的多个设备的批量采集请求。 // 它负责查找区域主控、生成关联ID、创建待处理记录、构建指令并最终发送。 func (g *GeneralDeviceService) Collect(regionalControllerID uint, devicesToCollect []*models.Device) error { if regionalControllerID == 0 { return errors.New("区域主控ID不能为空") } if len(devicesToCollect) == 0 { // 如果没有要采集的设备,这不是一个错误,只是一个空操作。 g.logger.Info("待采集设备列表为空,无需执行采集任务。") return nil } // 1. 查找并自检区域主控设备 regionalController, err := g.deviceRepo.FindByID(regionalControllerID) if err != nil { return fmt.Errorf("查找区域主控 (ID: %d) 失败: %w", regionalControllerID, err) } if !regionalController.SelfCheck() { return fmt.Errorf("区域主控 (ID: %d) 未通过自检,缺少必要属性", regionalControllerID) } // 2. 准备采集任务列表和数据库存根,并验证设备 var childDeviceIDs []uint var collectTasks []*proto.CollectTask for _, dev := range devicesToCollect { // 验证设备是否属于指定的区域主控 if dev.ParentID == nil || *dev.ParentID != regionalControllerID { return fmt.Errorf("设备 '%s' (ID: %d) 不属于指定的区域主控 (ID: %d)", dev.Name, dev.ID, regionalControllerID) } // 对每个待采集的设备执行自检 if !dev.SelfCheck() { g.logger.Warnf("跳过设备 %d,因其未通过自检", dev.ID) continue } // 自检已通过,我们可以安全地解析属性 var props map[string]interface{} // 此时 ParseProperties 不应失败 _ = dev.ParseProperties(&props) busNumber := props[models.BusNumber].(float64) busAddress := props[models.BusAddress].(float64) collectTasks = append(collectTasks, &proto.CollectTask{ DeviceAction: dev.Command, BusNumber: int32(busNumber), BusAddress: int32(busAddress), }) childDeviceIDs = append(childDeviceIDs, dev.ID) } if len(childDeviceIDs) == 0 { return errors.New("经过滤后,没有可通过自检的有效设备") } // 3. 从区域主控的属性中解析出 DevEui (loraAddress) var rcProps map[string]interface{} // SelfCheck 已保证属性可解析 _ = regionalController.ParseProperties(&rcProps) loraAddress := rcProps[models.LoRaAddress].(string) // 4. 创建待处理请求记录 correlationID := uuid.New().String() pendingReq := &models.PendingCollection{ CorrelationID: correlationID, DeviceID: regionalController.ID, CommandMetadata: childDeviceIDs, Status: models.PendingStatusPending, CreatedAt: time.Now(), } if err := g.pendingCollectionRepo.Create(pendingReq); err != nil { g.logger.Errorf("创建待采集请求失败 (CorrelationID: %s): %v", correlationID, err) return err } g.logger.Infof("成功创建待采集请求 (CorrelationID: %s, DeviceID: %d)", correlationID, regionalController.ID) // 5. 构建最终的空中载荷 batchCmd := &proto.BatchCollectCommand{ CorrelationId: correlationID, Tasks: collectTasks, } anyData, err := anypb.New(batchCmd) if err != nil { g.logger.Errorf("创建 Any Protobuf 失败 (CorrelationID: %s): %v", correlationID, err) return err } instruction := &proto.Instruction{ Method: proto.MethodType_COLLECT, Data: anyData, } payload, err := gproto.Marshal(instruction) if err != nil { g.logger.Errorf("序列化采集指令失败 (CorrelationID: %s): %v", correlationID, err) return err } // 6. 发送指令 if _, err := g.comm.Send(loraAddress, payload); err != nil { g.logger.DPanicf("待采集请求 (CorrelationID: %s) 已创建,但发送到设备失败: %v。数据可能不一致!", correlationID, err) return err } g.logger.Infof("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, loraAddress) return nil }