定义 LoRaMeshUartPassthroughManager

This commit is contained in:
2025-10-09 17:01:52 +08:00
parent f30d0e0865
commit 7bc7a95379
6 changed files with 91 additions and 12 deletions

View File

@@ -11,6 +11,9 @@
# --- LoRa 模块配置 ---
# 假设LoRa模块使用独立的UART进行通信
LORA_CONFIG = {
# # 平台LoRa地址
# 'master_address': 0x01, // 目前的LoRa模块主从模式下 从节点发送广播包=点对点发送给主节点, 所有从节点都会忽略其他从节点发的广播
# LoRa模块连接的UART总线ID (0, 1, or 2 on ESP32)
'uart_id': 2,
@@ -21,7 +24,19 @@ LORA_CONFIG = {
'pins': {
'tx': 17, # UART TX
'rx': 16, # UART RX
}
},
# LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包)
# e.g.
# EC: 接收端只会接收到消息, 不会接收到请求头
# e.g. 发送: EC 05 02 01 48 65 6c 6c 6f
# (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体))
# 接收: 48 65 6c 6c 6f ("Hello")
# ED: 接收端会接收完整数据包,包含请求头
# e.g. 发送: ED 05 02 01 48 65 6c 6c 6f
# (ED + 05(消息长度) + 0201(地址) + "Hello"(消息本体))
# 接收: ED 05 02 01 48 65 6c 6c 6f
'lora_mesh_mode': 'EC',
}
# --- 总线配置 ---

View File

@@ -12,7 +12,7 @@ LoRa通信模块的抽象接口定义 (契约)
# abc (Abstract Base Class) 是Python定义接口的标准方式
from abc import ABC, abstractmethod
class ILoraHandler(ABC):
class ILoraManager(ABC):
"""
LoRa处理器接口。
它规定了所有LoRa处理器实现类必须提供的功能。

View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
LoRa模块的具体实现 (UART Passthrough for LoRa Mesh)
负责与LoRa模块进行底层通信并向上层提供标准化的数据包收发接口。
这个实现针对的是通过UART进行透传的LoRa Mesh模块。
"""
from .lora_interface import ILoraManager
from main.logs.logger import log
class LoRaMeshUartPassthroughManager(ILoraManager):
"""
通过UART与LoRa Mesh模块通信的处理器实现 (透传模式)。
"""
def __init__(self, lora_config: dict):
"""
初始化LoRa处理器。
Args:
lora_config (dict): 来自全局配置文件的LoRa配置字典。
"""
log("LoRaMeshUartPassthroughHandler: 初始化...")
# --- 将配置注入到实例变量 ---
self.master_address = lora_config.get('master_address')
self.uart_id = lora_config.get('uart_id')
self.baudrate = lora_config.get('baudrate')
self.pins = lora_config.get('pins')
self.lora_mesh_mode = lora_config.get('lora_mesh_mode')
# 在这里可以添加真实的硬件初始化代码例如初始化UART
# self.uart = UART(self.uart_id, self.baudrate, tx=self.pins['tx'], rx=self.pins['rx'])
log(f"LoRaMeshUartPassthroughHandler: 配置加载完成. UART ID: {self.uart_id}, Baudrate: {self.baudrate}")
def receive_packet(self):
"""
【实现】非阻塞地检查并接收一个数据包。
(当前为存根实现)
"""
# 具体的实现将在这里...
# e.g. self.uart.read()
pass
def send_packet(self, data_bytes: bytes) -> bool:
"""
【实现】发送一个数据包。
(当前为存根实现)
Args:
data_bytes (bytes): 需要发送的字节数据。
Returns:
bool: True表示发送成功False表示失败。
"""
# 具体的实现将在这里...
# e.g. self.uart.write(data_bytes)
log(f"LoRaMeshUartPassthroughHandler: 模拟发送数据 -> {data_bytes}")
return True

View File

@@ -19,9 +19,9 @@ from config import config
import uqueue # 导入我们自己创建的本地uqueue模块
# 导入接口和实现
from lora.lora_interface import ILoraHandler
from lora.lora_interface import ILoraManager
from bus.bus_interface import IBusManager
from lora.lora_handler import LoRaHandler
from lora.lora_mesh_uart_passthrough_manager import LoRaMeshUartPassthroughManager
from bus.rs485_manager import RS485Manager
from processor import Processor
@@ -30,7 +30,7 @@ from worker import worker_task
from logs.logger import log
# --- 模块级变量定义 (带有类型提示) ---
lora_controller: ILoraHandler | None = None
lora_manager: ILoraManager | None = None
bus_manager: IBusManager | None = None
processor: Processor | None = None
task_queue: uqueue.Queue | None = None
@@ -40,15 +40,15 @@ def setup():
"""
初始化函数,负责创建所有对象实例、共享队列,并启动工作线程。
"""
global lora_controller, bus_manager, processor, task_queue
global lora_manager, bus_manager, processor, task_queue
log("--- 系统初始化开始 ---")
# 1. 初始化硬件驱动和业务处理器
lora_controller = LoRaHandler()
lora_manager = LoRaMeshUartPassthroughManager(config.LORA_CONFIG)
bus_manager = RS485Manager(config.BUS_CONFIG, config.DEFAULT_TIMEOUTS)
processor = Processor(lora_handler=lora_controller, bus_manager=bus_manager)
processor = Processor(lora_handler=lora_manager, bus_manager=bus_manager)
# 2. 从配置文件读取队列长度,并创建线程安全的队列
queue_size = config.SYSTEM_PARAMS.get('task_queue_max_size', 10)
@@ -66,7 +66,7 @@ def loop():
主线程循环函数 (生产者)。
只负责监听LoRa并将数据放入队列。
"""
packet = lora_controller.receive_packet()
packet = lora_manager.receive_packet()
if packet:
if task_queue.full():

View File

@@ -10,7 +10,7 @@
"""
# 导入我们定义的“契约”(接口)
from lora.lora_interface import ILoraHandler
from lora.lora_interface import ILoraManager
from bus.bus_interface import IBusManager
# 导入Protobuf解析代码
@@ -25,12 +25,12 @@ class Processor:
它依赖于抽象的、面向业务的接口。
"""
def __init__(self, lora_handler: ILoraHandler, bus_manager: IBusManager):
def __init__(self, lora_handler: ILoraManager, bus_manager: IBusManager):
"""
构造函数 (依赖注入)。
Args:
lora_handler (ILoraHandler): 一个实现了LoRa接口的对象。
lora_handler (ILoraManager): 一个实现了LoRa接口的对象。
bus_manager (IBusManager): 一个实现了总线接口的对象。
"""
self.lora = lora_handler