diff --git a/main/bus/__init__.py b/main/bus/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main/bus/rs485_manager.py b/main/bus/rs485_manager.py index e69de29..df3ddc4 100644 --- a/main/bus/rs485_manager.py +++ b/main/bus/rs485_manager.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +RS485 总线管理器实现 + +此模块实现了 IBusManager 接口,用于管理 RS485 总线通信。 +""" + +from .bus_interface import IBusManager +from typing import Dict, Any +from main.logs.logger import log + +# 导入 MicroPython 的 UART 和 Pin 库 +from machine import UART, Pin +import time # 用于添加延时,确保RS485方向切换 +import _thread # 用于线程同步 +import struct # 用于浮点数转换 + + +class RS485Manager(IBusManager): + """ + RS485 总线管理器。 + 负责 RS485 设备的指令发送、响应接收和数据解析。 + """ + + def __init__(self, bus_config: Dict[int, Dict[str, Any]], default_timeouts: Dict[str, int]): + """ + 构造函数,注入配置。 + 根据传入的配置初始化 RS485 总线对应的 UART 管理器。 + + Args: + bus_config (Dict[int, Dict[str, Any]]): 包含所有总线配置的字典。 + 键是总线ID,值是该总线的详细配置。 + default_timeouts (Dict[str, int]): 包含各种默认超时设置的字典。 + """ + self.bus_config = bus_config + self.default_timeouts = default_timeouts + # 存储以总线号为key的UART管理器实例、RTS引脚和锁 + self.bus_ports: Dict[int, Dict[str, Any]] = {} + + log("RS485Manager 已使用配置初始化。") + log(f"总线配置: {self.bus_config}") + log(f"默认超时设置: {self.default_timeouts}") + + # 遍历 bus_config,初始化 RS485 端口 + for bus_id, config in bus_config.items(): + if config.get('protocol') == 'RS485': + try: + uart_id = config['uart_id'] + baudrate = config['baudrate'] + pins = config['pins'] + tx_pin_num = pins['tx'] + rx_pin_num = pins['rx'] + rts_pin_num = pins['rts'] # RS485 的 DE/RE 方向控制引脚 + + # 初始化 Pin 对象 + rts_pin = Pin(rts_pin_num, Pin.OUT) # RTS 引脚设置为输出模式 + rts_pin.value(0) # 默认设置为接收模式 + + # 初始化 UART 对象 + # 注意:MicroPython 的 UART 构造函数可能不支持直接传入 Pin 对象,而是 Pin 编号 + # 并且 rts 参数通常用于硬件流控制,RS485 的 DE/RE 需要手动控制 + uart = UART(uart_id, baudrate=baudrate, tx=tx_pin_num, rx=rx_pin_num, + timeout=self.default_timeouts.get('rs485_response', 500)) + + self.bus_ports[bus_id] = { + 'uart': uart, + 'rts_pin': rts_pin, + 'lock': _thread.allocate_lock() + } + log(f"总线 {bus_id} (RS485) 的 UART 管理器初始化成功。UART ID: {uart_id}, 波特率: {baudrate}, TX: {tx_pin_num}, RX: {rx_pin_num}, RTS(DE/RE): {rts_pin_num}") + except KeyError as e: + log(f"错误: 总线 {bus_id} 的 RS485 配置缺少关键参数: {e}") + except Exception as e: + log(f"错误: 初始化总线 {bus_id} 的 RS485 管理器失败: {e}") + else: + log(f"总线 {bus_id} 的协议不是 RS485,跳过初始化。") + + def _calculate_crc16_modbus(self, data: bytes) -> int: + """ + 计算 Modbus RTU 的 CRC16 校验码。 + """ + crc = 0xFFFF + for byte in data: + crc ^= byte + for _ in range(8): + if (crc & 0x0001): + crc >>= 1 + crc ^= 0xA001 + else: + crc >>= 1 + return crc + + def _parse_modbus_rtu_float_default(self, response_bytes: bytes) -> float | None: + """ + 默认解析 Modbus RTU 响应中的 32 位 IEEE 754 单精度浮点数。 + 假定为大端 (ABCD) 字节序。 + """ + # 最小预期长度: 从站ID(1) + 功能码(1) + 字节计数(1) + 4字节数据 + CRC(2) = 9字节 + MIN_RESPONSE_LEN = 9 + EXPECTED_DATA_BYTE_COUNT = 4 # 32位浮点数占用4字节 + + if not response_bytes or len(response_bytes) < MIN_RESPONSE_LEN: + log(f"警告: 响应字节过短或为空,无法解析为浮点数。响应: {response_bytes.hex() if response_bytes else 'None'}") + return None + + # 提取响应组件 + # 注意: Modbus RTU CRC是LSB在前,所以这里需要调整 + # response_bytes[:-2] 是用于CRC计算的数据部分 + # response_bytes[-2:] 是CRC本身 + data_for_crc = response_bytes[:-2] + received_crc = (response_bytes[-1] << 8) | response_bytes[-2] # CRC的低字节在前,高字节在后 + + # 1. CRC 校验 + calculated_crc = self._calculate_crc16_modbus(data_for_crc) + if calculated_crc != received_crc: + log(f"错误: CRC校验失败。接收CRC: {received_crc:04X}, 计算CRC: {calculated_crc:04X}. 响应: {response_bytes.hex()}") + return None + + slave_id = response_bytes[0] + function_code = response_bytes[1] + byte_count = response_bytes[2] + data_bytes = response_bytes[3:3 + EXPECTED_DATA_BYTE_COUNT] + + # 2. 功能码检查 (假设读取保持寄存器0x03或输入寄存器0x04) + if function_code not in [0x03, 0x04]: + log(f"警告: 响应功能码 {function_code:02X} 不符合预期 (期望0x03或0x04)。响应: {response_bytes.hex()}") + return None + + # 3. 字节计数检查 + if byte_count != EXPECTED_DATA_BYTE_COUNT: + log(f"警告: 响应字节计数 {byte_count} 不符合预期 (期望{EXPECTED_DATA_BYTE_COUNT})。响应: {response_bytes.hex()}") + return None + + # 4. 提取的数据字节长度检查 (与字节计数检查有重叠,但更安全) + if len(data_bytes) != EXPECTED_DATA_BYTE_COUNT: + log(f"错误: 提取的数据字节长度不正确。期望{EXPECTED_DATA_BYTE_COUNT}, 实际{len(data_bytes)}. 响应: {response_bytes.hex()}") + return None + + # 5. 转换为浮点数 (大端, ABCD) + try: + parsed_float = struct.unpack('>f', data_bytes)[0] + log(f"成功解析浮点数: {parsed_float}") + return parsed_float + except Exception as e: + log(f"错误: 浮点数转换失败: {e}. 数据字节: {data_bytes.hex()}. 响应: {response_bytes.hex()}") + return None + + def execute_raw_command(self, bus_id: int, command: bytes) -> None: + """ + 【契约】执行一个“发后不理”的原始指令。 + + Args: + bus_id (int): 目标总线的编号。 + command (bytes): 要发送的原始命令字节。 + """ + if bus_id not in self.bus_ports: + log(f"错误: 未找到总线 {bus_id} 的 RS485 配置。") + return + + port_info = self.bus_ports[bus_id] + uart = port_info['uart'] + rts_pin = port_info['rts_pin'] + lock = port_info['lock'] + with lock: + try: + rts_pin.value(1) # 设置为发送模式 (DE/RE = HIGH) + time.sleep_us(100) # 短暂延时,确保方向切换完成 + uart.write(command) + # 等待所有数据发送完毕 + uart.flush() + time.sleep_us(100) # 短暂延时,确保数据完全发出 + rts_pin.value(0) # 切换回接收模式 (DE/RE = LOW) + log(f"总线 {bus_id} 原始命令发送成功: {command.hex()}") + except Exception as e: + log(f"错误: 在总线 {bus_id} 上执行原始命令失败: {e}") + + def execute_collect_task(self, task: dict) -> float | None: + """ + 【契约】执行一个完整的采集任务,并直接返回最终的数值。 + + 一个符合本接口的实现必须自己处理所有细节: + - 从task字典中解析出 bus_id, command, parser_type。 + - 发送指令。 + - 接收响应。 + - 根据parser_type选择正确的内部解析器进行解析。 + - 返回最终的float数值,或在任何失败情况下返回None。 + + Args: + task (dict): 从Protobuf解码出的单个CollectTask消息字典。 + 期望结构: {"command": {"bus_number": int, "command_bytes": bytes}} + + Returns: + float | None: 成功解析则返回数值,否则返回None。 + """ + # I. 任务参数解析与初步验证 + try: + command_info = task.get("command") + if not command_info: + log("错误: CollectTask 缺少 'command' 字段。") + return None + + bus_id = command_info.get("bus_number") + command_bytes = command_info.get("command_bytes") + + if bus_id is None or command_bytes is None: + log("错误: Raw485Command 缺少 'bus_number' 或 'command_bytes' 字段。") + return None + + except Exception as e: + log(f"错误: 解析CollectTask失败: {e}. 任务: {task}") + return None + + if bus_id not in self.bus_ports: + log(f"错误: 未找到总线 {bus_id} 的 RS485 配置。") + return None + + port_info = self.bus_ports[bus_id] + uart = port_info['uart'] + rts_pin = port_info['rts_pin'] + lock = port_info['lock'] + + response_bytes = None + with lock: + try: + # II. 线程安全与指令发送 + rts_pin.value(1) # 设置为发送模式 (DE/RE = HIGH) + time.sleep_us(100) # 短暂延时,确保方向切换完成 + uart.write(command_bytes) + uart.flush() + time.sleep_us(100) # 短暂延时,确保数据完全发出 + rts_pin.value(0) # 切换回接收模式 (DE/RE = LOW) + log(f"总线 {bus_id} 原始命令发送成功: {command_bytes.hex()}") + + # III. 接收响应 + response_bytes = uart.read() + if response_bytes: + log(f"总线 {bus_id} 收到响应: {response_bytes.hex()}") + else: + log(f"警告: 总线 {bus_id} 未收到响应或响应超时。") + return None + + except Exception as e: + log(f"错误: 在总线 {bus_id} 上执行采集命令失败: {e}") + return None + + # IV. 响应解析与数据提取 (默认 Modbus RTU 浮点数) + # TODO: 根据CollectTask的Protobuf定义,此处需要根据parser_type来选择具体的解析逻辑和类型。 + # 目前默认使用Modbus RTU大端浮点数解析。 + parsed_value = self._parse_modbus_rtu_float_default(response_bytes) + + # V. 返回结果 + return parsed_value diff --git a/main/config.py b/main/config/config.py similarity index 89% rename from main/config.py rename to main/config/config.py index c005458..db79f15 100644 --- a/main/config.py +++ b/main/config/config.py @@ -49,16 +49,16 @@ BUS_CONFIG = { }, # 如果未来有第二条总线,或不同协议的总线,可以直接在这里添加 - 2: { - 'protocol': 'RS485', - 'uart_id': 0, - 'baudrate': 19200, # 这条总线可以有不同的波特率 - 'pins': { - 'tx': 25, - 'rx': 26, - 'rts': 27, - } - }, + # 2: { + # 'protocol': 'RS485', + # 'uart_id': 0, + # 'baudrate': 19200, # 这条总线可以有不同的波特率 + # 'pins': { + # 'tx': 25, + # 'rx': 26, + # 'rts': 27, + # } + # }, } # --- 全局超时设置 (毫秒) --- diff --git a/main/logs/__init__.py b/main/logs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main/logger.py b/main/logs/logger.py similarity index 86% rename from main/logger.py rename to main/logs/logger.py index 0daa969..8f3122e 100644 --- a/main/logger.py +++ b/main/logs/logger.py @@ -5,7 +5,8 @@ 一个简单的、可配置的日志记录器模块。 """ -import config +from main.config.config import * + def log(message: str): """ @@ -16,7 +17,7 @@ def log(message: str): """ # 从配置文件中获取调试开关的状态 # .get()方法可以安全地获取值,如果键不存在,则返回默认值False - if config.SYSTEM_PARAMS.get('debug_enabled', False): + if SYSTEM_PARAMS.get('debug_enabled', False): print(message) - + # 如果开关为False,此函数会立即返回,不执行任何操作。 diff --git a/main/lora/__init__.py b/main/lora/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main/main.py b/main/main.py index 222a0ae..89c87b0 100644 --- a/main/main.py +++ b/main/main.py @@ -15,7 +15,7 @@ import time import _thread -import config +from config import config import uqueue # 导入我们自己创建的本地uqueue模块 # 导入接口和实现 @@ -27,7 +27,7 @@ from processor import Processor # 导入工作线程的执行函数 from worker import worker_task -from logger import log +from logs.logger import log # --- 模块级变量定义 (带有类型提示) --- lora_controller: ILoraHandler | None = None @@ -46,7 +46,8 @@ def setup(): # 1. 初始化硬件驱动和业务处理器 lora_controller = LoRaHandler() - bus_manager = RS485Manager() + bus_manager = RS485Manager(config.BUS_CONFIG, config.DEFAULT_TIMEOUTS) + processor = Processor(lora_handler=lora_controller, bus_manager=bus_manager) # 2. 从配置文件读取队列长度,并创建线程安全的队列 diff --git a/main/processor.py b/main/processor.py index f8cbd57..9cf18d2 100644 --- a/main/processor.py +++ b/main/processor.py @@ -16,7 +16,7 @@ from bus.bus_interface import IBusManager # 导入Protobuf解析代码 from proto import client_pb -from logger import log +from logs.logger import log class Processor: diff --git a/main/worker.py b/main/worker.py index 9d0faf9..b682593 100644 --- a/main/worker.py +++ b/main/worker.py @@ -12,7 +12,7 @@ import uqueue from processor import Processor -from logger import log +from logs.logger import log def worker_task(task_queue: uqueue.Queue, processor: Processor): @@ -24,21 +24,21 @@ def worker_task(task_queue: uqueue.Queue, processor: Processor): processor (Processor): 业务处理器实例。 """ log("工作线程已启动,等待任务...") - + while True: try: # 1. 阻塞式地从队列中获取任务 # 如果队列为空,程序会在这里自动挂起,不消耗CPU # get()方法是线程安全的 packet_bytes = task_queue.get() - + log(f"工作线程:收到新任务,开始处理... 数据: {packet_bytes.hex()}") - + # 2. 调用processor进行耗时的、阻塞式的处理 # 这个处理过程不会影响主线程的LoRa监听 processor.handle_packet(packet_bytes) - + log("工作线程:任务处理完毕,继续等待下一个任务。") - + except Exception as e: log(f"错误:工作线程在处理任务时发生异常: {e}")