diff --git a/Makefile b/Makefile index a050764..35ef016 100644 --- a/Makefile +++ b/Makefile @@ -45,7 +45,7 @@ swag: # 生成protobuf文件 .PHONY: proto proto: - protoc --go_out=internal/domain/device/proto --go_opt=paths=source_relative --go-grpc_out=internal/domain/device/proto --go-grpc_opt=paths=source_relative -Iinternal/domain/device/proto internal/domain/device/proto/device.proto + protoc --go_out=internal/infra/transport/proto --go_opt=paths=source_relative --go-grpc_out=internal/infra/transport/proto --go-grpc_opt=paths=source_relative -Iinternal/infra/transport/proto internal/infra/transport/proto/device.proto # 运行代码检查 .PHONY: lint diff --git a/config.yml b/config.yml index 653b377..a3aa9e2 100644 --- a/config.yml +++ b/config.yml @@ -69,18 +69,21 @@ lora_mesh: uart_port: "/dev/ttyS1" # LoRa模块的通信波特率 baud_rate: 9600 - # 等待LoRa模块AT指令响应的超时时间 - timeout: 5 + # 等待LoRa模块AT指令响应的超时时间(ms) + timeout: 50 # LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包) # e.g. # EC: 接收端只会接收到消息, 不会接收到请求头 # e.g. 发送: EC 05 02 01 48 65 6c 6c 6f # (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体)) # 接收: 48 65 6c 6c 6f ("Hello") - # ED: 接收端会接收完整数据包,包含请求头 - # e.g. 发送: ED 05 02 01 48 65 6c 6c 6f - # (ED + 05(消息长度) + 0201(地址) + "Hello"(消息本体)) - # 接收: ED 05 02 01 48 65 6c 6c 6f + # ED: 接收端会接收完整数据包,包含自定义协议头和地址信息。 + # e.g. 发送: ED 05 12 34 01 00 01 02 03 + # (ED(帧头) + 05(Length, 即 1(总包数)+1(当前包序号)+3(数据块)) + 12 34(目标地址) + 01(总包数) + 00(当前包序号) + 01 02 03(数据块)) + # 接收: ED 05 12 34 01 00 01 02 03 56 78(56 78 是发送方地址,会自动拼接到消息末尾) lora_mesh_mode: "ED" # 单包最大用户数据数据长度, 模块限制240, 去掉两位自定义包头, 还剩238 - max_chunk_size: 238 \ No newline at end of file + max_chunk_size: 238 + #分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间 + # 还没收到完整的包,则认为接收失败。 + reassembly_timeout: 30 \ No newline at end of file diff --git a/internal/app/webhook/chirp_stack.go b/internal/app/webhook/chirp_stack.go index 2f46263..7be960e 100644 --- a/internal/app/webhook/chirp_stack.go +++ b/internal/app/webhook/chirp_stack.go @@ -8,10 +8,10 @@ import ( "net/http" "time" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/device/proto" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto" gproto "google.golang.org/protobuf/proto" "gorm.io/datatypes" ) diff --git a/internal/domain/device/general_device_service.go b/internal/domain/device/general_device_service.go index 8fa2894..480ab98 100644 --- a/internal/domain/device/general_device_service.go +++ b/internal/domain/device/general_device_service.go @@ -5,11 +5,11 @@ import ( "fmt" "time" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/device/proto" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto" "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/command_generater" "github.com/google/uuid" diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index 29f489a..0fd56ff 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -150,11 +150,12 @@ type LoraConfig struct { // LoraMeshConfig 代表Lora Mesh配置 type LoraMeshConfig struct { - UARTPort string `yaml:"uart_port"` - BaudRate int `yaml:"baud_rate"` - Timeout int `yaml:"timeout"` - LoraMeshMode string `yaml:"lora_mesh_mode"` - MaxChunkSize int `yaml:"max_chunk_size"` + UARTPort string `yaml:"uart_port"` + BaudRate int `yaml:"baud_rate"` + Timeout int `yaml:"timeout"` + LoraMeshMode string `yaml:"lora_mesh_mode"` + MaxChunkSize int `yaml:"max_chunk_size"` + ReassemblyTimeout int `yaml:"reassembly_timeout"` } // NewConfig 创建并返回一个新的配置实例 diff --git a/internal/infra/transport/lora/lora_mesh_uart_passthrough_transport.go b/internal/infra/transport/lora/lora_mesh_uart_passthrough_transport.go index f0a1b30..3514634 100644 --- a/internal/infra/transport/lora/lora_mesh_uart_passthrough_transport.go +++ b/internal/infra/transport/lora/lora_mesh_uart_passthrough_transport.go @@ -1,31 +1,103 @@ package lora import ( + "bytes" + "encoding/binary" + "encoding/json" "fmt" + "io" + "math" + "strconv" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" - + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto" + "github.com/google/uuid" "github.com/tarm/serial" + gproto "google.golang.org/protobuf/proto" + "gorm.io/datatypes" ) -// LoRaMeshUartPassthroughTransport 实现 transport.Communicator 接口, 用于 LoRa 网状网络 UART 透传 +// transportState 定义了传输层的内部状态 +type transportState int + +const ( + stateIdle transportState = iota // 空闲状态 + stateReceiving // 接收状态:正在接收一个(可能分片的)消息 + stateSending // 发送状态:正在发送一个(可能分片的)消息 +) + +// message 是一个内部结构,用于封装一个完整的、已重组的消息及其元数据 +type message struct { + SourceAddr string // 源地址 + DestAddr string // 目标地址 + Payload []byte // 有效载荷 +} + +// LoRaMeshUartPassthroughTransport 实现了 transport.Communicator 和 transport.Listener 接口 type LoRaMeshUartPassthroughTransport struct { config config.LoraMeshConfig logger *logs.Logger - mu sync.Mutex // 保护对 LoRa 模块的并发访问 port *serial.Port + + mu sync.Mutex // 用于保护对外的公共方法(如Send)的并发调用 + state transportState + + stopChan chan struct{} // 用于优雅地停止worker协程 + wg sync.WaitGroup // 用于等待worker协程完全退出 + sendChan chan *sendRequest // 发送任务的请求通道 + + // --- 接收与重组相关 --- + reassemblyBuffers map[uint16]*reassemblyBuffer // 键为源地址SourceAddr,值为对应的重组缓冲区 + currentRecvSource uint16 // 当前正在接收的源地址 + reassemblyTimeout *time.Timer // 分片重组的超时定时器 + reassemblyTimeoutCh chan uint16 // 当超时触发时,用于传递源地址 + + // --- 依赖注入的仓库 --- + areaControllerRepo repository.AreaControllerRepository + pendingCollectionRepo repository.PendingCollectionRepository + deviceRepo repository.DeviceRepository + sensorDataRepo repository.SensorDataRepository } -// NewLoRaMeshUartPassthroughTransport 创建一个新的 LoRaMeshUartPassthroughTransport -func NewLoRaMeshUartPassthroughTransport(config config.LoraMeshConfig, logger *logs.Logger) (*LoRaMeshUartPassthroughTransport, error) { +// sendRequest 封装了一次发送请求 +type sendRequest struct { + address string + payload []byte + result chan *sendResultTuple +} + +// sendResultTuple 用于在通道中安全地传递Send方法的返回值 +type sendResultTuple struct { + result *transport.SendResult + err error +} + +// reassemblyBuffer 用于缓存和重组来自同一源的分片 +type reassemblyBuffer struct { + chunks map[uint8][]byte // 键为当前包序号CurrentChunk + totalChunks uint8 + receivedChunks int +} + +// NewLoRaMeshUartPassthroughTransport 创建一个新的 LoRaMeshUartPassthroughTransport 实例 +func NewLoRaMeshUartPassthroughTransport( + config config.LoraMeshConfig, + logger *logs.Logger, + areaControllerRepo repository.AreaControllerRepository, + pendingCollectionRepo repository.PendingCollectionRepository, + deviceRepo repository.DeviceRepository, + sensorDataRepo repository.SensorDataRepository, +) (*LoRaMeshUartPassthroughTransport, error) { c := &serial.Config{ Name: config.UARTPort, Baud: config.BaudRate, - ReadTimeout: time.Second * time.Duration(config.Timeout), + ReadTimeout: time.Millisecond * time.Duration(config.Timeout), } port, err := serial.OpenPort(c) @@ -33,26 +105,441 @@ func NewLoRaMeshUartPassthroughTransport(config config.LoraMeshConfig, logger *l return nil, fmt.Errorf("无法打开串口 %s: %w", config.UARTPort, err) } - return &LoRaMeshUartPassthroughTransport{ - config: config, - logger: logger, - mu: sync.Mutex{}, - port: port, - }, nil -} - -// Send 将数据发送到指定的地址 -func (t *LoRaMeshUartPassthroughTransport) Send(address string, payload []byte) (*transport.SendResult, error) { - // TODO: 实现发送逻辑 - return nil, nil + t := &LoRaMeshUartPassthroughTransport{ + config: config, + logger: logger, + port: port, + state: stateIdle, + stopChan: make(chan struct{}), + sendChan: make(chan *sendRequest), + reassemblyBuffers: make(map[uint16]*reassemblyBuffer), + reassemblyTimeoutCh: make(chan uint16, 1), + + // 注入依赖 + areaControllerRepo: areaControllerRepo, + pendingCollectionRepo: pendingCollectionRepo, + deviceRepo: deviceRepo, + sensorDataRepo: sensorDataRepo, + } + + return t, nil } +// Listen 启动后台监听协程(非阻塞) func (t *LoRaMeshUartPassthroughTransport) Listen() error { - //TODO implement me - panic("implement me") + t.wg.Add(1) + go t.workerLoop() + t.logger.Info("LoRa传输层工作协程已启动") + return nil } -func (t *LoRaMeshUartPassthroughTransport) Stop() error { - //TODO implement me - panic("implement me") +// Send 将发送任务提交给worker协程 +func (t *LoRaMeshUartPassthroughTransport) Send(address string, payload []byte) (*transport.SendResult, error) { + t.mu.Lock() + defer t.mu.Unlock() + + resultChan := make(chan *sendResultTuple, 1) + req := &sendRequest{ + address: address, + payload: payload, + result: resultChan, + } + + select { + case t.sendChan <- req: + // 等待worker协程处理完毕 + res := <-resultChan + return res.result, res.err + case <-t.stopChan: + return nil, fmt.Errorf("传输层正在停止") + } +} + +// Stop 停止传输层 +func (t *LoRaMeshUartPassthroughTransport) Stop() error { + close(t.stopChan) + t.wg.Wait() + return t.port.Close() +} + +// workerLoop 是核心的状态机和调度器 +func (t *LoRaMeshUartPassthroughTransport) workerLoop() { + defer t.wg.Done() + + readBuffer := make([]byte, 1024) + parserBuffer := new(bytes.Buffer) + + for { + // 1. 检查是否需要停止 (优先检查,以便快速退出) + select { + case <-t.stopChan: + if t.reassemblyTimeout != nil { + t.reassemblyTimeout.Stop() + } + t.logger.Info("LoRa传输层工作协程已停止") + return + default: + } + + // 2. 尝试从串口读取数据 + n, err := t.port.Read(readBuffer) + if n > 0 { + parserBuffer.Write(readBuffer[:n]) + } + if err != nil && err != io.EOF { + // 忽略预期的超时错误(io.EOF),只记录真正的IO错误 + t.logger.Errorf("从串口读取数据时发生错误: %v", err) + } + + // 3. 循环解析缓冲区中的完整物理帧 + for { + frame := t.parseCompleteFrame(parserBuffer) + if frame == nil { + break // 缓冲区中没有更多完整帧了 + } + t.handleFrame(frame) + } + + // 4. 根据当前状态执行主要逻辑 + switch t.state { + case stateIdle: + t.runIdleState() + case stateReceiving: + t.runReceivingState() + } + } +} + +// runIdleState 处理空闲状态下的逻辑,主要是检查并启动发送任务 +func (t *LoRaMeshUartPassthroughTransport) runIdleState() { + select { + case req := <-t.sendChan: + t.state = stateSending + // 此处为阻塞式发送 + result, err := t.executeSend(req) + req.result <- &sendResultTuple{result: result, err: err} + t.state = stateIdle + default: + // 没有发送任务,保持空闲 + } +} + +// runReceivingState 处理接收状态下的逻辑,主要是检查超时 +func (t *LoRaMeshUartPassthroughTransport) runReceivingState() { + select { + case sourceAddr := <-t.reassemblyTimeoutCh: + t.logger.Warnf("接收来自 0x%04X 的消息超时", sourceAddr) + delete(t.reassemblyBuffers, sourceAddr) + t.state = stateIdle + default: + // 等待更多分片或超时 + } +} + +// executeSend 执行完整的发送流程(分片、构建、写入) +func (t *LoRaMeshUartPassthroughTransport) executeSend(req *sendRequest) (*transport.SendResult, error) { + chunks := splitPayload(req.payload, t.config.MaxChunkSize) + totalChunks := uint8(len(chunks)) + + destAddr, err := strconv.ParseUint(req.address, 16, 16) + if err != nil { + return nil, fmt.Errorf("无效的目标地址: %s", req.address) + } + + for i, chunk := range chunks { + currentChunk := uint8(i) + frame := new(bytes.Buffer) + frame.WriteByte(0xED) // 帧头 + frame.WriteByte(uint8(len(chunk) + 2)) // 数据长度 = 数据块 + 2 (总包数+当前包序号) + binary.Write(frame, binary.BigEndian, uint16(destAddr)) // 目标地址 + frame.WriteByte(totalChunks) // 总包数 + frame.WriteByte(currentChunk) // 当前包序号 + frame.Write(chunk) // 数据块 + + _, err := t.port.Write(frame.Bytes()) + if err != nil { + return nil, fmt.Errorf("写入串口失败: %w", err) + } + } + + msgID := uuid.New().String() + return &transport.SendResult{MessageID: msgID}, nil +} + +// handleFrame 处理一个从串口解析出的完整物理帧 +func (t *LoRaMeshUartPassthroughTransport) handleFrame(frame []byte) { + if len(frame) < 8 { + t.logger.Warnf("收到了一个无效长度的帧: %d", len(frame)) + return + } + + destAddr := binary.BigEndian.Uint16(frame[2:4]) + totalChunks := frame[4] + currentChunk := frame[5] + sourceAddr := binary.BigEndian.Uint16(frame[len(frame)-2:]) + chunkData := frame[6 : len(frame)-2] + + // 如果是单包消息 + if totalChunks == 1 { + msg := &message{ + SourceAddr: fmt.Sprintf("%04X", sourceAddr), + DestAddr: fmt.Sprintf("%04X", destAddr), + Payload: chunkData, + } + go t.handleUpstreamMessage(msg) + return + } + + // --- 处理分片消息 --- + switch t.state { + case stateIdle: + if currentChunk == 0 { + t.state = stateReceiving + t.currentRecvSource = sourceAddr + t.reassemblyBuffers[sourceAddr] = &reassemblyBuffer{ + chunks: make(map[uint8][]byte), + totalChunks: totalChunks, + receivedChunks: 0, + } + t.reassemblyBuffers[sourceAddr].chunks[currentChunk] = chunkData + t.reassemblyBuffers[sourceAddr].receivedChunks++ + + if t.reassemblyTimeout != nil { + t.reassemblyTimeout.Stop() + } + t.reassemblyTimeout = time.AfterFunc(time.Duration(t.config.ReassemblyTimeout)*time.Second, func() { + t.reassemblyTimeoutCh <- sourceAddr + }) + } else { + t.logger.Warnf("在空闲状态下收到了一个来自 0x%04X 的非首包分片,已忽略。", sourceAddr) + } + + case stateReceiving: + if sourceAddr != t.currentRecvSource { + t.logger.Warnf("正在接收来自 0x%04X 的数据时,收到了另一个源 0x%04X 的分片,已忽略。", t.currentRecvSource, sourceAddr) + return + } + + buffer, ok := t.reassemblyBuffers[sourceAddr] + if !ok { + t.logger.Errorf("内部错误: 处于接收状态,但没有为 0x%04X 找到缓冲区", sourceAddr) + t.state = stateIdle // 重置状态 + return + } + + // 存入分片并重置超时 + buffer.chunks[currentChunk] = chunkData + buffer.receivedChunks++ + t.reassemblyTimeout.Reset(time.Duration(t.config.ReassemblyTimeout) * time.Second) + + // 检查是否已全部收到 + if buffer.receivedChunks == int(buffer.totalChunks) { + t.reassemblyTimeout.Stop() + + // 重组消息 + fullPayload := new(bytes.Buffer) + for i := 0; i < int(buffer.totalChunks); i++ { + fullPayload.Write(buffer.chunks[uint8(i)]) + } + + msg := &message{ + SourceAddr: fmt.Sprintf("%04X", sourceAddr), + DestAddr: fmt.Sprintf("%04X", destAddr), + Payload: fullPayload.Bytes(), + } + go t.handleUpstreamMessage(msg) + + // 清理并返回空闲状态 + delete(t.reassemblyBuffers, sourceAddr) + t.state = stateIdle + } + } +} + +// handleUpstreamMessage 在独立的协程中处理单个上行的、完整的消息。 +func (t *LoRaMeshUartPassthroughTransport) handleUpstreamMessage(msg *message) { + t.logger.Infof("开始处理来自 %s 的上行消息", msg.SourceAddr) + + // 1. 解析外层 "信封" + var instruction proto.Instruction + if err := gproto.Unmarshal(msg.Payload, &instruction); err != nil { + t.logger.Errorf("解析上行 Instruction Protobuf 失败: %v, 源地址: %s, 原始数据: %x", err, msg.SourceAddr, msg.Payload) + return + } + + // 2. 使用 type switch 从 oneof payload 中提取 CollectResult + var collectResp *proto.CollectResult + switch p := instruction.GetPayload().(type) { + case *proto.Instruction_CollectResult: + collectResp = p.CollectResult + default: + // 如果上行的数据不是采集结果,记录日志并忽略 + t.logger.Infof("收到一个非采集响应的上行指令 (类型: %T),无需处理。源地址: %s", p, msg.SourceAddr) + return + } + + if collectResp == nil { + t.logger.Errorf("从 Instruction 中提取的 CollectResult 为 nil。源地址: %s", msg.SourceAddr) + return + } + + correlationID := collectResp.CorrelationId + t.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values)) + + // 3. 查找区域主控 (注意:LoRa Mesh 的 SourceAddr 对应于区域主控的 NetworkID) + regionalController, err := t.areaControllerRepo.FindByNetworkID(msg.SourceAddr) + if err != nil { + t.logger.Errorf("处理上行消息失败:无法通过源地址 '%s' 找到区域主控设备: %v", msg.SourceAddr, err) + return + } + if err := regionalController.SelfCheck(); err != nil { + t.logger.Errorf("处理上行消息失败:区域主控 %v(ID: %d) 未通过自检: %v", regionalController.Name, regionalController.ID, err) + return + } + + // 4. 根据 CorrelationID 查找待处理请求 + pendingReq, err := t.pendingCollectionRepo.FindByCorrelationID(correlationID) + if err != nil { + t.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err) + return + } + + // 检查状态,防止重复处理 + if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut { + t.logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status) + return + } + + // 5. 匹配数据并存入数据库 + deviceIDs := pendingReq.CommandMetadata + values := collectResp.Values + if len(deviceIDs) != len(values) { + t.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID) + err = t.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, time.Now()) + if err != nil { + t.logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err) + } + return + } + + for i, deviceID := range deviceIDs { + rawSensorValue := values[i] + + if math.IsNaN(float64(rawSensorValue)) { + t.logger.Warnf("设备 (ID: %d) 上报了一个无效的 NaN 值,已跳过当前值的记录。", deviceID) + continue + } + + dev, err := t.deviceRepo.FindByID(deviceID) + if err != nil { + t.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err) + continue + } + if err := dev.SelfCheck(); err != nil { + t.logger.Warnf("跳过设备 %d,因其未通过自检: %v", dev.ID, err) + continue + } + if err := dev.DeviceTemplate.SelfCheck(); err != nil { + t.logger.Warnf("跳过设备 %d,因其设备模板未通过自检: %v", dev.ID, err) + continue + } + + var valueDescriptors []*models.ValueDescriptor + if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil { + t.logger.Warnf("跳过设备 %d,因其设备模板的 Values 属性解析失败: %v", dev.ID, err) + continue + } + if len(valueDescriptors) == 0 { + t.logger.Warnf("跳过设备 %d,因其设备模板缺少 ValueDescriptor 定义", dev.ID) + continue + } + valueDescriptor := valueDescriptors[0] + + parsedValue := float64(rawSensorValue)*valueDescriptor.Multiplier + valueDescriptor.Offset + + var dataToRecord interface{} + switch valueDescriptor.Type { + case models.SensorTypeTemperature: + dataToRecord = models.TemperatureData{TemperatureCelsius: parsedValue} + case models.SensorTypeHumidity: + dataToRecord = models.HumidityData{HumidityPercent: parsedValue} + case models.SensorTypeWeight: + dataToRecord = models.WeightData{WeightKilograms: parsedValue} + default: + t.logger.Warnf("未知的传感器类型 '%s',将使用通用格式记录", valueDescriptor.Type) + dataToRecord = map[string]float64{"value": parsedValue} + } + + t.recordSensorData(regionalController.ID, dev.ID, time.Now(), valueDescriptor.Type, dataToRecord) + t.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%f, 解析值=%.2f", dev.ID, valueDescriptor.Type, rawSensorValue, parsedValue) + } + + // 6. 更新请求状态为“已完成” + if err := t.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, time.Now()); err != nil { + t.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err) + } else { + t.logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID) + } +} + +// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。 +func (t *LoRaMeshUartPassthroughTransport) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, sensorType models.SensorType, data interface{}) { + jsonData, err := json.Marshal(data) + if err != nil { + t.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err) + return + } + + sensorData := &models.SensorData{ + Time: eventTime, + DeviceID: sensorDeviceID, + RegionalControllerID: regionalControllerID, + SensorType: sensorType, + Data: datatypes.JSON(jsonData), + } + + if err := t.sensorDataRepo.Create(sensorData); err != nil { + t.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err) + } +} + +// parseCompleteFrame 实现粘包和半包处理 +func (t *LoRaMeshUartPassthroughTransport) parseCompleteFrame(buffer *bytes.Buffer) []byte { + for { + headerIndex := bytes.IndexByte(buffer.Bytes(), 0xED) + if headerIndex == -1 { + return nil + } + buffer.Next(headerIndex) + + if buffer.Len() < 2 { + return nil + } + + lengthField := buffer.Bytes()[1] + frameLength := 1 + 1 + 2 + int(lengthField) + 2 + + if buffer.Len() < frameLength { + return nil + } + + return buffer.Next(frameLength) + } +} + +// splitPayload 将数据块按最大长度进行切分 +func splitPayload(payload []byte, maxChunkSize int) [][]byte { + if len(payload) == 0 { + return [][]byte{{}} + } + + var chunks [][]byte + for i := 0; i < len(payload); i += maxChunkSize { + end := i + maxChunkSize + if end > len(payload) { + end = len(payload) + } + chunks = append(chunks, payload[i:end]) + } + return chunks } diff --git a/internal/domain/device/proto/device.pb.go b/internal/infra/transport/proto/device.pb.go similarity index 100% rename from internal/domain/device/proto/device.pb.go rename to internal/infra/transport/proto/device.pb.go diff --git a/internal/domain/device/proto/device.proto b/internal/infra/transport/proto/device.proto similarity index 100% rename from internal/domain/device/proto/device.proto rename to internal/infra/transport/proto/device.proto