实现 LoRaMeshUartPassthroughTransport

This commit is contained in:
2025-10-10 15:58:40 +08:00
parent 8a5f6dc34e
commit 50a843c9ef
8 changed files with 529 additions and 38 deletions

View File

@@ -1,31 +1,103 @@
package lora
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"math"
"strconv"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
"github.com/google/uuid"
"github.com/tarm/serial"
gproto "google.golang.org/protobuf/proto"
"gorm.io/datatypes"
)
// LoRaMeshUartPassthroughTransport 实现 transport.Communicator 接口, 用于 LoRa 网状网络 UART 透传
// transportState 定义了传输层的内部状态
type transportState int
const (
stateIdle transportState = iota // 空闲状态
stateReceiving // 接收状态:正在接收一个(可能分片的)消息
stateSending // 发送状态:正在发送一个(可能分片的)消息
)
// message 是一个内部结构,用于封装一个完整的、已重组的消息及其元数据
type message struct {
SourceAddr string // 源地址
DestAddr string // 目标地址
Payload []byte // 有效载荷
}
// LoRaMeshUartPassthroughTransport 实现了 transport.Communicator 和 transport.Listener 接口
type LoRaMeshUartPassthroughTransport struct {
config config.LoraMeshConfig
logger *logs.Logger
mu sync.Mutex // 保护对 LoRa 模块的并发访问
port *serial.Port
mu sync.Mutex // 用于保护对外的公共方法如Send的并发调用
state transportState
stopChan chan struct{} // 用于优雅地停止worker协程
wg sync.WaitGroup // 用于等待worker协程完全退出
sendChan chan *sendRequest // 发送任务的请求通道
// --- 接收与重组相关 ---
reassemblyBuffers map[uint16]*reassemblyBuffer // 键为源地址SourceAddr值为对应的重组缓冲区
currentRecvSource uint16 // 当前正在接收的源地址
reassemblyTimeout *time.Timer // 分片重组的超时定时器
reassemblyTimeoutCh chan uint16 // 当超时触发时,用于传递源地址
// --- 依赖注入的仓库 ---
areaControllerRepo repository.AreaControllerRepository
pendingCollectionRepo repository.PendingCollectionRepository
deviceRepo repository.DeviceRepository
sensorDataRepo repository.SensorDataRepository
}
// NewLoRaMeshUartPassthroughTransport 创建一个新的 LoRaMeshUartPassthroughTransport
func NewLoRaMeshUartPassthroughTransport(config config.LoraMeshConfig, logger *logs.Logger) (*LoRaMeshUartPassthroughTransport, error) {
// sendRequest 封装了一次发送请求
type sendRequest struct {
address string
payload []byte
result chan *sendResultTuple
}
// sendResultTuple 用于在通道中安全地传递Send方法的返回值
type sendResultTuple struct {
result *transport.SendResult
err error
}
// reassemblyBuffer 用于缓存和重组来自同一源的分片
type reassemblyBuffer struct {
chunks map[uint8][]byte // 键为当前包序号CurrentChunk
totalChunks uint8
receivedChunks int
}
// NewLoRaMeshUartPassthroughTransport 创建一个新的 LoRaMeshUartPassthroughTransport 实例
func NewLoRaMeshUartPassthroughTransport(
config config.LoraMeshConfig,
logger *logs.Logger,
areaControllerRepo repository.AreaControllerRepository,
pendingCollectionRepo repository.PendingCollectionRepository,
deviceRepo repository.DeviceRepository,
sensorDataRepo repository.SensorDataRepository,
) (*LoRaMeshUartPassthroughTransport, error) {
c := &serial.Config{
Name: config.UARTPort,
Baud: config.BaudRate,
ReadTimeout: time.Second * time.Duration(config.Timeout),
ReadTimeout: time.Millisecond * time.Duration(config.Timeout),
}
port, err := serial.OpenPort(c)
@@ -33,26 +105,441 @@ func NewLoRaMeshUartPassthroughTransport(config config.LoraMeshConfig, logger *l
return nil, fmt.Errorf("无法打开串口 %s: %w", config.UARTPort, err)
}
return &LoRaMeshUartPassthroughTransport{
config: config,
logger: logger,
mu: sync.Mutex{},
port: port,
}, nil
}
// Send 将数据发送到指定的地址
func (t *LoRaMeshUartPassthroughTransport) Send(address string, payload []byte) (*transport.SendResult, error) {
// TODO: 实现发送逻辑
return nil, nil
t := &LoRaMeshUartPassthroughTransport{
config: config,
logger: logger,
port: port,
state: stateIdle,
stopChan: make(chan struct{}),
sendChan: make(chan *sendRequest),
reassemblyBuffers: make(map[uint16]*reassemblyBuffer),
reassemblyTimeoutCh: make(chan uint16, 1),
// 注入依赖
areaControllerRepo: areaControllerRepo,
pendingCollectionRepo: pendingCollectionRepo,
deviceRepo: deviceRepo,
sensorDataRepo: sensorDataRepo,
}
return t, nil
}
// Listen 启动后台监听协程(非阻塞)
func (t *LoRaMeshUartPassthroughTransport) Listen() error {
//TODO implement me
panic("implement me")
t.wg.Add(1)
go t.workerLoop()
t.logger.Info("LoRa传输层工作协程已启动")
return nil
}
func (t *LoRaMeshUartPassthroughTransport) Stop() error {
//TODO implement me
panic("implement me")
// Send 将发送任务提交给worker协程
func (t *LoRaMeshUartPassthroughTransport) Send(address string, payload []byte) (*transport.SendResult, error) {
t.mu.Lock()
defer t.mu.Unlock()
resultChan := make(chan *sendResultTuple, 1)
req := &sendRequest{
address: address,
payload: payload,
result: resultChan,
}
select {
case t.sendChan <- req:
// 等待worker协程处理完毕
res := <-resultChan
return res.result, res.err
case <-t.stopChan:
return nil, fmt.Errorf("传输层正在停止")
}
}
// Stop 停止传输层
func (t *LoRaMeshUartPassthroughTransport) Stop() error {
close(t.stopChan)
t.wg.Wait()
return t.port.Close()
}
// workerLoop 是核心的状态机和调度器
func (t *LoRaMeshUartPassthroughTransport) workerLoop() {
defer t.wg.Done()
readBuffer := make([]byte, 1024)
parserBuffer := new(bytes.Buffer)
for {
// 1. 检查是否需要停止 (优先检查,以便快速退出)
select {
case <-t.stopChan:
if t.reassemblyTimeout != nil {
t.reassemblyTimeout.Stop()
}
t.logger.Info("LoRa传输层工作协程已停止")
return
default:
}
// 2. 尝试从串口读取数据
n, err := t.port.Read(readBuffer)
if n > 0 {
parserBuffer.Write(readBuffer[:n])
}
if err != nil && err != io.EOF {
// 忽略预期的超时错误(io.EOF)只记录真正的IO错误
t.logger.Errorf("从串口读取数据时发生错误: %v", err)
}
// 3. 循环解析缓冲区中的完整物理帧
for {
frame := t.parseCompleteFrame(parserBuffer)
if frame == nil {
break // 缓冲区中没有更多完整帧了
}
t.handleFrame(frame)
}
// 4. 根据当前状态执行主要逻辑
switch t.state {
case stateIdle:
t.runIdleState()
case stateReceiving:
t.runReceivingState()
}
}
}
// runIdleState 处理空闲状态下的逻辑,主要是检查并启动发送任务
func (t *LoRaMeshUartPassthroughTransport) runIdleState() {
select {
case req := <-t.sendChan:
t.state = stateSending
// 此处为阻塞式发送
result, err := t.executeSend(req)
req.result <- &sendResultTuple{result: result, err: err}
t.state = stateIdle
default:
// 没有发送任务,保持空闲
}
}
// runReceivingState 处理接收状态下的逻辑,主要是检查超时
func (t *LoRaMeshUartPassthroughTransport) runReceivingState() {
select {
case sourceAddr := <-t.reassemblyTimeoutCh:
t.logger.Warnf("接收来自 0x%04X 的消息超时", sourceAddr)
delete(t.reassemblyBuffers, sourceAddr)
t.state = stateIdle
default:
// 等待更多分片或超时
}
}
// executeSend 执行完整的发送流程(分片、构建、写入)
func (t *LoRaMeshUartPassthroughTransport) executeSend(req *sendRequest) (*transport.SendResult, error) {
chunks := splitPayload(req.payload, t.config.MaxChunkSize)
totalChunks := uint8(len(chunks))
destAddr, err := strconv.ParseUint(req.address, 16, 16)
if err != nil {
return nil, fmt.Errorf("无效的目标地址: %s", req.address)
}
for i, chunk := range chunks {
currentChunk := uint8(i)
frame := new(bytes.Buffer)
frame.WriteByte(0xED) // 帧头
frame.WriteByte(uint8(len(chunk) + 2)) // 数据长度 = 数据块 + 2 (总包数+当前包序号)
binary.Write(frame, binary.BigEndian, uint16(destAddr)) // 目标地址
frame.WriteByte(totalChunks) // 总包数
frame.WriteByte(currentChunk) // 当前包序号
frame.Write(chunk) // 数据块
_, err := t.port.Write(frame.Bytes())
if err != nil {
return nil, fmt.Errorf("写入串口失败: %w", err)
}
}
msgID := uuid.New().String()
return &transport.SendResult{MessageID: msgID}, nil
}
// handleFrame 处理一个从串口解析出的完整物理帧
func (t *LoRaMeshUartPassthroughTransport) handleFrame(frame []byte) {
if len(frame) < 8 {
t.logger.Warnf("收到了一个无效长度的帧: %d", len(frame))
return
}
destAddr := binary.BigEndian.Uint16(frame[2:4])
totalChunks := frame[4]
currentChunk := frame[5]
sourceAddr := binary.BigEndian.Uint16(frame[len(frame)-2:])
chunkData := frame[6 : len(frame)-2]
// 如果是单包消息
if totalChunks == 1 {
msg := &message{
SourceAddr: fmt.Sprintf("%04X", sourceAddr),
DestAddr: fmt.Sprintf("%04X", destAddr),
Payload: chunkData,
}
go t.handleUpstreamMessage(msg)
return
}
// --- 处理分片消息 ---
switch t.state {
case stateIdle:
if currentChunk == 0 {
t.state = stateReceiving
t.currentRecvSource = sourceAddr
t.reassemblyBuffers[sourceAddr] = &reassemblyBuffer{
chunks: make(map[uint8][]byte),
totalChunks: totalChunks,
receivedChunks: 0,
}
t.reassemblyBuffers[sourceAddr].chunks[currentChunk] = chunkData
t.reassemblyBuffers[sourceAddr].receivedChunks++
if t.reassemblyTimeout != nil {
t.reassemblyTimeout.Stop()
}
t.reassemblyTimeout = time.AfterFunc(time.Duration(t.config.ReassemblyTimeout)*time.Second, func() {
t.reassemblyTimeoutCh <- sourceAddr
})
} else {
t.logger.Warnf("在空闲状态下收到了一个来自 0x%04X 的非首包分片,已忽略。", sourceAddr)
}
case stateReceiving:
if sourceAddr != t.currentRecvSource {
t.logger.Warnf("正在接收来自 0x%04X 的数据时,收到了另一个源 0x%04X 的分片,已忽略。", t.currentRecvSource, sourceAddr)
return
}
buffer, ok := t.reassemblyBuffers[sourceAddr]
if !ok {
t.logger.Errorf("内部错误: 处于接收状态,但没有为 0x%04X 找到缓冲区", sourceAddr)
t.state = stateIdle // 重置状态
return
}
// 存入分片并重置超时
buffer.chunks[currentChunk] = chunkData
buffer.receivedChunks++
t.reassemblyTimeout.Reset(time.Duration(t.config.ReassemblyTimeout) * time.Second)
// 检查是否已全部收到
if buffer.receivedChunks == int(buffer.totalChunks) {
t.reassemblyTimeout.Stop()
// 重组消息
fullPayload := new(bytes.Buffer)
for i := 0; i < int(buffer.totalChunks); i++ {
fullPayload.Write(buffer.chunks[uint8(i)])
}
msg := &message{
SourceAddr: fmt.Sprintf("%04X", sourceAddr),
DestAddr: fmt.Sprintf("%04X", destAddr),
Payload: fullPayload.Bytes(),
}
go t.handleUpstreamMessage(msg)
// 清理并返回空闲状态
delete(t.reassemblyBuffers, sourceAddr)
t.state = stateIdle
}
}
}
// handleUpstreamMessage 在独立的协程中处理单个上行的、完整的消息。
func (t *LoRaMeshUartPassthroughTransport) handleUpstreamMessage(msg *message) {
t.logger.Infof("开始处理来自 %s 的上行消息", msg.SourceAddr)
// 1. 解析外层 "信封"
var instruction proto.Instruction
if err := gproto.Unmarshal(msg.Payload, &instruction); err != nil {
t.logger.Errorf("解析上行 Instruction Protobuf 失败: %v, 源地址: %s, 原始数据: %x", err, msg.SourceAddr, msg.Payload)
return
}
// 2. 使用 type switch 从 oneof payload 中提取 CollectResult
var collectResp *proto.CollectResult
switch p := instruction.GetPayload().(type) {
case *proto.Instruction_CollectResult:
collectResp = p.CollectResult
default:
// 如果上行的数据不是采集结果,记录日志并忽略
t.logger.Infof("收到一个非采集响应的上行指令 (类型: %T),无需处理。源地址: %s", p, msg.SourceAddr)
return
}
if collectResp == nil {
t.logger.Errorf("从 Instruction 中提取的 CollectResult 为 nil。源地址: %s", msg.SourceAddr)
return
}
correlationID := collectResp.CorrelationId
t.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values))
// 3. 查找区域主控 (注意LoRa Mesh 的 SourceAddr 对应于区域主控的 NetworkID)
regionalController, err := t.areaControllerRepo.FindByNetworkID(msg.SourceAddr)
if err != nil {
t.logger.Errorf("处理上行消息失败:无法通过源地址 '%s' 找到区域主控设备: %v", msg.SourceAddr, err)
return
}
if err := regionalController.SelfCheck(); err != nil {
t.logger.Errorf("处理上行消息失败:区域主控 %v(ID: %d) 未通过自检: %v", regionalController.Name, regionalController.ID, err)
return
}
// 4. 根据 CorrelationID 查找待处理请求
pendingReq, err := t.pendingCollectionRepo.FindByCorrelationID(correlationID)
if err != nil {
t.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err)
return
}
// 检查状态,防止重复处理
if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut {
t.logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status)
return
}
// 5. 匹配数据并存入数据库
deviceIDs := pendingReq.CommandMetadata
values := collectResp.Values
if len(deviceIDs) != len(values) {
t.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID)
err = t.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, time.Now())
if err != nil {
t.logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err)
}
return
}
for i, deviceID := range deviceIDs {
rawSensorValue := values[i]
if math.IsNaN(float64(rawSensorValue)) {
t.logger.Warnf("设备 (ID: %d) 上报了一个无效的 NaN 值,已跳过当前值的记录。", deviceID)
continue
}
dev, err := t.deviceRepo.FindByID(deviceID)
if err != nil {
t.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err)
continue
}
if err := dev.SelfCheck(); err != nil {
t.logger.Warnf("跳过设备 %d因其未通过自检: %v", dev.ID, err)
continue
}
if err := dev.DeviceTemplate.SelfCheck(); err != nil {
t.logger.Warnf("跳过设备 %d因其设备模板未通过自检: %v", dev.ID, err)
continue
}
var valueDescriptors []*models.ValueDescriptor
if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil {
t.logger.Warnf("跳过设备 %d因其设备模板的 Values 属性解析失败: %v", dev.ID, err)
continue
}
if len(valueDescriptors) == 0 {
t.logger.Warnf("跳过设备 %d因其设备模板缺少 ValueDescriptor 定义", dev.ID)
continue
}
valueDescriptor := valueDescriptors[0]
parsedValue := float64(rawSensorValue)*valueDescriptor.Multiplier + valueDescriptor.Offset
var dataToRecord interface{}
switch valueDescriptor.Type {
case models.SensorTypeTemperature:
dataToRecord = models.TemperatureData{TemperatureCelsius: parsedValue}
case models.SensorTypeHumidity:
dataToRecord = models.HumidityData{HumidityPercent: parsedValue}
case models.SensorTypeWeight:
dataToRecord = models.WeightData{WeightKilograms: parsedValue}
default:
t.logger.Warnf("未知的传感器类型 '%s',将使用通用格式记录", valueDescriptor.Type)
dataToRecord = map[string]float64{"value": parsedValue}
}
t.recordSensorData(regionalController.ID, dev.ID, time.Now(), valueDescriptor.Type, dataToRecord)
t.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%f, 解析值=%.2f", dev.ID, valueDescriptor.Type, rawSensorValue, parsedValue)
}
// 6. 更新请求状态为“已完成”
if err := t.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, time.Now()); err != nil {
t.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err)
} else {
t.logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID)
}
}
// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。
func (t *LoRaMeshUartPassthroughTransport) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, sensorType models.SensorType, data interface{}) {
jsonData, err := json.Marshal(data)
if err != nil {
t.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
return
}
sensorData := &models.SensorData{
Time: eventTime,
DeviceID: sensorDeviceID,
RegionalControllerID: regionalControllerID,
SensorType: sensorType,
Data: datatypes.JSON(jsonData),
}
if err := t.sensorDataRepo.Create(sensorData); err != nil {
t.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
}
}
// parseCompleteFrame 实现粘包和半包处理
func (t *LoRaMeshUartPassthroughTransport) parseCompleteFrame(buffer *bytes.Buffer) []byte {
for {
headerIndex := bytes.IndexByte(buffer.Bytes(), 0xED)
if headerIndex == -1 {
return nil
}
buffer.Next(headerIndex)
if buffer.Len() < 2 {
return nil
}
lengthField := buffer.Bytes()[1]
frameLength := 1 + 1 + 2 + int(lengthField) + 2
if buffer.Len() < frameLength {
return nil
}
return buffer.Next(frameLength)
}
}
// splitPayload 将数据块按最大长度进行切分
func splitPayload(payload []byte, maxChunkSize int) [][]byte {
if len(payload) == 0 {
return [][]byte{{}}
}
var chunks [][]byte
for i := 0; i < len(payload); i += maxChunkSize {
end := i + maxChunkSize
if end > len(payload) {
end = len(payload)
}
chunks = append(chunks, payload[i:end])
}
return chunks
}