Files
pig-house-controller/main/bus/rs485_manager.py
2025-10-08 19:36:48 +08:00

255 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
RS485 总线管理器实现
此模块实现了 IBusManager 接口,用于管理 RS485 总线通信。
"""
from .bus_interface import IBusManager
from typing import Dict, Any
from main.logs.logger import log
# 导入 MicroPython 的 UART 和 Pin 库
from machine import UART, Pin
import time # 用于添加延时确保RS485方向切换
import _thread # 用于线程同步
import struct # 用于浮点数转换
class RS485Manager(IBusManager):
"""
RS485 总线管理器。
负责 RS485 设备的指令发送、响应接收和数据解析。
"""
def __init__(self, bus_config: Dict[int, Dict[str, Any]], default_timeouts: Dict[str, int]):
"""
构造函数,注入配置。
根据传入的配置初始化 RS485 总线对应的 UART 管理器。
Args:
bus_config (Dict[int, Dict[str, Any]]): 包含所有总线配置的字典。
键是总线ID值是该总线的详细配置。
default_timeouts (Dict[str, int]): 包含各种默认超时设置的字典。
"""
self.bus_config = bus_config
self.default_timeouts = default_timeouts
# 存储以总线号为key的UART管理器实例、RTS引脚和锁
self.bus_ports: Dict[int, Dict[str, Any]] = {}
log("RS485Manager 已使用配置初始化。")
log(f"总线配置: {self.bus_config}")
log(f"默认超时设置: {self.default_timeouts}")
# 遍历 bus_config初始化 RS485 端口
for bus_id, config in bus_config.items():
if config.get('protocol') == 'RS485':
try:
uart_id = config['uart_id']
baudrate = config['baudrate']
pins = config['pins']
tx_pin_num = pins['tx']
rx_pin_num = pins['rx']
rts_pin_num = pins['rts'] # RS485 的 DE/RE 方向控制引脚
# 初始化 Pin 对象
rts_pin = Pin(rts_pin_num, Pin.OUT) # RTS 引脚设置为输出模式
rts_pin.value(0) # 默认设置为接收模式
# 初始化 UART 对象
# 注意MicroPython 的 UART 构造函数可能不支持直接传入 Pin 对象,而是 Pin 编号
# 并且 rts 参数通常用于硬件流控制RS485 的 DE/RE 需要手动控制
uart = UART(uart_id, baudrate=baudrate, tx=tx_pin_num, rx=rx_pin_num,
timeout=self.default_timeouts.get('rs485_response', 500))
self.bus_ports[bus_id] = {
'uart': uart,
'rts_pin': rts_pin,
'lock': _thread.allocate_lock()
}
log(f"总线 {bus_id} (RS485) 的 UART 管理器初始化成功。UART ID: {uart_id}, 波特率: {baudrate}, TX: {tx_pin_num}, RX: {rx_pin_num}, RTS(DE/RE): {rts_pin_num}")
except KeyError as e:
log(f"错误: 总线 {bus_id} 的 RS485 配置缺少关键参数: {e}")
except Exception as e:
log(f"错误: 初始化总线 {bus_id} 的 RS485 管理器失败: {e}")
else:
log(f"总线 {bus_id} 的协议不是 RS485跳过初始化。")
def _calculate_crc16_modbus(self, data: bytes) -> int:
"""
计算 Modbus RTU 的 CRC16 校验码。
"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if (crc & 0x0001):
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc
def _parse_modbus_rtu_float_default(self, response_bytes: bytes) -> float | None:
"""
默认解析 Modbus RTU 响应中的 32 位 IEEE 754 单精度浮点数。
假定为大端 (ABCD) 字节序。
"""
# 最小预期长度: 从站ID(1) + 功能码(1) + 字节计数(1) + 4字节数据 + CRC(2) = 9字节
MIN_RESPONSE_LEN = 9
EXPECTED_DATA_BYTE_COUNT = 4 # 32位浮点数占用4字节
if not response_bytes or len(response_bytes) < MIN_RESPONSE_LEN:
log(f"警告: 响应字节过短或为空,无法解析为浮点数。响应: {response_bytes.hex() if response_bytes else 'None'}")
return None
# 提取响应组件
# 注意: Modbus RTU CRC是LSB在前所以这里需要调整
# response_bytes[:-2] 是用于CRC计算的数据部分
# response_bytes[-2:] 是CRC本身
data_for_crc = response_bytes[:-2]
received_crc = (response_bytes[-1] << 8) | response_bytes[-2] # CRC的低字节在前高字节在后
# 1. CRC 校验
calculated_crc = self._calculate_crc16_modbus(data_for_crc)
if calculated_crc != received_crc:
log(f"错误: CRC校验失败。接收CRC: {received_crc:04X}, 计算CRC: {calculated_crc:04X}. 响应: {response_bytes.hex()}")
return None
slave_id = response_bytes[0]
function_code = response_bytes[1]
byte_count = response_bytes[2]
data_bytes = response_bytes[3:3 + EXPECTED_DATA_BYTE_COUNT]
# 2. 功能码检查 (假设读取保持寄存器0x03或输入寄存器0x04)
if function_code not in [0x03, 0x04]:
log(f"警告: 响应功能码 {function_code:02X} 不符合预期 (期望0x03或0x04)。响应: {response_bytes.hex()}")
return None
# 3. 字节计数检查
if byte_count != EXPECTED_DATA_BYTE_COUNT:
log(f"警告: 响应字节计数 {byte_count} 不符合预期 (期望{EXPECTED_DATA_BYTE_COUNT})。响应: {response_bytes.hex()}")
return None
# 4. 提取的数据字节长度检查 (与字节计数检查有重叠,但更安全)
if len(data_bytes) != EXPECTED_DATA_BYTE_COUNT:
log(f"错误: 提取的数据字节长度不正确。期望{EXPECTED_DATA_BYTE_COUNT}, 实际{len(data_bytes)}. 响应: {response_bytes.hex()}")
return None
# 5. 转换为浮点数 (大端, ABCD)
try:
parsed_float = struct.unpack('>f', data_bytes)[0]
log(f"成功解析浮点数: {parsed_float}")
return parsed_float
except Exception as e:
log(f"错误: 浮点数转换失败: {e}. 数据字节: {data_bytes.hex()}. 响应: {response_bytes.hex()}")
return None
def execute_raw_command(self, bus_id: int, command: bytes) -> None:
"""
【契约】执行一个“发后不理”的原始指令。
Args:
bus_id (int): 目标总线的编号。
command (bytes): 要发送的原始命令字节。
"""
if bus_id not in self.bus_ports:
log(f"错误: 未找到总线 {bus_id} 的 RS485 配置。")
return
port_info = self.bus_ports[bus_id]
uart = port_info['uart']
rts_pin = port_info['rts_pin']
lock = port_info['lock']
with lock:
try:
rts_pin.value(1) # 设置为发送模式 (DE/RE = HIGH)
time.sleep_us(100) # 短暂延时,确保方向切换完成
uart.write(command)
# 等待所有数据发送完毕
uart.flush()
time.sleep_us(100) # 短暂延时,确保数据完全发出
rts_pin.value(0) # 切换回接收模式 (DE/RE = LOW)
log(f"总线 {bus_id} 原始命令发送成功: {command.hex()}")
except Exception as e:
log(f"错误: 在总线 {bus_id} 上执行原始命令失败: {e}")
def execute_collect_task(self, task: dict) -> float | None:
"""
【契约】执行一个完整的采集任务,并直接返回最终的数值。
一个符合本接口的实现必须自己处理所有细节:
- 从task字典中解析出 bus_id, command, parser_type。
- 发送指令。
- 接收响应。
- 根据parser_type选择正确的内部解析器进行解析。
- 返回最终的float数值或在任何失败情况下返回None。
Args:
task (dict): 从Protobuf解码出的单个CollectTask消息字典。
期望结构: {"command": {"bus_number": int, "command_bytes": bytes}}
Returns:
float | None: 成功解析则返回数值否则返回None。
"""
# I. 任务参数解析与初步验证
try:
command_info = task.get("command")
if not command_info:
log("错误: CollectTask 缺少 'command' 字段。")
return None
bus_id = command_info.get("bus_number")
command_bytes = command_info.get("command_bytes")
if bus_id is None or command_bytes is None:
log("错误: Raw485Command 缺少 'bus_number''command_bytes' 字段。")
return None
except Exception as e:
log(f"错误: 解析CollectTask失败: {e}. 任务: {task}")
return None
if bus_id not in self.bus_ports:
log(f"错误: 未找到总线 {bus_id} 的 RS485 配置。")
return None
port_info = self.bus_ports[bus_id]
uart = port_info['uart']
rts_pin = port_info['rts_pin']
lock = port_info['lock']
response_bytes = None
with lock:
try:
# II. 线程安全与指令发送
rts_pin.value(1) # 设置为发送模式 (DE/RE = HIGH)
time.sleep_us(100) # 短暂延时,确保方向切换完成
uart.write(command_bytes)
uart.flush()
time.sleep_us(100) # 短暂延时,确保数据完全发出
rts_pin.value(0) # 切换回接收模式 (DE/RE = LOW)
log(f"总线 {bus_id} 原始命令发送成功: {command_bytes.hex()}")
# III. 接收响应
response_bytes = uart.read()
if response_bytes:
log(f"总线 {bus_id} 收到响应: {response_bytes.hex()}")
else:
log(f"警告: 总线 {bus_id} 未收到响应或响应超时。")
return None
except Exception as e:
log(f"错误: 在总线 {bus_id} 上执行采集命令失败: {e}")
return None
# IV. 响应解析与数据提取 (默认 Modbus RTU 浮点数)
# TODO: 根据CollectTask的Protobuf定义此处需要根据parser_type来选择具体的解析逻辑和类型。
# 目前默认使用Modbus RTU大端浮点数解析。
parsed_value = self._parse_modbus_rtu_float_default(response_bytes)
# V. 返回结果
return parsed_value