package device import ( "errors" "fmt" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto" "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/command_generater" "github.com/google/uuid" gproto "google.golang.org/protobuf/proto" ) type GeneralDeviceService struct { deviceRepo repository.DeviceRepository deviceCommandLogRepo repository.DeviceCommandLogRepository pendingCollectionRepo repository.PendingCollectionRepository logger *logs.Logger comm transport.Communicator } // NewGeneralDeviceService 创建一个通用设备服务 func NewGeneralDeviceService( deviceRepo repository.DeviceRepository, deviceCommandLogRepo repository.DeviceCommandLogRepository, pendingCollectionRepo repository.PendingCollectionRepository, logger *logs.Logger, comm transport.Communicator, ) Service { return &GeneralDeviceService{ deviceRepo: deviceRepo, deviceCommandLogRepo: deviceCommandLogRepo, pendingCollectionRepo: pendingCollectionRepo, logger: logger, comm: comm, } } func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction) error { // 1. 依赖模型自身的 SelfCheck 进行全面校验 if err := device.SelfCheck(); err != nil { return fmt.Errorf("设备 %v(id=%v) 未通过自检: %w", device.Name, device.ID, err) } if err := device.DeviceTemplate.SelfCheck(); err != nil { return fmt.Errorf("设备 %v(id=%v) 的模板未通过自检: %w", device.Name, device.ID, err) } // 2. 检查预加载的 AreaController 是否有效 areaController := &device.AreaController if err := areaController.SelfCheck(); err != nil { return fmt.Errorf("区域主控 %v(id=%v) 未通过自检: %w", areaController.Name, areaController.ID, err) } // 3. 使用模型层预定义的 Bus485Properties 结构体解析设备属性 var deviceProps models.Bus485Properties if err := device.ParseProperties(&deviceProps); err != nil { return fmt.Errorf("解析设备 %v(id=%v) 的属性失败: %w", device.Name, device.ID, err) } // 4. 解析设备模板中的开关指令参数 var switchCmd models.SwitchCommands if err := device.DeviceTemplate.ParseCommands(&switchCmd); err != nil { return fmt.Errorf("解析设备 %v(id=%v) 的开关指令失败: %w", device.Name, device.ID, err) } // 5. 根据 action 生成 Modbus RTU 写入指令 onOffState := true // 默认为开启 if action == DeviceActionStop { // 如果是停止动作,则设置为关闭 onOffState = false } modbusCommandBytes, err := command_generater.GenerateModbusRTUSwitchCommand( deviceProps.BusAddress, switchCmd.ModbusStartAddress, onOffState, ) if err != nil { return fmt.Errorf("生成Modbus RTU写入指令失败: %w", err) } // 6. 构建 Protobuf Raw485Command,包含总线号 raw485Cmd := &proto.Raw485Command{ BusNumber: int32(deviceProps.BusNumber), // 添加总线号 CommandBytes: modbusCommandBytes, } instruction := &proto.Instruction{ Payload: &proto.Instruction_Raw_485Command{ Raw_485Command: raw485Cmd, }, } message, err := gproto.Marshal(instruction) if err != nil { return fmt.Errorf("序列化指令失败: %w", err) } // 7. 发送指令 networkID := areaController.NetworkID sendResult, err := g.comm.Send(networkID, message) if err != nil { return fmt.Errorf("发送指令到 %s 失败: %w", networkID, err) } // 8. 创建并保存命令日志 logRecord := &models.DeviceCommandLog{ MessageID: sendResult.MessageID, DeviceID: areaController.ID, SentAt: time.Now(), } if err := g.deviceCommandLogRepo.Create(logRecord); err != nil { // 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。 // 我们记录一个错误日志,然后成功返回。 g.logger.Errorf("创建指令日志失败 (MessageID: %s): %v", sendResult.MessageID, err) } g.logger.Infof("成功发送指令到 %s 并创建日志 (MessageID: %s)", networkID, sendResult.MessageID) return nil } // Collect 实现了 Service 接口,用于发起对指定区域主控下的多个设备的批量采集请求。 func (g *GeneralDeviceService) Collect(regionalControllerID uint, devicesToCollect []*models.Device) error { if len(devicesToCollect) == 0 { g.logger.Info("待采集设备列表为空,无需执行采集任务。") return nil } // 1. 从设备列表中获取预加载的区域主控,并进行校验 regionalController := &devicesToCollect[0].AreaController if regionalController.ID != regionalControllerID { return fmt.Errorf("设备列表与指定的区域主控ID (%d) 不匹配", regionalControllerID) } if err := regionalController.SelfCheck(); err != nil { return fmt.Errorf("区域主控 (ID: %d) 未通过自检: %w", regionalControllerID, err) } // 2. 准备采集任务列表 var childDeviceIDs []uint var collectTasks []*proto.CollectTask for _, dev := range devicesToCollect { // 依赖模型自身的 SelfCheck 进行全面校验 if err := dev.SelfCheck(); err != nil { g.logger.Warnf("跳过设备 %d,因其未通过自检: %v", dev.ID, err) continue } if err := dev.DeviceTemplate.SelfCheck(); err != nil { g.logger.Warnf("跳过设备 %d,因其设备模板未通过自检: %v", dev.ID, err) continue } // 使用模板的 ParseCommands 方法获取传感器指令参数 var sensorCmd models.SensorCommands if err := dev.DeviceTemplate.ParseCommands(&sensorCmd); err != nil { g.logger.Warnf("跳过设备 %d,因其模板指令无法解析为 SensorCommands: %v", dev.ID, err) continue } // 使用模型层预定义的 Bus485Properties 结构体解析设备属性 var deviceProps models.Bus485Properties if err := dev.ParseProperties(&deviceProps); err != nil { g.logger.Warnf("跳过设备 %d,因其属性解析失败: %v", dev.ID, err) continue } // 生成 Modbus RTU 读取指令 modbusCommandBytes, err := command_generater.GenerateModbusRTUReadCommand( deviceProps.BusAddress, sensorCmd.ModbusFunctionCode, sensorCmd.ModbusStartAddress, sensorCmd.ModbusQuantity, ) if err != nil { g.logger.Warnf("跳过设备 %d,因生成Modbus RTU读取指令失败: %v", dev.ID, err) continue } g.logger.Debugf("生成485指令: %v", modbusCommandBytes) // 构建 Raw485Command,包含总线号 raw485Cmd := &proto.Raw485Command{ BusNumber: int32(deviceProps.BusNumber), // 添加总线号 CommandBytes: modbusCommandBytes, } collectTasks = append(collectTasks, &proto.CollectTask{ Command: raw485Cmd, }) childDeviceIDs = append(childDeviceIDs, dev.ID) } if len(childDeviceIDs) == 0 { return errors.New("经过滤后,没有可通过自检的有效设备") } // 3. 构建并发送指令 networkID := regionalController.NetworkID // 4. 创建待处理请求记录 correlationID := uuid.New().String() pendingReq := &models.PendingCollection{ CorrelationID: correlationID, DeviceID: regionalController.ID, CommandMetadata: childDeviceIDs, Status: models.PendingStatusPending, CreatedAt: time.Now(), } if err := g.pendingCollectionRepo.Create(pendingReq); err != nil { g.logger.Errorf("创建待采集请求失败 (CorrelationID: %s): %v", correlationID, err) return err } g.logger.Infof("成功创建待采集请求 (CorrelationID: %s, DeviceID: %d)", correlationID, regionalController.ID) // 5. 构建最终的空中载荷 batchCmd := &proto.BatchCollectCommand{ CorrelationId: correlationID, Tasks: collectTasks, } instruction := &proto.Instruction{ Payload: &proto.Instruction_BatchCollectCommand{ BatchCollectCommand: batchCmd, }, } payload, err := gproto.Marshal(instruction) if err != nil { g.logger.Errorf("序列化采集指令失败 (CorrelationID: %s): %v", correlationID, err) return err } g.logger.Infof("构造空中载荷成功: networkID: %v, payload: %v", networkID, instruction) if _, err := g.comm.Send(networkID, payload); err != nil { g.logger.DPanicf("待采集请求 (CorrelationID: %s) 已创建,但发送到设备失败: %v。数据可能不一致!", correlationID, err) return err } g.logger.Infof("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, networkID) return nil }