From c370aa9a4a05d67907fd4d3d648ca4e5c6391351 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 8 Oct 2025 17:44:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=BB=E9=80=BB=E8=BE=91(=E9=80=9A=E4=BF=A1?= =?UTF-8?q?=E6=9A=82=E6=97=B6=E5=8F=AA=E6=9C=89=E6=8A=BD=E8=B1=A1=E5=AE=9A?= =?UTF-8?q?=E4=B9=89)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main/bus/bus_interface.py | 47 +++++++++++++++ main/command_processor.py | 0 main/config.py | 81 ++++++++++++++++++++++++++ main/logger.py | 22 +++++++ main/lora/lora_interface.py | 53 +++++++++++++++++ main/main.py | 94 ++++++++++++++++++++++++------ main/processor.py | 111 ++++++++++++++++++++++++++++++++++++ main/uqueue.py | 81 ++++++++++++++++++++++++++ main/worker.py | 44 ++++++++++++++ 9 files changed, 516 insertions(+), 17 deletions(-) create mode 100644 main/bus/bus_interface.py delete mode 100644 main/command_processor.py create mode 100644 main/logger.py create mode 100644 main/lora/lora_interface.py create mode 100644 main/processor.py create mode 100644 main/uqueue.py create mode 100644 main/worker.py diff --git a/main/bus/bus_interface.py b/main/bus/bus_interface.py new file mode 100644 index 0000000..0b8aece --- /dev/null +++ b/main/bus/bus_interface.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +总线通信模块的抽象接口定义 (契约) + +此接口定义了面向业务操作的方法,将所有实现细节(包括解析)完全封装。 +""" + +from abc import ABC, abstractmethod + +class IBusManager(ABC): + """ + 总线管理器接口。 + 调用方只关心业务,不关心实现。 + """ + + @abstractmethod + def execute_raw_command(self, bus_id: int, command: bytes) -> None: + """ + 【契约】执行一个“发后不理”的原始指令。 + + Args: + bus_id (int): 目标总线的编号。 + command (bytes): 要发送的原始命令字节。 + """ + pass + + @abstractmethod + def execute_collect_task(self, task: dict) -> float | None: + """ + 【契约】执行一个完整的采集任务,并直接返回最终的数值。 + + 一个符合本接口的实现必须自己处理所有细节: + - 从task字典中解析出 bus_id, command, parser_type。 + - 发送指令。 + - 接收响应。 + - 根据parser_type选择正确的内部解析器进行解析。 + - 返回最终的float数值,或在任何失败情况下返回None。 + + Args: + task (dict): 从Protobuf解码出的单个CollectTask消息字典。 + + Returns: + float | None: 成功解析则返回数值,否则返回None。 + """ + pass diff --git a/main/command_processor.py b/main/command_processor.py deleted file mode 100644 index e69de29..0000000 diff --git a/main/config.py b/main/config.py index e69de29..c005458 100644 --- a/main/config.py +++ b/main/config.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +项目全局配置文件 + +集中管理所有硬件引脚、通信参数和软件配置, +便于统一修改和适配不同的硬件版本。 +""" + +# --- LoRa 模块配置 --- +# 假设LoRa模块使用独立的UART进行通信 +LORA_CONFIG = { + # LoRa模块连接的UART总线ID (0, 1, or 2 on ESP32) + 'uart_id': 2, + + # LoRa模块的通信波特率 + 'baudrate': 9600, + + # LoRa模块连接的GPIO引脚 + 'pins': { + 'tx': 17, # UART TX + 'rx': 16, # UART RX + } +} + +# --- 总线配置 --- +# 使用字典来定义项目中的所有通信总线 +# key是总线ID (bus_id),value是该总线的详细配置字典。 +# 这种结构使得 command_processor 可以通过 bus_id 动态获取其配置。 +BUS_CONFIG = { + # --- 总线 1 --- + 1: { + # 总线协议类型,用于程序动态选择不同的处理逻辑 + 'protocol': 'RS485', + + # 该总线使用的硬件UART ID + 'uart_id': 1, + + # 该总线的通信波特率 + 'baudrate': 9600, + + # 该总线使用的GPIO引脚 + 'pins': { + 'tx': 4, # RS485 TX + 'rx': 5, # RS485 RX + 'rts': 2, # RS485 DE/RE 方向控制引脚 + } + }, + + # 如果未来有第二条总线,或不同协议的总线,可以直接在这里添加 + 2: { + 'protocol': 'RS485', + 'uart_id': 0, + 'baudrate': 19200, # 这条总线可以有不同的波特率 + 'pins': { + 'tx': 25, + 'rx': 26, + 'rts': 27, + } + }, +} + +# --- 全局超时设置 (毫秒) --- +DEFAULT_TIMEOUTS = { + 'rs485_response': 500, # 等待RS485设备响应的默认超时时间 + 'lora_at_command': 300, # 等待LoRa模块AT指令响应的超时时间 +} + +# --- 系统参数配置 --- +SYSTEM_PARAMS = { + # 任务队列的最大长度。用于主线程和工作线程之间的缓冲。 + # 如果LoRa指令瞬间并发量大,可以适当调高此值。 + # 如果内存紧张,可以适当调低。 + 'task_queue_max_size': 10, + + # 全局调试日志开关 + # True: 所有 logger.log() 的信息都会被打印到串口。 + # False: logger.log() 将不执行任何操作,用于发布产品。 + 'debug_enabled': True, +} diff --git a/main/logger.py b/main/logger.py new file mode 100644 index 0000000..0daa969 --- /dev/null +++ b/main/logger.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +一个简单的、可配置的日志记录器模块。 +""" + +import config + +def log(message: str): + """ + 打印一条日志消息,是否实际输出取决于配置文件。 + + Args: + message (str): 要打印的日志消息。 + """ + # 从配置文件中获取调试开关的状态 + # .get()方法可以安全地获取值,如果键不存在,则返回默认值False + if config.SYSTEM_PARAMS.get('debug_enabled', False): + print(message) + + # 如果开关为False,此函数会立即返回,不执行任何操作。 diff --git a/main/lora/lora_interface.py b/main/lora/lora_interface.py new file mode 100644 index 0000000..5bacfc4 --- /dev/null +++ b/main/lora/lora_interface.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +LoRa通信模块的抽象接口定义 (契约) + +这个文件定义了一个LoRa处理器应该具备哪些功能, +但不包含任何具体的实现代码。任何具体的LoRa处理器, +无论是UART的还是SPI的,都必须实现这里定义的所有方法。 +""" + +# abc (Abstract Base Class) 是Python定义接口的标准方式 +from abc import ABC, abstractmethod + +class ILoraHandler(ABC): + """ + LoRa处理器接口。 + 它规定了所有LoRa处理器实现类必须提供的功能。 + """ + + @abstractmethod + def receive_packet(self): + """ + 【契约】非阻塞地检查并接收一个数据包。 + + 一个符合本接口的实现必须: + - 检查是否有新的数据包。 + - 如果有,读取、解析并返回负载数据。 + - 如果没有,必须立刻返回None,不得阻塞。 + + Returns: + bytes: 如果成功接收到一个数据包,返回该数据包的字节。 + None: 如果当前没有可读的数据包。 + """ + pass + + @abstractmethod + def send_packet(self, data_bytes: bytes) -> bool: + """ + 【契约】发送一个数据包。 + + 一个符合本接口的实现必须: + - 接收一个bytes类型的参数。 + - 将这些数据通过LoRa模块发送出去。 + - 返回一个布尔值表示发送指令是否成功提交。 + + Args: + data_bytes (bytes): 需要发送的字节数据。 + + Returns: + bool: True表示发送指令已成功提交,False表示因任何原因失败。 + """ + pass diff --git a/main/main.py b/main/main.py index c4c05bd..222a0ae 100644 --- a/main/main.py +++ b/main/main.py @@ -2,29 +2,89 @@ # -*- coding: utf-8 -*- """ -猪舍主控系统主程序入口 +程序主入口 (双线程生产者-消费者模型) + +主线程 (生产者): +- 职责:以最高优先级不间断监听LoRa数据,并将数据包放入任务队列。 +- 特点:永远不执行耗时操作,保证LoRa数据接收的实时性。 + +工作线程 (消费者): +- 职责:从任务队列中取出数据包,并进行耗时的业务处理。 +- 特点:可能会长时间阻塞,但不影响主线程的数据接收。 """ import time -# import struct # 根据需要保留或删除 -# import client_pb # 根据需要保留或删除 +import _thread +import config +import uqueue # 导入我们自己创建的本地uqueue模块 + +# 导入接口和实现 +from lora.lora_interface import ILoraHandler +from bus.bus_interface import IBusManager +from lora.lora_handler import LoRaHandler +from bus.rs485_manager import RS485Manager +from processor import Processor + +# 导入工作线程的执行函数 +from worker import worker_task +from logger import log + +# --- 模块级变量定义 (带有类型提示) --- +lora_controller: ILoraHandler | None = None +bus_manager: IBusManager | None = None +processor: Processor | None = None +task_queue: uqueue.Queue | None = None -def main_loop(): +def setup(): """ - 主循环 + 初始化函数,负责创建所有对象实例、共享队列,并启动工作线程。 """ - print("猪舍控制系统启动...") - - while True: - # 在这里添加你的逻辑 - time.sleep(1) # 避免空循环占用过多CPU + global lora_controller, bus_manager, processor, task_queue -# 程序入口 + log("--- 系统初始化开始 ---") + + # 1. 初始化硬件驱动和业务处理器 + lora_controller = LoRaHandler() + bus_manager = RS485Manager() + processor = Processor(lora_handler=lora_controller, bus_manager=bus_manager) + + # 2. 从配置文件读取队列长度,并创建线程安全的队列 + queue_size = config.SYSTEM_PARAMS.get('task_queue_max_size', 10) + task_queue = uqueue.Queue(maxsize=queue_size) + log(f"任务队列已创建,最大容量: {queue_size}") + + # 3. 启动工作线程 + _thread.start_new_thread(worker_task, (task_queue, processor)) + + log("--- 系统初始化完成 ---") + + +def loop(): + """ + 主线程循环函数 (生产者)。 + 只负责监听LoRa,并将数据放入队列。 + """ + packet = lora_controller.receive_packet() + + if packet: + if task_queue.full(): + log("警告:任务队列已满,新的LoRa数据包被丢弃!") + return + + try: + task_queue.put_nowait(packet) + log(f"主线程:新LoRa数据包已入队。当前队列大小: {task_queue.qsize()}") + except Exception as e: + log(f"错误:数据包入队失败: {e}") + + time.sleep_ms(10) + + +# --- 程序主执行区 --- if __name__ == "__main__": - try: - main_loop() - except KeyboardInterrupt: - print("程序被中断") - except Exception as e: - print(f"程序异常: {e}") + setup() + + log("--- 主线程进入循环 (LoRa监听) ---") + while True: + loop() diff --git a/main/processor.py b/main/processor.py new file mode 100644 index 0000000..b5b5a51 --- /dev/null +++ b/main/processor.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +核心业务逻辑处理器 (V3 - 面向业务接口) + +职责: +- 编排业务流程:解码指令,并将业务任务分发给相应的管理器。 +- 完全不关心总线通信和数据解析的技术实现细节。 +""" + +# 导入我们定义的“契约”(接口) +from lora.lora_interface import ILoraHandler +from bus.bus_interface import IBusManager + +# 导入Protobuf解析代码 +from proto import client_pb + +from logger import log + + +class Processor: + """ + 命令处理器类,项目的“大脑”。 + 它依赖于抽象的、面向业务的接口。 + """ + + def __init__(self, lora_handler: ILoraHandler, bus_manager: IBusManager): + """ + 构造函数 (依赖注入)。 + + Args: + lora_handler (ILoraHandler): 一个实现了LoRa接口的对象。 + bus_manager (IBusManager): 一个实现了总线接口的对象。 + """ + self.lora = lora_handler + self.bus = bus_manager + log("业务处理器已初始化,准备就绪。") + + def handle_packet(self, packet_bytes: bytes): + """ + 处理单个LoRa数据包的入口函数。 + """ + log(f"收到待处理数据包: {packet_bytes.hex()}") + + try: + instruction = client_pb.decode_instruction(packet_bytes) + except Exception as e: + log(f"错误:解码指令失败: {e}") + return + + # 根据指令类型,分发到不同的业务处理方法 + if 'raw_485_command' in instruction: + self._process_exec_command(instruction['raw_485_command']) + + elif 'batch_collect_command' in instruction: + self._process_collect_command(instruction['batch_collect_command']) + + else: + log(f"警告:收到未知或不适用于此设备的指令类型: {instruction}") + + def _process_exec_command(self, cmd: dict): + """ + 处理“执行命令”业务。 + """ + bus_id = cmd['bus_number'] + command_bytes = cmd['command_bytes'] + log(f"处理[执行命令]业务:向总线 {bus_id} 下发指令。") + + # 直接调用总线接口的业务方法,不关心实现 + self.bus.execute_raw_command(bus_id, command_bytes) + log("执行指令已下发。") + + def _process_collect_command(self, cmd: dict): + """ + 处理“采集命令”业务。 + """ + correlation_id = cmd['correlation_id'] + tasks = cmd['tasks'] + log(f"处理[采集命令]业务 (ID: {correlation_id}):共 {len(tasks)} 个任务。") + + sensor_values = [] + for i, task in enumerate(tasks): + log(f" - 执行任务 {i + 1}...") + + # 调用总线接口的业务方法,直接获取最终结果 + # 我们不再关心task的具体内容,也不关心解析过程 + value = self.bus.execute_collect_task(task) + + if value is not None: + sensor_values.append(value) + log(f" => 成功,获取值为: {value}") + else: + # 如果返回None,表示任务失败(超时或解析错误) + sensor_values.append(-1) # 添加一个默认/错误值 + log(" => 失败,任务未返回有效值。") + + # 所有任务执行完毕,构建并发送响应 + log(f"所有采集任务完成,准备发送响应。采集到的值: {sensor_values}") + try: + response_payload = { + 'correlation_id': correlation_id, + 'values': sensor_values + } + response_packet = client_pb.encode_instruction('collect_result', response_payload) + + # 通过LoRa接口发送出去 + self.lora.send_packet(response_packet) + log("采集结果已通过LoRa发送。") + except Exception as e: + log(f"错误:编码或发送采集结果失败: {e}") diff --git a/main/uqueue.py b/main/uqueue.py new file mode 100644 index 0000000..c915ea4 --- /dev/null +++ b/main/uqueue.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +一个适用于MicroPython的、简单的线程安全队列实现。 + +这个模块提供了一个与标准库 `queue.Queue` 类似的类, +确保在多线程环境下的数据操作是安全的。 +""" + +import _thread +from collections import deque + +class Queue: + """一个简单的、线程安全的队列。""" + + def __init__(self, maxsize=0): + self.maxsize = maxsize + self._lock = _thread.allocate_lock() + # 使用deque可以实现更高效的头部弹出操作 (O(1)) + self._items = deque((), maxsize if maxsize > 0 else 1024) + + def qsize(self): + """返回队列中的项目数。""" + with self._lock: + return len(self._items) + + def empty(self): + """如果队列为空,返回True,否则返回False。""" + return self.qsize() == 0 + + def full(self): + """如果队列已满,返回True,否则返回False。""" + if self.maxsize <= 0: + return False + return self.qsize() >= self.maxsize + + def put(self, item, block=True, timeout=None): + """将一个项目放入队列。""" + if not block: + return self.put_nowait(item) + + # 阻塞式put的简单实现 (在实际应用中更复杂的实现会使用信号量) + while True: + with self._lock: + if not self.full(): + self._items.append(item) + return + # 如果队列是满的,短暂休眠后重试 + import time + time.sleep_ms(5) + + def put_nowait(self, item): + """等同于 put(item, block=False)。""" + if self.full(): + raise OSError("Queue full") + + with self._lock: + self._items.append(item) + + def get(self, block=True, timeout=None): + """从队列中移除并返回一个项目。""" + if not block: + return self.get_nowait() + + # 阻塞式get的简单实现 + while True: + with self._lock: + if self._items: + return self._items.popleft() + # 如果队列是空的,短暂休眠后重试 + import time + time.sleep_ms(5) + + def get_nowait(self): + """等同于 get(item, block=False)。""" + if self.empty(): + raise OSError("Queue empty") + + with self._lock: + return self._items.popleft() diff --git a/main/worker.py b/main/worker.py new file mode 100644 index 0000000..9d0faf9 --- /dev/null +++ b/main/worker.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +工作线程模块 + +职责: +- 作为一个独立的线程运行。 +- 阻塞式地等待任务队列。 +- 从队列中取出任务,并交给Processor进行耗时处理。 +""" + +import uqueue +from processor import Processor +from logger import log + + +def worker_task(task_queue: uqueue.Queue, processor: Processor): + """ + 工作线程的主函数。 + + Args: + task_queue (uqueue.Queue): 共享的任务队列。 + processor (Processor): 业务处理器实例。 + """ + log("工作线程已启动,等待任务...") + + while True: + try: + # 1. 阻塞式地从队列中获取任务 + # 如果队列为空,程序会在这里自动挂起,不消耗CPU + # get()方法是线程安全的 + packet_bytes = task_queue.get() + + log(f"工作线程:收到新任务,开始处理... 数据: {packet_bytes.hex()}") + + # 2. 调用processor进行耗时的、阻塞式的处理 + # 这个处理过程不会影响主线程的LoRa监听 + processor.handle_packet(packet_bytes) + + log("工作线程:任务处理完毕,继续等待下一个任务。") + + except Exception as e: + log(f"错误:工作线程在处理任务时发生异常: {e}")