Files
pig-house-controller/app/lora/lora_mesh_uart_passthrough_manager.py
2025-10-13 14:36:49 +08:00

180 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
LoRa模块的具体实现 (UART Passthrough for LoRa Mesh)
负责与LoRa模块进行底层通信并向上层提供标准化的数据包收发接口。
这个实现针对的是通过UART进行透传的LoRa Mesh模块。
"""
from ..logs.logger import log
from machine import UART
import time
class LoRaMeshUartPassthroughManager:
"""
通过UART与LoRa Mesh模块通信的处理器实现 (ED模式)。
实现了自动分片与重组逻辑。
"""
def __init__(self, lora_config: dict):
"""
初始化LoRa处理器。
Args:
lora_config (dict): 来自全局配置文件的LoRa配置字典。
"""
log("LoRaMeshUartPassthroughManager: 初始化...")
# --- 配置注入 ---
self.master_address = lora_config.get('master_address')
self.uart_id = lora_config.get('uart_id')
self.baudrate = lora_config.get('baudrate')
self.pins = lora_config.get('pins')
self.max_chunk_size = lora_config.get('max_chunk_size')
self.lora_mesh_mode = b'\xed'
# TODO 目前这个配置没用, 完全按ED处理的
if lora_config.get('lora_mesh_mode') == 'EC':
self.lora_mesh_mode = b'\xec'
# --- 硬件初始化 ---
self.uart = UART(self.uart_id, self.baudrate, tx=self.pins['tx'], rx=self.pins['rx'])
# --- 内部状态变量 ---
self._rx_buffer = bytearray() # UART接收缓冲区
self._reassembly_cache = {} # 分片重组缓冲区 { chunk_index: chunk_data }
self._expected_chunks = 0 # 当前会话期望的总分片数
log(f"LoRaMeshUartPassthroughManager: 配置加载完成. UART ID: {self.uart_id}, Baudrate: {self.baudrate}")
def send_packet(self, payload: bytes) -> bool:
"""
【实现】发送一个数据包,自动处理分片。
Args:
payload (bytes): 需要发送的完整业务数据。
Returns:
bool: True表示所有分片都已成功提交发送False表示失败。
"""
max_chunk_size = self.max_chunk_size
if not payload:
total_chunks = 1
else:
total_chunks = (len(payload) + max_chunk_size - 1) // max_chunk_size
try:
for i in range(total_chunks):
chunk_index = i
start = i * max_chunk_size
end = start + max_chunk_size
chunk_data = payload[start:end]
# --- 组装物理包 ---
header = b'\xed'
dest_addr_bytes = self.master_address.to_bytes(2, 'big')
total_chunks_bytes = total_chunks.to_bytes(1, 'big')
current_chunk_bytes = chunk_index.to_bytes(1, 'big')
# 计算后续长度(总包数和当前包序号是自定义包头, 各占一位, 标准包头算在长度内)
length_val = 2 + len(chunk_data)
length_bytes = length_val.to_bytes(1, 'big')
# 拼接成最终的数据包
packet_to_send = header + length_bytes + dest_addr_bytes + total_chunks_bytes + current_chunk_bytes + chunk_data
self.uart.write(packet_to_send)
log(f"LoRa: 发送分片 {chunk_index + 1}/{total_chunks} 到地址 {self.master_address}")
# 让出CPU, 模块将缓存区的数据发出去本身也需要时间
time.sleep_ms(10)
return True
except Exception as e:
log(f"LoRa: 发送数据包失败: {e}")
return False
def receive_packet(self) -> bytes | None:
"""
【实现】非阻塞地检查、解析并重组一个完整的数据包。
"""
# 1. 从硬件读取数据到缓冲区
if self.uart.any():
self._rx_buffer.extend(self.uart.read())
# 2. 循环尝试从缓冲区解析包
while True:
# 2.1 检查头部和长度字段是否存在
if len(self._rx_buffer) < 2:
return None # 数据不足,无法读取长度
# 2.2 检查帧头是否正确
if self._rx_buffer[0] != 0xED:
log(f"LoRa: 接收到错误帧头: {hex(self._rx_buffer[0])}正在寻找下一个ED...")
next_ed = self._rx_buffer.find(b'\xed', 1)
if next_ed == -1:
self._rx_buffer[:] = b''
else:
self._rx_buffer = self._rx_buffer[next_ed:]
continue
# 2.3 检查包是否完整
payload_len = self._rx_buffer[1]
total_packet_len = 1 + 1 + payload_len
if len(self._rx_buffer) < total_packet_len:
return None # "半包"情况,等待更多数据
# 3. 提取和解析一个完整的物理包
packet = self._rx_buffer[:total_packet_len]
self._rx_buffer = self._rx_buffer[total_packet_len:]
addr = int.from_bytes(packet[2:4], 'big')
total_chunks = packet[4]
current_chunk = packet[5]
# 提取数据块排除末尾的2字节源地址
chunk_data = packet[6:-2]
# --- 长度反向校验 ---
# 根据协议Length字段 = 2 (自定义头) + N (数据块)
expected_payload_len = 2 + len(chunk_data)
if payload_len != expected_payload_len:
log(f"LoRa: 收到损坏的数据包!声明长度 {payload_len} 与实际计算长度 {expected_payload_len} 不符。已丢弃。")
# 包已从缓冲区移除直接continue进入下一次循环尝试解析缓冲区的后续内容
continue
# --- 校验结束 ---
# 4. 重组逻辑
if total_chunks == 1:
log(f"LoRa: 收到单包消息,来自地址 {addr},长度 {len(chunk_data)}")
return chunk_data
# 对于多包消息,只有当收到第一个分片时才清空缓存并设置期望分片数
if current_chunk == 0:
log(f"LoRa: 开始接收新的多包会话 ({total_chunks}个分片)...")
self._reassembly_cache.clear()
self._expected_chunks = total_chunks
elif not self._reassembly_cache and self._expected_chunks == 0:
# 如果不是第一个分片,但缓存是空的,说明错过了第一个分片,丢弃当前分片
log(f"LoRa: 收到非首个分片 {current_chunk},但未检测到会话开始,已丢弃。")
continue
self._reassembly_cache[current_chunk] = chunk_data
log(f"LoRa: 收到分片 {current_chunk + 1}/{self._expected_chunks},已缓存 {len(self._reassembly_cache)}")
if len(self._reassembly_cache) == self._expected_chunks:
log("LoRa: 所有分片已集齐,正在重组...")
full_payload = bytearray()
for i in range(self._expected_chunks):
if i not in self._reassembly_cache:
log(f"LoRa: 重组失败!缺少分片 {i}")
self._reassembly_cache.clear()
return None
full_payload.extend(self._reassembly_cache[i])
log(f"LoRa: 重组完成,总长度 {len(full_payload)}")
self._reassembly_cache.clear()
return bytes(full_payload)