diff --git a/config.yml b/config.yml index af5693c..4531cc0 100644 --- a/config.yml +++ b/config.yml @@ -50,6 +50,10 @@ chirp_stack: api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer fport: 1 api_timeout: 10 # ChirpStack API请求超时时间(秒) + # 等待设备上行响应的超时时间(秒)。 + # 对于LoRaWAN这种延迟较高的网络,建议设置为5分钟 (300秒) 或更长。 + collection_request_timeout: 300 + # 任务调度器配置 task: diff --git a/internal/app/api/api.go b/internal/app/api/api.go index 89c48d7..6d2dbf9 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -53,8 +53,6 @@ func NewAPI(cfg config.ServerConfig, userRepo repository.UserRepository, deviceRepository repository.DeviceRepository, planRepository repository.PlanRepository, - sensorDataRepository repository.SensorDataRepository, - executionLogRepository repository.ExecutionLogRepository, tokenService token.TokenService, listenHandler transport.ListenHandler, analysisTaskManager *task.AnalysisPlanTaskManager) *API { diff --git a/internal/app/controller/device/device_controller.go b/internal/app/controller/device/device_controller.go index 3eac4b5..97d5d10 100644 --- a/internal/app/controller/device/device_controller.go +++ b/internal/app/controller/device/device_controller.go @@ -77,7 +77,7 @@ func newDeviceResponse(device *models.Device) (*DeviceResponse, error) { var props map[string]interface{} if len(device.Properties) > 0 && string(device.Properties) != "null" { - if err := json.Unmarshal(device.Properties, &props); err != nil { + if err := device.ParseProperties(&props); err != nil { return nil, fmt.Errorf("解析设备属性失败 (ID: %d): %w", device.ID, err) } } @@ -143,6 +143,13 @@ func (c *Controller) CreateDevice(ctx *gin.Context) { Properties: propertiesJSON, } + // 在创建设备前进行自检 + if !device.SelfCheck() { + c.logger.Errorf("创建设备: 设备属性自检失败: %v", device) + controller.SendErrorResponse(ctx, controller.CodeBadRequest, "设备属性不符合要求") + return + } + if err := c.repo.Create(device); err != nil { c.logger.Errorf("创建设备: 数据库操作失败: %v", err) controller.SendErrorResponse(ctx, controller.CodeInternalError, "创建设备失败") @@ -272,6 +279,13 @@ func (c *Controller) UpdateDevice(ctx *gin.Context) { existingDevice.Location = req.Location existingDevice.Properties = propertiesJSON + // 在更新设备前进行自检 + if !existingDevice.SelfCheck() { + c.logger.Errorf("更新设备: 设备属性自检失败: %v", existingDevice) + controller.SendErrorResponse(ctx, controller.CodeBadRequest, "设备属性不符合要求") + return + } + // 4. 将修改后的 existingDevice 对象保存回数据库 if err := c.repo.Update(existingDevice); err != nil { c.logger.Errorf("更新设备: 数据库操作失败: %v", err) diff --git a/internal/app/controller/plan/converter.go b/internal/app/controller/plan/converter.go index 1727c14..57dbf96 100644 --- a/internal/app/controller/plan/converter.go +++ b/internal/app/controller/plan/converter.go @@ -189,7 +189,7 @@ func TaskToResponse(task *models.Task) (TaskResponse, error) { var params map[string]interface{} if len(task.Parameters) > 0 && string(task.Parameters) != "null" { - if err := json.Unmarshal(task.Parameters, ¶ms); err != nil { + if err := task.ParseParameters(¶ms); err != nil { return TaskResponse{}, fmt.Errorf("parsing task parameters failed (ID: %d): %w", task.ID, err) } } diff --git a/internal/app/service/device/device_service.go b/internal/app/service/device/device_service.go index 7b37832..d0296af 100644 --- a/internal/app/service/device/device_service.go +++ b/internal/app/service/device/device_service.go @@ -21,7 +21,10 @@ var ( type Service interface { // Switch 用于切换指定设备的状态, 比如启动和停止 - Switch(device models.Device, action DeviceAction) error + Switch(device *models.Device, action DeviceAction) error + + // Collect 用于发起对指定区域主控下的多个设备的批量采集请求。 + Collect(regionalControllerID uint, devicesToCollect []*models.Device) error } // 设备操作指令通用结构(最外层) diff --git a/internal/app/service/device/general_device_service.go b/internal/app/service/device/general_device_service.go index 473ff03..964d207 100644 --- a/internal/app/service/device/general_device_service.go +++ b/internal/app/service/device/general_device_service.go @@ -1,31 +1,43 @@ package device import ( + "errors" "fmt" "strconv" + "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" + "github.com/google/uuid" gproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) type GeneralDeviceService struct { - deviceRepo repository.DeviceRepository - logger *logs.Logger - - comm transport.Communicator + deviceRepo repository.DeviceRepository + deviceCommandLogRepo repository.DeviceCommandLogRepository + pendingCollectionRepo repository.PendingCollectionRepository + logger *logs.Logger + comm transport.Communicator } // NewGeneralDeviceService 创建一个通用设备服务 -func NewGeneralDeviceService(deviceRepo repository.DeviceRepository, logger *logs.Logger, comm transport.Communicator) *GeneralDeviceService { +func NewGeneralDeviceService( + deviceRepo repository.DeviceRepository, + deviceCommandLogRepo repository.DeviceCommandLogRepository, + pendingCollectionRepo repository.PendingCollectionRepository, + logger *logs.Logger, + comm transport.Communicator, +) Service { return &GeneralDeviceService{ - deviceRepo: deviceRepo, - logger: logger, - comm: comm, + deviceRepo: deviceRepo, + deviceCommandLogRepo: deviceCommandLogRepo, + pendingCollectionRepo: pendingCollectionRepo, + logger: logger, + comm: comm, } } @@ -45,18 +57,10 @@ func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction return fmt.Errorf("解析设备 %v(id=%v) 配置失败: %v", device.Name, device.ID, err) } - busNumber, err := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusNumber])) - if err != nil { - return fmt.Errorf("无效的总线号: %v", err) - } - busAddress, err := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusAddress])) - if err != nil { - return fmt.Errorf("无效的总线地址: %v", err) - } - relayChannel, err := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.RelayChannel])) - if err != nil { - return fmt.Errorf("无效的继电器通道: %v", err) - } + // 已通过 SelfCheck 保证其为纯数字,此处仅进行类型转换 + busNumber, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusNumber])) + busAddress, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusAddress])) + relayChannel, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.RelayChannel])) data, err := anypb.New(&proto.Switch{ DeviceAction: string(action), @@ -87,11 +91,141 @@ func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction } loraAddress := fmt.Sprintf("%v", thisDeviceinfo[models.LoRaAddress]) - // 生成消息并发送 + // 生成消息 message, err := gproto.Marshal(instruction) if err != nil { return fmt.Errorf("序列化指令失败: %v", err) } - return g.comm.Send(loraAddress, message) + // 发送指令并获取 SendResult + sendResult, err := g.comm.Send(loraAddress, message) + if err != nil { + // 发送失败,直接返回错误 + return fmt.Errorf("发送指令到设备 %s 失败: %w", loraAddress, err) + } + + // 创建并保存命令日志 + logRecord := &models.DeviceCommandLog{ + MessageID: sendResult.MessageID, + DeviceID: thisDevice.ID, // thisDevice 是我们查出来的区域主控 + SentAt: time.Now(), + } + + if err := g.deviceCommandLogRepo.Create(logRecord); err != nil { + // 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。 + // 我们记录一个错误日志,然后成功返回。 + g.logger.Errorf("创建指令日志失败 (MessageID: %s): %v", sendResult.MessageID, err) + } + + g.logger.Infof("成功发送指令到设备 %s 并创建日志 (MessageID: %s)", loraAddress, sendResult.MessageID) + return nil +} + +// Collect 实现了 Service 接口,用于发起对指定区域主控下的多个设备的批量采集请求。 +// 它负责查找区域主控、生成关联ID、创建待处理记录、构建指令并最终发送。 +func (g *GeneralDeviceService) Collect(regionalControllerID uint, devicesToCollect []*models.Device) error { + if regionalControllerID == 0 { + return errors.New("区域主控ID不能为空") + } + + if len(devicesToCollect) == 0 { + // 如果没有要采集的设备,这不是一个错误,只是一个空操作。 + g.logger.Info("待采集设备列表为空,无需执行采集任务。") + return nil + } + + // 1. 查找并自检区域主控设备 + regionalController, err := g.deviceRepo.FindByID(regionalControllerID) + if err != nil { + return fmt.Errorf("查找区域主控 (ID: %d) 失败: %w", regionalControllerID, err) + } + if !regionalController.SelfCheck() { + return fmt.Errorf("区域主控 (ID: %d) 未通过自检,缺少必要属性", regionalControllerID) + } + + // 2. 准备采集任务列表和数据库存根,并验证设备 + var childDeviceIDs []uint + var collectTasks []*proto.CollectTask + + for _, dev := range devicesToCollect { + // 验证设备是否属于指定的区域主控 + if dev.ParentID == nil || *dev.ParentID != regionalControllerID { + return fmt.Errorf("设备 '%s' (ID: %d) 不属于指定的区域主控 (ID: %d)", dev.Name, dev.ID, regionalControllerID) + } + + // 对每个待采集的设备执行自检 + if !dev.SelfCheck() { + g.logger.Warnf("跳过设备 %d,因其未通过自检", dev.ID) + continue + } + + // 自检已通过,我们可以安全地解析属性 + var props map[string]interface{} + // 此时 ParseProperties 不应失败 + _ = dev.ParseProperties(&props) + + busNumber := props[models.BusNumber].(float64) + busAddress := props[models.BusAddress].(float64) + + collectTasks = append(collectTasks, &proto.CollectTask{ + DeviceAction: dev.Command, + BusNumber: int32(busNumber), + BusAddress: int32(busAddress), + }) + childDeviceIDs = append(childDeviceIDs, dev.ID) + } + + if len(childDeviceIDs) == 0 { + return errors.New("经过滤后,没有可通过自检的有效设备") + } + + // 3. 从区域主控的属性中解析出 DevEui (loraAddress) + var rcProps map[string]interface{} + // SelfCheck 已保证属性可解析 + _ = regionalController.ParseProperties(&rcProps) + loraAddress := rcProps[models.LoRaAddress].(string) + + // 4. 创建待处理请求记录 + correlationID := uuid.New().String() + pendingReq := &models.PendingCollection{ + CorrelationID: correlationID, + DeviceID: regionalController.ID, + CommandMetadata: childDeviceIDs, + Status: models.PendingStatusPending, + CreatedAt: time.Now(), + } + if err := g.pendingCollectionRepo.Create(pendingReq); err != nil { + g.logger.Errorf("创建待采集请求失败 (CorrelationID: %s): %v", correlationID, err) + return err + } + g.logger.Infof("成功创建待采集请求 (CorrelationID: %s, DeviceID: %d)", correlationID, regionalController.ID) + + // 5. 构建最终的空中载荷 + batchCmd := &proto.BatchCollectCommand{ + CorrelationId: correlationID, + Tasks: collectTasks, + } + anyData, err := anypb.New(batchCmd) + if err != nil { + g.logger.Errorf("创建 Any Protobuf 失败 (CorrelationID: %s): %v", correlationID, err) + return err + } + instruction := &proto.Instruction{ + Method: proto.MethodType_COLLECT, + Data: anyData, + } + payload, err := gproto.Marshal(instruction) + if err != nil { + g.logger.Errorf("序列化采集指令失败 (CorrelationID: %s): %v", correlationID, err) + return err + } + + // 6. 发送指令 + if _, err := g.comm.Send(loraAddress, payload); err != nil { + g.logger.DPanicf("待采集请求 (CorrelationID: %s) 已创建,但发送到设备失败: %v。数据可能不一致!", correlationID, err) + return err + } + + g.logger.Infof("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, loraAddress) + return nil } diff --git a/internal/app/service/device/proto/device.pb.go b/internal/app/service/device/proto/device.pb.go index a775c64..9150c9d 100644 --- a/internal/app/service/device/proto/device.pb.go +++ b/internal/app/service/device/proto/device.pb.go @@ -69,7 +69,7 @@ func (MethodType) EnumDescriptor() ([]byte, []int) { return file_device_proto_rawDescGZIP(), []int{0} } -// 指令 +// 指令 (所有空中数据都会被包装在这里面) type Instruction struct { state protoimpl.MessageState `protogen:"open.v1"` Method MethodType `protobuf:"varint,1,opt,name=method,proto3,enum=device.MethodType" json:"method,omitempty"` @@ -122,6 +122,7 @@ func (x *Instruction) GetData() *anypb.Any { return nil } +// Switch 指令的载荷 type Switch struct { state protoimpl.MessageState `protogen:"open.v1"` DeviceAction string `protobuf:"bytes,1,opt,name=device_action,json=deviceAction,proto3" json:"device_action,omitempty"` // 指令 @@ -190,29 +191,31 @@ func (x *Switch) GetRelayChannel() int32 { return 0 } -type Collect struct { +// BatchCollectCommand +// 用于在平台内部构建一个完整的、包含所有元数据的批量采集任务。 +// 这个消息本身不会被发送到设备。 +type BatchCollectCommand struct { state protoimpl.MessageState `protogen:"open.v1"` - BusNumber int32 `protobuf:"varint,1,opt,name=bus_number,json=busNumber,proto3" json:"bus_number,omitempty"` // 总线号 - BusAddress int32 `protobuf:"varint,2,opt,name=bus_address,json=busAddress,proto3" json:"bus_address,omitempty"` // 总线地址 - Value float32 `protobuf:"fixed32,3,opt,name=value,proto3" json:"value,omitempty"` // 采集值 + CorrelationId string `protobuf:"bytes,1,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"` // 用于关联请求和响应的唯一ID + Tasks []*CollectTask `protobuf:"bytes,2,rep,name=tasks,proto3" json:"tasks,omitempty"` // 采集任务列表 unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } -func (x *Collect) Reset() { - *x = Collect{} +func (x *BatchCollectCommand) Reset() { + *x = BatchCollectCommand{} mi := &file_device_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *Collect) String() string { +func (x *BatchCollectCommand) String() string { return protoimpl.X.MessageStringOf(x) } -func (*Collect) ProtoMessage() {} +func (*BatchCollectCommand) ProtoMessage() {} -func (x *Collect) ProtoReflect() protoreflect.Message { +func (x *BatchCollectCommand) ProtoReflect() protoreflect.Message { mi := &file_device_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -224,30 +227,139 @@ func (x *Collect) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use Collect.ProtoReflect.Descriptor instead. -func (*Collect) Descriptor() ([]byte, []int) { +// Deprecated: Use BatchCollectCommand.ProtoReflect.Descriptor instead. +func (*BatchCollectCommand) Descriptor() ([]byte, []int) { return file_device_proto_rawDescGZIP(), []int{2} } -func (x *Collect) GetBusNumber() int32 { +func (x *BatchCollectCommand) GetCorrelationId() string { + if x != nil { + return x.CorrelationId + } + return "" +} + +func (x *BatchCollectCommand) GetTasks() []*CollectTask { + if x != nil { + return x.Tasks + } + return nil +} + +// CollectTask +// 定义了单个采集任务的“意图”。 +type CollectTask struct { + state protoimpl.MessageState `protogen:"open.v1"` + DeviceAction string `protobuf:"bytes,1,opt,name=device_action,json=deviceAction,proto3" json:"device_action,omitempty"` // 指令 + BusNumber int32 `protobuf:"varint,2,opt,name=bus_number,json=busNumber,proto3" json:"bus_number,omitempty"` // 总线号 + BusAddress int32 `protobuf:"varint,3,opt,name=bus_address,json=busAddress,proto3" json:"bus_address,omitempty"` // 总线地址 + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CollectTask) Reset() { + *x = CollectTask{} + mi := &file_device_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CollectTask) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CollectTask) ProtoMessage() {} + +func (x *CollectTask) ProtoReflect() protoreflect.Message { + mi := &file_device_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CollectTask.ProtoReflect.Descriptor instead. +func (*CollectTask) Descriptor() ([]byte, []int) { + return file_device_proto_rawDescGZIP(), []int{3} +} + +func (x *CollectTask) GetDeviceAction() string { + if x != nil { + return x.DeviceAction + } + return "" +} + +func (x *CollectTask) GetBusNumber() int32 { if x != nil { return x.BusNumber } return 0 } -func (x *Collect) GetBusAddress() int32 { +func (x *CollectTask) GetBusAddress() int32 { if x != nil { return x.BusAddress } return 0 } -func (x *Collect) GetValue() float32 { +// CollectResult +// 这是设备响应的、极致精简的数据包。 +type CollectResult struct { + state protoimpl.MessageState `protogen:"open.v1"` + CorrelationId string `protobuf:"bytes,1,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"` // 从下行指令中原样返回的关联ID + Values []float32 `protobuf:"fixed32,2,rep,packed,name=values,proto3" json:"values,omitempty"` // 按预定顺序排列的采集值 + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CollectResult) Reset() { + *x = CollectResult{} + mi := &file_device_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CollectResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CollectResult) ProtoMessage() {} + +func (x *CollectResult) ProtoReflect() protoreflect.Message { + mi := &file_device_proto_msgTypes[4] if x != nil { - return x.Value + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms } - return 0 + return mi.MessageOf(x) +} + +// Deprecated: Use CollectResult.ProtoReflect.Descriptor instead. +func (*CollectResult) Descriptor() ([]byte, []int) { + return file_device_proto_rawDescGZIP(), []int{4} +} + +func (x *CollectResult) GetCorrelationId() string { + if x != nil { + return x.CorrelationId + } + return "" +} + +func (x *CollectResult) GetValues() []float32 { + if x != nil { + return x.Values + } + return nil } var File_device_proto protoreflect.FileDescriptor @@ -264,13 +376,19 @@ const file_device_proto_rawDesc = "" + "bus_number\x18\x02 \x01(\x05R\tbusNumber\x12\x1f\n" + "\vbus_address\x18\x03 \x01(\x05R\n" + "busAddress\x12#\n" + - "\rrelay_channel\x18\x04 \x01(\x05R\frelayChannel\"_\n" + - "\aCollect\x12\x1d\n" + + "\rrelay_channel\x18\x04 \x01(\x05R\frelayChannel\"g\n" + + "\x13BatchCollectCommand\x12%\n" + + "\x0ecorrelation_id\x18\x01 \x01(\tR\rcorrelationId\x12)\n" + + "\x05tasks\x18\x02 \x03(\v2\x13.device.CollectTaskR\x05tasks\"r\n" + + "\vCollectTask\x12#\n" + + "\rdevice_action\x18\x01 \x01(\tR\fdeviceAction\x12\x1d\n" + "\n" + - "bus_number\x18\x01 \x01(\x05R\tbusNumber\x12\x1f\n" + - "\vbus_address\x18\x02 \x01(\x05R\n" + - "busAddress\x12\x14\n" + - "\x05value\x18\x03 \x01(\x02R\x05value*%\n" + + "bus_number\x18\x02 \x01(\x05R\tbusNumber\x12\x1f\n" + + "\vbus_address\x18\x03 \x01(\x05R\n" + + "busAddress\"N\n" + + "\rCollectResult\x12%\n" + + "\x0ecorrelation_id\x18\x01 \x01(\tR\rcorrelationId\x12\x16\n" + + "\x06values\x18\x02 \x03(\x02R\x06values*%\n" + "\n" + "MethodType\x12\n" + "\n" + @@ -290,22 +408,25 @@ func file_device_proto_rawDescGZIP() []byte { } var file_device_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_device_proto_goTypes = []any{ - (MethodType)(0), // 0: device.MethodType - (*Instruction)(nil), // 1: device.Instruction - (*Switch)(nil), // 2: device.Switch - (*Collect)(nil), // 3: device.Collect - (*anypb.Any)(nil), // 4: google.protobuf.Any + (MethodType)(0), // 0: device.MethodType + (*Instruction)(nil), // 1: device.Instruction + (*Switch)(nil), // 2: device.Switch + (*BatchCollectCommand)(nil), // 3: device.BatchCollectCommand + (*CollectTask)(nil), // 4: device.CollectTask + (*CollectResult)(nil), // 5: device.CollectResult + (*anypb.Any)(nil), // 6: google.protobuf.Any } var file_device_proto_depIdxs = []int32{ 0, // 0: device.Instruction.method:type_name -> device.MethodType - 4, // 1: device.Instruction.data:type_name -> google.protobuf.Any - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 6, // 1: device.Instruction.data:type_name -> google.protobuf.Any + 4, // 2: device.BatchCollectCommand.tasks:type_name -> device.CollectTask + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_device_proto_init() } @@ -319,7 +440,7 @@ func file_device_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_device_proto_rawDesc), len(file_device_proto_rawDesc)), NumEnums: 1, - NumMessages: 3, + NumMessages: 5, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/app/service/device/proto/device.proto b/internal/app/service/device/proto/device.proto index 8a98e32..9476170 100644 --- a/internal/app/service/device/proto/device.proto +++ b/internal/app/service/device/proto/device.proto @@ -6,27 +6,50 @@ import "google/protobuf/any.proto"; option go_package = "internal/app/service/device/proto"; +// --- 通用指令结构 --- + // 指令类型 -enum MethodType{ - SWITCH = 0; // 启停 +enum MethodType { + SWITCH = 0; // 启停 COLLECT = 1; // 采集 } -// 指令 -message Instruction{ +// 指令 (所有空中数据都会被包装在这里面) +message Instruction { MethodType method = 1; google.protobuf.Any data = 2; } -message Switch{ +// Switch 指令的载荷 +message Switch { string device_action = 1; // 指令 - int32 bus_number = 2; // 总线号 - int32 bus_address = 3; // 总线地址 - int32 relay_channel = 4; // 继电器通道号 + int32 bus_number = 2; // 总线号 + int32 bus_address = 3; // 总线地址 + int32 relay_channel = 4; // 继电器通道号 } -message Collect{ - int32 bus_number = 1; // 总线号 - int32 bus_address = 2; // 总线地址 - float value = 3; // 采集值 + +// --- 批量采集相关结构 --- + +// BatchCollectCommand +// 用于在平台内部构建一个完整的、包含所有元数据的批量采集任务。 +// 这个消息本身不会被发送到设备。 +message BatchCollectCommand { + string correlation_id = 1; // 用于关联请求和响应的唯一ID + repeated CollectTask tasks = 2; // 采集任务列表 +} + +// CollectTask +// 定义了单个采集任务的“意图”。 +message CollectTask { + string device_action = 1; // 指令 + int32 bus_number = 2; // 总线号 + int32 bus_address = 3; // 总线地址 +} + +// CollectResult +// 这是设备响应的、极致精简的数据包。 +message CollectResult { + string correlation_id = 1; // 从下行指令中原样返回的关联ID + repeated float values = 2; // 按预定顺序排列的采集值 } \ No newline at end of file diff --git a/internal/app/service/task/delay_task.go b/internal/app/service/task/delay_task.go index 5d96a36..dcfdb41 100644 --- a/internal/app/service/task/delay_task.go +++ b/internal/app/service/task/delay_task.go @@ -1,7 +1,6 @@ package task import ( - "encoding/json" "fmt" "time" @@ -46,7 +45,7 @@ func (d *DelayTask) parseParameters() error { } var params DelayTaskParams - err := json.Unmarshal(d.executionTask.Task.Parameters, ¶ms) + err := d.executionTask.Task.ParseParameters(¶ms) if err != nil { d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) return fmt.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) diff --git a/internal/app/service/task/release_feed_weight_task.go b/internal/app/service/task/release_feed_weight_task.go index d3a89a4..6541206 100644 --- a/internal/app/service/task/release_feed_weight_task.go +++ b/internal/app/service/task/release_feed_weight_task.go @@ -9,7 +9,6 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" ) // ReleaseFeedWeightTaskParams 定义了 ReleaseFeedWeightTask 的参数结构 @@ -25,23 +24,28 @@ type ReleaseFeedWeightTask struct { sensorDataRepo repository.SensorDataRepository claimedLog *models.TaskExecutionLog - feedPortDevice *models.Device // 下料口基本信息 - releaseWeight float64 // 需要释放的重量 - mixingTankDeviceID uint // 搅拌罐称重传感器ID + feedPortDevice *models.Device + releaseWeight float64 + mixingTankDeviceID uint - comm transport.Communicator - feedPort *device.GeneralDeviceService // 下料口指令下发器 + feedPort device.Service logger *logs.Logger } // NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例 -func NewReleaseFeedWeightTask(claimedLog *models.TaskExecutionLog, deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator, logger *logs.Logger) Task { +func NewReleaseFeedWeightTask( + claimedLog *models.TaskExecutionLog, + sensorDataRepo repository.SensorDataRepository, + deviceRepo repository.DeviceRepository, + deviceService device.Service, + logger *logs.Logger, +) Task { return &ReleaseFeedWeightTask{ claimedLog: claimedLog, deviceRepo: deviceRepo, sensorDataRepo: sensorDataRepo, - comm: comm, + feedPort: deviceService, // 直接注入 logger: logger, } } @@ -118,7 +122,7 @@ func (r *ReleaseFeedWeightTask) parseParameters() error { } var params ReleaseFeedWeightTaskParams - err := json.Unmarshal(r.claimedLog.Task.Parameters, ¶ms) + err := r.claimedLog.Task.ParseParameters(¶ms) if err != nil { r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) return fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) @@ -140,7 +144,6 @@ func (r *ReleaseFeedWeightTask) parseParameters() error { r.releaseWeight = params.ReleaseWeight r.mixingTankDeviceID = params.MixingTankDeviceID - r.feedPort = device.NewGeneralDeviceService(r.deviceRepo, r.logger, r.comm) r.feedPortDevice, err = r.deviceRepo.FindByID(params.FeedPortDeviceID) if err != nil { r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index 53fe5ae..d60d6c4 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -1,15 +1,14 @@ package task import ( - "encoding/json" "errors" "sync" "time" + "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "github.com/panjf2000/ants/v2" "gorm.io/gorm" ) @@ -84,9 +83,9 @@ type Scheduler struct { deviceRepo repository.DeviceRepository sensorDataRepo repository.SensorDataRepository planRepo repository.PlanRepository - comm transport.Communicator analysisPlanTaskManager *AnalysisPlanTaskManager progressTracker *ProgressTracker + deviceService device.Service pool *ants.Pool // 使用 ants 协程池来管理并发 wg sync.WaitGroup @@ -100,20 +99,21 @@ func NewScheduler( deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, planRepo repository.PlanRepository, - comm transport.Communicator, analysisPlanTaskManager *AnalysisPlanTaskManager, logger *logs.Logger, + deviceService device.Service, interval time.Duration, - numWorkers int) *Scheduler { + numWorkers int, +) *Scheduler { return &Scheduler{ pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, deviceRepo: deviceRepo, sensorDataRepo: sensorDataRepo, planRepo: planRepo, - comm: comm, analysisPlanTaskManager: analysisPlanTaskManager, logger: logger, + deviceService: deviceService, pollingInterval: interval, workers: numWorkers, progressTracker: NewProgressTracker(), @@ -289,7 +289,7 @@ func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task { case models.TaskTypeWaiting: return NewDelayTask(s.logger, claimedLog) case models.TaskTypeReleaseFeedWeight: - return NewReleaseFeedWeightTask(claimedLog, s.deviceRepo, s.sensorDataRepo, s.comm, s.logger) + return NewReleaseFeedWeightTask(claimedLog, s.sensorDataRepo, s.deviceRepo, s.deviceService, s.logger) default: // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 @@ -304,7 +304,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { var params struct { PlanID uint `json:"plan_id"` } - if err := json.Unmarshal(claimedLog.Task.Parameters, ¶ms); err != nil { + if err := claimedLog.Task.ParseParameters(¶ms); err != nil { s.logger.Errorf("解析任务参数中的计划ID失败,日志ID: %d, 错误: %v", claimedLog.ID, err) return err } diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index a3ec41f..2e8ed77 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -7,9 +7,11 @@ import ( "net/http" "time" + "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + gproto "google.golang.org/protobuf/proto" "gorm.io/datatypes" ) @@ -27,23 +29,27 @@ const ( // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 type ChirpStackListener struct { - logger *logs.Logger - sensorDataRepo repository.SensorDataRepository - deviceRepo repository.DeviceRepository - deviceCommandLogRepo repository.DeviceCommandLogRepository + logger *logs.Logger + sensorDataRepo repository.SensorDataRepository + deviceRepo repository.DeviceRepository + deviceCommandLogRepo repository.DeviceCommandLogRepository + pendingCollectionRepo repository.PendingCollectionRepository // 新增 } +// NewChirpStackListener 创建一个新的 ChirpStackListener 实例 func NewChirpStackListener( logger *logs.Logger, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, deviceCommandLogRepo repository.DeviceCommandLogRepository, -) *ChirpStackListener { + pendingCollectionRepo repository.PendingCollectionRepository, // 新增 +) ListenHandler { // 返回接口类型 return &ChirpStackListener{ - logger: logger, - sensorDataRepo: sensorDataRepo, - deviceRepo: deviceRepo, - deviceCommandLogRepo: deviceCommandLogRepo, + logger: logger, + sensorDataRepo: sensorDataRepo, + deviceRepo: deviceRepo, + deviceCommandLogRepo: deviceCommandLogRepo, + pendingCollectionRepo: pendingCollectionRepo, // 新增 } } @@ -151,12 +157,20 @@ type GenericSensorReading struct { // handleUpEvent 处理上行数据事件 func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { - c.logger.Infof("处理 'up' 事件: %+v", event) + c.logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui) - // 记录信号强度 - // 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。 - // 因此,我们只取第一个 RxInfo 中的信号数据即可。 + // 1. 查找区域主控设备 + regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + if err != nil { + c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) + return + } + c.logger.Infof("找到区域主控: %s (ID: %d)", regionalController.Name, regionalController.ID) + + // 2. 记录区域主控的信号强度 (如果存在) if len(event.RxInfo) > 0 { + // 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。 + // 因此,我们只取第一个 RxInfo 中的信号数据即可。 rx := event.RxInfo[0] // 取第一个接收到的网关信息 // 构建 SignalMetrics 结构体 @@ -164,73 +178,111 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { RssiDbm: rx.Rssi, SnrDb: rx.Snr, } - - // 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui - regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) - if err != nil { - c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) - return - } - // 记录区域主控的信号强度 c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) + c.logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", regionalController.ID, rx.Rssi, rx.Snr) } else { c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui) } - // 解析并记录传感器数据 (温度、湿度、重量) - // 假设 event.Data (frmPayload) 是 Base64 编码的 JSON 数组字符串 - if event.Data != "" { - decodedData, err := base64.StdEncoding.DecodeString(event.Data) + // 3. 处理上报的传感器数据 + if event.Data == "" { + c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无需记录上行数据。DevEui: %s", event.DeviceInfo.DevEui) + return + } + + // 3.1 Base64 解码 + decodedData, err := base64.StdEncoding.DecodeString(event.Data) + if err != nil { + c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data) + return + } + + // 3.2 解析外层 "信封" + var instruction proto.Instruction + if err := gproto.Unmarshal(decodedData, &instruction); err != nil { + c.logger.Errorf("解析上行 Instruction Protobuf 失败: %v, Decoded Data: %x", err, decodedData) + return + } + + // 3.3 检查是否是采集响应 + if instruction.Method != proto.MethodType_COLLECT { + c.logger.Infof("收到一个非采集响应的上行指令 (Method: %s),无需处理。", instruction.Method.String()) + return + } + + // 2.4 解包内层 CollectResult + var collectResp proto.CollectResult + if err := instruction.Data.UnmarshalTo(&collectResp); err != nil { + c.logger.Errorf("解包数据信息失败: %v", err) + return + } + + correlationID := collectResp.CorrelationId + c.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values)) + + // 3. 根据 CorrelationID 查找待处理请求 + pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(correlationID) + if err != nil { + c.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err) + return + } + + // 检查状态,防止重复处理 + if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut { + c.logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status) + return + } + + // 4. 匹配数据并存入数据库 + deviceIDs := pendingReq.CommandMetadata + values := collectResp.Values + if len(deviceIDs) != len(values) { + c.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID) + // TODO 数量不匹配是否全改成失败 + // 即使数量不匹配,也更新状态为完成,以防止请求永远 pending + err = c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time) if err != nil { - c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data) - return + c.logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err) } + return + } - var readings []GenericSensorReading - if err := json.Unmarshal(decodedData, &readings); err != nil { - c.logger.Errorf("解析 'up' 事件的解码后 Data (JSON 数组) 失败: %v, Decoded Data: %s", err, string(decodedData)) - return - } - - // 查找区域主控设备,以便记录其ID - regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) + for i, deviceID := range deviceIDs { + value := values[i] + dev, err := c.deviceRepo.FindByID(deviceID) if err != nil { - c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) - return + c.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err) + continue } - for _, reading := range readings { - // 根据类型构建具体的传感器数据结构体 - var sensorData interface{} - var sensorDataType models.SensorDataType - - switch reading.Type { - case models.SensorDataTypeTemperature: // 使用枚举常量 - sensorData = models.TemperatureData{ - TemperatureCelsius: reading.Value, - } - sensorDataType = models.SensorDataTypeTemperature - case models.SensorDataTypeHumidity: // 使用枚举常量 - sensorData = models.HumidityData{ - HumidityPercent: reading.Value, - } - sensorDataType = models.SensorDataTypeHumidity - case models.SensorDataTypeWeight: // 使用枚举常量 - sensorData = models.WeightData{ - WeightKilograms: reading.Value, - } - sensorDataType = models.SensorDataTypeWeight - default: - c.logger.Warnf("处理 'up' 事件时遇到未知传感器类型: %s, Value: %f. 区域主控DevEui: %s, 传感器设备ID: %d", - reading.Type, reading.Value, event.DeviceInfo.DevEui, reading.DeviceID) - continue // 跳过未知类型 - } - - // 记录普通设备的传感器数据 - c.recordSensorData(regionalController.ID, reading.DeviceID, event.Time, sensorDataType, sensorData) + sensorDataType, ok := models.DeviceSubTypeToSensorDataTypeMap[dev.SubType] + if !ok { + c.logger.Warnf("设备 %d 的子类型 '%s' 没有对应的传感器数据类型,跳过记录。", dev.ID, dev.SubType) + continue } + + var sensorData interface{} + switch sensorDataType { + case models.SensorDataTypeTemperature: + sensorData = models.TemperatureData{TemperatureCelsius: float64(value)} + case models.SensorDataTypeHumidity: + sensorData = models.HumidityData{HumidityPercent: float64(value)} + case models.SensorDataTypeWeight: + sensorData = models.WeightData{WeightKilograms: float64(value)} + default: + c.logger.Warnf("未处理的传感器数据类型 '%s' (设备ID: %d)", sensorDataType, dev.ID) + continue + } + + c.recordSensorData(pendingReq.DeviceID, dev.ID, event.Time, sensorDataType, sensorData) + c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 值=%.2f", dev.ID, sensorDataType, value) + } + + // 5. 更新请求状态为“已完成” + if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time); err != nil { + c.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err) } else { - c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无法记录传感器数据。DevEui: %s", event.DeviceInfo.DevEui) + c.logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID) } } diff --git a/internal/core/application.go b/internal/core/application.go index fdc74bf..216c205 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -8,6 +8,7 @@ import ( "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/api" + "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/task" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/token" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport" @@ -31,6 +32,7 @@ type Application struct { planRepo repository.PlanRepository pendingTaskRepo repository.PendingTaskRepository executionLogRepo repository.ExecutionLogRepository + pendingCollectionRepo repository.PendingCollectionRepository analysisPlanTaskManager *task.AnalysisPlanTaskManager } @@ -76,32 +78,64 @@ func NewApplication(configPath string) (*Application, error) { // 初始化命令下发历史仓库 deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB()) + // 初始化待采集请求仓库 + pendingCollectionRepo := repository.NewGormPendingCollectionRepository(storage.GetDB()) + // 初始化设备上行监听器 - listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo) + listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo, pendingCollectionRepo) // 初始化计划触发器管理器 analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) - // 初始化设备通信器 - comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger, deviceCommandLogRepo, deviceRepo) + // 初始化设备通信器 (纯粹的通信客户端) + comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger) + + // 初始化通用设备服务 + generalDeviceService := device.NewGeneralDeviceService( + deviceRepo, + deviceCommandLogRepo, + pendingCollectionRepo, + logger, + comm, + ) // 初始化任务执行器 - executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, deviceRepo, sensorDataRepo, planRepo, comm, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) + executor := task.NewScheduler( + pendingTaskRepo, + executionLogRepo, + deviceRepo, + sensorDataRepo, + planRepo, + analysisPlanTaskManager, + logger, + generalDeviceService, + time.Duration(cfg.Task.Interval)*time.Second, + cfg.Task.NumWorkers, + ) // 初始化 API 服务器 - apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager) + apiServer := api.NewAPI( + cfg.Server, + logger, + userRepo, + deviceRepo, + planRepo, + tokenService, + listenHandler, + analysisPlanTaskManager, + ) // 组装 Application 对象 app := &Application{ - Config: cfg, - Logger: logger, - Storage: storage, - Executor: executor, - API: apiServer, - // 填充新增的字段 + Config: cfg, + Logger: logger, + Storage: storage, + Executor: executor, + API: apiServer, planRepo: planRepo, pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, + pendingCollectionRepo: pendingCollectionRepo, analysisPlanTaskManager: analysisPlanTaskManager, } @@ -112,7 +146,13 @@ func NewApplication(configPath string) (*Application, error) { func (app *Application) Start() error { app.Logger.Info("应用启动中...") - // --- 新增逻辑:初始化待执行任务列表 --- + // --- 清理待采集任务 --- + if err := app.initializePendingCollections(); err != nil { + // 这是一个非致命错误,记录它,但应用应继续启动 + app.Logger.Error(err) + } + + // --- 初始化待执行任务列表 --- if err := app.initializePendingTasks( app.planRepo, // 传入 planRepo app.pendingTaskRepo, // 传入 pendingTaskRepo @@ -160,6 +200,25 @@ func (app *Application) Stop() error { return nil } +// initializePendingCollections 在应用启动时处理所有未完成的采集请求。 +// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 +// 这保证了系统在每次启动时都处于一个干净、确定的状态。 +func (app *Application) initializePendingCollections() error { + app.Logger.Info("开始清理所有未完成的采集请求...") + + // 直接将所有 'pending' 状态的请求更新为 'timed_out'。 + count, err := app.pendingCollectionRepo.MarkAllPendingAsTimedOut() + if err != nil { + return fmt.Errorf("清理未完成的采集请求失败: %v", err) + } else if count > 0 { + app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count) + } else { + app.Logger.Info("没有需要清理的采集请求。") + } + + return nil +} + // initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 func (app *Application) initializePendingTasks( planRepo repository.PlanRepository, diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index e0a0a40..80833c4 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -117,10 +117,11 @@ type HeartbeatConfig struct { // ChirpStackConfig 代表 ChirpStack API 配置 type ChirpStackConfig struct { - APIHost string `yaml:"api_host"` - APIToken string `yaml:"api_token"` - FPort int `yaml:"fport"` - APITimeout int `yaml:"api_timeout"` + APIHost string `yaml:"api_host"` + APIToken string `yaml:"api_token"` + FPort int `yaml:"fport"` + APITimeout int `yaml:"api_timeout"` + CollectionRequestTimeout int `yaml:"collection_request_timeout"` } // TaskConfig 代表任务调度配置 diff --git a/internal/infra/models/device.go b/internal/infra/models/device.go index 469129e..c3cc889 100644 --- a/internal/infra/models/device.go +++ b/internal/infra/models/device.go @@ -6,6 +6,8 @@ import ( "gorm.io/datatypes" "gorm.io/gorm" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils" ) // DeviceType 定义了设备的高级类别 @@ -30,6 +32,8 @@ const ( SubTypeSensorHumidity DeviceSubType = "humidity" // SubTypeSensorAmmonia 氨气传感器 SubTypeSensorAmmonia DeviceSubType = "ammonia" + // SubTypeSensorWeight 电子秤 + SubTypeSensorWeight DeviceSubType = "weight" // SubTypeValveFeed 下料阀门 SubTypeValveFeed DeviceSubType = "feed_valve" @@ -85,6 +89,10 @@ type Device struct { // Location 描述了设备的物理安装位置,例如 "1号猪舍东侧",方便运维。建立索引以优化按位置查询。 Location string `gorm:"index" json:"location"` + // Command 存储了与设备交互所需的具体指令。 + // 例如,对于传感器,这里存储 Modbus 采集指令;对于开关和区域主控,这里可以为空。 + Command string `gorm:"type:varchar(255)" json:"command"` + // Properties 用于存储特定类型设备的独有属性,采用JSON格式。 // 建议在应用层为不同子类型的设备定义专用的属性结构体(如 LoraProperties, BusProperties),以保证数据一致性。 Properties datatypes.JSON `json:"properties"` @@ -110,28 +118,52 @@ func (d *Device) ParseProperties(v interface{}) error { // SelfCheck 进行参数自检, 返回检测结果 // 方法会根据自身类型进行参数检查, 参数不全时返回false -// TODO 没写单测 func (d *Device) SelfCheck() bool { - - properties := make(map[string]interface{}) - if err := d.ParseProperties(&properties); err != nil { - return false - } - - has := func(key string) bool { - _, ok := properties[key] - return ok - } - - switch d.SubType { - case SubTypeFan: - if !has(BusNumber) || !has(BusAddress) || !has(RelayChannel) { + // 使用清晰的 switch 结构,确保所有情况都被覆盖 + switch d.Type { + case DeviceTypeAreaController: + props := make(map[string]interface{}) + if err := d.ParseProperties(&props); err != nil { return false } + _, ok := props[LoRaAddress].(string) + return ok + + case DeviceTypeDevice: + // 所有普通设备都必须有父级 + if d.ParentID == nil || *d.ParentID == 0 { + return false + } + + props := make(map[string]interface{}) + if err := d.ParseProperties(&props); err != nil { + return false + } + + // hasPureNumeric 检查一个key是否存在于map中,并且其值是纯数字(整数或可解析为整数的字符串) + hasPureNumeric := func(key string) bool { + val, ok := props[key] + if !ok { + return false // Key不存在 + } + return utils.IsPureNumeric(val) + } + + // 根据子类型进行具体校验 + switch d.SubType { + // 所有传感器类型都必须有 Command 和总线信息,且总线信息为纯数字 + case SubTypeSensorTemp, SubTypeSensorHumidity, SubTypeSensorWeight, SubTypeSensorAmmonia: + return d.Command != "" && hasPureNumeric(BusNumber) && hasPureNumeric(BusAddress) + // 所有开关类型都必须有继电器和总线信息,且都为纯数字 + case SubTypeFan, SubTypeWaterCurtain, SubTypeValveFeed: + return hasPureNumeric(BusNumber) && hasPureNumeric(BusAddress) && hasPureNumeric(RelayChannel) + // 如果是未知的子类型,或者没有子类型,则认为自检失败 + default: + return false + } + + // 如果设备类型不是已知的任何一种,则自检失败 default: - // 不应该有类型未知的设备 return false } - - return true } diff --git a/internal/infra/models/device_command_log.go b/internal/infra/models/device_command_log.go deleted file mode 100644 index 6fc420b..0000000 --- a/internal/infra/models/device_command_log.go +++ /dev/null @@ -1,32 +0,0 @@ -package models - -import ( - "time" -) - -// DeviceCommandLog 记录下行任务的下发情况和设备确认状态 -type DeviceCommandLog struct { - // MessageID 是下行消息的唯一标识符。 - // 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。 - MessageID string `gorm:"primaryKey" json:"message_id"` - - // DeviceID 是接收此下行任务的设备的ID。 - // 对于 LoRaWAN,这通常是区域主控设备的ID。 - DeviceID uint `gorm:"not null;index" json:"device_id"` - - // SentAt 记录下行任务最初发送的时间。 - SentAt time.Time `gorm:"not null" json:"sent_at"` - - // AcknowledgedAt 记录设备确认收到下行消息的时间。 - // 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。 - AcknowledgedAt *time.Time `json:"acknowledged_at"` - - // ReceivedSuccess 表示设备是否成功接收到下行消息。 - // true 表示设备已确认收到,false 表示设备未确认收到或下发失败。 - ReceivedSuccess bool `gorm:"not null" json:"received_success"` -} - -// TableName 自定义 GORM 使用的数据库表名 -func (DeviceCommandLog) TableName() string { - return "device_command_log" -} diff --git a/internal/infra/models/execution.go b/internal/infra/models/execution.go index 94598ef..4e4eaf5 100644 --- a/internal/infra/models/execution.go +++ b/internal/infra/models/execution.go @@ -73,3 +73,70 @@ func (log *TaskExecutionLog) AfterFind(tx *gorm.DB) (err error) { } return } + +// --- 指令与采集 --- + +// PendingCollectionStatus 定义了待采集请求的状态 +type PendingCollectionStatus string + +const ( + PendingStatusPending PendingCollectionStatus = "pending" // 请求已发送,等待设备响应 + PendingStatusFulfilled PendingCollectionStatus = "fulfilled" // 已收到设备响应并成功处理 + PendingStatusTimedOut PendingCollectionStatus = "timed_out" // 请求超时,未收到设备响应 +) + +// DeviceCommandLog 记录所有“发后即忘”的下行指令日志。 +// 这张表主要用于追踪指令是否被网关成功发送 (ack)。 +type DeviceCommandLog struct { + // MessageID 是下行消息的唯一标识符。 + // 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。 + MessageID string `gorm:"primaryKey" json:"message_id"` + + // DeviceID 是接收此下行任务的设备的ID。 + // 对于 LoRaWAN,这通常是区域主控设备的ID。 + DeviceID uint `gorm:"not null;index" json:"device_id"` + + // SentAt 记录下行任务最初发送的时间。 + SentAt time.Time `gorm:"not null" json:"sent_at"` + + // AcknowledgedAt 记录设备确认收到下行消息的时间。 + // 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。 + AcknowledgedAt *time.Time `json:"acknowledged_at"` + + // ReceivedSuccess 表示设备是否成功接收到下行消息。 + // true 表示设备已确认收到,false 表示设备未确认收到或下发失败。 + ReceivedSuccess bool `gorm:"not null" json:"received_success"` +} + +// TableName 自定义 GORM 使用的数据库表名 +func (DeviceCommandLog) TableName() string { + return "device_command_log" +} + +// PendingCollection 记录所有需要设备响应的“待采集请求”。 +// 这是一张状态机表,追踪从请求发送到收到响应的整个生命周期。 +type PendingCollection struct { + // CorrelationID 是由平台生成的、在请求和响应之间全局唯一的关联ID,作为主键。 + CorrelationID string `gorm:"primaryKey"` + + // DeviceID 是接收此任务的设备ID + // 对于 LoRaWAN,这通常是区域主控设备的ID。 + DeviceID uint `gorm:"index"` + + // CommandMetadata 存储了此次采集任务对应的设备ID列表,顺序与设备响应值的顺序一致。 + CommandMetadata UintArray `gorm:"type:bigint[]"` + + // Status 是该请求的当前状态,用于状态机管理和超时处理。 + Status PendingCollectionStatus `gorm:"index"` + + // FulfilledAt 是收到设备响应并成功处理的时间。使用指针以允许 NULL 值。 + FulfilledAt *time.Time + + // CreatedAt 是 GORM 的标准字段,记录请求创建时间。 + CreatedAt time.Time +} + +// TableName 自定义 GORM 使用的数据库表名 +func (PendingCollection) TableName() string { + return "pending_collections" +} diff --git a/internal/infra/models/models.go b/internal/infra/models/models.go index 40382f4..c3ae59b 100644 --- a/internal/infra/models/models.go +++ b/internal/infra/models/models.go @@ -1,5 +1,13 @@ package models +import ( + "database/sql/driver" + "errors" + "fmt" + "strconv" + "strings" +) + // GetAllModels 返回一个包含所有数据库模型实例的切片。 // 这个函数用于在数据库初始化时自动迁移所有的表结构。 func GetAllModels() []interface{} { @@ -14,5 +22,70 @@ func GetAllModels() []interface{} { &PendingTask{}, &SensorData{}, &DeviceCommandLog{}, + &PendingCollection{}, } } + +// UintArray 是一个自定义类型,代表 uint 的切片。 +// 它实现了 gorm.Scanner 和 driver.Valuer 接口, +// 以便能与数据库的 bigint[] 类型进行原生映射。 +type UintArray []uint + +// Value 实现了 driver.Valuer 接口。 +// 它告诉 GORM 如何将 UintArray ([]) 转换为数据库能够理解的格式。 +func (a UintArray) Value() (driver.Value, error) { + if a == nil { + return "{}", nil + } + + var b strings.Builder + b.WriteString("{") + for i, v := range a { + if i > 0 { + b.WriteString(",") + } + b.WriteString(strconv.FormatUint(uint64(v), 10)) + } + b.WriteString("}") + return b.String(), nil +} + +// Scan 实现了 gorm.Scanner 接口。 +// 它告诉 GORM 如何将从数据库读取的数据转换为我们的 UintArray ([])。 +func (a *UintArray) Scan(src interface{}) error { + if src == nil { + *a = nil + return nil + } + + var srcStr string + switch v := src.(type) { + case []byte: + srcStr = string(v) + case string: + srcStr = v + default: + return errors.New("无法扫描非字符串或字节类型的源到 UintArray") + } + + // 去掉花括号 + srcStr = strings.Trim(srcStr, "{}") + if srcStr == "" { + *a = []uint{} + return nil + } + + // 按逗号分割 + parts := strings.Split(srcStr, ",") + arr := make([]uint, len(parts)) + for i, p := range parts { + val, err := strconv.ParseUint(p, 10, 64) + if err != nil { + return fmt.Errorf("解析 UintArray 元素失败: %w", err) + } + arr[i] = uint(val) + } + + *a = arr + return nil +} diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index bbb2315..bf2ce42 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -1,6 +1,8 @@ package models import ( + "encoding/json" + "errors" "fmt" "sort" "time" @@ -172,3 +174,16 @@ type Task struct { func (Task) TableName() string { return "tasks" } + +// ParseParameters 解析 JSON 属性到一个具体的结构体中。 +// 调用方需要传入一个指向目标结构体实例的指针。 +// 示例: +// +// var param LoraParameters +// if err := task.ParseParameters(¶m); err != nil { ... } +func (t Task) ParseParameters(v interface{}) error { + if t.Parameters == nil { + return errors.New("设备属性为空,无法解析") + } + return json.Unmarshal(t.Parameters, v) +} diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go index 0e1a6a9..31713af 100644 --- a/internal/infra/models/sensor_data.go +++ b/internal/infra/models/sensor_data.go @@ -17,6 +17,14 @@ const ( SensorDataTypeWeight SensorDataType = "weight" // 重量 ) +// DeviceSubTypeToSensorDataTypeMap 定义了设备子类型到其产生的传感器数据类型的静态映射. +// 这个公开的 map 是连接设备定义和数据记录的桥梁, 供其他包直接查询. +var DeviceSubTypeToSensorDataTypeMap = map[DeviceSubType]SensorDataType{ + SubTypeSensorTemp: SensorDataTypeTemperature, + SubTypeSensorHumidity: SensorDataTypeHumidity, + SubTypeSensorWeight: SensorDataTypeWeight, +} + // SignalMetrics 存储信号强度数据 type SignalMetrics struct { RssiDbm int `json:"rssi_dbm"` // 绝对信号强度(dBm),受距离、障碍物影响 diff --git a/internal/infra/repository/device_repository.go b/internal/infra/repository/device_repository.go index 5cc3ee8..1005a1c 100644 --- a/internal/infra/repository/device_repository.go +++ b/internal/infra/repository/device_repository.go @@ -35,6 +35,9 @@ type DeviceRepository interface { // FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 (新增) FindByDevEui(devEui string) (*models.Device, error) + + // FindByParentAndPhysicalAddress 根据父设备ID和物理地址(总线号、总线地址)查找设备 + FindByParentAndPhysicalAddress(parentID uint, busNumber int32, busAddress int32) (*models.Device, error) } // gormDeviceRepository 是 DeviceRepository 的 GORM 实现 @@ -121,3 +124,18 @@ func (r *gormDeviceRepository) FindByDevEui(devEui string) (*models.Device, erro } return &device, nil } + +// FindByParentAndPhysicalAddress 根据父设备ID和物理地址(总线号、总线地址)查找设备 +func (r *gormDeviceRepository) FindByParentAndPhysicalAddress(parentID uint, busNumber int32, busAddress int32) (*models.Device, error) { + var device models.Device + // PostgreSQL 使用 ->> 操作符来查询 JSONB 字段的文本值 + err := r.db.Where("parent_id = ?", parentID). + Where("properties->>'bus_number' = ?", strconv.Itoa(int(busNumber))). + Where("properties->>'bus_address' = ?", strconv.Itoa(int(busAddress))). + First(&device).Error + + if err != nil { + return nil, fmt.Errorf("根据父设备ID %d 和物理地址 (总线号: %d, 总线地址: %d) 查找设备失败: %w", parentID, busNumber, busAddress, err) + } + return &device, nil +} diff --git a/internal/infra/repository/pending_collection_repository.go b/internal/infra/repository/pending_collection_repository.go new file mode 100644 index 0000000..5481569 --- /dev/null +++ b/internal/infra/repository/pending_collection_repository.go @@ -0,0 +1,67 @@ +package repository + +import ( + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "gorm.io/gorm" +) + +// PendingCollectionRepository 定义了与待采集请求相关的数据库操作接口。 +type PendingCollectionRepository interface { + // Create 创建一个新的待采集请求。 + Create(req *models.PendingCollection) error + + // FindByCorrelationID 根据关联ID查找一个待采集请求。 + FindByCorrelationID(correlationID string) (*models.PendingCollection, error) + + // UpdateStatusToFulfilled 将指定关联ID的请求状态更新为“已完成”。 + UpdateStatusToFulfilled(correlationID string, fulfilledAt time.Time) error + + // MarkAllPendingAsTimedOut 将所有“待处理”请求更新为“已超时”。 + MarkAllPendingAsTimedOut() (int64, error) +} + +// gormPendingCollectionRepository 是 PendingCollectionRepository 的 GORM 实现。 +type gormPendingCollectionRepository struct { + db *gorm.DB +} + +// NewGormPendingCollectionRepository 创建一个新的 PendingCollectionRepository GORM 实现实例。 +func NewGormPendingCollectionRepository(db *gorm.DB) PendingCollectionRepository { + return &gormPendingCollectionRepository{db: db} +} + +// Create 创建一个新的待采集请求。 +func (r *gormPendingCollectionRepository) Create(req *models.PendingCollection) error { + return r.db.Create(req).Error +} + +// FindByCorrelationID 根据关联ID查找一个待采集请求。 +func (r *gormPendingCollectionRepository) FindByCorrelationID(correlationID string) (*models.PendingCollection, error) { + var req models.PendingCollection + if err := r.db.First(&req, "correlation_id = ?", correlationID).Error; err != nil { + return nil, err + } + return &req, nil +} + +// UpdateStatusToFulfilled 将指定关联ID的请求状态更新为“已完成”。 +func (r *gormPendingCollectionRepository) UpdateStatusToFulfilled(correlationID string, fulfilledAt time.Time) error { + return r.db.Model(&models.PendingCollection{}). + Where("correlation_id = ?", correlationID). + Updates(map[string]interface{}{ + "status": models.PendingStatusFulfilled, + "fulfilled_at": &fulfilledAt, + }).Error +} + +// MarkAllPendingAsTimedOut 将所有状态为 'pending' 的记录更新为 'timed_out'。 +// 返回被更新的记录数量和错误。 +func (r *gormPendingCollectionRepository) MarkAllPendingAsTimedOut() (int64, error) { + result := r.db.Model(&models.PendingCollection{}). + Where("status = ?", models.PendingStatusPending). + Update("status", models.PendingStatusTimedOut) + + return result.RowsAffected, result.Error +} diff --git a/internal/infra/transport/lora/chirp_stack.go b/internal/infra/transport/lora/chirp_stack.go index 4f653fa..6bc4a2c 100644 --- a/internal/infra/transport/lora/chirp_stack.go +++ b/internal/infra/transport/lora/chirp_stack.go @@ -1,12 +1,12 @@ package lora import ( + "errors" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service" "github.com/go-openapi/runtime" httptransport "github.com/go-openapi/runtime/client" @@ -20,19 +20,13 @@ type ChirpStackTransport struct { client *client.ChirpStackRESTAPI authInfo runtime.ClientAuthInfoWriter config config.ChirpStackConfig - - deviceCommandLogRepo repository.DeviceCommandLogRepository - deviceRepo repository.DeviceRepository - - logger *logs.Logger + logger *logs.Logger } // NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。 func NewChirpStackTransport( config config.ChirpStackConfig, logger *logs.Logger, - deviceCommandLogRepo repository.DeviceCommandLogRepository, - deviceRepo repository.DeviceRepository, ) *ChirpStackTransport { // 使用配置中的服务器地址创建一个 HTTP transport。 // 它会使用生成的客户端中定义的默认 base path 和 schemes。 @@ -45,16 +39,14 @@ func NewChirpStackTransport( authInfo := httptransport.APIKeyAuth("grpc-metadata-authorization", "header", config.GenerateAPIKey()) return &ChirpStackTransport{ - client: apiClient, - authInfo: authInfo, - config: config, - logger: logger, - deviceCommandLogRepo: deviceCommandLogRepo, - deviceRepo: deviceRepo, + client: apiClient, + authInfo: authInfo, + config: config, + logger: logger, } } -func (c *ChirpStackTransport) Send(address string, payload []byte) error { +func (c *ChirpStackTransport) Send(address string, payload []byte) (*transport.SendResult, error) { // 1. 构建 API 请求体。 // - Confirmed: true 表示确认消息, 设为false将不保证消息送达(但可以节约下行容量)。 // - Data: 经过 Base64 编码的数据。 @@ -72,7 +64,7 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error { // - WithQueueItemDevEui 指定目标设备的 EUI。 // - WithBody 设置请求体。 params := device_service.NewDeviceServiceEnqueueParams(). - WithTimeout(10 * time.Second). + WithTimeout(time.Duration(c.config.APITimeout) * time.Second). WithQueueItemDevEui(address). WithBody(body) @@ -81,53 +73,23 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error { resp, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo) if err != nil { c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err) - return err + return nil, err } - // 4. 成功发送后,尝试记录下行任务 - messageID := "" - if resp != nil && resp.Payload != nil && resp.Payload.ID != "" { // 根据实际结构,使用 resp.Payload.ID - messageID = resp.Payload.ID - } else { - c.logger.Warnf("ChirpStack Enqueue 响应未包含 MessageID (ID),无法记录下行任务。设备: %s", address) - // 即使无法获取 MessageID,也认为发送成功,因为 ChirpStack Enqueue 成功了 - return nil + if resp == nil || resp.Payload == nil || resp.Payload.ID == "" { + // 这是一个需要明确处理的错误情况,因为调用方依赖 MessageID。 + errMsg := "ChirpStack Enqueue 响应未包含 MessageID (ID)" + c.logger.Errorf(errMsg) + return nil, errors.New(errMsg) } - // 调用私有方法记录下行任务 - if err := c.recordDownlinkTask(address, messageID); err != nil { - // 记录失败不影响下行命令的发送成功 - c.logger.Errorf("记录下行任务失败 (MessageID: %s, DevEui: %s): %v", messageID, address, err) - return nil + c.logger.Infof("成功将 payload 发送到设备 %s 的队列 (MessageID: %s)", address, resp.Payload.ID) + + // 将 MessageID 包装在 SendResult 中返回 + result := &transport.SendResult{ + MessageID: resp.Payload.ID, } - c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功,并创建下行任务记录 (MessageID: %s)", address, messageID) + return result, nil - return nil -} - -// recordDownlinkTask 记录下行任务到数据库 -func (c *ChirpStackTransport) recordDownlinkTask(devEui string, messageID string) error { - // 获取区域主控的内部 DeviceID - regionalController, err := c.deviceRepo.FindByDevEui(devEui) - if err != nil { - c.logger.Errorf("记录下行任务失败:无法通过 DevEui '%s' 找到区域主控设备: %v", devEui, err) - return err - } - - // 创建 DeviceCommandLog - record := &models.DeviceCommandLog{ - MessageID: messageID, - DeviceID: regionalController.ID, - SentAt: time.Now(), - AcknowledgedAt: nil, // 初始状态为未确认 - } - - if err := c.deviceCommandLogRepo.Create(record); err != nil { - c.logger.Errorf("创建下行任务记录失败 (MessageID: %s, DeviceID: %d): %v", messageID, regionalController.ID, err) - return err - } - - c.logger.Infof("成功创建下行任务记录 (MessageID: %s, DeviceID: %d)", messageID, regionalController.ID) - return nil } diff --git a/internal/infra/transport/transport.go b/internal/infra/transport/transport.go index 3fb94a0..648aaeb 100644 --- a/internal/infra/transport/transport.go +++ b/internal/infra/transport/transport.go @@ -3,5 +3,13 @@ package transport // Communicator 用于其他设备通信 type Communicator interface { // Send 用于发送一条单向数据(不等待回信) - Send(address string, payload []byte) error + // 成功时,它返回一个包含 MessageID 的 SendResult,以便调用方追踪。 + Send(address string, payload []byte) (*SendResult, error) +} + +// SendResult 包含了 SendGo 方法成功执行后返回的结果。 +type SendResult struct { + // MessageID 是通信服务为此次发送分配的唯一标识符。 + // 调用方需要保存此 ID,以便后续关联 ACK 等事件。 + MessageID string } diff --git a/internal/infra/utils/validation.go b/internal/infra/utils/validation.go new file mode 100644 index 0000000..bab71e2 --- /dev/null +++ b/internal/infra/utils/validation.go @@ -0,0 +1,13 @@ +package utils + +import ( + "fmt" + "strconv" +) + +// IsPureNumeric 检查值是否纯数字(整数或可转换为整数的字符串)。 +func IsPureNumeric(val interface{}) bool { + v := fmt.Sprintf("%v", val) + _, err := strconv.Atoi(v) + return err == nil +}