issue_18 #19
| @@ -50,6 +50,10 @@ chirp_stack: | ||||
|   api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer <YOUR_API_TOKEN> | ||||
|   fport: 1 | ||||
|   api_timeout: 10 # ChirpStack API请求超时时间(秒) | ||||
|   # 等待设备上行响应的超时时间(秒)。 | ||||
|   # 对于LoRaWAN这种延迟较高的网络,建议设置为5分钟 (300秒) 或更长。 | ||||
|   collection_request_timeout: 300 | ||||
|  | ||||
|  | ||||
| # 任务调度器配置 | ||||
| task: | ||||
|   | ||||
| @@ -53,8 +53,6 @@ func NewAPI(cfg config.ServerConfig, | ||||
| 	userRepo repository.UserRepository, | ||||
| 	deviceRepository repository.DeviceRepository, | ||||
| 	planRepository repository.PlanRepository, | ||||
| 	sensorDataRepository repository.SensorDataRepository, | ||||
| 	executionLogRepository repository.ExecutionLogRepository, | ||||
| 	tokenService token.TokenService, | ||||
| 	listenHandler transport.ListenHandler, | ||||
| 	analysisTaskManager *task.AnalysisPlanTaskManager) *API { | ||||
|   | ||||
| @@ -77,7 +77,7 @@ func newDeviceResponse(device *models.Device) (*DeviceResponse, error) { | ||||
|  | ||||
| 	var props map[string]interface{} | ||||
| 	if len(device.Properties) > 0 && string(device.Properties) != "null" { | ||||
| 		if err := json.Unmarshal(device.Properties, &props); err != nil { | ||||
| 		if err := device.ParseProperties(&props); err != nil { | ||||
| 			return nil, fmt.Errorf("解析设备属性失败 (ID: %d): %w", device.ID, err) | ||||
| 		} | ||||
| 	} | ||||
| @@ -143,6 +143,13 @@ func (c *Controller) CreateDevice(ctx *gin.Context) { | ||||
| 		Properties: propertiesJSON, | ||||
| 	} | ||||
|  | ||||
| 	// 在创建设备前进行自检 | ||||
| 	if !device.SelfCheck() { | ||||
| 		c.logger.Errorf("创建设备: 设备属性自检失败: %v", device) | ||||
| 		controller.SendErrorResponse(ctx, controller.CodeBadRequest, "设备属性不符合要求") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if err := c.repo.Create(device); err != nil { | ||||
| 		c.logger.Errorf("创建设备: 数据库操作失败: %v", err) | ||||
| 		controller.SendErrorResponse(ctx, controller.CodeInternalError, "创建设备失败") | ||||
| @@ -272,6 +279,13 @@ func (c *Controller) UpdateDevice(ctx *gin.Context) { | ||||
| 	existingDevice.Location = req.Location | ||||
| 	existingDevice.Properties = propertiesJSON | ||||
|  | ||||
| 	// 在更新设备前进行自检 | ||||
| 	if !existingDevice.SelfCheck() { | ||||
| 		c.logger.Errorf("更新设备: 设备属性自检失败: %v", existingDevice) | ||||
| 		controller.SendErrorResponse(ctx, controller.CodeBadRequest, "设备属性不符合要求") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// 4. 将修改后的 existingDevice 对象保存回数据库 | ||||
| 	if err := c.repo.Update(existingDevice); err != nil { | ||||
| 		c.logger.Errorf("更新设备: 数据库操作失败: %v", err) | ||||
|   | ||||
| @@ -189,7 +189,7 @@ func TaskToResponse(task *models.Task) (TaskResponse, error) { | ||||
|  | ||||
| 	var params map[string]interface{} | ||||
| 	if len(task.Parameters) > 0 && string(task.Parameters) != "null" { | ||||
| 		if err := json.Unmarshal(task.Parameters, ¶ms); err != nil { | ||||
| 		if err := task.ParseParameters(¶ms); err != nil { | ||||
| 			return TaskResponse{}, fmt.Errorf("parsing task parameters failed (ID: %d): %w", task.ID, err) | ||||
| 		} | ||||
| 	} | ||||
|   | ||||
| @@ -21,7 +21,10 @@ var ( | ||||
| type Service interface { | ||||
|  | ||||
| 	// Switch 用于切换指定设备的状态, 比如启动和停止 | ||||
| 	Switch(device models.Device, action DeviceAction) error | ||||
| 	Switch(device *models.Device, action DeviceAction) error | ||||
|  | ||||
| 	// Collect 用于发起对指定区域主控下的多个设备的批量采集请求。 | ||||
| 	Collect(regionalControllerID uint, devicesToCollect []*models.Device) error | ||||
| } | ||||
|  | ||||
| // 设备操作指令通用结构(最外层) | ||||
|   | ||||
| @@ -1,29 +1,41 @@ | ||||
| package device | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"time" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" | ||||
| 	"github.com/google/uuid" | ||||
| 	gproto "google.golang.org/protobuf/proto" | ||||
| 	"google.golang.org/protobuf/types/known/anypb" | ||||
| ) | ||||
|  | ||||
| type GeneralDeviceService struct { | ||||
| 	deviceRepo            repository.DeviceRepository | ||||
| 	deviceCommandLogRepo  repository.DeviceCommandLogRepository | ||||
| 	pendingCollectionRepo repository.PendingCollectionRepository | ||||
| 	logger                *logs.Logger | ||||
|  | ||||
| 	comm                  transport.Communicator | ||||
| } | ||||
|  | ||||
| // NewGeneralDeviceService 创建一个通用设备服务 | ||||
| func NewGeneralDeviceService(deviceRepo repository.DeviceRepository, logger *logs.Logger, comm transport.Communicator) *GeneralDeviceService { | ||||
| func NewGeneralDeviceService( | ||||
| 	deviceRepo repository.DeviceRepository, | ||||
| 	deviceCommandLogRepo repository.DeviceCommandLogRepository, | ||||
| 	pendingCollectionRepo repository.PendingCollectionRepository, | ||||
| 	logger *logs.Logger, | ||||
| 	comm transport.Communicator, | ||||
| ) Service { | ||||
| 	return &GeneralDeviceService{ | ||||
| 		deviceRepo:            deviceRepo, | ||||
| 		deviceCommandLogRepo:  deviceCommandLogRepo, | ||||
| 		pendingCollectionRepo: pendingCollectionRepo, | ||||
| 		logger:                logger, | ||||
| 		comm:                  comm, | ||||
| 	} | ||||
| @@ -45,18 +57,10 @@ func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction | ||||
| 		return fmt.Errorf("解析设备 %v(id=%v) 配置失败: %v", device.Name, device.ID, err) | ||||
| 	} | ||||
|  | ||||
| 	busNumber, err := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusNumber])) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("无效的总线号: %v", err) | ||||
| 	} | ||||
| 	busAddress, err := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusAddress])) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("无效的总线地址: %v", err) | ||||
| 	} | ||||
| 	relayChannel, err := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.RelayChannel])) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("无效的继电器通道: %v", err) | ||||
| 	} | ||||
| 	// 已通过 SelfCheck 保证其为纯数字,此处仅进行类型转换 | ||||
| 	busNumber, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusNumber])) | ||||
| 	busAddress, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.BusAddress])) | ||||
| 	relayChannel, _ := strconv.Atoi(fmt.Sprintf("%v", deviceInfo[models.RelayChannel])) | ||||
|  | ||||
| 	data, err := anypb.New(&proto.Switch{ | ||||
| 		DeviceAction: string(action), | ||||
| @@ -87,11 +91,141 @@ func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction | ||||
| 	} | ||||
| 	loraAddress := fmt.Sprintf("%v", thisDeviceinfo[models.LoRaAddress]) | ||||
|  | ||||
| 	// 生成消息并发送 | ||||
| 	// 生成消息 | ||||
| 	message, err := gproto.Marshal(instruction) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("序列化指令失败: %v", err) | ||||
| 	} | ||||
| 	return g.comm.Send(loraAddress, message) | ||||
|  | ||||
| 	// 发送指令并获取 SendResult | ||||
| 	sendResult, err := g.comm.Send(loraAddress, message) | ||||
| 	if err != nil { | ||||
| 		// 发送失败,直接返回错误 | ||||
| 		return fmt.Errorf("发送指令到设备 %s 失败: %w", loraAddress, err) | ||||
| 	} | ||||
|  | ||||
| 	// 创建并保存命令日志 | ||||
| 	logRecord := &models.DeviceCommandLog{ | ||||
| 		MessageID: sendResult.MessageID, | ||||
| 		DeviceID:  thisDevice.ID, // thisDevice 是我们查出来的区域主控 | ||||
| 		SentAt:    time.Now(), | ||||
| 	} | ||||
|  | ||||
| 	if err := g.deviceCommandLogRepo.Create(logRecord); err != nil { | ||||
| 		// 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。 | ||||
| 		// 我们记录一个错误日志,然后成功返回。 | ||||
| 		g.logger.Errorf("创建指令日志失败 (MessageID: %s): %v", sendResult.MessageID, err) | ||||
| 	} | ||||
|  | ||||
| 	g.logger.Infof("成功发送指令到设备 %s 并创建日志 (MessageID: %s)", loraAddress, sendResult.MessageID) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Collect 实现了 Service 接口,用于发起对指定区域主控下的多个设备的批量采集请求。 | ||||
| // 它负责查找区域主控、生成关联ID、创建待处理记录、构建指令并最终发送。 | ||||
| func (g *GeneralDeviceService) Collect(regionalControllerID uint, devicesToCollect []*models.Device) error { | ||||
| 	if regionalControllerID == 0 { | ||||
| 		return errors.New("区域主控ID不能为空") | ||||
| 	} | ||||
|  | ||||
| 	if len(devicesToCollect) == 0 { | ||||
| 		// 如果没有要采集的设备,这不是一个错误,只是一个空操作。 | ||||
| 		g.logger.Info("待采集设备列表为空,无需执行采集任务。") | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// 1. 查找并自检区域主控设备 | ||||
| 	regionalController, err := g.deviceRepo.FindByID(regionalControllerID) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("查找区域主控 (ID: %d) 失败: %w", regionalControllerID, err) | ||||
| 	} | ||||
| 	if !regionalController.SelfCheck() { | ||||
| 		return fmt.Errorf("区域主控 (ID: %d) 未通过自检,缺少必要属性", regionalControllerID) | ||||
| 	} | ||||
|  | ||||
| 	// 2. 准备采集任务列表和数据库存根,并验证设备 | ||||
| 	var childDeviceIDs []uint | ||||
| 	var collectTasks []*proto.CollectTask | ||||
|  | ||||
| 	for _, dev := range devicesToCollect { | ||||
| 		// 验证设备是否属于指定的区域主控 | ||||
| 		if dev.ParentID == nil || *dev.ParentID != regionalControllerID { | ||||
| 			return fmt.Errorf("设备 '%s' (ID: %d) 不属于指定的区域主控 (ID: %d)", dev.Name, dev.ID, regionalControllerID) | ||||
| 		} | ||||
|  | ||||
| 		// 对每个待采集的设备执行自检 | ||||
| 		if !dev.SelfCheck() { | ||||
| 			g.logger.Warnf("跳过设备 %d,因其未通过自检", dev.ID) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// 自检已通过,我们可以安全地解析属性 | ||||
| 		var props map[string]interface{} | ||||
| 		// 此时 ParseProperties 不应失败 | ||||
| 		_ = dev.ParseProperties(&props) | ||||
|  | ||||
| 		busNumber := props[models.BusNumber].(float64) | ||||
| 		busAddress := props[models.BusAddress].(float64) | ||||
|  | ||||
| 		collectTasks = append(collectTasks, &proto.CollectTask{ | ||||
| 			DeviceAction: dev.Command, | ||||
| 			BusNumber:    int32(busNumber), | ||||
| 			BusAddress:   int32(busAddress), | ||||
| 		}) | ||||
| 		childDeviceIDs = append(childDeviceIDs, dev.ID) | ||||
| 	} | ||||
|  | ||||
| 	if len(childDeviceIDs) == 0 { | ||||
| 		return errors.New("经过滤后,没有可通过自检的有效设备") | ||||
| 	} | ||||
|  | ||||
| 	// 3. 从区域主控的属性中解析出 DevEui (loraAddress) | ||||
| 	var rcProps map[string]interface{} | ||||
| 	// SelfCheck 已保证属性可解析 | ||||
| 	_ = regionalController.ParseProperties(&rcProps) | ||||
| 	loraAddress := rcProps[models.LoRaAddress].(string) | ||||
|  | ||||
| 	// 4. 创建待处理请求记录 | ||||
| 	correlationID := uuid.New().String() | ||||
| 	pendingReq := &models.PendingCollection{ | ||||
| 		CorrelationID:   correlationID, | ||||
| 		DeviceID:        regionalController.ID, | ||||
| 		CommandMetadata: childDeviceIDs, | ||||
| 		Status:          models.PendingStatusPending, | ||||
| 		CreatedAt:       time.Now(), | ||||
| 	} | ||||
| 	if err := g.pendingCollectionRepo.Create(pendingReq); err != nil { | ||||
| 		g.logger.Errorf("创建待采集请求失败 (CorrelationID: %s): %v", correlationID, err) | ||||
| 		return err | ||||
| 	} | ||||
| 	g.logger.Infof("成功创建待采集请求 (CorrelationID: %s, DeviceID: %d)", correlationID, regionalController.ID) | ||||
|  | ||||
| 	// 5. 构建最终的空中载荷 | ||||
| 	batchCmd := &proto.BatchCollectCommand{ | ||||
| 		CorrelationId: correlationID, | ||||
| 		Tasks:         collectTasks, | ||||
| 	} | ||||
| 	anyData, err := anypb.New(batchCmd) | ||||
| 	if err != nil { | ||||
| 		g.logger.Errorf("创建 Any Protobuf 失败 (CorrelationID: %s): %v", correlationID, err) | ||||
| 		return err | ||||
| 	} | ||||
| 	instruction := &proto.Instruction{ | ||||
| 		Method: proto.MethodType_COLLECT, | ||||
| 		Data:   anyData, | ||||
| 	} | ||||
| 	payload, err := gproto.Marshal(instruction) | ||||
| 	if err != nil { | ||||
| 		g.logger.Errorf("序列化采集指令失败 (CorrelationID: %s): %v", correlationID, err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 6. 发送指令 | ||||
| 	if _, err := g.comm.Send(loraAddress, payload); err != nil { | ||||
| 		g.logger.DPanicf("待采集请求 (CorrelationID: %s) 已创建,但发送到设备失败: %v。数据可能不一致!", correlationID, err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	g.logger.Infof("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, loraAddress) | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -69,7 +69,7 @@ func (MethodType) EnumDescriptor() ([]byte, []int) { | ||||
| 	return file_device_proto_rawDescGZIP(), []int{0} | ||||
| } | ||||
|  | ||||
| // 指令 | ||||
| // 指令 (所有空中数据都会被包装在这里面) | ||||
| type Instruction struct { | ||||
| 	state         protoimpl.MessageState `protogen:"open.v1"` | ||||
| 	Method        MethodType             `protobuf:"varint,1,opt,name=method,proto3,enum=device.MethodType" json:"method,omitempty"` | ||||
| @@ -122,6 +122,7 @@ func (x *Instruction) GetData() *anypb.Any { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Switch 指令的载荷 | ||||
| type Switch struct { | ||||
| 	state         protoimpl.MessageState `protogen:"open.v1"` | ||||
| 	DeviceAction  string                 `protobuf:"bytes,1,opt,name=device_action,json=deviceAction,proto3" json:"device_action,omitempty"`  // 指令 | ||||
| @@ -190,29 +191,31 @@ func (x *Switch) GetRelayChannel() int32 { | ||||
| 	return 0 | ||||
| } | ||||
|  | ||||
| type Collect struct { | ||||
| // BatchCollectCommand | ||||
| // 用于在平台内部构建一个完整的、包含所有元数据的批量采集任务。 | ||||
| // 这个消息本身不会被发送到设备。 | ||||
| type BatchCollectCommand struct { | ||||
| 	state         protoimpl.MessageState `protogen:"open.v1"` | ||||
| 	BusNumber     int32                  `protobuf:"varint,1,opt,name=bus_number,json=busNumber,proto3" json:"bus_number,omitempty"`    // 总线号 | ||||
| 	BusAddress    int32                  `protobuf:"varint,2,opt,name=bus_address,json=busAddress,proto3" json:"bus_address,omitempty"` // 总线地址 | ||||
| 	Value         float32                `protobuf:"fixed32,3,opt,name=value,proto3" json:"value,omitempty"`                            // 采集值 | ||||
| 	CorrelationId string                 `protobuf:"bytes,1,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"` // 用于关联请求和响应的唯一ID | ||||
| 	Tasks         []*CollectTask         `protobuf:"bytes,2,rep,name=tasks,proto3" json:"tasks,omitempty"`                                      // 采集任务列表 | ||||
| 	unknownFields protoimpl.UnknownFields | ||||
| 	sizeCache     protoimpl.SizeCache | ||||
| } | ||||
|  | ||||
| func (x *Collect) Reset() { | ||||
| 	*x = Collect{} | ||||
| func (x *BatchCollectCommand) Reset() { | ||||
| 	*x = BatchCollectCommand{} | ||||
| 	mi := &file_device_proto_msgTypes[2] | ||||
| 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | ||||
| 	ms.StoreMessageInfo(mi) | ||||
| } | ||||
|  | ||||
| func (x *Collect) String() string { | ||||
| func (x *BatchCollectCommand) String() string { | ||||
| 	return protoimpl.X.MessageStringOf(x) | ||||
| } | ||||
|  | ||||
| func (*Collect) ProtoMessage() {} | ||||
| func (*BatchCollectCommand) ProtoMessage() {} | ||||
|  | ||||
| func (x *Collect) ProtoReflect() protoreflect.Message { | ||||
| func (x *BatchCollectCommand) ProtoReflect() protoreflect.Message { | ||||
| 	mi := &file_device_proto_msgTypes[2] | ||||
| 	if x != nil { | ||||
| 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | ||||
| @@ -224,30 +227,139 @@ func (x *Collect) ProtoReflect() protoreflect.Message { | ||||
| 	return mi.MessageOf(x) | ||||
| } | ||||
|  | ||||
| // Deprecated: Use Collect.ProtoReflect.Descriptor instead. | ||||
| func (*Collect) Descriptor() ([]byte, []int) { | ||||
| // Deprecated: Use BatchCollectCommand.ProtoReflect.Descriptor instead. | ||||
| func (*BatchCollectCommand) Descriptor() ([]byte, []int) { | ||||
| 	return file_device_proto_rawDescGZIP(), []int{2} | ||||
| } | ||||
|  | ||||
| func (x *Collect) GetBusNumber() int32 { | ||||
| func (x *BatchCollectCommand) GetCorrelationId() string { | ||||
| 	if x != nil { | ||||
| 		return x.CorrelationId | ||||
| 	} | ||||
| 	return "" | ||||
| } | ||||
|  | ||||
| func (x *BatchCollectCommand) GetTasks() []*CollectTask { | ||||
| 	if x != nil { | ||||
| 		return x.Tasks | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // CollectTask | ||||
| // 定义了单个采集任务的“意图”。 | ||||
| type CollectTask struct { | ||||
| 	state         protoimpl.MessageState `protogen:"open.v1"` | ||||
| 	DeviceAction  string                 `protobuf:"bytes,1,opt,name=device_action,json=deviceAction,proto3" json:"device_action,omitempty"` // 指令 | ||||
| 	BusNumber     int32                  `protobuf:"varint,2,opt,name=bus_number,json=busNumber,proto3" json:"bus_number,omitempty"`         // 总线号 | ||||
| 	BusAddress    int32                  `protobuf:"varint,3,opt,name=bus_address,json=busAddress,proto3" json:"bus_address,omitempty"`      // 总线地址 | ||||
| 	unknownFields protoimpl.UnknownFields | ||||
| 	sizeCache     protoimpl.SizeCache | ||||
| } | ||||
|  | ||||
| func (x *CollectTask) Reset() { | ||||
| 	*x = CollectTask{} | ||||
| 	mi := &file_device_proto_msgTypes[3] | ||||
| 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | ||||
| 	ms.StoreMessageInfo(mi) | ||||
| } | ||||
|  | ||||
| func (x *CollectTask) String() string { | ||||
| 	return protoimpl.X.MessageStringOf(x) | ||||
| } | ||||
|  | ||||
| func (*CollectTask) ProtoMessage() {} | ||||
|  | ||||
| func (x *CollectTask) ProtoReflect() protoreflect.Message { | ||||
| 	mi := &file_device_proto_msgTypes[3] | ||||
| 	if x != nil { | ||||
| 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | ||||
| 		if ms.LoadMessageInfo() == nil { | ||||
| 			ms.StoreMessageInfo(mi) | ||||
| 		} | ||||
| 		return ms | ||||
| 	} | ||||
| 	return mi.MessageOf(x) | ||||
| } | ||||
|  | ||||
| // Deprecated: Use CollectTask.ProtoReflect.Descriptor instead. | ||||
| func (*CollectTask) Descriptor() ([]byte, []int) { | ||||
| 	return file_device_proto_rawDescGZIP(), []int{3} | ||||
| } | ||||
|  | ||||
| func (x *CollectTask) GetDeviceAction() string { | ||||
| 	if x != nil { | ||||
| 		return x.DeviceAction | ||||
| 	} | ||||
| 	return "" | ||||
| } | ||||
|  | ||||
| func (x *CollectTask) GetBusNumber() int32 { | ||||
| 	if x != nil { | ||||
| 		return x.BusNumber | ||||
| 	} | ||||
| 	return 0 | ||||
| } | ||||
|  | ||||
| func (x *Collect) GetBusAddress() int32 { | ||||
| func (x *CollectTask) GetBusAddress() int32 { | ||||
| 	if x != nil { | ||||
| 		return x.BusAddress | ||||
| 	} | ||||
| 	return 0 | ||||
| } | ||||
|  | ||||
| func (x *Collect) GetValue() float32 { | ||||
| 	if x != nil { | ||||
| 		return x.Value | ||||
| // CollectResult | ||||
| // 这是设备响应的、极致精简的数据包。 | ||||
| type CollectResult struct { | ||||
| 	state         protoimpl.MessageState `protogen:"open.v1"` | ||||
| 	CorrelationId string                 `protobuf:"bytes,1,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"` // 从下行指令中原样返回的关联ID | ||||
| 	Values        []float32              `protobuf:"fixed32,2,rep,packed,name=values,proto3" json:"values,omitempty"`                           // 按预定顺序排列的采集值 | ||||
| 	unknownFields protoimpl.UnknownFields | ||||
| 	sizeCache     protoimpl.SizeCache | ||||
| } | ||||
| 	return 0 | ||||
|  | ||||
| func (x *CollectResult) Reset() { | ||||
| 	*x = CollectResult{} | ||||
| 	mi := &file_device_proto_msgTypes[4] | ||||
| 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | ||||
| 	ms.StoreMessageInfo(mi) | ||||
| } | ||||
|  | ||||
| func (x *CollectResult) String() string { | ||||
| 	return protoimpl.X.MessageStringOf(x) | ||||
| } | ||||
|  | ||||
| func (*CollectResult) ProtoMessage() {} | ||||
|  | ||||
| func (x *CollectResult) ProtoReflect() protoreflect.Message { | ||||
| 	mi := &file_device_proto_msgTypes[4] | ||||
| 	if x != nil { | ||||
| 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | ||||
| 		if ms.LoadMessageInfo() == nil { | ||||
| 			ms.StoreMessageInfo(mi) | ||||
| 		} | ||||
| 		return ms | ||||
| 	} | ||||
| 	return mi.MessageOf(x) | ||||
| } | ||||
|  | ||||
| // Deprecated: Use CollectResult.ProtoReflect.Descriptor instead. | ||||
| func (*CollectResult) Descriptor() ([]byte, []int) { | ||||
| 	return file_device_proto_rawDescGZIP(), []int{4} | ||||
| } | ||||
|  | ||||
| func (x *CollectResult) GetCorrelationId() string { | ||||
| 	if x != nil { | ||||
| 		return x.CorrelationId | ||||
| 	} | ||||
| 	return "" | ||||
| } | ||||
|  | ||||
| func (x *CollectResult) GetValues() []float32 { | ||||
| 	if x != nil { | ||||
| 		return x.Values | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| var File_device_proto protoreflect.FileDescriptor | ||||
| @@ -264,13 +376,19 @@ const file_device_proto_rawDesc = "" + | ||||
| 	"bus_number\x18\x02 \x01(\x05R\tbusNumber\x12\x1f\n" + | ||||
| 	"\vbus_address\x18\x03 \x01(\x05R\n" + | ||||
| 	"busAddress\x12#\n" + | ||||
| 	"\rrelay_channel\x18\x04 \x01(\x05R\frelayChannel\"_\n" + | ||||
| 	"\aCollect\x12\x1d\n" + | ||||
| 	"\rrelay_channel\x18\x04 \x01(\x05R\frelayChannel\"g\n" + | ||||
| 	"\x13BatchCollectCommand\x12%\n" + | ||||
| 	"\x0ecorrelation_id\x18\x01 \x01(\tR\rcorrelationId\x12)\n" + | ||||
| 	"\x05tasks\x18\x02 \x03(\v2\x13.device.CollectTaskR\x05tasks\"r\n" + | ||||
| 	"\vCollectTask\x12#\n" + | ||||
| 	"\rdevice_action\x18\x01 \x01(\tR\fdeviceAction\x12\x1d\n" + | ||||
| 	"\n" + | ||||
| 	"bus_number\x18\x01 \x01(\x05R\tbusNumber\x12\x1f\n" + | ||||
| 	"\vbus_address\x18\x02 \x01(\x05R\n" + | ||||
| 	"busAddress\x12\x14\n" + | ||||
| 	"\x05value\x18\x03 \x01(\x02R\x05value*%\n" + | ||||
| 	"bus_number\x18\x02 \x01(\x05R\tbusNumber\x12\x1f\n" + | ||||
| 	"\vbus_address\x18\x03 \x01(\x05R\n" + | ||||
| 	"busAddress\"N\n" + | ||||
| 	"\rCollectResult\x12%\n" + | ||||
| 	"\x0ecorrelation_id\x18\x01 \x01(\tR\rcorrelationId\x12\x16\n" + | ||||
| 	"\x06values\x18\x02 \x03(\x02R\x06values*%\n" + | ||||
| 	"\n" + | ||||
| 	"MethodType\x12\n" + | ||||
| 	"\n" + | ||||
| @@ -290,22 +408,25 @@ func file_device_proto_rawDescGZIP() []byte { | ||||
| } | ||||
|  | ||||
| var file_device_proto_enumTypes = make([]protoimpl.EnumInfo, 1) | ||||
| var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 3) | ||||
| var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 5) | ||||
| var file_device_proto_goTypes = []any{ | ||||
| 	(MethodType)(0),             // 0: device.MethodType | ||||
| 	(*Instruction)(nil),         // 1: device.Instruction | ||||
| 	(*Switch)(nil),              // 2: device.Switch | ||||
| 	(*Collect)(nil),     // 3: device.Collect | ||||
| 	(*anypb.Any)(nil),   // 4: google.protobuf.Any | ||||
| 	(*BatchCollectCommand)(nil), // 3: device.BatchCollectCommand | ||||
| 	(*CollectTask)(nil),         // 4: device.CollectTask | ||||
| 	(*CollectResult)(nil),       // 5: device.CollectResult | ||||
| 	(*anypb.Any)(nil),           // 6: google.protobuf.Any | ||||
| } | ||||
| var file_device_proto_depIdxs = []int32{ | ||||
| 	0, // 0: device.Instruction.method:type_name -> device.MethodType | ||||
| 	4, // 1: device.Instruction.data:type_name -> google.protobuf.Any | ||||
| 	2, // [2:2] is the sub-list for method output_type | ||||
| 	2, // [2:2] is the sub-list for method input_type | ||||
| 	2, // [2:2] is the sub-list for extension type_name | ||||
| 	2, // [2:2] is the sub-list for extension extendee | ||||
| 	0, // [0:2] is the sub-list for field type_name | ||||
| 	6, // 1: device.Instruction.data:type_name -> google.protobuf.Any | ||||
| 	4, // 2: device.BatchCollectCommand.tasks:type_name -> device.CollectTask | ||||
| 	3, // [3:3] is the sub-list for method output_type | ||||
| 	3, // [3:3] is the sub-list for method input_type | ||||
| 	3, // [3:3] is the sub-list for extension type_name | ||||
| 	3, // [3:3] is the sub-list for extension extendee | ||||
| 	0, // [0:3] is the sub-list for field type_name | ||||
| } | ||||
|  | ||||
| func init() { file_device_proto_init() } | ||||
| @@ -319,7 +440,7 @@ func file_device_proto_init() { | ||||
| 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(), | ||||
| 			RawDescriptor: unsafe.Slice(unsafe.StringData(file_device_proto_rawDesc), len(file_device_proto_rawDesc)), | ||||
| 			NumEnums:      1, | ||||
| 			NumMessages:   3, | ||||
| 			NumMessages:   5, | ||||
| 			NumExtensions: 0, | ||||
| 			NumServices:   0, | ||||
| 		}, | ||||
|   | ||||
| @@ -6,18 +6,21 @@ import "google/protobuf/any.proto"; | ||||
|  | ||||
| option go_package = "internal/app/service/device/proto"; | ||||
|  | ||||
| // --- 通用指令结构 --- | ||||
|  | ||||
| // 指令类型 | ||||
| enum MethodType { | ||||
|   SWITCH = 0;  // 启停 | ||||
|   COLLECT = 1; // 采集 | ||||
| } | ||||
|  | ||||
| // 指令 | ||||
| // 指令 (所有空中数据都会被包装在这里面) | ||||
| message Instruction { | ||||
|   MethodType method = 1; | ||||
|   google.protobuf.Any data = 2; | ||||
| } | ||||
|  | ||||
| // Switch 指令的载荷 | ||||
| message Switch { | ||||
|   string device_action = 1; // 指令 | ||||
|   int32 bus_number = 2;     // 总线号 | ||||
| @@ -25,8 +28,28 @@ message Switch{ | ||||
|   int32 relay_channel = 4;  // 继电器通道号 | ||||
| } | ||||
|  | ||||
| message Collect{ | ||||
|   int32 bus_number = 1; // 总线号 | ||||
|   int32 bus_address = 2; // 总线地址 | ||||
|   float value = 3; // 采集值 | ||||
|  | ||||
| // --- 批量采集相关结构 --- | ||||
|  | ||||
| // BatchCollectCommand | ||||
| // 用于在平台内部构建一个完整的、包含所有元数据的批量采集任务。 | ||||
| // 这个消息本身不会被发送到设备。 | ||||
| message BatchCollectCommand { | ||||
|   string correlation_id = 1; // 用于关联请求和响应的唯一ID | ||||
|   repeated CollectTask tasks = 2;   // 采集任务列表 | ||||
| } | ||||
|  | ||||
| // CollectTask | ||||
| // 定义了单个采集任务的“意图”。 | ||||
| message CollectTask { | ||||
|   string device_action = 1; // 指令 | ||||
|   int32 bus_number = 2;  // 总线号 | ||||
|   int32 bus_address = 3; // 总线地址 | ||||
| } | ||||
|  | ||||
| // CollectResult | ||||
| // 这是设备响应的、极致精简的数据包。 | ||||
| message CollectResult { | ||||
|   string correlation_id = 1; // 从下行指令中原样返回的关联ID | ||||
|   repeated float values = 2;   // 按预定顺序排列的采集值 | ||||
| } | ||||
| @@ -1,7 +1,6 @@ | ||||
| package task | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| @@ -46,7 +45,7 @@ func (d *DelayTask) parseParameters() error { | ||||
| 	} | ||||
|  | ||||
| 	var params DelayTaskParams | ||||
| 	err := json.Unmarshal(d.executionTask.Task.Parameters, ¶ms) | ||||
| 	err := d.executionTask.Task.ParseParameters(¶ms) | ||||
| 	if err != nil { | ||||
| 		d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) | ||||
| 		return fmt.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) | ||||
|   | ||||
| @@ -9,7 +9,6 @@ import ( | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" | ||||
| ) | ||||
|  | ||||
| // ReleaseFeedWeightTaskParams 定义了 ReleaseFeedWeightTask 的参数结构 | ||||
| @@ -25,23 +24,28 @@ type ReleaseFeedWeightTask struct { | ||||
| 	sensorDataRepo repository.SensorDataRepository | ||||
| 	claimedLog     *models.TaskExecutionLog | ||||
|  | ||||
| 	feedPortDevice     *models.Device // 下料口基本信息 | ||||
| 	releaseWeight      float64        // 需要释放的重量 | ||||
| 	mixingTankDeviceID uint           // 搅拌罐称重传感器ID | ||||
| 	feedPortDevice     *models.Device | ||||
| 	releaseWeight      float64 | ||||
| 	mixingTankDeviceID uint | ||||
|  | ||||
| 	comm     transport.Communicator | ||||
| 	feedPort *device.GeneralDeviceService // 下料口指令下发器 | ||||
| 	feedPort device.Service | ||||
|  | ||||
| 	logger *logs.Logger | ||||
| } | ||||
|  | ||||
| // NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例 | ||||
| func NewReleaseFeedWeightTask(claimedLog *models.TaskExecutionLog, deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator, logger *logs.Logger) Task { | ||||
| func NewReleaseFeedWeightTask( | ||||
| 	claimedLog *models.TaskExecutionLog, | ||||
| 	sensorDataRepo repository.SensorDataRepository, | ||||
| 	deviceRepo repository.DeviceRepository, | ||||
| 	deviceService device.Service, | ||||
| 	logger *logs.Logger, | ||||
| ) Task { | ||||
| 	return &ReleaseFeedWeightTask{ | ||||
| 		claimedLog:     claimedLog, | ||||
| 		deviceRepo:     deviceRepo, | ||||
| 		sensorDataRepo: sensorDataRepo, | ||||
| 		comm:           comm, | ||||
| 		feedPort:       deviceService, // 直接注入 | ||||
| 		logger:         logger, | ||||
| 	} | ||||
| } | ||||
| @@ -118,7 +122,7 @@ func (r *ReleaseFeedWeightTask) parseParameters() error { | ||||
| 	} | ||||
|  | ||||
| 	var params ReleaseFeedWeightTaskParams | ||||
| 	err := json.Unmarshal(r.claimedLog.Task.Parameters, ¶ms) | ||||
| 	err := r.claimedLog.Task.ParseParameters(¶ms) | ||||
| 	if err != nil { | ||||
| 		r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) | ||||
| 		return fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) | ||||
| @@ -140,7 +144,6 @@ func (r *ReleaseFeedWeightTask) parseParameters() error { | ||||
|  | ||||
| 	r.releaseWeight = params.ReleaseWeight | ||||
| 	r.mixingTankDeviceID = params.MixingTankDeviceID | ||||
| 	r.feedPort = device.NewGeneralDeviceService(r.deviceRepo, r.logger, r.comm) | ||||
| 	r.feedPortDevice, err = r.deviceRepo.FindByID(params.FeedPortDeviceID) | ||||
| 	if err != nil { | ||||
| 		r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) | ||||
|   | ||||
| @@ -1,15 +1,14 @@ | ||||
| package task | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" | ||||
| 	"github.com/panjf2000/ants/v2" | ||||
| 	"gorm.io/gorm" | ||||
| ) | ||||
| @@ -84,9 +83,9 @@ type Scheduler struct { | ||||
| 	deviceRepo              repository.DeviceRepository | ||||
| 	sensorDataRepo          repository.SensorDataRepository | ||||
| 	planRepo                repository.PlanRepository | ||||
| 	comm                    transport.Communicator | ||||
| 	analysisPlanTaskManager *AnalysisPlanTaskManager | ||||
| 	progressTracker         *ProgressTracker | ||||
| 	deviceService           device.Service | ||||
|  | ||||
| 	pool     *ants.Pool // 使用 ants 协程池来管理并发 | ||||
| 	wg       sync.WaitGroup | ||||
| @@ -100,20 +99,21 @@ func NewScheduler( | ||||
| 	deviceRepo repository.DeviceRepository, | ||||
| 	sensorDataRepo repository.SensorDataRepository, | ||||
| 	planRepo repository.PlanRepository, | ||||
| 	comm transport.Communicator, | ||||
| 	analysisPlanTaskManager *AnalysisPlanTaskManager, | ||||
| 	logger *logs.Logger, | ||||
| 	deviceService device.Service, | ||||
| 	interval time.Duration, | ||||
| 	numWorkers int) *Scheduler { | ||||
| 	numWorkers int, | ||||
| ) *Scheduler { | ||||
| 	return &Scheduler{ | ||||
| 		pendingTaskRepo:         pendingTaskRepo, | ||||
| 		executionLogRepo:        executionLogRepo, | ||||
| 		deviceRepo:              deviceRepo, | ||||
| 		sensorDataRepo:          sensorDataRepo, | ||||
| 		planRepo:                planRepo, | ||||
| 		comm:                    comm, | ||||
| 		analysisPlanTaskManager: analysisPlanTaskManager, | ||||
| 		logger:                  logger, | ||||
| 		deviceService:           deviceService, | ||||
| 		pollingInterval:         interval, | ||||
| 		workers:                 numWorkers, | ||||
| 		progressTracker:         NewProgressTracker(), | ||||
| @@ -289,7 +289,7 @@ func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task { | ||||
| 	case models.TaskTypeWaiting: | ||||
| 		return NewDelayTask(s.logger, claimedLog) | ||||
| 	case models.TaskTypeReleaseFeedWeight: | ||||
| 		return NewReleaseFeedWeightTask(claimedLog, s.deviceRepo, s.sensorDataRepo, s.comm, s.logger) | ||||
| 		return NewReleaseFeedWeightTask(claimedLog, s.sensorDataRepo, s.deviceRepo, s.deviceService, s.logger) | ||||
|  | ||||
| 	default: | ||||
| 		// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 | ||||
| @@ -304,7 +304,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { | ||||
| 	var params struct { | ||||
| 		PlanID uint `json:"plan_id"` | ||||
| 	} | ||||
| 	if err := json.Unmarshal(claimedLog.Task.Parameters, ¶ms); err != nil { | ||||
| 	if err := claimedLog.Task.ParseParameters(¶ms); err != nil { | ||||
| 		s.logger.Errorf("解析任务参数中的计划ID失败,日志ID: %d, 错误: %v", claimedLog.ID, err) | ||||
| 		return err | ||||
| 	} | ||||
|   | ||||
| @@ -7,9 +7,11 @@ import ( | ||||
| 	"net/http" | ||||
| 	"time" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device/proto" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| 	gproto "google.golang.org/protobuf/proto" | ||||
| 	"gorm.io/datatypes" | ||||
| ) | ||||
|  | ||||
| @@ -31,19 +33,23 @@ type ChirpStackListener struct { | ||||
| 	sensorDataRepo        repository.SensorDataRepository | ||||
| 	deviceRepo            repository.DeviceRepository | ||||
| 	deviceCommandLogRepo  repository.DeviceCommandLogRepository | ||||
| 	pendingCollectionRepo repository.PendingCollectionRepository // 新增 | ||||
| } | ||||
|  | ||||
| // NewChirpStackListener 创建一个新的 ChirpStackListener 实例 | ||||
| func NewChirpStackListener( | ||||
| 	logger *logs.Logger, | ||||
| 	sensorDataRepo repository.SensorDataRepository, | ||||
| 	deviceRepo repository.DeviceRepository, | ||||
| 	deviceCommandLogRepo repository.DeviceCommandLogRepository, | ||||
| ) *ChirpStackListener { | ||||
| 	pendingCollectionRepo repository.PendingCollectionRepository, // 新增 | ||||
| ) ListenHandler { // 返回接口类型 | ||||
| 	return &ChirpStackListener{ | ||||
| 		logger:                logger, | ||||
| 		sensorDataRepo:        sensorDataRepo, | ||||
| 		deviceRepo:            deviceRepo, | ||||
| 		deviceCommandLogRepo:  deviceCommandLogRepo, | ||||
| 		pendingCollectionRepo: pendingCollectionRepo, // 新增 | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -151,12 +157,20 @@ type GenericSensorReading struct { | ||||
|  | ||||
| // handleUpEvent 处理上行数据事件 | ||||
| func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { | ||||
| 	c.logger.Infof("处理 'up' 事件: %+v", event) | ||||
| 	c.logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui) | ||||
|  | ||||
| 	// 记录信号强度 | ||||
| 	// 1. 查找区域主控设备 | ||||
| 	regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) | ||||
| 		return | ||||
| 	} | ||||
| 	c.logger.Infof("找到区域主控: %s (ID: %d)", regionalController.Name, regionalController.ID) | ||||
|  | ||||
| 	// 2. 记录区域主控的信号强度 (如果存在) | ||||
| 	if len(event.RxInfo) > 0 { | ||||
| 		// 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。 | ||||
| 		// 因此,我们只取第一个 RxInfo 中的信号数据即可。 | ||||
| 	if len(event.RxInfo) > 0 { | ||||
| 		rx := event.RxInfo[0] // 取第一个接收到的网关信息 | ||||
|  | ||||
| 		// 构建 SignalMetrics 结构体 | ||||
| @@ -164,73 +178,111 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { | ||||
| 			RssiDbm: rx.Rssi, | ||||
| 			SnrDb:   rx.Snr, | ||||
| 		} | ||||
|  | ||||
| 		// 这里的 event.DeviceInfo.DevEui 对应的是区域主控的 DevEui | ||||
| 		regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) | ||||
| 		if err != nil { | ||||
| 			c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) | ||||
| 			return | ||||
| 		} | ||||
| 		// 记录区域主控的信号强度 | ||||
| 		c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics) | ||||
| 		c.logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", regionalController.ID, rx.Rssi, rx.Snr) | ||||
| 	} else { | ||||
| 		c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui) | ||||
| 	} | ||||
|  | ||||
| 	// 解析并记录传感器数据 (温度、湿度、重量) | ||||
| 	// 假设 event.Data (frmPayload) 是 Base64 编码的 JSON 数组字符串 | ||||
| 	if event.Data != "" { | ||||
| 	// 3. 处理上报的传感器数据 | ||||
| 	if event.Data == "" { | ||||
| 		c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无需记录上行数据。DevEui: %s", event.DeviceInfo.DevEui) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// 3.1 Base64 解码 | ||||
| 	decodedData, err := base64.StdEncoding.DecodeString(event.Data) | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 		var readings []GenericSensorReading | ||||
| 		if err := json.Unmarshal(decodedData, &readings); err != nil { | ||||
| 			c.logger.Errorf("解析 'up' 事件的解码后 Data (JSON 数组) 失败: %v, Decoded Data: %s", err, string(decodedData)) | ||||
| 	// 3.2 解析外层 "信封" | ||||
| 	var instruction proto.Instruction | ||||
| 	if err := gproto.Unmarshal(decodedData, &instruction); err != nil { | ||||
| 		c.logger.Errorf("解析上行 Instruction Protobuf 失败: %v, Decoded Data: %x", err, decodedData) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 		// 查找区域主控设备,以便记录其ID | ||||
| 		regionalController, err := c.deviceRepo.FindByDevEui(event.DeviceInfo.DevEui) | ||||
| 	// 3.3 检查是否是采集响应 | ||||
| 	if instruction.Method != proto.MethodType_COLLECT { | ||||
| 		c.logger.Infof("收到一个非采集响应的上行指令 (Method: %s),无需处理。", instruction.Method.String()) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// 2.4 解包内层 CollectResult | ||||
| 	var collectResp proto.CollectResult | ||||
| 	if err := instruction.Data.UnmarshalTo(&collectResp); err != nil { | ||||
| 		c.logger.Errorf("解包数据信息失败: %v", err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	correlationID := collectResp.CorrelationId | ||||
| 	c.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values)) | ||||
|  | ||||
| 	// 3. 根据 CorrelationID 查找待处理请求 | ||||
| 	pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(correlationID) | ||||
| 	if err != nil { | ||||
| 			c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err) | ||||
| 		c.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 		for _, reading := range readings { | ||||
| 			// 根据类型构建具体的传感器数据结构体 | ||||
| 	// 检查状态,防止重复处理 | ||||
| 	if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut { | ||||
| 		c.logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// 4. 匹配数据并存入数据库 | ||||
| 	deviceIDs := pendingReq.CommandMetadata | ||||
| 	values := collectResp.Values | ||||
| 	if len(deviceIDs) != len(values) { | ||||
| 		c.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID) | ||||
| 		// TODO 数量不匹配是否全改成失败 | ||||
| 		// 即使数量不匹配,也更新状态为完成,以防止请求永远 pending | ||||
| 		err = c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time) | ||||
| 		if err != nil { | ||||
| 			c.logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err) | ||||
| 		} | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	for i, deviceID := range deviceIDs { | ||||
| 		value := values[i] | ||||
| 		dev, err := c.deviceRepo.FindByID(deviceID) | ||||
| 		if err != nil { | ||||
| 			c.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		sensorDataType, ok := models.DeviceSubTypeToSensorDataTypeMap[dev.SubType] | ||||
| 		if !ok { | ||||
| 			c.logger.Warnf("设备 %d 的子类型 '%s' 没有对应的传感器数据类型,跳过记录。", dev.ID, dev.SubType) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		var sensorData interface{} | ||||
| 			var sensorDataType models.SensorDataType | ||||
|  | ||||
| 			switch reading.Type { | ||||
| 			case models.SensorDataTypeTemperature: // 使用枚举常量 | ||||
| 				sensorData = models.TemperatureData{ | ||||
| 					TemperatureCelsius: reading.Value, | ||||
| 				} | ||||
| 				sensorDataType = models.SensorDataTypeTemperature | ||||
| 			case models.SensorDataTypeHumidity: // 使用枚举常量 | ||||
| 				sensorData = models.HumidityData{ | ||||
| 					HumidityPercent: reading.Value, | ||||
| 				} | ||||
| 				sensorDataType = models.SensorDataTypeHumidity | ||||
| 			case models.SensorDataTypeWeight: // 使用枚举常量 | ||||
| 				sensorData = models.WeightData{ | ||||
| 					WeightKilograms: reading.Value, | ||||
| 				} | ||||
| 				sensorDataType = models.SensorDataTypeWeight | ||||
| 		switch sensorDataType { | ||||
| 		case models.SensorDataTypeTemperature: | ||||
| 			sensorData = models.TemperatureData{TemperatureCelsius: float64(value)} | ||||
| 		case models.SensorDataTypeHumidity: | ||||
| 			sensorData = models.HumidityData{HumidityPercent: float64(value)} | ||||
| 		case models.SensorDataTypeWeight: | ||||
| 			sensorData = models.WeightData{WeightKilograms: float64(value)} | ||||
| 		default: | ||||
| 				c.logger.Warnf("处理 'up' 事件时遇到未知传感器类型: %s, Value: %f. 区域主控DevEui: %s, 传感器设备ID: %d", | ||||
| 					reading.Type, reading.Value, event.DeviceInfo.DevEui, reading.DeviceID) | ||||
| 				continue // 跳过未知类型 | ||||
| 			c.logger.Warnf("未处理的传感器数据类型 '%s' (设备ID: %d)", sensorDataType, dev.ID) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 			// 记录普通设备的传感器数据 | ||||
| 			c.recordSensorData(regionalController.ID, reading.DeviceID, event.Time, sensorDataType, sensorData) | ||||
| 		c.recordSensorData(pendingReq.DeviceID, dev.ID, event.Time, sensorDataType, sensorData) | ||||
| 		c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 值=%.2f", dev.ID, sensorDataType, value) | ||||
| 	} | ||||
|  | ||||
| 	// 5. 更新请求状态为“已完成” | ||||
| 	if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time); err != nil { | ||||
| 		c.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err) | ||||
| 	} else { | ||||
| 		c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无法记录传感器数据。DevEui: %s", event.DeviceInfo.DevEui) | ||||
| 		c.logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID) | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -8,6 +8,7 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/api" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/task" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport" | ||||
| @@ -31,6 +32,7 @@ type Application struct { | ||||
| 	planRepo                repository.PlanRepository | ||||
| 	pendingTaskRepo         repository.PendingTaskRepository | ||||
| 	executionLogRepo        repository.ExecutionLogRepository | ||||
| 	pendingCollectionRepo   repository.PendingCollectionRepository | ||||
| 	analysisPlanTaskManager *task.AnalysisPlanTaskManager | ||||
| } | ||||
|  | ||||
| @@ -76,20 +78,52 @@ func NewApplication(configPath string) (*Application, error) { | ||||
| 	// 初始化命令下发历史仓库 | ||||
| 	deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB()) | ||||
|  | ||||
| 	// 初始化待采集请求仓库 | ||||
| 	pendingCollectionRepo := repository.NewGormPendingCollectionRepository(storage.GetDB()) | ||||
|  | ||||
| 	// 初始化设备上行监听器 | ||||
| 	listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo) | ||||
| 	listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo, pendingCollectionRepo) | ||||
|  | ||||
| 	// 初始化计划触发器管理器 | ||||
| 	analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) | ||||
|  | ||||
| 	// 初始化设备通信器 | ||||
| 	comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger, deviceCommandLogRepo, deviceRepo) | ||||
| 	// 初始化设备通信器 (纯粹的通信客户端) | ||||
| 	comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger) | ||||
|  | ||||
| 	// 初始化通用设备服务 | ||||
| 	generalDeviceService := device.NewGeneralDeviceService( | ||||
| 		deviceRepo, | ||||
| 		deviceCommandLogRepo, | ||||
| 		pendingCollectionRepo, | ||||
| 		logger, | ||||
| 		comm, | ||||
| 	) | ||||
|  | ||||
| 	//  初始化任务执行器 | ||||
| 	executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, deviceRepo, sensorDataRepo, planRepo, comm, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) | ||||
| 	executor := task.NewScheduler( | ||||
| 		pendingTaskRepo, | ||||
| 		executionLogRepo, | ||||
| 		deviceRepo, | ||||
| 		sensorDataRepo, | ||||
| 		planRepo, | ||||
| 		analysisPlanTaskManager, | ||||
| 		logger, | ||||
| 		generalDeviceService, | ||||
| 		time.Duration(cfg.Task.Interval)*time.Second, | ||||
| 		cfg.Task.NumWorkers, | ||||
| 	) | ||||
|  | ||||
| 	//  初始化 API 服务器 | ||||
| 	apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager) | ||||
| 	apiServer := api.NewAPI( | ||||
| 		cfg.Server, | ||||
| 		logger, | ||||
| 		userRepo, | ||||
| 		deviceRepo, | ||||
| 		planRepo, | ||||
| 		tokenService, | ||||
| 		listenHandler, | ||||
| 		analysisPlanTaskManager, | ||||
| 	) | ||||
|  | ||||
| 	//  组装 Application 对象 | ||||
| 	app := &Application{ | ||||
| @@ -98,10 +132,10 @@ func NewApplication(configPath string) (*Application, error) { | ||||
| 		Storage:                 storage, | ||||
| 		Executor:                executor, | ||||
| 		API:                     apiServer, | ||||
| 		// 填充新增的字段 | ||||
| 		planRepo:                planRepo, | ||||
| 		pendingTaskRepo:         pendingTaskRepo, | ||||
| 		executionLogRepo:        executionLogRepo, | ||||
| 		pendingCollectionRepo:   pendingCollectionRepo, | ||||
| 		analysisPlanTaskManager: analysisPlanTaskManager, | ||||
| 	} | ||||
|  | ||||
| @@ -112,7 +146,13 @@ func NewApplication(configPath string) (*Application, error) { | ||||
| func (app *Application) Start() error { | ||||
| 	app.Logger.Info("应用启动中...") | ||||
|  | ||||
| 	// --- 新增逻辑:初始化待执行任务列表 --- | ||||
| 	// --- 清理待采集任务 --- | ||||
| 	if err := app.initializePendingCollections(); err != nil { | ||||
| 		// 这是一个非致命错误,记录它,但应用应继续启动 | ||||
| 		app.Logger.Error(err) | ||||
| 	} | ||||
|  | ||||
| 	// --- 初始化待执行任务列表 --- | ||||
| 	if err := app.initializePendingTasks( | ||||
| 		app.planRepo,                // 传入 planRepo | ||||
| 		app.pendingTaskRepo,         // 传入 pendingTaskRepo | ||||
| @@ -160,6 +200,25 @@ func (app *Application) Stop() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // initializePendingCollections 在应用启动时处理所有未完成的采集请求。 | ||||
| // 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 | ||||
| // 这保证了系统在每次启动时都处于一个干净、确定的状态。 | ||||
| func (app *Application) initializePendingCollections() error { | ||||
| 	app.Logger.Info("开始清理所有未完成的采集请求...") | ||||
|  | ||||
| 	// 直接将所有 'pending' 状态的请求更新为 'timed_out'。 | ||||
| 	count, err := app.pendingCollectionRepo.MarkAllPendingAsTimedOut() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("清理未完成的采集请求失败: %v", err) | ||||
| 	} else if count > 0 { | ||||
| 		app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count) | ||||
| 	} else { | ||||
| 		app.Logger.Info("没有需要清理的采集请求。") | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 | ||||
| func (app *Application) initializePendingTasks( | ||||
| 	planRepo repository.PlanRepository, | ||||
|   | ||||
| @@ -121,6 +121,7 @@ type ChirpStackConfig struct { | ||||
| 	APIToken                 string `yaml:"api_token"` | ||||
| 	FPort                    int    `yaml:"fport"` | ||||
| 	APITimeout               int    `yaml:"api_timeout"` | ||||
| 	CollectionRequestTimeout int    `yaml:"collection_request_timeout"` | ||||
| } | ||||
|  | ||||
| // TaskConfig 代表任务调度配置 | ||||
|   | ||||
| @@ -6,6 +6,8 @@ import ( | ||||
|  | ||||
| 	"gorm.io/datatypes" | ||||
| 	"gorm.io/gorm" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils" | ||||
| ) | ||||
|  | ||||
| // DeviceType 定义了设备的高级类别 | ||||
| @@ -30,6 +32,8 @@ const ( | ||||
| 	SubTypeSensorHumidity DeviceSubType = "humidity" | ||||
| 	// SubTypeSensorAmmonia 氨气传感器 | ||||
| 	SubTypeSensorAmmonia DeviceSubType = "ammonia" | ||||
| 	// SubTypeSensorWeight 电子秤 | ||||
| 	SubTypeSensorWeight DeviceSubType = "weight" | ||||
|  | ||||
| 	// SubTypeValveFeed 下料阀门 | ||||
| 	SubTypeValveFeed DeviceSubType = "feed_valve" | ||||
| @@ -85,6 +89,10 @@ type Device struct { | ||||
| 	// Location 描述了设备的物理安装位置,例如 "1号猪舍东侧",方便运维。建立索引以优化按位置查询。 | ||||
| 	Location string `gorm:"index" json:"location"` | ||||
|  | ||||
| 	// Command 存储了与设备交互所需的具体指令。 | ||||
| 	// 例如,对于传感器,这里存储 Modbus 采集指令;对于开关和区域主控,这里可以为空。 | ||||
| 	Command string `gorm:"type:varchar(255)" json:"command"` | ||||
|  | ||||
| 	// Properties 用于存储特定类型设备的独有属性,采用JSON格式。 | ||||
| 	// 建议在应用层为不同子类型的设备定义专用的属性结构体(如 LoraProperties, BusProperties),以保证数据一致性。 | ||||
| 	Properties datatypes.JSON `json:"properties"` | ||||
| @@ -110,28 +118,52 @@ func (d *Device) ParseProperties(v interface{}) error { | ||||
|  | ||||
| // SelfCheck 进行参数自检, 返回检测结果 | ||||
| // 方法会根据自身类型进行参数检查, 参数不全时返回false | ||||
| // TODO 没写单测 | ||||
| func (d *Device) SelfCheck() bool { | ||||
|  | ||||
| 	properties := make(map[string]interface{}) | ||||
| 	if err := d.ParseProperties(&properties); err != nil { | ||||
| 	// 使用清晰的 switch 结构,确保所有情况都被覆盖 | ||||
| 	switch d.Type { | ||||
| 	case DeviceTypeAreaController: | ||||
| 		props := make(map[string]interface{}) | ||||
| 		if err := d.ParseProperties(&props); err != nil { | ||||
| 			return false | ||||
| 		} | ||||
|  | ||||
| 	has := func(key string) bool { | ||||
| 		_, ok := properties[key] | ||||
| 		_, ok := props[LoRaAddress].(string) | ||||
| 		return ok | ||||
|  | ||||
| 	case DeviceTypeDevice: | ||||
| 		// 所有普通设备都必须有父级 | ||||
| 		if d.ParentID == nil || *d.ParentID == 0 { | ||||
| 			return false | ||||
| 		} | ||||
|  | ||||
| 		props := make(map[string]interface{}) | ||||
| 		if err := d.ParseProperties(&props); err != nil { | ||||
| 			return false | ||||
| 		} | ||||
|  | ||||
| 		// hasPureNumeric 检查一个key是否存在于map中,并且其值是纯数字(整数或可解析为整数的字符串) | ||||
| 		hasPureNumeric := func(key string) bool { | ||||
| 			val, ok := props[key] | ||||
| 			if !ok { | ||||
| 				return false // Key不存在 | ||||
| 			} | ||||
| 			return utils.IsPureNumeric(val) | ||||
| 		} | ||||
|  | ||||
| 		// 根据子类型进行具体校验 | ||||
| 		switch d.SubType { | ||||
| 	case SubTypeFan: | ||||
| 		if !has(BusNumber) || !has(BusAddress) || !has(RelayChannel) { | ||||
| 			return false | ||||
| 		} | ||||
| 		// 所有传感器类型都必须有 Command 和总线信息,且总线信息为纯数字 | ||||
| 		case SubTypeSensorTemp, SubTypeSensorHumidity, SubTypeSensorWeight, SubTypeSensorAmmonia: | ||||
| 			return d.Command != "" && hasPureNumeric(BusNumber) && hasPureNumeric(BusAddress) | ||||
| 		// 所有开关类型都必须有继电器和总线信息,且都为纯数字 | ||||
| 		case SubTypeFan, SubTypeWaterCurtain, SubTypeValveFeed: | ||||
| 			return hasPureNumeric(BusNumber) && hasPureNumeric(BusAddress) && hasPureNumeric(RelayChannel) | ||||
| 		// 如果是未知的子类型,或者没有子类型,则认为自检失败 | ||||
| 		default: | ||||
| 		// 不应该有类型未知的设备 | ||||
| 			return false | ||||
| 		} | ||||
|  | ||||
| 	return true | ||||
| 	// 如果设备类型不是已知的任何一种,则自检失败 | ||||
| 	default: | ||||
| 		return false | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -1,32 +0,0 @@ | ||||
| package models | ||||
|  | ||||
| import ( | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| // DeviceCommandLog 记录下行任务的下发情况和设备确认状态 | ||||
| type DeviceCommandLog struct { | ||||
| 	// MessageID 是下行消息的唯一标识符。 | ||||
| 	// 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。 | ||||
| 	MessageID string `gorm:"primaryKey" json:"message_id"` | ||||
|  | ||||
| 	// DeviceID 是接收此下行任务的设备的ID。 | ||||
| 	// 对于 LoRaWAN,这通常是区域主控设备的ID。 | ||||
| 	DeviceID uint `gorm:"not null;index" json:"device_id"` | ||||
|  | ||||
| 	// SentAt 记录下行任务最初发送的时间。 | ||||
| 	SentAt time.Time `gorm:"not null" json:"sent_at"` | ||||
|  | ||||
| 	// AcknowledgedAt 记录设备确认收到下行消息的时间。 | ||||
| 	// 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。 | ||||
| 	AcknowledgedAt *time.Time `json:"acknowledged_at"` | ||||
|  | ||||
| 	// ReceivedSuccess 表示设备是否成功接收到下行消息。 | ||||
| 	// true 表示设备已确认收到,false 表示设备未确认收到或下发失败。 | ||||
| 	ReceivedSuccess bool `gorm:"not null" json:"received_success"` | ||||
| } | ||||
|  | ||||
| // TableName 自定义 GORM 使用的数据库表名 | ||||
| func (DeviceCommandLog) TableName() string { | ||||
| 	return "device_command_log" | ||||
| } | ||||
| @@ -73,3 +73,70 @@ func (log *TaskExecutionLog) AfterFind(tx *gorm.DB) (err error) { | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // --- 指令与采集 --- | ||||
|  | ||||
| // PendingCollectionStatus 定义了待采集请求的状态 | ||||
| type PendingCollectionStatus string | ||||
|  | ||||
| const ( | ||||
| 	PendingStatusPending   PendingCollectionStatus = "pending"   // 请求已发送,等待设备响应 | ||||
| 	PendingStatusFulfilled PendingCollectionStatus = "fulfilled" // 已收到设备响应并成功处理 | ||||
| 	PendingStatusTimedOut  PendingCollectionStatus = "timed_out" // 请求超时,未收到设备响应 | ||||
| ) | ||||
|  | ||||
| // DeviceCommandLog 记录所有“发后即忘”的下行指令日志。 | ||||
| // 这张表主要用于追踪指令是否被网关成功发送 (ack)。 | ||||
| type DeviceCommandLog struct { | ||||
| 	// MessageID 是下行消息的唯一标识符。 | ||||
| 	// 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。 | ||||
| 	MessageID string `gorm:"primaryKey" json:"message_id"` | ||||
|  | ||||
| 	// DeviceID 是接收此下行任务的设备的ID。 | ||||
| 	// 对于 LoRaWAN,这通常是区域主控设备的ID。 | ||||
| 	DeviceID uint `gorm:"not null;index" json:"device_id"` | ||||
|  | ||||
| 	// SentAt 记录下行任务最初发送的时间。 | ||||
| 	SentAt time.Time `gorm:"not null" json:"sent_at"` | ||||
|  | ||||
| 	// AcknowledgedAt 记录设备确认收到下行消息的时间。 | ||||
| 	// 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。 | ||||
| 	AcknowledgedAt *time.Time `json:"acknowledged_at"` | ||||
|  | ||||
| 	// ReceivedSuccess 表示设备是否成功接收到下行消息。 | ||||
| 	// true 表示设备已确认收到,false 表示设备未确认收到或下发失败。 | ||||
| 	ReceivedSuccess bool `gorm:"not null" json:"received_success"` | ||||
| } | ||||
|  | ||||
| // TableName 自定义 GORM 使用的数据库表名 | ||||
| func (DeviceCommandLog) TableName() string { | ||||
| 	return "device_command_log" | ||||
| } | ||||
|  | ||||
| // PendingCollection 记录所有需要设备响应的“待采集请求”。 | ||||
| // 这是一张状态机表,追踪从请求发送到收到响应的整个生命周期。 | ||||
| type PendingCollection struct { | ||||
| 	// CorrelationID 是由平台生成的、在请求和响应之间全局唯一的关联ID,作为主键。 | ||||
| 	CorrelationID string `gorm:"primaryKey"` | ||||
|  | ||||
| 	// DeviceID 是接收此任务的设备ID | ||||
| 	// 对于 LoRaWAN,这通常是区域主控设备的ID。 | ||||
| 	DeviceID uint `gorm:"index"` | ||||
|  | ||||
| 	// CommandMetadata 存储了此次采集任务对应的设备ID列表,顺序与设备响应值的顺序一致。 | ||||
| 	CommandMetadata UintArray `gorm:"type:bigint[]"` | ||||
|  | ||||
| 	// Status 是该请求的当前状态,用于状态机管理和超时处理。 | ||||
| 	Status PendingCollectionStatus `gorm:"index"` | ||||
|  | ||||
| 	// FulfilledAt 是收到设备响应并成功处理的时间。使用指针以允许 NULL 值。 | ||||
| 	FulfilledAt *time.Time | ||||
|  | ||||
| 	// CreatedAt 是 GORM 的标准字段,记录请求创建时间。 | ||||
| 	CreatedAt time.Time | ||||
| } | ||||
|  | ||||
| // TableName 自定义 GORM 使用的数据库表名 | ||||
| func (PendingCollection) TableName() string { | ||||
| 	return "pending_collections" | ||||
| } | ||||
|   | ||||
| @@ -1,5 +1,13 @@ | ||||
| package models | ||||
|  | ||||
| import ( | ||||
| 	"database/sql/driver" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| ) | ||||
|  | ||||
| // GetAllModels 返回一个包含所有数据库模型实例的切片。 | ||||
| // 这个函数用于在数据库初始化时自动迁移所有的表结构。 | ||||
| func GetAllModels() []interface{} { | ||||
| @@ -14,5 +22,70 @@ func GetAllModels() []interface{} { | ||||
| 		&PendingTask{}, | ||||
| 		&SensorData{}, | ||||
| 		&DeviceCommandLog{}, | ||||
| 		&PendingCollection{}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // UintArray 是一个自定义类型,代表 uint 的切片。 | ||||
| // 它实现了 gorm.Scanner 和 driver.Valuer 接口, | ||||
| // 以便能与数据库的 bigint[] 类型进行原生映射。 | ||||
| type UintArray []uint | ||||
|  | ||||
| // Value 实现了 driver.Valuer 接口。 | ||||
| // 它告诉 GORM 如何将 UintArray ([]) 转换为数据库能够理解的格式。 | ||||
| func (a UintArray) Value() (driver.Value, error) { | ||||
| 	if a == nil { | ||||
| 		return "{}", nil | ||||
| 	} | ||||
|  | ||||
| 	var b strings.Builder | ||||
| 	b.WriteString("{") | ||||
| 	for i, v := range a { | ||||
| 		if i > 0 { | ||||
| 			b.WriteString(",") | ||||
| 		} | ||||
| 		b.WriteString(strconv.FormatUint(uint64(v), 10)) | ||||
| 	} | ||||
| 	b.WriteString("}") | ||||
| 	return b.String(), nil | ||||
| } | ||||
|  | ||||
| // Scan 实现了 gorm.Scanner 接口。 | ||||
| // 它告诉 GORM 如何将从数据库读取的数据转换为我们的 UintArray ([])。 | ||||
| func (a *UintArray) Scan(src interface{}) error { | ||||
| 	if src == nil { | ||||
| 		*a = nil | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	var srcStr string | ||||
| 	switch v := src.(type) { | ||||
| 	case []byte: | ||||
| 		srcStr = string(v) | ||||
| 	case string: | ||||
| 		srcStr = v | ||||
| 	default: | ||||
| 		return errors.New("无法扫描非字符串或字节类型的源到 UintArray") | ||||
| 	} | ||||
|  | ||||
| 	// 去掉花括号 | ||||
| 	srcStr = strings.Trim(srcStr, "{}") | ||||
| 	if srcStr == "" { | ||||
| 		*a = []uint{} | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// 按逗号分割 | ||||
| 	parts := strings.Split(srcStr, ",") | ||||
| 	arr := make([]uint, len(parts)) | ||||
| 	for i, p := range parts { | ||||
| 		val, err := strconv.ParseUint(p, 10, 64) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("解析 UintArray 元素失败: %w", err) | ||||
| 		} | ||||
| 		arr[i] = uint(val) | ||||
| 	} | ||||
|  | ||||
| 	*a = arr | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -1,6 +1,8 @@ | ||||
| package models | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"sort" | ||||
| 	"time" | ||||
| @@ -172,3 +174,16 @@ type Task struct { | ||||
| func (Task) TableName() string { | ||||
| 	return "tasks" | ||||
| } | ||||
|  | ||||
| // ParseParameters 解析 JSON 属性到一个具体的结构体中。 | ||||
| // 调用方需要传入一个指向目标结构体实例的指针。 | ||||
| // 示例: | ||||
| // | ||||
| //	var param LoraParameters | ||||
| //	if err := task.ParseParameters(¶m); err != nil { ... } | ||||
| func (t Task) ParseParameters(v interface{}) error { | ||||
| 	if t.Parameters == nil { | ||||
| 		return errors.New("设备属性为空,无法解析") | ||||
| 	} | ||||
| 	return json.Unmarshal(t.Parameters, v) | ||||
| } | ||||
|   | ||||
| @@ -17,6 +17,14 @@ const ( | ||||
| 	SensorDataTypeWeight        SensorDataType = "weight"         // 重量 | ||||
| ) | ||||
|  | ||||
| // DeviceSubTypeToSensorDataTypeMap 定义了设备子类型到其产生的传感器数据类型的静态映射. | ||||
| // 这个公开的 map 是连接设备定义和数据记录的桥梁, 供其他包直接查询. | ||||
| var DeviceSubTypeToSensorDataTypeMap = map[DeviceSubType]SensorDataType{ | ||||
| 	SubTypeSensorTemp:     SensorDataTypeTemperature, | ||||
| 	SubTypeSensorHumidity: SensorDataTypeHumidity, | ||||
| 	SubTypeSensorWeight:   SensorDataTypeWeight, | ||||
| } | ||||
|  | ||||
| // SignalMetrics 存储信号强度数据 | ||||
| type SignalMetrics struct { | ||||
| 	RssiDbm        int     `json:"rssi_dbm"`        // 绝对信号强度(dBm),受距离、障碍物影响 | ||||
|   | ||||
| @@ -35,6 +35,9 @@ type DeviceRepository interface { | ||||
|  | ||||
| 	// FindByDevEui 根据 DevEui (存储在 properties JSONB 中的 lora_address) 查找设备 (新增) | ||||
| 	FindByDevEui(devEui string) (*models.Device, error) | ||||
|  | ||||
| 	// FindByParentAndPhysicalAddress 根据父设备ID和物理地址(总线号、总线地址)查找设备 | ||||
| 	FindByParentAndPhysicalAddress(parentID uint, busNumber int32, busAddress int32) (*models.Device, error) | ||||
| } | ||||
|  | ||||
| // gormDeviceRepository 是 DeviceRepository 的 GORM 实现 | ||||
| @@ -121,3 +124,18 @@ func (r *gormDeviceRepository) FindByDevEui(devEui string) (*models.Device, erro | ||||
| 	} | ||||
| 	return &device, nil | ||||
| } | ||||
|  | ||||
| // FindByParentAndPhysicalAddress 根据父设备ID和物理地址(总线号、总线地址)查找设备 | ||||
| func (r *gormDeviceRepository) FindByParentAndPhysicalAddress(parentID uint, busNumber int32, busAddress int32) (*models.Device, error) { | ||||
| 	var device models.Device | ||||
| 	// PostgreSQL 使用 ->> 操作符来查询 JSONB 字段的文本值 | ||||
| 	err := r.db.Where("parent_id = ?", parentID). | ||||
| 		Where("properties->>'bus_number' = ?", strconv.Itoa(int(busNumber))). | ||||
| 		Where("properties->>'bus_address' = ?", strconv.Itoa(int(busAddress))). | ||||
| 		First(&device).Error | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("根据父设备ID %d 和物理地址 (总线号: %d, 总线地址: %d) 查找设备失败: %w", parentID, busNumber, busAddress, err) | ||||
| 	} | ||||
| 	return &device, nil | ||||
| } | ||||
|   | ||||
							
								
								
									
										67
									
								
								internal/infra/repository/pending_collection_repository.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										67
									
								
								internal/infra/repository/pending_collection_repository.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,67 @@ | ||||
| package repository | ||||
|  | ||||
| import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"gorm.io/gorm" | ||||
| ) | ||||
|  | ||||
| // PendingCollectionRepository 定义了与待采集请求相关的数据库操作接口。 | ||||
| type PendingCollectionRepository interface { | ||||
| 	// Create 创建一个新的待采集请求。 | ||||
| 	Create(req *models.PendingCollection) error | ||||
|  | ||||
| 	// FindByCorrelationID 根据关联ID查找一个待采集请求。 | ||||
| 	FindByCorrelationID(correlationID string) (*models.PendingCollection, error) | ||||
|  | ||||
| 	// UpdateStatusToFulfilled 将指定关联ID的请求状态更新为“已完成”。 | ||||
| 	UpdateStatusToFulfilled(correlationID string, fulfilledAt time.Time) error | ||||
|  | ||||
| 	// MarkAllPendingAsTimedOut 将所有“待处理”请求更新为“已超时”。 | ||||
| 	MarkAllPendingAsTimedOut() (int64, error) | ||||
| } | ||||
|  | ||||
| // gormPendingCollectionRepository 是 PendingCollectionRepository 的 GORM 实现。 | ||||
| type gormPendingCollectionRepository struct { | ||||
| 	db *gorm.DB | ||||
| } | ||||
|  | ||||
| // NewGormPendingCollectionRepository 创建一个新的 PendingCollectionRepository GORM 实现实例。 | ||||
| func NewGormPendingCollectionRepository(db *gorm.DB) PendingCollectionRepository { | ||||
| 	return &gormPendingCollectionRepository{db: db} | ||||
| } | ||||
|  | ||||
| // Create 创建一个新的待采集请求。 | ||||
| func (r *gormPendingCollectionRepository) Create(req *models.PendingCollection) error { | ||||
| 	return r.db.Create(req).Error | ||||
| } | ||||
|  | ||||
| // FindByCorrelationID 根据关联ID查找一个待采集请求。 | ||||
| func (r *gormPendingCollectionRepository) FindByCorrelationID(correlationID string) (*models.PendingCollection, error) { | ||||
| 	var req models.PendingCollection | ||||
| 	if err := r.db.First(&req, "correlation_id = ?", correlationID).Error; err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &req, nil | ||||
| } | ||||
|  | ||||
| // UpdateStatusToFulfilled 将指定关联ID的请求状态更新为“已完成”。 | ||||
| func (r *gormPendingCollectionRepository) UpdateStatusToFulfilled(correlationID string, fulfilledAt time.Time) error { | ||||
| 	return r.db.Model(&models.PendingCollection{}). | ||||
| 		Where("correlation_id = ?", correlationID). | ||||
| 		Updates(map[string]interface{}{ | ||||
| 			"status":       models.PendingStatusFulfilled, | ||||
| 			"fulfilled_at": &fulfilledAt, | ||||
| 		}).Error | ||||
| } | ||||
|  | ||||
| // MarkAllPendingAsTimedOut 将所有状态为 'pending' 的记录更新为 'timed_out'。 | ||||
| // 返回被更新的记录数量和错误。 | ||||
| func (r *gormPendingCollectionRepository) MarkAllPendingAsTimedOut() (int64, error) { | ||||
| 	result := r.db.Model(&models.PendingCollection{}). | ||||
| 		Where("status = ?", models.PendingStatusPending). | ||||
| 		Update("status", models.PendingStatusTimedOut) | ||||
|  | ||||
| 	return result.RowsAffected, result.Error | ||||
| } | ||||
| @@ -1,12 +1,12 @@ | ||||
| package lora | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"time" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/config" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service" | ||||
| 	"github.com/go-openapi/runtime" | ||||
| 	httptransport "github.com/go-openapi/runtime/client" | ||||
| @@ -20,10 +20,6 @@ type ChirpStackTransport struct { | ||||
| 	client   *client.ChirpStackRESTAPI | ||||
| 	authInfo runtime.ClientAuthInfoWriter | ||||
| 	config   config.ChirpStackConfig | ||||
|  | ||||
| 	deviceCommandLogRepo repository.DeviceCommandLogRepository | ||||
| 	deviceRepo           repository.DeviceRepository | ||||
|  | ||||
| 	logger   *logs.Logger | ||||
| } | ||||
|  | ||||
| @@ -31,8 +27,6 @@ type ChirpStackTransport struct { | ||||
| func NewChirpStackTransport( | ||||
| 	config config.ChirpStackConfig, | ||||
| 	logger *logs.Logger, | ||||
| 	deviceCommandLogRepo repository.DeviceCommandLogRepository, | ||||
| 	deviceRepo repository.DeviceRepository, | ||||
| ) *ChirpStackTransport { | ||||
| 	// 使用配置中的服务器地址创建一个 HTTP transport。 | ||||
| 	// 它会使用生成的客户端中定义的默认 base path 和 schemes。 | ||||
| @@ -49,12 +43,10 @@ func NewChirpStackTransport( | ||||
| 		authInfo: authInfo, | ||||
| 		config:   config, | ||||
| 		logger:   logger, | ||||
| 		deviceCommandLogRepo: deviceCommandLogRepo, | ||||
| 		deviceRepo:           deviceRepo, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (c *ChirpStackTransport) Send(address string, payload []byte) error { | ||||
| func (c *ChirpStackTransport) Send(address string, payload []byte) (*transport.SendResult, error) { | ||||
| 	// 1. 构建 API 请求体。 | ||||
| 	//    - Confirmed: true 表示确认消息, 设为false将不保证消息送达(但可以节约下行容量)。 | ||||
| 	//    - Data: 经过 Base64 编码的数据。 | ||||
| @@ -72,7 +64,7 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error { | ||||
| 	//    - WithQueueItemDevEui 指定目标设备的 EUI。 | ||||
| 	//    - WithBody 设置请求体。 | ||||
| 	params := device_service.NewDeviceServiceEnqueueParams(). | ||||
| 		WithTimeout(10 * time.Second). | ||||
| 		WithTimeout(time.Duration(c.config.APITimeout) * time.Second). | ||||
| 		WithQueueItemDevEui(address). | ||||
| 		WithBody(body) | ||||
|  | ||||
| @@ -81,53 +73,23 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error { | ||||
| 	resp, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo) | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err) | ||||
| 		return err | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	// 4. 成功发送后,尝试记录下行任务 | ||||
| 	messageID := "" | ||||
| 	if resp != nil && resp.Payload != nil && resp.Payload.ID != "" { // 根据实际结构,使用 resp.Payload.ID | ||||
| 		messageID = resp.Payload.ID | ||||
| 	} else { | ||||
| 		c.logger.Warnf("ChirpStack Enqueue 响应未包含 MessageID (ID),无法记录下行任务。设备: %s", address) | ||||
| 		// 即使无法获取 MessageID,也认为发送成功,因为 ChirpStack Enqueue 成功了 | ||||
| 		return nil | ||||
| 	if resp == nil || resp.Payload == nil || resp.Payload.ID == "" { | ||||
| 		// 这是一个需要明确处理的错误情况,因为调用方依赖 MessageID。 | ||||
| 		errMsg := "ChirpStack Enqueue 响应未包含 MessageID (ID)" | ||||
| 		c.logger.Errorf(errMsg) | ||||
| 		return nil, errors.New(errMsg) | ||||
| 	} | ||||
|  | ||||
| 	// 调用私有方法记录下行任务 | ||||
| 	if err := c.recordDownlinkTask(address, messageID); err != nil { | ||||
| 		// 记录失败不影响下行命令的发送成功 | ||||
| 		c.logger.Errorf("记录下行任务失败 (MessageID: %s, DevEui: %s): %v", messageID, address, err) | ||||
| 		return nil | ||||
| 	c.logger.Infof("成功将 payload 发送到设备 %s 的队列 (MessageID: %s)", address, resp.Payload.ID) | ||||
|  | ||||
| 	// 将 MessageID 包装在 SendResult 中返回 | ||||
| 	result := &transport.SendResult{ | ||||
| 		MessageID: resp.Payload.ID, | ||||
| 	} | ||||
|  | ||||
| 	c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功,并创建下行任务记录 (MessageID: %s)", address, messageID) | ||||
| 	return result, nil | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // recordDownlinkTask 记录下行任务到数据库 | ||||
| func (c *ChirpStackTransport) recordDownlinkTask(devEui string, messageID string) error { | ||||
| 	// 获取区域主控的内部 DeviceID | ||||
| 	regionalController, err := c.deviceRepo.FindByDevEui(devEui) | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("记录下行任务失败:无法通过 DevEui '%s' 找到区域主控设备: %v", devEui, err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 创建 DeviceCommandLog | ||||
| 	record := &models.DeviceCommandLog{ | ||||
| 		MessageID:      messageID, | ||||
| 		DeviceID:       regionalController.ID, | ||||
| 		SentAt:         time.Now(), | ||||
| 		AcknowledgedAt: nil, // 初始状态为未确认 | ||||
| 	} | ||||
|  | ||||
| 	if err := c.deviceCommandLogRepo.Create(record); err != nil { | ||||
| 		c.logger.Errorf("创建下行任务记录失败 (MessageID: %s, DeviceID: %d): %v", messageID, regionalController.ID, err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	c.logger.Infof("成功创建下行任务记录 (MessageID: %s, DeviceID: %d)", messageID, regionalController.ID) | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -3,5 +3,13 @@ package transport | ||||
| // Communicator 用于其他设备通信 | ||||
| type Communicator interface { | ||||
| 	// Send 用于发送一条单向数据(不等待回信) | ||||
| 	Send(address string, payload []byte) error | ||||
| 	// 成功时,它返回一个包含 MessageID 的 SendResult,以便调用方追踪。 | ||||
| 	Send(address string, payload []byte) (*SendResult, error) | ||||
| } | ||||
|  | ||||
| // SendResult 包含了 SendGo 方法成功执行后返回的结果。 | ||||
| type SendResult struct { | ||||
| 	// MessageID 是通信服务为此次发送分配的唯一标识符。 | ||||
| 	// 调用方需要保存此 ID,以便后续关联 ACK 等事件。 | ||||
| 	MessageID string | ||||
| } | ||||
|   | ||||
							
								
								
									
										13
									
								
								internal/infra/utils/validation.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								internal/infra/utils/validation.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,13 @@ | ||||
| package utils | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| ) | ||||
|  | ||||
| // IsPureNumeric 检查值是否纯数字(整数或可转换为整数的字符串)。 | ||||
| func IsPureNumeric(val interface{}) bool { | ||||
| 	v := fmt.Sprintf("%v", val) | ||||
| 	_, err := strconv.Atoi(v) | ||||
| 	return err == nil | ||||
| } | ||||
		Reference in New Issue
	
	Block a user