Compare commits
	
		
			2 Commits
		
	
	
		
			8a5f6dc34e
			...
			503feb1b21
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 503feb1b21 | |||
| 50a843c9ef | 
							
								
								
									
										2
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								Makefile
									
									
									
									
									
								
							@@ -45,7 +45,7 @@ swag:
 | 
				
			|||||||
# 生成protobuf文件
 | 
					# 生成protobuf文件
 | 
				
			||||||
.PHONY: proto
 | 
					.PHONY: proto
 | 
				
			||||||
proto:
 | 
					proto:
 | 
				
			||||||
	protoc --go_out=internal/domain/device/proto --go_opt=paths=source_relative --go-grpc_out=internal/domain/device/proto --go-grpc_opt=paths=source_relative -Iinternal/domain/device/proto internal/domain/device/proto/device.proto
 | 
						protoc --go_out=internal/infra/transport/proto --go_opt=paths=source_relative --go-grpc_out=internal/infra/transport/proto --go-grpc_opt=paths=source_relative -Iinternal/infra/transport/proto internal/infra/transport/proto/device.proto
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# 运行代码检查
 | 
					# 运行代码检查
 | 
				
			||||||
.PHONY: lint
 | 
					.PHONY: lint
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										15
									
								
								config.yml
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								config.yml
									
									
									
									
									
								
							@@ -69,18 +69,21 @@ lora_mesh:
 | 
				
			|||||||
  uart_port: "/dev/ttyS1"
 | 
					  uart_port: "/dev/ttyS1"
 | 
				
			||||||
  # LoRa模块的通信波特率
 | 
					  # LoRa模块的通信波特率
 | 
				
			||||||
  baud_rate: 9600
 | 
					  baud_rate: 9600
 | 
				
			||||||
  # 等待LoRa模块AT指令响应的超时时间
 | 
					  # 等待LoRa模块AT指令响应的超时时间(ms)
 | 
				
			||||||
  timeout: 5
 | 
					  timeout: 50
 | 
				
			||||||
  # LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包)
 | 
					  # LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包)
 | 
				
			||||||
  # e.g.
 | 
					  # e.g.
 | 
				
			||||||
  #   EC: 接收端只会接收到消息, 不会接收到请求头
 | 
					  #   EC: 接收端只会接收到消息, 不会接收到请求头
 | 
				
			||||||
  #       e.g. 发送: EC 05 02 01 48 65 6c 6c 6f
 | 
					  #       e.g. 发送: EC 05 02 01 48 65 6c 6c 6f
 | 
				
			||||||
  #            (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体))
 | 
					  #            (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体))
 | 
				
			||||||
  #            接收: 48 65 6c 6c 6f ("Hello")
 | 
					  #            接收: 48 65 6c 6c 6f ("Hello")
 | 
				
			||||||
  #   ED: 接收端会接收完整数据包,包含请求头
 | 
					  #   ED: 接收端会接收完整数据包,包含自定义协议头和地址信息。
 | 
				
			||||||
  #       e.g. 发送: ED 05 02 01 48 65 6c 6c 6f
 | 
					  #       e.g. 发送: ED 05 12 34 01 00 01 02 03
 | 
				
			||||||
  #            (ED + 05(消息长度) + 0201(地址) + "Hello"(消息本体))
 | 
					  #            (ED(帧头) + 05(Length, 即 1(总包数)+1(当前包序号)+3(数据块)) + 12 34(目标地址) + 01(总包数) + 00(当前包序号) + 01 02 03(数据块))
 | 
				
			||||||
  #            接收: ED 05 02 01 48 65 6c 6c 6f
 | 
					  #            接收: ED 05 12 34 01 00 01 02 03 56 78(56 78 是发送方地址,会自动拼接到消息末尾)
 | 
				
			||||||
  lora_mesh_mode: "ED"
 | 
					  lora_mesh_mode: "ED"
 | 
				
			||||||
  # 单包最大用户数据数据长度, 模块限制240, 去掉两位自定义包头, 还剩238
 | 
					  # 单包最大用户数据数据长度, 模块限制240, 去掉两位自定义包头, 还剩238
 | 
				
			||||||
  max_chunk_size: 238
 | 
					  max_chunk_size: 238
 | 
				
			||||||
 | 
					  #分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间
 | 
				
			||||||
 | 
					  # 还没收到完整的包,则认为接收失败。
 | 
				
			||||||
 | 
					  reassembly_timeout: 30
 | 
				
			||||||
@@ -8,10 +8,10 @@ import (
 | 
				
			|||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device/proto"
 | 
					 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
 | 
				
			||||||
	gproto "google.golang.org/protobuf/proto"
 | 
						gproto "google.golang.org/protobuf/proto"
 | 
				
			||||||
	"gorm.io/datatypes"
 | 
						"gorm.io/datatypes"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -115,7 +115,7 @@ func NewApplication(configPath string) (*Application, error) {
 | 
				
			|||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
 | 
							logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
 | 
				
			||||||
		listenHandler = webhook.NewPlaceholderListener(logger)
 | 
							listenHandler = webhook.NewPlaceholderListener(logger)
 | 
				
			||||||
		tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger)
 | 
							tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, areaControllerRepo, pendingCollectionRepo, deviceRepo, sensorDataRepo)
 | 
				
			||||||
		loraListener = tp
 | 
							loraListener = tp
 | 
				
			||||||
		comm = tp
 | 
							comm = tp
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -5,11 +5,11 @@ import (
 | 
				
			|||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device/proto"
 | 
					 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/command_generater"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/command_generater"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/google/uuid"
 | 
						"github.com/google/uuid"
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -155,6 +155,7 @@ type LoraMeshConfig struct {
 | 
				
			|||||||
	Timeout           int    `yaml:"timeout"`
 | 
						Timeout           int    `yaml:"timeout"`
 | 
				
			||||||
	LoraMeshMode      string `yaml:"lora_mesh_mode"`
 | 
						LoraMeshMode      string `yaml:"lora_mesh_mode"`
 | 
				
			||||||
	MaxChunkSize      int    `yaml:"max_chunk_size"`
 | 
						MaxChunkSize      int    `yaml:"max_chunk_size"`
 | 
				
			||||||
 | 
						ReassemblyTimeout int    `yaml:"reassembly_timeout"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewConfig 创建并返回一个新的配置实例
 | 
					// NewConfig 创建并返回一个新的配置实例
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,31 +1,103 @@
 | 
				
			|||||||
package lora
 | 
					package lora
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"bytes"
 | 
				
			||||||
 | 
						"encoding/binary"
 | 
				
			||||||
 | 
						"encoding/json"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
						"io"
 | 
				
			||||||
 | 
						"math"
 | 
				
			||||||
 | 
						"strconv"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
 | 
				
			||||||
 | 
						"github.com/google/uuid"
 | 
				
			||||||
	"github.com/tarm/serial"
 | 
						"github.com/tarm/serial"
 | 
				
			||||||
 | 
						gproto "google.golang.org/protobuf/proto"
 | 
				
			||||||
 | 
						"gorm.io/datatypes"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// LoRaMeshUartPassthroughTransport 实现 transport.Communicator 接口, 用于 LoRa 网状网络 UART 透传
 | 
					// transportState 定义了传输层的内部状态
 | 
				
			||||||
 | 
					type transportState int
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						stateIdle      transportState = iota // 空闲状态
 | 
				
			||||||
 | 
						stateReceiving                       // 接收状态:正在接收一个(可能分片的)消息
 | 
				
			||||||
 | 
						stateSending                         // 发送状态:正在发送一个(可能分片的)消息
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// message 是一个内部结构,用于封装一个完整的、已重组的消息及其元数据
 | 
				
			||||||
 | 
					type message struct {
 | 
				
			||||||
 | 
						SourceAddr string // 源地址
 | 
				
			||||||
 | 
						DestAddr   string // 目标地址
 | 
				
			||||||
 | 
						Payload    []byte // 有效载荷
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// LoRaMeshUartPassthroughTransport 实现了 transport.Communicator 和 transport.Listener 接口
 | 
				
			||||||
type LoRaMeshUartPassthroughTransport struct {
 | 
					type LoRaMeshUartPassthroughTransport struct {
 | 
				
			||||||
	config config.LoraMeshConfig
 | 
						config config.LoraMeshConfig
 | 
				
			||||||
	logger *logs.Logger
 | 
						logger *logs.Logger
 | 
				
			||||||
	mu     sync.Mutex // 保护对 LoRa 模块的并发访问
 | 
					 | 
				
			||||||
	port   *serial.Port
 | 
						port   *serial.Port
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						mu    sync.Mutex // 用于保护对外的公共方法(如Send)的并发调用
 | 
				
			||||||
 | 
						state transportState
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						stopChan chan struct{}     // 用于优雅地停止worker协程
 | 
				
			||||||
 | 
						wg       sync.WaitGroup    // 用于等待worker协程完全退出
 | 
				
			||||||
 | 
						sendChan chan *sendRequest // 发送任务的请求通道
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// --- 接收与重组相关 ---
 | 
				
			||||||
 | 
						reassemblyBuffers   map[uint16]*reassemblyBuffer // 键为源地址SourceAddr,值为对应的重组缓冲区
 | 
				
			||||||
 | 
						currentRecvSource   uint16                       // 当前正在接收的源地址
 | 
				
			||||||
 | 
						reassemblyTimeout   *time.Timer                  // 分片重组的超时定时器
 | 
				
			||||||
 | 
						reassemblyTimeoutCh chan uint16                  // 当超时触发时,用于传递源地址
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// --- 依赖注入的仓库 ---
 | 
				
			||||||
 | 
						areaControllerRepo    repository.AreaControllerRepository
 | 
				
			||||||
 | 
						pendingCollectionRepo repository.PendingCollectionRepository
 | 
				
			||||||
 | 
						deviceRepo            repository.DeviceRepository
 | 
				
			||||||
 | 
						sensorDataRepo        repository.SensorDataRepository
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewLoRaMeshUartPassthroughTransport 创建一个新的 LoRaMeshUartPassthroughTransport
 | 
					// sendRequest 封装了一次发送请求
 | 
				
			||||||
func NewLoRaMeshUartPassthroughTransport(config config.LoraMeshConfig, logger *logs.Logger) (*LoRaMeshUartPassthroughTransport, error) {
 | 
					type sendRequest struct {
 | 
				
			||||||
 | 
						address string
 | 
				
			||||||
 | 
						payload []byte
 | 
				
			||||||
 | 
						result  chan *sendResultTuple
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// sendResultTuple 用于在通道中安全地传递Send方法的返回值
 | 
				
			||||||
 | 
					type sendResultTuple struct {
 | 
				
			||||||
 | 
						result *transport.SendResult
 | 
				
			||||||
 | 
						err    error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// reassemblyBuffer 用于缓存和重组来自同一源的分片
 | 
				
			||||||
 | 
					type reassemblyBuffer struct {
 | 
				
			||||||
 | 
						chunks         map[uint8][]byte // 键为当前包序号CurrentChunk
 | 
				
			||||||
 | 
						totalChunks    uint8
 | 
				
			||||||
 | 
						receivedChunks int
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewLoRaMeshUartPassthroughTransport 创建一个新的 LoRaMeshUartPassthroughTransport 实例
 | 
				
			||||||
 | 
					func NewLoRaMeshUartPassthroughTransport(
 | 
				
			||||||
 | 
						config config.LoraMeshConfig,
 | 
				
			||||||
 | 
						logger *logs.Logger,
 | 
				
			||||||
 | 
						areaControllerRepo repository.AreaControllerRepository,
 | 
				
			||||||
 | 
						pendingCollectionRepo repository.PendingCollectionRepository,
 | 
				
			||||||
 | 
						deviceRepo repository.DeviceRepository,
 | 
				
			||||||
 | 
						sensorDataRepo repository.SensorDataRepository,
 | 
				
			||||||
 | 
					) (*LoRaMeshUartPassthroughTransport, error) {
 | 
				
			||||||
	c := &serial.Config{
 | 
						c := &serial.Config{
 | 
				
			||||||
		Name:        config.UARTPort,
 | 
							Name:        config.UARTPort,
 | 
				
			||||||
		Baud:        config.BaudRate,
 | 
							Baud:        config.BaudRate,
 | 
				
			||||||
		ReadTimeout: time.Second * time.Duration(config.Timeout),
 | 
							ReadTimeout: time.Millisecond * time.Duration(config.Timeout),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	port, err := serial.OpenPort(c)
 | 
						port, err := serial.OpenPort(c)
 | 
				
			||||||
@@ -33,26 +105,441 @@ func NewLoRaMeshUartPassthroughTransport(config config.LoraMeshConfig, logger *l
 | 
				
			|||||||
		return nil, fmt.Errorf("无法打开串口 %s: %w", config.UARTPort, err)
 | 
							return nil, fmt.Errorf("无法打开串口 %s: %w", config.UARTPort, err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return &LoRaMeshUartPassthroughTransport{
 | 
						t := &LoRaMeshUartPassthroughTransport{
 | 
				
			||||||
		config:              config,
 | 
							config:              config,
 | 
				
			||||||
		logger:              logger,
 | 
							logger:              logger,
 | 
				
			||||||
		mu:     sync.Mutex{},
 | 
					 | 
				
			||||||
		port:                port,
 | 
							port:                port,
 | 
				
			||||||
	}, nil
 | 
							state:               stateIdle,
 | 
				
			||||||
}
 | 
							stopChan:            make(chan struct{}),
 | 
				
			||||||
 | 
							sendChan:            make(chan *sendRequest),
 | 
				
			||||||
// Send 将数据发送到指定的地址
 | 
							reassemblyBuffers:   make(map[uint16]*reassemblyBuffer),
 | 
				
			||||||
func (t *LoRaMeshUartPassthroughTransport) Send(address string, payload []byte) (*transport.SendResult, error) {
 | 
							reassemblyTimeoutCh: make(chan uint16, 1),
 | 
				
			||||||
	// TODO: 实现发送逻辑
 | 
					
 | 
				
			||||||
	return nil, nil
 | 
							// 注入依赖
 | 
				
			||||||
 | 
							areaControllerRepo:    areaControllerRepo,
 | 
				
			||||||
 | 
							pendingCollectionRepo: pendingCollectionRepo,
 | 
				
			||||||
 | 
							deviceRepo:            deviceRepo,
 | 
				
			||||||
 | 
							sensorDataRepo:        sensorDataRepo,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return t, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Listen 启动后台监听协程(非阻塞)
 | 
				
			||||||
func (t *LoRaMeshUartPassthroughTransport) Listen() error {
 | 
					func (t *LoRaMeshUartPassthroughTransport) Listen() error {
 | 
				
			||||||
	//TODO implement me
 | 
						t.wg.Add(1)
 | 
				
			||||||
	panic("implement me")
 | 
						go t.workerLoop()
 | 
				
			||||||
 | 
						t.logger.Info("LoRa传输层工作协程已启动")
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *LoRaMeshUartPassthroughTransport) Stop() error {
 | 
					// Send 将发送任务提交给worker协程
 | 
				
			||||||
	//TODO implement me
 | 
					func (t *LoRaMeshUartPassthroughTransport) Send(address string, payload []byte) (*transport.SendResult, error) {
 | 
				
			||||||
	panic("implement me")
 | 
						t.mu.Lock()
 | 
				
			||||||
 | 
						defer t.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						resultChan := make(chan *sendResultTuple, 1)
 | 
				
			||||||
 | 
						req := &sendRequest{
 | 
				
			||||||
 | 
							address: address,
 | 
				
			||||||
 | 
							payload: payload,
 | 
				
			||||||
 | 
							result:  resultChan,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case t.sendChan <- req:
 | 
				
			||||||
 | 
							// 等待worker协程处理完毕
 | 
				
			||||||
 | 
							res := <-resultChan
 | 
				
			||||||
 | 
							return res.result, res.err
 | 
				
			||||||
 | 
						case <-t.stopChan:
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("传输层正在停止")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Stop 停止传输层
 | 
				
			||||||
 | 
					func (t *LoRaMeshUartPassthroughTransport) Stop() error {
 | 
				
			||||||
 | 
						close(t.stopChan)
 | 
				
			||||||
 | 
						t.wg.Wait()
 | 
				
			||||||
 | 
						return t.port.Close()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// workerLoop 是核心的状态机和调度器
 | 
				
			||||||
 | 
					func (t *LoRaMeshUartPassthroughTransport) workerLoop() {
 | 
				
			||||||
 | 
						defer t.wg.Done()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						readBuffer := make([]byte, 1024)
 | 
				
			||||||
 | 
						parserBuffer := new(bytes.Buffer)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							// 1. 检查是否需要停止 (优先检查,以便快速退出)
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case <-t.stopChan:
 | 
				
			||||||
 | 
								if t.reassemblyTimeout != nil {
 | 
				
			||||||
 | 
									t.reassemblyTimeout.Stop()
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								t.logger.Info("LoRa传输层工作协程已停止")
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// 2. 尝试从串口读取数据
 | 
				
			||||||
 | 
							n, err := t.port.Read(readBuffer)
 | 
				
			||||||
 | 
							if n > 0 {
 | 
				
			||||||
 | 
								parserBuffer.Write(readBuffer[:n])
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if err != nil && err != io.EOF {
 | 
				
			||||||
 | 
								// 忽略预期的超时错误(io.EOF),只记录真正的IO错误
 | 
				
			||||||
 | 
								t.logger.Errorf("从串口读取数据时发生错误: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// 3. 循环解析缓冲区中的完整物理帧
 | 
				
			||||||
 | 
							for {
 | 
				
			||||||
 | 
								frame := t.parseCompleteFrame(parserBuffer)
 | 
				
			||||||
 | 
								if frame == nil {
 | 
				
			||||||
 | 
									break // 缓冲区中没有更多完整帧了
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								t.handleFrame(frame)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// 4. 根据当前状态执行主要逻辑
 | 
				
			||||||
 | 
							switch t.state {
 | 
				
			||||||
 | 
							case stateIdle:
 | 
				
			||||||
 | 
								t.runIdleState()
 | 
				
			||||||
 | 
							case stateReceiving:
 | 
				
			||||||
 | 
								t.runReceivingState()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// runIdleState 处理空闲状态下的逻辑,主要是检查并启动发送任务
 | 
				
			||||||
 | 
					func (t *LoRaMeshUartPassthroughTransport) runIdleState() {
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case req := <-t.sendChan:
 | 
				
			||||||
 | 
							t.state = stateSending
 | 
				
			||||||
 | 
							// 此处为阻塞式发送
 | 
				
			||||||
 | 
							result, err := t.executeSend(req)
 | 
				
			||||||
 | 
							req.result <- &sendResultTuple{result: result, err: err}
 | 
				
			||||||
 | 
							t.state = stateIdle
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							// 没有发送任务,保持空闲
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// runReceivingState 处理接收状态下的逻辑,主要是检查超时
 | 
				
			||||||
 | 
					func (t *LoRaMeshUartPassthroughTransport) runReceivingState() {
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case sourceAddr := <-t.reassemblyTimeoutCh:
 | 
				
			||||||
 | 
							t.logger.Warnf("接收来自 0x%04X 的消息超时", sourceAddr)
 | 
				
			||||||
 | 
							delete(t.reassemblyBuffers, sourceAddr)
 | 
				
			||||||
 | 
							t.state = stateIdle
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							// 等待更多分片或超时
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// executeSend 执行完整的发送流程(分片、构建、写入)
 | 
				
			||||||
 | 
					func (t *LoRaMeshUartPassthroughTransport) executeSend(req *sendRequest) (*transport.SendResult, error) {
 | 
				
			||||||
 | 
						chunks := splitPayload(req.payload, t.config.MaxChunkSize)
 | 
				
			||||||
 | 
						totalChunks := uint8(len(chunks))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						destAddr, err := strconv.ParseUint(req.address, 16, 16)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("无效的目标地址: %s", req.address)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i, chunk := range chunks {
 | 
				
			||||||
 | 
							currentChunk := uint8(i)
 | 
				
			||||||
 | 
							frame := new(bytes.Buffer)
 | 
				
			||||||
 | 
							frame.WriteByte(0xED)                                   // 帧头
 | 
				
			||||||
 | 
							frame.WriteByte(uint8(len(chunk) + 2))                  // 数据长度 = 数据块 + 2 (总包数+当前包序号)
 | 
				
			||||||
 | 
							binary.Write(frame, binary.BigEndian, uint16(destAddr)) // 目标地址
 | 
				
			||||||
 | 
							frame.WriteByte(totalChunks)                            // 总包数
 | 
				
			||||||
 | 
							frame.WriteByte(currentChunk)                           // 当前包序号
 | 
				
			||||||
 | 
							frame.Write(chunk)                                      // 数据块
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							_, err := t.port.Write(frame.Bytes())
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, fmt.Errorf("写入串口失败: %w", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						msgID := uuid.New().String()
 | 
				
			||||||
 | 
						return &transport.SendResult{MessageID: msgID}, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// handleFrame 处理一个从串口解析出的完整物理帧
 | 
				
			||||||
 | 
					func (t *LoRaMeshUartPassthroughTransport) handleFrame(frame []byte) {
 | 
				
			||||||
 | 
						if len(frame) < 8 {
 | 
				
			||||||
 | 
							t.logger.Warnf("收到了一个无效长度的帧: %d", len(frame))
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						destAddr := binary.BigEndian.Uint16(frame[2:4])
 | 
				
			||||||
 | 
						totalChunks := frame[4]
 | 
				
			||||||
 | 
						currentChunk := frame[5]
 | 
				
			||||||
 | 
						sourceAddr := binary.BigEndian.Uint16(frame[len(frame)-2:])
 | 
				
			||||||
 | 
						chunkData := frame[6 : len(frame)-2]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 如果是单包消息
 | 
				
			||||||
 | 
						if totalChunks == 1 {
 | 
				
			||||||
 | 
							msg := &message{
 | 
				
			||||||
 | 
								SourceAddr: fmt.Sprintf("%04X", sourceAddr),
 | 
				
			||||||
 | 
								DestAddr:   fmt.Sprintf("%04X", destAddr),
 | 
				
			||||||
 | 
								Payload:    chunkData,
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							go t.handleUpstreamMessage(msg)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// --- 处理分片消息 ---
 | 
				
			||||||
 | 
						switch t.state {
 | 
				
			||||||
 | 
						case stateIdle:
 | 
				
			||||||
 | 
							if currentChunk == 0 {
 | 
				
			||||||
 | 
								t.state = stateReceiving
 | 
				
			||||||
 | 
								t.currentRecvSource = sourceAddr
 | 
				
			||||||
 | 
								t.reassemblyBuffers[sourceAddr] = &reassemblyBuffer{
 | 
				
			||||||
 | 
									chunks:         make(map[uint8][]byte),
 | 
				
			||||||
 | 
									totalChunks:    totalChunks,
 | 
				
			||||||
 | 
									receivedChunks: 0,
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								t.reassemblyBuffers[sourceAddr].chunks[currentChunk] = chunkData
 | 
				
			||||||
 | 
								t.reassemblyBuffers[sourceAddr].receivedChunks++
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if t.reassemblyTimeout != nil {
 | 
				
			||||||
 | 
									t.reassemblyTimeout.Stop()
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								t.reassemblyTimeout = time.AfterFunc(time.Duration(t.config.ReassemblyTimeout)*time.Second, func() {
 | 
				
			||||||
 | 
									t.reassemblyTimeoutCh <- sourceAddr
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								t.logger.Warnf("在空闲状态下收到了一个来自 0x%04X 的非首包分片,已忽略。", sourceAddr)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						case stateReceiving:
 | 
				
			||||||
 | 
							if sourceAddr != t.currentRecvSource {
 | 
				
			||||||
 | 
								t.logger.Warnf("正在接收来自 0x%04X 的数据时,收到了另一个源 0x%04X 的分片,已忽略。", t.currentRecvSource, sourceAddr)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							buffer, ok := t.reassemblyBuffers[sourceAddr]
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								t.logger.Errorf("内部错误: 处于接收状态,但没有为 0x%04X 找到缓冲区", sourceAddr)
 | 
				
			||||||
 | 
								t.state = stateIdle // 重置状态
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// 存入分片并重置超时
 | 
				
			||||||
 | 
							buffer.chunks[currentChunk] = chunkData
 | 
				
			||||||
 | 
							buffer.receivedChunks++
 | 
				
			||||||
 | 
							t.reassemblyTimeout.Reset(time.Duration(t.config.ReassemblyTimeout) * time.Second)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// 检查是否已全部收到
 | 
				
			||||||
 | 
							if buffer.receivedChunks == int(buffer.totalChunks) {
 | 
				
			||||||
 | 
								t.reassemblyTimeout.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// 重组消息
 | 
				
			||||||
 | 
								fullPayload := new(bytes.Buffer)
 | 
				
			||||||
 | 
								for i := 0; i < int(buffer.totalChunks); i++ {
 | 
				
			||||||
 | 
									fullPayload.Write(buffer.chunks[uint8(i)])
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								msg := &message{
 | 
				
			||||||
 | 
									SourceAddr: fmt.Sprintf("%04X", sourceAddr),
 | 
				
			||||||
 | 
									DestAddr:   fmt.Sprintf("%04X", destAddr),
 | 
				
			||||||
 | 
									Payload:    fullPayload.Bytes(),
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								go t.handleUpstreamMessage(msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// 清理并返回空闲状态
 | 
				
			||||||
 | 
								delete(t.reassemblyBuffers, sourceAddr)
 | 
				
			||||||
 | 
								t.state = stateIdle
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// handleUpstreamMessage 在独立的协程中处理单个上行的、完整的消息。
 | 
				
			||||||
 | 
					func (t *LoRaMeshUartPassthroughTransport) handleUpstreamMessage(msg *message) {
 | 
				
			||||||
 | 
						t.logger.Infof("开始处理来自 %s 的上行消息", msg.SourceAddr)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 1. 解析外层 "信封"
 | 
				
			||||||
 | 
						var instruction proto.Instruction
 | 
				
			||||||
 | 
						if err := gproto.Unmarshal(msg.Payload, &instruction); err != nil {
 | 
				
			||||||
 | 
							t.logger.Errorf("解析上行 Instruction Protobuf 失败: %v, 源地址: %s, 原始数据: %x", err, msg.SourceAddr, msg.Payload)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 2. 使用 type switch 从 oneof payload 中提取 CollectResult
 | 
				
			||||||
 | 
						var collectResp *proto.CollectResult
 | 
				
			||||||
 | 
						switch p := instruction.GetPayload().(type) {
 | 
				
			||||||
 | 
						case *proto.Instruction_CollectResult:
 | 
				
			||||||
 | 
							collectResp = p.CollectResult
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							// 如果上行的数据不是采集结果,记录日志并忽略
 | 
				
			||||||
 | 
							t.logger.Infof("收到一个非采集响应的上行指令 (类型: %T),无需处理。源地址: %s", p, msg.SourceAddr)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if collectResp == nil {
 | 
				
			||||||
 | 
							t.logger.Errorf("从 Instruction 中提取的 CollectResult 为 nil。源地址: %s", msg.SourceAddr)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						correlationID := collectResp.CorrelationId
 | 
				
			||||||
 | 
						t.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 3. 查找区域主控 (注意:LoRa Mesh 的 SourceAddr 对应于区域主控的 NetworkID)
 | 
				
			||||||
 | 
						regionalController, err := t.areaControllerRepo.FindByNetworkID(msg.SourceAddr)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.logger.Errorf("处理上行消息失败:无法通过源地址 '%s' 找到区域主控设备: %v", msg.SourceAddr, err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := regionalController.SelfCheck(); err != nil {
 | 
				
			||||||
 | 
							t.logger.Errorf("处理上行消息失败:区域主控 %v(ID: %d) 未通过自检: %v", regionalController.Name, regionalController.ID, err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 4. 根据 CorrelationID 查找待处理请求
 | 
				
			||||||
 | 
						pendingReq, err := t.pendingCollectionRepo.FindByCorrelationID(correlationID)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 检查状态,防止重复处理
 | 
				
			||||||
 | 
						if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut {
 | 
				
			||||||
 | 
							t.logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 5. 匹配数据并存入数据库
 | 
				
			||||||
 | 
						deviceIDs := pendingReq.CommandMetadata
 | 
				
			||||||
 | 
						values := collectResp.Values
 | 
				
			||||||
 | 
						if len(deviceIDs) != len(values) {
 | 
				
			||||||
 | 
							t.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID)
 | 
				
			||||||
 | 
							err = t.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, time.Now())
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i, deviceID := range deviceIDs {
 | 
				
			||||||
 | 
							rawSensorValue := values[i]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if math.IsNaN(float64(rawSensorValue)) {
 | 
				
			||||||
 | 
								t.logger.Warnf("设备 (ID: %d) 上报了一个无效的 NaN 值,已跳过当前值的记录。", deviceID)
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							dev, err := t.deviceRepo.FindByID(deviceID)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err)
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if err := dev.SelfCheck(); err != nil {
 | 
				
			||||||
 | 
								t.logger.Warnf("跳过设备 %d,因其未通过自检: %v", dev.ID, err)
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if err := dev.DeviceTemplate.SelfCheck(); err != nil {
 | 
				
			||||||
 | 
								t.logger.Warnf("跳过设备 %d,因其设备模板未通过自检: %v", dev.ID, err)
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							var valueDescriptors []*models.ValueDescriptor
 | 
				
			||||||
 | 
							if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil {
 | 
				
			||||||
 | 
								t.logger.Warnf("跳过设备 %d,因其设备模板的 Values 属性解析失败: %v", dev.ID, err)
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if len(valueDescriptors) == 0 {
 | 
				
			||||||
 | 
								t.logger.Warnf("跳过设备 %d,因其设备模板缺少 ValueDescriptor 定义", dev.ID)
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							valueDescriptor := valueDescriptors[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							parsedValue := float64(rawSensorValue)*valueDescriptor.Multiplier + valueDescriptor.Offset
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							var dataToRecord interface{}
 | 
				
			||||||
 | 
							switch valueDescriptor.Type {
 | 
				
			||||||
 | 
							case models.SensorTypeTemperature:
 | 
				
			||||||
 | 
								dataToRecord = models.TemperatureData{TemperatureCelsius: parsedValue}
 | 
				
			||||||
 | 
							case models.SensorTypeHumidity:
 | 
				
			||||||
 | 
								dataToRecord = models.HumidityData{HumidityPercent: parsedValue}
 | 
				
			||||||
 | 
							case models.SensorTypeWeight:
 | 
				
			||||||
 | 
								dataToRecord = models.WeightData{WeightKilograms: parsedValue}
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								t.logger.Warnf("未知的传感器类型 '%s',将使用通用格式记录", valueDescriptor.Type)
 | 
				
			||||||
 | 
								dataToRecord = map[string]float64{"value": parsedValue}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							t.recordSensorData(regionalController.ID, dev.ID, time.Now(), valueDescriptor.Type, dataToRecord)
 | 
				
			||||||
 | 
							t.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%f, 解析值=%.2f", dev.ID, valueDescriptor.Type, rawSensorValue, parsedValue)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 6. 更新请求状态为“已完成”
 | 
				
			||||||
 | 
						if err := t.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, time.Now()); err != nil {
 | 
				
			||||||
 | 
							t.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							t.logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。
 | 
				
			||||||
 | 
					func (t *LoRaMeshUartPassthroughTransport) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, sensorType models.SensorType, data interface{}) {
 | 
				
			||||||
 | 
						jsonData, err := json.Marshal(data)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						sensorData := &models.SensorData{
 | 
				
			||||||
 | 
							Time:                 eventTime,
 | 
				
			||||||
 | 
							DeviceID:             sensorDeviceID,
 | 
				
			||||||
 | 
							RegionalControllerID: regionalControllerID,
 | 
				
			||||||
 | 
							SensorType:           sensorType,
 | 
				
			||||||
 | 
							Data:                 datatypes.JSON(jsonData),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err := t.sensorDataRepo.Create(sensorData); err != nil {
 | 
				
			||||||
 | 
							t.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// parseCompleteFrame 实现粘包和半包处理
 | 
				
			||||||
 | 
					func (t *LoRaMeshUartPassthroughTransport) parseCompleteFrame(buffer *bytes.Buffer) []byte {
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							headerIndex := bytes.IndexByte(buffer.Bytes(), 0xED)
 | 
				
			||||||
 | 
							if headerIndex == -1 {
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							buffer.Next(headerIndex)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if buffer.Len() < 2 {
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							lengthField := buffer.Bytes()[1]
 | 
				
			||||||
 | 
							frameLength := 1 + 1 + 2 + int(lengthField) + 2
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if buffer.Len() < frameLength {
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return buffer.Next(frameLength)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// splitPayload 将数据块按最大长度进行切分
 | 
				
			||||||
 | 
					func splitPayload(payload []byte, maxChunkSize int) [][]byte {
 | 
				
			||||||
 | 
						if len(payload) == 0 {
 | 
				
			||||||
 | 
							return [][]byte{{}}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var chunks [][]byte
 | 
				
			||||||
 | 
						for i := 0; i < len(payload); i += maxChunkSize {
 | 
				
			||||||
 | 
							end := i + maxChunkSize
 | 
				
			||||||
 | 
							if end > len(payload) {
 | 
				
			||||||
 | 
								end = len(payload)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							chunks = append(chunks, payload[i:end])
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return chunks
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user