Compare commits
28 Commits
10525dcfcc
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| e3167a0144 | |||
| 452aaaeaba | |||
| c0a1925150 | |||
| 6d50e0a810 | |||
| b69805063d | |||
| c025530c25 | |||
| 902c90cf19 | |||
| 961b0c170b | |||
| c3d4531870 | |||
| 5f81314540 | |||
| d5de3a7a2b | |||
| 97a9c778e8 | |||
| dfb50f5c74 | |||
| 7bc7a95379 | |||
| f30d0e0865 | |||
| 46922e8505 | |||
| fe1c307129 | |||
| c370aa9a4a | |||
| d57d7cba70 | |||
| c36411e616 | |||
| 462e100c27 | |||
| 23889b80de | |||
| c117759ac3 | |||
| 5519d43253 | |||
| b80a04bfc1 | |||
| ae2a9e9364 | |||
| 9b13d413c4 | |||
| cd0f51057a |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -174,3 +174,6 @@ cython_debug/
|
||||
# PyPI configuration file
|
||||
.pypirc
|
||||
|
||||
|
||||
.idea
|
||||
.vscode
|
||||
|
||||
8
.idea/.gitignore
generated
vendored
Normal file
8
.idea/.gitignore
generated
vendored
Normal file
@@ -0,0 +1,8 @@
|
||||
# 默认忽略的文件
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# 基于编辑器的 HTTP 客户端请求
|
||||
/httpRequests/
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
||||
43
README.md
43
README.md
@@ -1,3 +1,44 @@
|
||||
# pig-house-controller
|
||||
|
||||
猪舍主控节点,根据上位机的指令控制当前猪舍内所有设备(传感器,阀门,电机等),并汇聚当前猪舍传感器数据统一上报
|
||||
## LoRa通信协议约定
|
||||
|
||||
本项目中的LoRa通信采用自定义的帧格式,以支持精确寻址和大数据包的自动分片与重组。所有数据包均由主控节点主动发起。
|
||||
|
||||
### 1. 物理帧结构
|
||||
|
||||
每个通过LoRa UART模块发送的物理数据包都遵循以下结构:
|
||||
|
||||
| 字段 | 长度 (字节) | 值 (Hex) | 描述 |
|
||||
|:-------------------------| :------------ | :---------------- |:------------------------------|
|
||||
| **帧头 (Header)** | 1 | `0xED` | 固定值,表示一个数据包的开始。 |
|
||||
| **数据长度 (Length)** | 1 | `0x00`-`0xFF` | 从`总包数`字段到`数据块`末尾的总字节数,不包含源地址。 |
|
||||
| **目标地址 (DestAddr)** | 2 | `0x0000`-`0xFFFF` | 接收该数据包的设备地址。 |
|
||||
| **总包数 (TotalChunks)** | 1 | `0x01`-`0xFF` | 表示当前消息被分成了几个包。`0x01`代表这是唯一的包。 |
|
||||
| **当前包序号 (CurrentChunk)** | 1 | `0x00`-`0xFE` | 当前是第几个数据包(从0开始计数)。 |
|
||||
| **数据块 (ChunkData)** | N | - | 实际传输的数据片段。 |
|
||||
| **源地址 (SourceAddr)** | 2 | `0x0000`-`0xFFFF` | 发送该数据包的设备地址,由硬件自动拼接。 |
|
||||
|
||||
**示例:**
|
||||
|
||||
发送一个数据为 `[0x01, 0x02, 0x03]` 的单包消息到地址 `0x1234`,发送方地址为 `0x5678`:
|
||||
`ED 05 12 34 01 00 01 02 03 56 78`
|
||||
- `ED`: 帧头
|
||||
- `05`: 后续长度 (1+1+3 = 5)
|
||||
- `12 34`: 目标地址
|
||||
- `01`: 总包数 (共1包)
|
||||
- `00`: 当前包序号 (第0包)
|
||||
- `01 02 03`: 数据块
|
||||
- `56 78`: 源地址
|
||||
|
||||
### 2. 数据分片 (Fragmentation)
|
||||
|
||||
- LoRa模块的物理层限制单次发送的数据部分**最大为240字节**。
|
||||
- 根据项目约定,为自定义协议头(总包数、当前包序号)预留2字节,地址由模块处理。
|
||||
- 因此,每个物理包中 **`数据块 (ChunkData)` 的最大长度为 `238` 字节**。
|
||||
- `send_packet` 方法会自动处理分片逻辑。
|
||||
|
||||
### 3. 数据重组 (Reassembly)
|
||||
|
||||
- `receive_packet` 方法会缓存收到的分片。
|
||||
- 当一个设备的所有分片都接收完毕后,`receive_packet` 会将它们自动重组成一个完整的消息,并向上层返回。
|
||||
- 由于通信是单向的(仅主控发送),接收端无需管理多个源地址的重组缓冲区。
|
||||
0
app/bus/__init__.py
Normal file
0
app/bus/__init__.py
Normal file
47
app/bus/bus_interface.py
Normal file
47
app/bus/bus_interface.py
Normal file
@@ -0,0 +1,47 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
总线通信模块的抽象接口定义 (契约)
|
||||
|
||||
此接口定义了面向业务操作的方法,将所有实现细节(包括解析)完全封装。
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
class IBusManager(ABC):
|
||||
"""
|
||||
总线管理器接口。
|
||||
调用方只关心业务,不关心实现。
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def execute_raw_command(self, bus_id: int, command: bytes) -> None:
|
||||
"""
|
||||
【契约】执行一个“发后不理”的原始指令。
|
||||
|
||||
Args:
|
||||
bus_id (int): 目标总线的编号。
|
||||
command (bytes): 要发送的原始命令字节。
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def execute_collect_task(self, task: dict) -> float | None:
|
||||
"""
|
||||
【契约】执行一个完整的采集任务,并直接返回最终的数值。
|
||||
|
||||
一个符合本接口的实现必须自己处理所有细节:
|
||||
- 从task字典中解析出 bus_id, command, parser_type。
|
||||
- 发送指令。
|
||||
- 接收响应。
|
||||
- 根据parser_type选择正确的内部解析器进行解析。
|
||||
- 返回最终的float数值,或在任何失败情况下返回None。
|
||||
|
||||
Args:
|
||||
task (dict): 从Protobuf解码出的单个CollectTask消息字典。
|
||||
|
||||
Returns:
|
||||
float | None: 成功解析则返回数值,否则返回None。
|
||||
"""
|
||||
pass
|
||||
300
app/bus/rs485_manager.py
Normal file
300
app/bus/rs485_manager.py
Normal file
@@ -0,0 +1,300 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
RS485 总线管理器实现
|
||||
|
||||
此模块实现了 IBusManager 接口,用于管理 RS485 总线通信。
|
||||
"""
|
||||
|
||||
from ..logs.logger import log
|
||||
|
||||
# 导入 MicroPython 的 UART 和 Pin 库
|
||||
from machine import UART, Pin
|
||||
import time # 用于添加延时,确保RS485方向切换
|
||||
import _thread # 用于线程同步
|
||||
import struct # 用于浮点数转换
|
||||
|
||||
|
||||
class RS485Manager:
|
||||
"""
|
||||
RS485 总线管理器。
|
||||
负责 RS485 设备的指令发送、响应接收和数据解析。
|
||||
"""
|
||||
|
||||
def __init__(self, bus_config, default_timeouts):
|
||||
"""
|
||||
构造函数,注入配置。
|
||||
根据传入的配置初始化 RS485 总线对应的 UART 管理器。
|
||||
|
||||
Args:
|
||||
bus_config: 包含所有总线配置的字典。
|
||||
键是总线ID,值是该总线的详细配置。
|
||||
default_timeouts: 包含各种默认超时设置的字典。
|
||||
"""
|
||||
self.bus_config = bus_config
|
||||
self.default_timeouts = default_timeouts
|
||||
# 存储以总线号为key的UART管理器实例、RTS引脚和锁
|
||||
self.bus_ports = {}
|
||||
|
||||
log("RS485Manager 已使用配置初始化。")
|
||||
log(f"总线配置: {self.bus_config}")
|
||||
log(f"默认超时设置: {self.default_timeouts}")
|
||||
|
||||
# 遍历 bus_config,初始化 RS485 端口
|
||||
for bus_id, config in bus_config.items():
|
||||
if config.get('protocol') == 'RS485':
|
||||
try:
|
||||
uart_id = config['uart_id']
|
||||
baudrate = config['baudrate']
|
||||
pins = config['pins']
|
||||
tx_pin_num = pins['tx']
|
||||
rx_pin_num = pins['rx']
|
||||
rts_pin_num = pins['rts'] # RS485 的 DE/RE 方向控制引脚
|
||||
|
||||
# 初始化 Pin 对象
|
||||
rts_pin = Pin(rts_pin_num, Pin.OUT) # RTS 引脚设置为输出模式
|
||||
rts_pin.value(0) # 默认设置为接收模式
|
||||
|
||||
# 初始化 UART 对象
|
||||
# 注意:MicroPython 的 UART 构造函数可能不支持直接传入 Pin 对象,而是 Pin 编号
|
||||
# 并且 rts 参数通常用于硬件流控制,RS485 的 DE/RE 需要手动控制
|
||||
uart = UART(uart_id, baudrate=baudrate, tx=tx_pin_num, rx=rx_pin_num,
|
||||
timeout=self.default_timeouts.get('rs485_response', 500))
|
||||
|
||||
self.bus_ports[bus_id] = {
|
||||
'uart': uart,
|
||||
'rts_pin': rts_pin,
|
||||
'lock': _thread.allocate_lock()
|
||||
}
|
||||
log(f"总线 {bus_id} (RS485) 的 UART 管理器初始化成功。UART ID: {uart_id}, 波特率: {baudrate}, TX: {tx_pin_num}, RX: {rx_pin_num}, RTS(DE/RE): {rts_pin_num}")
|
||||
except KeyError as e:
|
||||
log(f"错误: 总线 {bus_id} 的 RS485 配置缺少关键参数: {e}")
|
||||
except Exception as e:
|
||||
log(f"错误: 初始化总线 {bus_id} 的 RS485 管理器失败: {e}")
|
||||
else:
|
||||
log(f"总线 {bus_id} 的协议不是 RS485,跳过初始化。")
|
||||
|
||||
@staticmethod
|
||||
def _calculate_crc16_modbus(data):
|
||||
"""
|
||||
计算 Modbus RTU 的 CRC16 校验码。
|
||||
"""
|
||||
crc = 0xFFFF
|
||||
for byte in data:
|
||||
crc ^= byte
|
||||
for _ in range(8):
|
||||
if crc & 0x0001:
|
||||
crc >>= 1
|
||||
crc ^= 0xA001
|
||||
else:
|
||||
crc >>= 1
|
||||
return crc
|
||||
|
||||
def execute_raw_command(self, bus_id, command):
|
||||
"""
|
||||
【契约】执行一个“发后不理”的原始指令。
|
||||
|
||||
Args:
|
||||
bus_id (int): 目标总线的编号。
|
||||
command (bytes): 要发送的原始命令字节。
|
||||
"""
|
||||
if bus_id not in self.bus_ports:
|
||||
log(f"错误: 未找到总线 {bus_id} 的 RS485 配置。")
|
||||
return
|
||||
|
||||
port_info = self.bus_ports[bus_id]
|
||||
uart = port_info['uart']
|
||||
rts_pin = port_info['rts_pin']
|
||||
lock = port_info['lock']
|
||||
with lock:
|
||||
try:
|
||||
rts_pin.value(1) # 设置为发送模式 (DE/RE = HIGH)
|
||||
time.sleep_us(100) # 短暂延时,确保方向切换完成
|
||||
uart.write(command)
|
||||
# 等待所有数据发送完毕
|
||||
uart.flush()
|
||||
time.sleep_us(100) # 短暂延时,确保数据完全发出
|
||||
rts_pin.value(0) # 切换回接收模式 (DE/RE = LOW)
|
||||
log(f"总线 {bus_id} 原始命令发送成功: {command.hex()}")
|
||||
except Exception as e:
|
||||
log(f"错误: 在总线 {bus_id} 上执行原始命令失败: {e}")
|
||||
|
||||
def execute_collect_task(self, task):
|
||||
"""
|
||||
【契约】执行一个完整的采集任务,并直接返回最终的数值。
|
||||
|
||||
一个符合本接口的实现必须自己处理所有细节:
|
||||
- 从task字典中解析出 bus_id, command, parser_type。
|
||||
- 发送指令。
|
||||
- 接收响应。
|
||||
- 根据parser_type选择正确的内部解析器进行解析。
|
||||
- 返回最终的float数值,或在任何失败情况下返回None。
|
||||
|
||||
Args:
|
||||
task: 从Protobuf解码出的单个CollectTask消息字典。
|
||||
期望结构: {"command": {"bus_number": int, "command_bytes": bytes}}
|
||||
|
||||
Returns:
|
||||
成功解析则返回数值,否则返回None。
|
||||
"""
|
||||
# I. 任务参数解析与初步验证
|
||||
try:
|
||||
command_info = task.get("command")
|
||||
if not command_info:
|
||||
log("错误: CollectTask 缺少 'command' 字段。")
|
||||
return None
|
||||
|
||||
bus_id = command_info.get("bus_number")
|
||||
command_bytes = command_info.get("command_bytes")
|
||||
|
||||
# 增加对命令有效性的检查
|
||||
if bus_id is None or not command_bytes or len(command_bytes) < 2:
|
||||
log(f"错误: CollectTask 的 'command' 字段无效。bus_id: {bus_id}, command_bytes: {command_bytes}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
log(f"错误: 解析CollectTask失败: {e}. 任务: {task}")
|
||||
return None
|
||||
|
||||
if bus_id not in self.bus_ports:
|
||||
log(f"错误: 未找到总线 {bus_id} 的 RS485 配置。")
|
||||
return None
|
||||
|
||||
port_info = self.bus_ports[bus_id]
|
||||
uart = port_info['uart']
|
||||
rts_pin = port_info['rts_pin']
|
||||
lock = port_info['lock']
|
||||
|
||||
response_bytes = None # 在锁外部初始化,确保其作用域
|
||||
response_buffer = bytearray()
|
||||
with lock:
|
||||
try:
|
||||
# II. 线程安全与指令发送
|
||||
rts_pin.value(1)
|
||||
time.sleep_us(100)
|
||||
uart.write(command_bytes)
|
||||
uart.flush()
|
||||
time.sleep_us(100)
|
||||
rts_pin.value(0)
|
||||
log(f"总线 {bus_id} 原始命令发送成功: {command_bytes.hex()}")
|
||||
|
||||
# III. 接收响应
|
||||
start_time = time.ticks_ms()
|
||||
response_timeout = self.default_timeouts.get('rs485_response', 500)
|
||||
while time.ticks_diff(time.ticks_ms(), start_time) < response_timeout:
|
||||
if uart.any():
|
||||
chunk = uart.read(32)
|
||||
if chunk:
|
||||
response_buffer.extend(chunk)
|
||||
start_time = time.ticks_ms() # 收到数据就重置超时
|
||||
time.sleep_ms(5)
|
||||
|
||||
if response_buffer:
|
||||
# 动态地从请求命令中获取预期的从站ID和功能码
|
||||
expected_slave_id = command_bytes[0]
|
||||
expected_func_code = command_bytes[1]
|
||||
|
||||
found_frame = self._find_modbus_frame(response_buffer, expected_slave_id, expected_func_code)
|
||||
if found_frame:
|
||||
log(f"总线 {bus_id} 收到有效响应: {found_frame.hex()}")
|
||||
response_bytes = found_frame # 将找到的帧赋值给外部变量
|
||||
else:
|
||||
log(f"警告: 总线 {bus_id} 响应中无有效帧。收到响应: {response_buffer.hex()}")
|
||||
else:
|
||||
log(f"警告: 总线 {bus_id} 未收到响应。")
|
||||
|
||||
except Exception as e:
|
||||
log(f"错误: 在总线 {bus_id} 上执行采集命令失败: {e}")
|
||||
|
||||
# IV. 统一处理和解析
|
||||
# 无论是因为超时、未找到有效帧还是发生异常,只要 response_bytes 仍为 None,就任务失败
|
||||
if response_bytes is None:
|
||||
return None
|
||||
|
||||
# 使用找到的有效帧进行解析
|
||||
parsed_value = RS485Manager._parse_modbus_rtu_default(response_bytes)
|
||||
|
||||
return parsed_value
|
||||
|
||||
def _find_modbus_frame(self, buffer: bytearray, expected_slave: int, func_code: int) -> bytes | None:
|
||||
"""
|
||||
修复版:加调试;优先头检查;CRC 字节序标准 Modbus (低字节在前)。
|
||||
"""
|
||||
log(f"搜索帧: buffer 长度 {len(buffer)}, hex {buffer.hex()}")
|
||||
i = 0
|
||||
while i < len(buffer) - 6: # 最小 7 字节,-6 安全
|
||||
if buffer[i] == expected_slave and buffer[i + 1] == func_code:
|
||||
byte_count = buffer[i + 2]
|
||||
frame_len = 3 + byte_count + 2
|
||||
if len(buffer) - i >= frame_len:
|
||||
frame = bytes(buffer[i:i + frame_len])
|
||||
# CRC 预校验(标准 Modbus:CRC 低字节在前)
|
||||
core = frame[:-2]
|
||||
calc_crc = self._calculate_crc16_modbus(core)
|
||||
low_crc = frame[-2]
|
||||
high_crc = frame[-1]
|
||||
recv_crc = (high_crc << 8) | low_crc # 高<<8 | 低
|
||||
log(f"候选帧 at {i}: {frame.hex()}, calc CRC {calc_crc:04X}, recv {recv_crc:04X}")
|
||||
if calc_crc == recv_crc:
|
||||
log(f"找到有效帧: {frame.hex()}")
|
||||
return frame
|
||||
else:
|
||||
log(f"CRC 不匹配,跳过 (calc {calc_crc:04X} != recv {recv_crc:04X})")
|
||||
i += 1
|
||||
log("无有效帧")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _parse_modbus_rtu_default(response_bytes): # 改名,支持整数/浮点
|
||||
"""
|
||||
修复版:动态数据长;CRC 只用核心。
|
||||
"""
|
||||
if not response_bytes or len(response_bytes) < 7:
|
||||
log(f"警告: 响应过短。响应: {response_bytes.hex() if response_bytes else 'None'}")
|
||||
return None
|
||||
|
||||
# CRC 校验(只核心)
|
||||
data_for_crc = response_bytes[:-2]
|
||||
received_crc = (response_bytes[-1] << 8) | response_bytes[-2]
|
||||
|
||||
calculated_crc = RS485Manager._calculate_crc16_modbus(data_for_crc)
|
||||
if calculated_crc != received_crc:
|
||||
log(f"错误: CRC失败。接收: {received_crc:04X}, 计算: {calculated_crc:04X}. 响应: {response_bytes.hex()}")
|
||||
return None
|
||||
|
||||
function_code = response_bytes[1]
|
||||
byte_count = response_bytes[2]
|
||||
data_bytes = response_bytes[3:3 + byte_count]
|
||||
|
||||
if function_code not in [0x03, 0x04]:
|
||||
log(f"警告: 功能码 {function_code:02X} 不符。")
|
||||
return None
|
||||
|
||||
if len(data_bytes) != byte_count:
|
||||
log(f"错误: 数据长 {len(data_bytes)} != {byte_count}")
|
||||
return None
|
||||
|
||||
# 动态解析
|
||||
if byte_count == 2:
|
||||
# 整数 (e.g., 温度)
|
||||
try:
|
||||
value = int.from_bytes(data_bytes, 'big') # 或 signed '>h'
|
||||
parsed_value = value
|
||||
log(f"成功解析整数: {parsed_value}")
|
||||
return parsed_value
|
||||
except Exception as e:
|
||||
log(f"整数解析失败: {e}")
|
||||
return None
|
||||
elif byte_count == 4:
|
||||
# 浮点
|
||||
try:
|
||||
parsed_value = struct.unpack('>f', data_bytes)[0]
|
||||
log(f"成功解析浮点: {parsed_value}")
|
||||
return parsed_value
|
||||
except Exception as e:
|
||||
log(f"浮点失败: {e}")
|
||||
return None
|
||||
else:
|
||||
log(f"警告: 未知字节数 {byte_count}")
|
||||
return None
|
||||
99
app/config/config.py
Normal file
99
app/config/config.py
Normal file
@@ -0,0 +1,99 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
项目全局配置文件
|
||||
|
||||
集中管理所有硬件引脚、通信参数和软件配置,
|
||||
便于统一修改和适配不同的硬件版本。
|
||||
"""
|
||||
|
||||
# --- LoRa 模块配置 ---
|
||||
# 假设LoRa模块使用独立的UART进行通信
|
||||
LORA_CONFIG = {
|
||||
# 平台LoRa地址
|
||||
'master_address': 0x01,
|
||||
|
||||
# LoRa模块连接的UART总线ID (0, 1, or 2 on ESP32)
|
||||
'uart_id': 2,
|
||||
|
||||
# LoRa模块的通信波特率
|
||||
'baudrate': 9600,
|
||||
|
||||
# LoRa模块连接的GPIO引脚
|
||||
'pins': {
|
||||
'tx': 5, # UART TX
|
||||
'rx': 4, # UART RX
|
||||
},
|
||||
|
||||
# LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包)
|
||||
# e.g.
|
||||
# EC: 接收端只会接收到消息, 不会接收到请求头
|
||||
# e.g. 发送: EC 05 02 01 48 65 6c 6c 6f
|
||||
# (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体))
|
||||
# 接收: 48 65 6c 6c 6f ("Hello")
|
||||
# ED: 接收端会接收完整数据包,包含自定义协议头和地址信息。
|
||||
# e.g. 发送: ED 05 12 34 01 00 01 02 03
|
||||
# (ED(帧头) + 05(Length, 即 1(总包数)+1(当前包序号)+3(数据块)) + 12 34(目标地址) + 01(总包数) + 00(当前包序号) + 01 02 03(数据块))
|
||||
# 接收: ED 05 12 34 01 00 01 02 03 56 78(56 78 是发送方地址,会自动拼接到消息末尾)
|
||||
'lora_mesh_mode': 'ED',
|
||||
|
||||
# 单包最大用户数据数据长度, 模块限制240, 去掉两位自定义包头, 还剩238
|
||||
'max_chunk_size': 238
|
||||
}
|
||||
|
||||
# --- 总线配置 ---
|
||||
# 使用字典来定义项目中的所有通信总线
|
||||
# key是总线ID (bus_id),value是该总线的详细配置字典。
|
||||
# 这种结构使得 command_processor 可以通过 bus_id 动态获取其配置。
|
||||
BUS_CONFIG = {
|
||||
# --- 总线 1 ---
|
||||
1: {
|
||||
# 总线协议类型,用于程序动态选择不同的处理逻辑
|
||||
'protocol': 'RS485',
|
||||
|
||||
# 该总线使用的硬件UART ID
|
||||
'uart_id': 1,
|
||||
|
||||
# 该总线的通信波特率
|
||||
'baudrate': 9600,
|
||||
|
||||
# 该总线使用的GPIO引脚
|
||||
'pins': {
|
||||
'tx': 16, # RS485 TX
|
||||
'rx': 17, # RS485 RX
|
||||
'rts': 15, # RS485 DE/RE 方向控制引脚
|
||||
}
|
||||
},
|
||||
|
||||
# 如果未来有第二条总线,或不同协议的总线,可以直接在这里添加
|
||||
# 2: {
|
||||
# 'protocol': 'RS485',
|
||||
# 'uart_id': 0,
|
||||
# 'baudrate': 19200, # 这条总线可以有不同的波特率
|
||||
# 'pins': {
|
||||
# 'tx': 25,
|
||||
# 'rx': 26,
|
||||
# 'rts': 27,
|
||||
# }
|
||||
# },
|
||||
}
|
||||
|
||||
# --- 全局超时设置 (毫秒) ---
|
||||
DEFAULT_TIMEOUTS = {
|
||||
'rs485_response': 500, # 等待RS485设备响应的默认超时时间
|
||||
'lora_at_command': 300, # 等待LoRa模块AT指令响应的超时时间
|
||||
}
|
||||
|
||||
# --- 系统参数配置 ---
|
||||
SYSTEM_PARAMS = {
|
||||
# 任务队列的最大长度。用于主线程和工作线程之间的缓冲。
|
||||
# 如果LoRa指令瞬间并发量大,可以适当调高此值。
|
||||
# 如果内存紧张,可以适当调低。
|
||||
'task_queue_max_size': 10,
|
||||
|
||||
# 全局调试日志开关
|
||||
# True: 所有 logger.log() 的信息都会被打印到串口。
|
||||
# False: logger.log() 将不执行任何操作,用于发布产品。
|
||||
'debug_enabled': True,
|
||||
}
|
||||
0
app/logs/__init__.py
Normal file
0
app/logs/__init__.py
Normal file
28
app/logs/logger.py
Normal file
28
app/logs/logger.py
Normal file
@@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
一个简单的、可配置的日志记录器模块。
|
||||
"""
|
||||
import _thread
|
||||
from app.config.config import *
|
||||
|
||||
# 创建一个锁,用于在多线程环境中同步对print的调用
|
||||
log_lock = _thread.allocate_lock()
|
||||
|
||||
|
||||
def log(message: str):
|
||||
"""
|
||||
打印一条日志消息,是否实际输出取决于配置文件。
|
||||
使用锁来确保多线程环境下的输出不会混乱。
|
||||
|
||||
Args:
|
||||
message (str): 要打印的日志消息。
|
||||
"""
|
||||
# 从配置文件中获取调试开关的状态
|
||||
# .get()方法可以安全地获取值,如果键不存在,则返回默认值False
|
||||
if SYSTEM_PARAMS.get('debug_enabled', False):
|
||||
with log_lock:
|
||||
print(message)
|
||||
|
||||
# 如果开关为False,此函数会立即返回,不执行任何操作。
|
||||
0
app/lora/__init__.py
Normal file
0
app/lora/__init__.py
Normal file
53
app/lora/lora_interface.py
Normal file
53
app/lora/lora_interface.py
Normal file
@@ -0,0 +1,53 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
LoRa通信模块的抽象接口定义 (契约)
|
||||
|
||||
这个文件定义了一个LoRa处理器应该具备哪些功能,
|
||||
但不包含任何具体的实现代码。任何具体的LoRa处理器,
|
||||
无论是UART的还是SPI的,都必须实现这里定义的所有方法。
|
||||
"""
|
||||
|
||||
# abc (Abstract Base Class) 是Python定义接口的标准方式
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
class ILoraManager(ABC):
|
||||
"""
|
||||
LoRa处理器接口。
|
||||
它规定了所有LoRa处理器实现类必须提供的功能。
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def receive_packet(self):
|
||||
"""
|
||||
【契约】非阻塞地检查并接收一个数据包。
|
||||
|
||||
一个符合本接口的实现必须:
|
||||
- 检查是否有新的数据包。
|
||||
- 如果有,读取、解析并返回负载数据。
|
||||
- 如果没有,必须立刻返回None,不得阻塞。
|
||||
|
||||
Returns:
|
||||
bytes: 如果成功接收到一个数据包,返回该数据包的字节。
|
||||
None: 如果当前没有可读的数据包。
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def send_packet(self, data_bytes: bytes) -> bool:
|
||||
"""
|
||||
【契约】发送一个数据包。
|
||||
|
||||
一个符合本接口的实现必须:
|
||||
- 接收一个bytes类型的参数。
|
||||
- 将这些数据通过LoRa模块发送出去。
|
||||
- 返回一个布尔值表示发送指令是否成功提交。
|
||||
|
||||
Args:
|
||||
data_bytes (bytes): 需要发送的字节数据。
|
||||
|
||||
Returns:
|
||||
bool: True表示发送指令已成功提交,False表示因任何原因失败。
|
||||
"""
|
||||
pass
|
||||
203
app/lora/lora_mesh_uart_passthrough_manager.py
Normal file
203
app/lora/lora_mesh_uart_passthrough_manager.py
Normal file
@@ -0,0 +1,203 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
LoRa模块的具体实现 (UART Passthrough for LoRa Mesh)
|
||||
|
||||
负责与LoRa模块进行底层通信,并向上层提供标准化的数据包收发接口。
|
||||
这个实现针对的是通过UART进行透传的LoRa Mesh模块。
|
||||
"""
|
||||
|
||||
from ..logs.logger import log
|
||||
from machine import UART
|
||||
import time
|
||||
|
||||
|
||||
class LoRaMeshUartPassthroughManager:
|
||||
"""
|
||||
通过UART与LoRa Mesh模块通信的处理器实现 (ED模式)。
|
||||
实现了自动分片与重组逻辑。
|
||||
"""
|
||||
|
||||
def __init__(self, lora_config: dict):
|
||||
"""
|
||||
初始化LoRa处理器。
|
||||
|
||||
Args:
|
||||
lora_config (dict): 来自全局配置文件的LoRa配置字典。
|
||||
"""
|
||||
log("LoRaMeshUartPassthroughManager: 初始化...")
|
||||
|
||||
# --- 配置注入 ---
|
||||
self.master_address = lora_config.get('master_address')
|
||||
self.uart_id = lora_config.get('uart_id')
|
||||
self.baudrate = lora_config.get('baudrate')
|
||||
self.pins = lora_config.get('pins')
|
||||
self.max_chunk_size = lora_config.get('max_chunk_size')
|
||||
self.lora_mesh_mode = b'\xed'
|
||||
# TODO 目前这个配置没用, 完全按ED处理的
|
||||
if lora_config.get('lora_mesh_mode') == 'EC':
|
||||
self.lora_mesh_mode = b'\xec'
|
||||
|
||||
# --- 硬件初始化 ---
|
||||
self.uart = UART(self.uart_id, self.baudrate, tx=self.pins['tx'], rx=self.pins['rx'])
|
||||
|
||||
# --- 内部状态变量 ---
|
||||
self._rx_buffer = bytearray() # UART接收缓冲区
|
||||
self._reassembly_cache = {} # 分片重组缓冲区 { chunk_index: chunk_data }
|
||||
self._expected_chunks = 0 # 当前会话期望的总分片数
|
||||
|
||||
log(f"LoRaMeshUartPassthroughManager: 配置加载完成. UART ID: {self.uart_id}, Baudrate: {self.baudrate}, 针脚: {self.pins}")
|
||||
|
||||
def send_packet(self, payload: bytes) -> bool:
|
||||
"""
|
||||
【实现】发送一个数据包,自动处理分片。
|
||||
|
||||
Args:
|
||||
payload (bytes): 需要发送的完整业务数据。
|
||||
|
||||
Returns:
|
||||
bool: True表示所有分片都已成功提交发送,False表示失败。
|
||||
"""
|
||||
max_chunk_size = self.max_chunk_size
|
||||
if not payload:
|
||||
total_chunks = 1
|
||||
else:
|
||||
total_chunks = (len(payload) + max_chunk_size - 1) // max_chunk_size
|
||||
|
||||
try:
|
||||
for i in range(total_chunks):
|
||||
chunk_index = i
|
||||
start = i * max_chunk_size
|
||||
end = start + max_chunk_size
|
||||
chunk_data = payload[start:end]
|
||||
|
||||
# --- 组装物理包 ---
|
||||
header = b'\xed'
|
||||
dest_addr_bytes = self.master_address.to_bytes(2, 'big')
|
||||
total_chunks_bytes = total_chunks.to_bytes(1, 'big')
|
||||
current_chunk_bytes = chunk_index.to_bytes(1, 'big')
|
||||
|
||||
# 计算后续长度(总包数和当前包序号是自定义包头, 各占一位, 标准包头算在长度内)
|
||||
length_val = 2 + len(chunk_data)
|
||||
length_bytes = length_val.to_bytes(1, 'big')
|
||||
|
||||
# 拼接成最终的数据包
|
||||
packet_to_send = header + length_bytes + dest_addr_bytes + total_chunks_bytes + current_chunk_bytes + chunk_data
|
||||
|
||||
self.uart.write(packet_to_send)
|
||||
log(f"LoRa: 发送分片 {chunk_index + 1}/{total_chunks} 到地址 {self.master_address}")
|
||||
|
||||
# 让出CPU, 模块将缓存区的数据发出去本身也需要时间
|
||||
time.sleep_ms(10)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
log(f"LoRa: 发送数据包失败: {e}")
|
||||
return False
|
||||
|
||||
def receive_packet(self) -> bytes | None:
|
||||
"""
|
||||
【实现】非阻塞地检查、解析并重组一个完整的数据包。
|
||||
"""
|
||||
# 1. 从硬件读取数据到缓冲区
|
||||
if self.uart.any():
|
||||
new_data = self.uart.read()
|
||||
if new_data:
|
||||
log(f"LoRa: UART收到原始数据 (长度 {len(new_data)}): {new_data.hex()}")
|
||||
self._rx_buffer.extend(new_data)
|
||||
|
||||
# 如果缓冲区为空,没有必要继续处理
|
||||
if not self._rx_buffer:
|
||||
return None
|
||||
|
||||
# 2. 只要缓冲区有数据就持续尝试从缓冲区解析包
|
||||
while len(self._rx_buffer) > 0:
|
||||
log(f"LoRa: --- 开始新一轮解析, 缓冲区 (长度 {len(self._rx_buffer)}): {self._rx_buffer.hex()} ---")
|
||||
|
||||
# 2.1 检查头部和长度字段是否存在
|
||||
if len(self._rx_buffer) < 2:
|
||||
log("LoRa: 缓冲区数据不足 (小于2字节),无法读取包头。等待更多数据...")
|
||||
return None # 数据不足,无法读取长度
|
||||
|
||||
# 2.2 检查帧头是否正确
|
||||
if self._rx_buffer[0] != 0xED:
|
||||
log(f"LoRa: 接收到错误帧头: {hex(self._rx_buffer[0])},正在寻找下一个ED...")
|
||||
next_ed = self._rx_buffer.find(b'\xed', 1)
|
||||
if next_ed == -1:
|
||||
log("LoRa: 缓冲区无有效帧头,已清空。")
|
||||
self._rx_buffer[:] = b''
|
||||
return None # 清空后没有数据了, 直接返回
|
||||
else:
|
||||
log(f"LoRa: 在位置 {next_ed} 找到下一个有效帧头,丢弃之前的数据。")
|
||||
self._rx_buffer = self._rx_buffer[next_ed:]
|
||||
continue # 继续循环,用新的缓冲区数据重新开始解析
|
||||
|
||||
# 2.3 检查包是否完整
|
||||
payload_len = self._rx_buffer[1]
|
||||
# 物理层在末尾又加了2字节的源地址,所以完整包长需要+2。
|
||||
total_packet_len = 1 + 1 + payload_len + 2
|
||||
log(f"LoRa: 帧头正确(ED)。声明的后续包长(payload_len): {payload_len}。计算出的总包长: {total_packet_len}。")
|
||||
|
||||
if len(self._rx_buffer) < total_packet_len:
|
||||
log(f"LoRa: '半包'情况,需要 {total_packet_len} 字节,但缓冲区只有 {len(self._rx_buffer)} 字节。等待更多数据...")
|
||||
return None # "半包"情况,等待更多数据
|
||||
|
||||
# 3. 提取和解析一个完整的物理包
|
||||
log(f"LoRa: 发现完整物理包 (长度 {total_packet_len}),正在提取...")
|
||||
packet = self._rx_buffer[:total_packet_len]
|
||||
self._rx_buffer = self._rx_buffer[total_packet_len:]
|
||||
log(f"LoRa: 提取的包: {packet.hex()}。剩余缓冲区 (长度 {len(self._rx_buffer)}): {self._rx_buffer.hex()}")
|
||||
|
||||
# --- 包结构解析 ---
|
||||
if len(packet) < 8:
|
||||
log(f"LoRa: 包长度 {len(packet)} 小于协议最小长度8, 判定为坏包,已丢弃。")
|
||||
continue
|
||||
|
||||
addr = int.from_bytes(packet[2:4], 'big')
|
||||
total_chunks = packet[4]
|
||||
current_chunk = packet[5]
|
||||
# 提取数据块
|
||||
chunk_data = packet[6:]
|
||||
source_addr = int.from_bytes(packet[-2:], 'big')
|
||||
log(f"LoRa: 解析包: 源地址={source_addr}, 目标地址={addr}, 总分片={total_chunks}, 当前分片={current_chunk}, 数据块长度={len(chunk_data)}")
|
||||
|
||||
# 4. 重组逻辑
|
||||
if total_chunks == 1:
|
||||
log(f"LoRa: 收到单包消息,来自地址 {source_addr},长度 {len(chunk_data)}")
|
||||
self._reassembly_cache.clear()
|
||||
self._expected_chunks = 0
|
||||
return chunk_data
|
||||
|
||||
# 对于多包消息,只有当收到第一个分片时才清空缓存并设置期望分片数
|
||||
if current_chunk == 0:
|
||||
log(f"LoRa: 开始接收新的多包会话 ({total_chunks}个分片) from {source_addr}...")
|
||||
self._reassembly_cache.clear()
|
||||
self._expected_chunks = total_chunks
|
||||
elif not self._reassembly_cache and self._expected_chunks == 0:
|
||||
# 如果不是第一个分片,但缓存是空的,说明错过了第一个分片,丢弃当前分片
|
||||
log(f"LoRa: 收到非首个分片 {current_chunk} from {source_addr},但未检测到会话开始,已丢弃。")
|
||||
continue
|
||||
|
||||
self._reassembly_cache[current_chunk] = chunk_data
|
||||
log(f"LoRa: 收到分片 {current_chunk + 1}/{self._expected_chunks} from {source_addr},已缓存 {len(self._reassembly_cache)} 个")
|
||||
|
||||
if len(self._reassembly_cache) == self._expected_chunks:
|
||||
log(f"LoRa: 所有分片已集齐 (from {source_addr}),正在重组...")
|
||||
full_payload = bytearray()
|
||||
for i in range(self._expected_chunks):
|
||||
if i not in self._reassembly_cache:
|
||||
log(f"LoRa: 重组失败!缺少分片 {i}。")
|
||||
self._reassembly_cache.clear()
|
||||
self._expected_chunks = 0
|
||||
return None
|
||||
full_payload.extend(self._reassembly_cache[i])
|
||||
|
||||
log(f"LoRa: 重组完成,总长度 {len(full_payload)}")
|
||||
self._reassembly_cache.clear()
|
||||
self._expected_chunks = 0
|
||||
return bytes(full_payload)
|
||||
|
||||
# while 循环结束,意味着缓冲区被处理完毕但没有返回一个完整的包
|
||||
return None
|
||||
117
app/processor.py
Normal file
117
app/processor.py
Normal file
@@ -0,0 +1,117 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
核心业务逻辑处理器 (V3 - 面向业务接口)
|
||||
|
||||
职责:
|
||||
- 编排业务流程:解码指令,并将业务任务分发给相应的管理器。
|
||||
- 完全不关心总线通信和数据解析的技术实现细节。
|
||||
"""
|
||||
from app.lora.lora_mesh_uart_passthrough_manager import LoRaMeshUartPassthroughManager
|
||||
from app.bus.rs485_manager import RS485Manager
|
||||
# 导入Protobuf解析代码
|
||||
from app.proto import client_pb
|
||||
|
||||
from app.logs.logger import log
|
||||
|
||||
|
||||
class Processor:
|
||||
"""
|
||||
命令处理器类,项目的“大脑”。
|
||||
它依赖于抽象的、面向业务的接口。
|
||||
"""
|
||||
|
||||
def __init__(self, lora_handler: LoRaMeshUartPassthroughManager, bus_manager: RS485Manager):
|
||||
"""
|
||||
构造函数 (依赖注入)。
|
||||
|
||||
Args:
|
||||
lora_handler (ILoraManager): 一个实现了LoRa接口的对象。
|
||||
bus_manager (IBusManager): 一个实现了总线接口的对象。
|
||||
"""
|
||||
self.lora = lora_handler
|
||||
self.bus = bus_manager
|
||||
log("业务处理器已初始化,准备就绪。")
|
||||
|
||||
def handle_packet(self, packet_bytes: bytes):
|
||||
"""
|
||||
处理单个LoRa数据包的入口函数。
|
||||
"""
|
||||
log(f"收到待处理数据包: {packet_bytes.hex()}")
|
||||
|
||||
try:
|
||||
instruction = client_pb.decode_instruction(packet_bytes)
|
||||
log(f"解析指令成功: {instruction}")
|
||||
except Exception as e:
|
||||
log(f"错误:解码指令失败: {e}")
|
||||
return
|
||||
|
||||
# 根据指令类型,分发到不同的业务处理方法
|
||||
if 'raw_485_command' in instruction:
|
||||
cmd = instruction['raw_485_command']
|
||||
if cmd:
|
||||
self._process_exec_command(cmd)
|
||||
else:
|
||||
log("警告:'raw_485_command' 指令内容为空。")
|
||||
|
||||
elif 'batch_collect_command' in instruction:
|
||||
cmd = instruction['batch_collect_command']
|
||||
if cmd:
|
||||
self._process_collect_command(cmd)
|
||||
else:
|
||||
log("警告:'batch_collect_command' 指令内容为空。")
|
||||
|
||||
else:
|
||||
log(f"警告:收到未知或不适用于此设备的指令类型: {instruction}")
|
||||
|
||||
def _process_exec_command(self, cmd: dict):
|
||||
"""
|
||||
处理“执行命令”业务。
|
||||
"""
|
||||
bus_id = cmd['bus_number']
|
||||
command_bytes = cmd['command_bytes']
|
||||
log(f"处理[执行命令]业务:向总线 {bus_id} 下发指令。")
|
||||
|
||||
# 直接调用总线接口的业务方法,不关心实现
|
||||
self.bus.execute_raw_command(bus_id, command_bytes)
|
||||
log("执行指令已下发。")
|
||||
|
||||
def _process_collect_command(self, cmd: dict):
|
||||
"""
|
||||
处理“采集命令”业务。
|
||||
"""
|
||||
correlation_id = cmd['correlation_id']
|
||||
tasks = cmd['tasks']
|
||||
log(f"处理[采集命令]业务 (ID: {correlation_id}):共 {len(tasks)} 个任务。")
|
||||
|
||||
sensor_values = []
|
||||
for i, task in enumerate(tasks):
|
||||
log(f" - 执行任务 {i + 1}...")
|
||||
|
||||
# 调用总线接口的业务方法,直接获取最终结果
|
||||
# 我们不再关心task的具体内容,也不关心解析过程
|
||||
value = self.bus.execute_collect_task(task)
|
||||
|
||||
if value is not None:
|
||||
sensor_values.append(value)
|
||||
log(f" => 成功,获取值为: {value}")
|
||||
else:
|
||||
# 如果返回None,表示任务失败(超时或解析错误)
|
||||
sensor_values.append(float('nan')) # 使用NaN表示无效值
|
||||
log(" => 失败,任务未返回有效值。")
|
||||
|
||||
# 所有任务执行完毕,构建并发送响应
|
||||
log(f"所有采集任务完成,准备发送响应。采集到的值: {sensor_values}")
|
||||
try:
|
||||
response_payload = {
|
||||
'correlation_id': correlation_id,
|
||||
'values': sensor_values
|
||||
}
|
||||
response_packet = client_pb.encode_instruction('collect_result', response_payload)
|
||||
|
||||
# 通过LoRa接口发送出去
|
||||
self.lora.send_packet(response_packet)
|
||||
log("采集结果已通过LoRa发送。")
|
||||
except Exception as e:
|
||||
log(f"错误:编码或发送采集结果失败: {e}")
|
||||
51
app/proto/client.proto
Normal file
51
app/proto/client.proto
Normal file
@@ -0,0 +1,51 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package device;
|
||||
|
||||
// import "google/protobuf/any.proto"; // REMOVED: Not suitable for embedded systems.
|
||||
|
||||
option go_package = "internal/domain/device/proto";
|
||||
|
||||
// --- Concrete Command & Data Structures ---
|
||||
|
||||
// 平台生成的原始485指令,单片机直接发送到总线
|
||||
message Raw485Command {
|
||||
int32 bus_number = 1; // 总线号,用于指示单片机将指令发送到哪个总线
|
||||
bytes command_bytes = 2; // 原始485指令的字节数组
|
||||
}
|
||||
|
||||
// BatchCollectCommand
|
||||
// 一个完整的、包含所有元数据的批量采集任务。
|
||||
message BatchCollectCommand {
|
||||
string correlation_id = 1; // 用于关联请求和响应的唯一ID
|
||||
repeated CollectTask tasks = 2; // 采集任务列表
|
||||
}
|
||||
|
||||
// CollectTask
|
||||
// 定义了单个采集任务的“意图”。
|
||||
message CollectTask {
|
||||
Raw485Command command = 1; // 平台生成的原始485指令
|
||||
}
|
||||
|
||||
// CollectResult
|
||||
// 这是设备响应的、极致精简的数据包。
|
||||
message CollectResult {
|
||||
string correlation_id = 1; // 从下行指令中原样返回的关联ID
|
||||
repeated float values = 2; // 按预定顺序排列的采集值
|
||||
}
|
||||
|
||||
|
||||
// --- Main Downlink Instruction Wrapper ---
|
||||
|
||||
// 指令 (所有从平台下发到设备的数据都应该被包装在这里面)
|
||||
// 使用 oneof 来替代 google.protobuf.Any,这是嵌入式环境下的标准做法。
|
||||
// 它高效、类型安全,且只解码一次。
|
||||
message Instruction {
|
||||
oneof payload {
|
||||
Raw485Command raw_485_command = 1;
|
||||
BatchCollectCommand batch_collect_command = 2;
|
||||
CollectResult collect_result = 3; // ADDED:用于上行数据
|
||||
// 如果未来有其他指令类型,比如开关控制,可以直接在这里添加
|
||||
// SwitchCommand switch_command = 3;
|
||||
}
|
||||
}
|
||||
305
app/proto/client_pb.py
Normal file
305
app/proto/client_pb.py
Normal file
@@ -0,0 +1,305 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
根据client.proto生成的解析代码 (V2 - 已修复解码逻辑)
|
||||
适用于ESP32 MicroPython环境
|
||||
"""
|
||||
|
||||
import struct
|
||||
|
||||
|
||||
# --- Protobuf基础类型辅助函数 ---
|
||||
|
||||
def encode_varint(value):
|
||||
buf = bytearray()
|
||||
while value >= 0x80:
|
||||
buf.append((value & 0x7F) | 0x80)
|
||||
value >>= 7
|
||||
buf.append(value & 0x7F)
|
||||
return buf
|
||||
|
||||
|
||||
def decode_varint(buf, pos=0):
|
||||
result = 0
|
||||
shift = 0
|
||||
while pos < len(buf):
|
||||
byte = buf[pos]
|
||||
pos += 1
|
||||
result |= (byte & 0x7F) << shift
|
||||
if not (byte & 0x80):
|
||||
break
|
||||
shift += 7
|
||||
return result, pos
|
||||
|
||||
|
||||
def encode_string(value):
|
||||
value_bytes = value.encode('utf-8')
|
||||
length = encode_varint(len(value_bytes))
|
||||
return length + value_bytes
|
||||
|
||||
|
||||
def decode_string(buf, pos=0):
|
||||
"""解码字符串 (已修复)"""
|
||||
length, pos_after_len = decode_varint(buf, pos)
|
||||
end_pos = pos_after_len + length
|
||||
value = buf[pos_after_len:end_pos].decode('utf-8')
|
||||
return value, end_pos
|
||||
|
||||
|
||||
# --- 消息编码/解码函数 ---
|
||||
|
||||
def encode_raw_485_command(bus_number, command_bytes):
|
||||
result = bytearray()
|
||||
result.extend(encode_varint((1 << 3) | 0))
|
||||
result.extend(encode_varint(bus_number))
|
||||
result.extend(encode_varint((2 << 3) | 2))
|
||||
result.extend(encode_varint(len(command_bytes)))
|
||||
result.extend(command_bytes)
|
||||
return result
|
||||
|
||||
|
||||
def decode_raw_485_command(buf):
|
||||
"""解码Raw485Command消息 (已修复)"""
|
||||
result = {}
|
||||
pos = 0
|
||||
while pos < len(buf):
|
||||
tag, pos = decode_varint(buf, pos)
|
||||
field_number = tag >> 3
|
||||
wire_type = tag & 0x07
|
||||
|
||||
if field_number == 1: # bus_number
|
||||
value, pos = decode_varint(buf, pos)
|
||||
result['bus_number'] = value
|
||||
elif field_number == 2: # command_bytes
|
||||
length, pos_after_len = decode_varint(buf, pos)
|
||||
end_pos = pos_after_len + length
|
||||
result['command_bytes'] = buf[pos_after_len:end_pos]
|
||||
pos = end_pos
|
||||
else:
|
||||
# 跳过未知字段
|
||||
if wire_type == 0:
|
||||
_, pos = decode_varint(buf, pos)
|
||||
elif wire_type == 2:
|
||||
length, pos_after_len = decode_varint(buf, pos)
|
||||
pos = pos_after_len + length
|
||||
elif wire_type == 5:
|
||||
pos += 4
|
||||
else:
|
||||
pos += 1
|
||||
return result
|
||||
|
||||
|
||||
def encode_collect_task(command_msg):
|
||||
result = bytearray()
|
||||
encoded_command = encode_raw_485_command(command_msg['bus_number'], command_msg['command_bytes'])
|
||||
result.extend(encode_varint((1 << 3) | 2))
|
||||
result.extend(encode_varint(len(encoded_command)))
|
||||
result.extend(encoded_command)
|
||||
return result
|
||||
|
||||
|
||||
def decode_collect_task(buf):
|
||||
"""解码CollectTask消息 (已修复)"""
|
||||
result = {}
|
||||
pos = 0
|
||||
while pos < len(buf):
|
||||
tag, pos = decode_varint(buf, pos)
|
||||
field_number = tag >> 3
|
||||
wire_type = tag & 0x07
|
||||
|
||||
if field_number == 1: # command
|
||||
length, pos_after_len = decode_varint(buf, pos)
|
||||
end_pos = pos_after_len + length
|
||||
value_buf = buf[pos_after_len:end_pos]
|
||||
result['command'] = decode_raw_485_command(value_buf)
|
||||
pos = end_pos
|
||||
else:
|
||||
if wire_type == 0:
|
||||
_, pos = decode_varint(buf, pos)
|
||||
elif wire_type == 2:
|
||||
length, pos_after_len = decode_varint(buf, pos)
|
||||
pos = pos_after_len + length
|
||||
elif wire_type == 5:
|
||||
pos += 4
|
||||
else:
|
||||
pos += 1
|
||||
return result
|
||||
|
||||
|
||||
def encode_batch_collect_command(correlation_id, tasks):
|
||||
result = bytearray()
|
||||
result.extend(encode_varint((1 << 3) | 2))
|
||||
result.extend(encode_string(correlation_id))
|
||||
for task in tasks:
|
||||
encoded_task = encode_collect_task(task['command'])
|
||||
result.extend(encode_varint((2 << 3) | 2))
|
||||
result.extend(encode_varint(len(encoded_task)))
|
||||
result.extend(encoded_task)
|
||||
return result
|
||||
|
||||
|
||||
def decode_batch_collect_command(buf):
|
||||
"""解码BatchCollectCommand消息 (已修复)"""
|
||||
result = {'tasks': []}
|
||||
pos = 0
|
||||
while pos < len(buf):
|
||||
tag, pos = decode_varint(buf, pos)
|
||||
field_number = tag >> 3
|
||||
wire_type = tag & 0x07
|
||||
|
||||
if field_number == 1: # correlation_id
|
||||
value, pos = decode_string(buf, pos)
|
||||
result['correlation_id'] = value
|
||||
elif field_number == 2: # tasks (repeated)
|
||||
length, pos_after_len = decode_varint(buf, pos)
|
||||
end_pos = pos_after_len + length
|
||||
value_buf = buf[pos_after_len:end_pos]
|
||||
result['tasks'].append(decode_collect_task(value_buf))
|
||||
pos = end_pos
|
||||
else:
|
||||
if wire_type == 0:
|
||||
_, pos = decode_varint(buf, pos)
|
||||
elif wire_type == 2:
|
||||
length, pos_after_len = decode_varint(buf, pos)
|
||||
pos = pos_after_len + length
|
||||
elif wire_type == 5:
|
||||
pos += 4
|
||||
else:
|
||||
pos += 1
|
||||
return result
|
||||
|
||||
|
||||
def encode_collect_result(correlation_id, values):
|
||||
result = bytearray()
|
||||
result.extend(encode_varint((1 << 3) | 2))
|
||||
result.extend(encode_string(correlation_id))
|
||||
for value in values:
|
||||
result.extend(encode_varint((2 << 3) | 5))
|
||||
result.extend(struct.pack('<f', value))
|
||||
return result
|
||||
|
||||
|
||||
def decode_collect_result(buf):
|
||||
"""解码CollectResult消息 (已修复)"""
|
||||
result = {'values': []}
|
||||
pos = 0
|
||||
while pos < len(buf):
|
||||
tag, pos = decode_varint(buf, pos)
|
||||
field_number = tag >> 3
|
||||
wire_type = tag & 0x07
|
||||
|
||||
if field_number == 1: # correlation_id
|
||||
value, pos = decode_string(buf, pos)
|
||||
result['correlation_id'] = value
|
||||
elif field_number == 2: # values (repeated)
|
||||
if wire_type == 5: # fixed32
|
||||
value = struct.unpack('<f', buf[pos:pos + 4])[0]
|
||||
pos += 4
|
||||
result['values'].append(value)
|
||||
else:
|
||||
if wire_type == 0:
|
||||
_, pos = decode_varint(buf, pos)
|
||||
elif wire_type == 2:
|
||||
length, pos_after_len = decode_varint(buf, pos)
|
||||
pos = pos_after_len + length
|
||||
elif wire_type == 5:
|
||||
pos += 4
|
||||
else:
|
||||
pos += 1
|
||||
return result
|
||||
|
||||
|
||||
def encode_instruction(payload_type, payload_data):
|
||||
result = bytearray()
|
||||
encoded_payload = bytearray()
|
||||
|
||||
if payload_type == 'raw_485_command':
|
||||
encoded_payload = encode_raw_485_command(payload_data['bus_number'], payload_data['command_bytes'])
|
||||
result.extend(encode_varint((1 << 3) | 2))
|
||||
elif payload_type == 'batch_collect_command':
|
||||
encoded_payload = encode_batch_collect_command(payload_data['correlation_id'], payload_data['tasks'])
|
||||
result.extend(encode_varint((2 << 3) | 2))
|
||||
elif payload_type == 'collect_result':
|
||||
encoded_payload = encode_collect_result(payload_data['correlation_id'], payload_data['values'])
|
||||
result.extend(encode_varint((3 << 3) | 2))
|
||||
else:
|
||||
raise ValueError("未知的指令负载类型")
|
||||
|
||||
result.extend(encode_varint(len(encoded_payload)))
|
||||
result.extend(encoded_payload)
|
||||
return result
|
||||
|
||||
|
||||
def decode_instruction(buf):
|
||||
"""解码Instruction消息 (已修复)"""
|
||||
result = {}
|
||||
pos = 0
|
||||
while pos < len(buf):
|
||||
tag, pos = decode_varint(buf, pos)
|
||||
field_number = tag >> 3
|
||||
wire_type = tag & 0x07
|
||||
|
||||
if wire_type == 2:
|
||||
length, pos_after_len = decode_varint(buf, pos)
|
||||
end_pos = pos_after_len + length
|
||||
value_buf = buf[pos_after_len:end_pos]
|
||||
|
||||
if field_number == 1:
|
||||
result['raw_485_command'] = decode_raw_485_command(value_buf)
|
||||
elif field_number == 2:
|
||||
result['batch_collect_command'] = decode_batch_collect_command(value_buf)
|
||||
elif field_number == 3:
|
||||
result['collect_result'] = decode_collect_result(value_buf)
|
||||
|
||||
pos = end_pos
|
||||
else:
|
||||
if wire_type == 0:
|
||||
_, pos = decode_varint(buf, pos)
|
||||
elif wire_type == 5:
|
||||
pos += 4
|
||||
else:
|
||||
pos += 1
|
||||
return result
|
||||
|
||||
|
||||
# --- 单元测试与使用范例 ---
|
||||
if __name__ == "__main__":
|
||||
print("--- 测试 Raw485Command ---")
|
||||
raw_cmd_data = {'bus_number': 1, 'command_bytes': b'\x01\x03\x00\x00\x00\x02\xc4\x0b'}
|
||||
encoded_raw_cmd = encode_raw_485_command(raw_cmd_data['bus_number'], raw_cmd_data['command_bytes'])
|
||||
decoded_raw_cmd = decode_raw_485_command(encoded_raw_cmd)
|
||||
assert decoded_raw_cmd == raw_cmd_data, f"Expected {raw_cmd_data}, got {decoded_raw_cmd}"
|
||||
|
||||
print("\n--- 测试 CollectTask ---")
|
||||
collect_task_data = {'command': raw_cmd_data}
|
||||
encoded_collect_task = encode_collect_task(collect_task_data['command'])
|
||||
decoded_collect_task = decode_collect_task(encoded_collect_task)
|
||||
assert decoded_collect_task == collect_task_data, f"Expected {collect_task_data}, got {decoded_collect_task}"
|
||||
|
||||
print("\n--- 测试 BatchCollectCommand ---")
|
||||
batch_collect_data = {
|
||||
'correlation_id': 'abc-123',
|
||||
'tasks': [
|
||||
{'command': {'bus_number': 1, 'command_bytes': b'\x01\x04\x00\x01\x00\x01\x60\x0a'}},
|
||||
{'command': {'bus_number': 2, 'command_bytes': b'\x02\x03\x00\x01\x00\x01\xd5\xfa'}}
|
||||
]
|
||||
}
|
||||
encoded_batch_collect = encode_batch_collect_command(batch_collect_data['correlation_id'], batch_collect_data['tasks'])
|
||||
decoded_batch_collect = decode_batch_collect_command(encoded_batch_collect)
|
||||
assert decoded_batch_collect == batch_collect_data, f"Expected {batch_collect_data}, got {decoded_batch_collect}"
|
||||
|
||||
print("\n--- 测试 CollectResult ---")
|
||||
collect_result_data = {'correlation_id': 'res-456', 'values': [12.34, 56.78]}
|
||||
encoded_collect_result = encode_collect_result(collect_result_data['correlation_id'], collect_result_data['values'])
|
||||
decoded_collect_result = decode_collect_result(encoded_collect_result)
|
||||
assert decoded_collect_result['correlation_id'] == collect_result_data['correlation_id']
|
||||
for i in range(len(collect_result_data['values'])):
|
||||
assert abs(decoded_collect_result['values'][i] - collect_result_data['values'][i]) < 1e-5
|
||||
|
||||
print("\n--- 测试 Instruction (内含BatchCollectCommand) ---")
|
||||
instruction_batch_collect = encode_instruction('batch_collect_command', batch_collect_data)
|
||||
decoded_instruction_batch_collect = decode_instruction(instruction_batch_collect)
|
||||
assert decoded_instruction_batch_collect == {'batch_collect_command': batch_collect_data}
|
||||
|
||||
print("\n所有测试均已通过!")
|
||||
81
app/uqueue.py
Normal file
81
app/uqueue.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
一个适用于MicroPython的、简单的线程安全队列实现。
|
||||
|
||||
这个模块提供了一个与标准库 `queue.Queue` 类似的类,
|
||||
确保在多线程环境下的数据操作是安全的。
|
||||
"""
|
||||
|
||||
import _thread
|
||||
from collections import deque
|
||||
|
||||
class Queue:
|
||||
"""一个简单的、线程安全的队列。"""
|
||||
|
||||
def __init__(self, maxsize=0):
|
||||
self.maxsize = maxsize
|
||||
self._lock = _thread.allocate_lock()
|
||||
# 使用deque可以实现更高效的头部弹出操作 (O(1))
|
||||
self._items = deque((), maxsize if maxsize > 0 else 1024)
|
||||
|
||||
def qsize(self):
|
||||
"""返回队列中的项目数。"""
|
||||
with self._lock:
|
||||
return len(self._items)
|
||||
|
||||
def empty(self):
|
||||
"""如果队列为空,返回True,否则返回False。"""
|
||||
return self.qsize() == 0
|
||||
|
||||
def full(self):
|
||||
"""如果队列已满,返回True,否则返回False。"""
|
||||
if self.maxsize <= 0:
|
||||
return False
|
||||
return self.qsize() >= self.maxsize
|
||||
|
||||
def put(self, item, block=True, timeout=None):
|
||||
"""将一个项目放入队列。"""
|
||||
if not block:
|
||||
return self.put_nowait(item)
|
||||
|
||||
# 阻塞式put的简单实现 (在实际应用中更复杂的实现会使用信号量)
|
||||
while True:
|
||||
with self._lock:
|
||||
if not self.full():
|
||||
self._items.append(item)
|
||||
return
|
||||
# 如果队列是满的,短暂休眠后重试
|
||||
import time
|
||||
time.sleep_ms(5)
|
||||
|
||||
def put_nowait(self, item):
|
||||
"""等同于 put(item, block=False)。"""
|
||||
if self.full():
|
||||
raise OSError("Queue full")
|
||||
|
||||
with self._lock:
|
||||
self._items.append(item)
|
||||
|
||||
def get(self, block=True, timeout=None):
|
||||
"""从队列中移除并返回一个项目。"""
|
||||
if not block:
|
||||
return self.get_nowait()
|
||||
|
||||
# 阻塞式get的简单实现
|
||||
while True:
|
||||
with self._lock:
|
||||
if self._items:
|
||||
return self._items.popleft()
|
||||
# 如果队列是空的,短暂休眠后重试
|
||||
import time
|
||||
time.sleep_ms(5)
|
||||
|
||||
def get_nowait(self):
|
||||
"""等同于 get(item, block=False)。"""
|
||||
if self.empty():
|
||||
raise OSError("Queue empty")
|
||||
|
||||
with self._lock:
|
||||
return self._items.popleft()
|
||||
44
app/worker.py
Normal file
44
app/worker.py
Normal file
@@ -0,0 +1,44 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
工作线程模块
|
||||
|
||||
职责:
|
||||
- 作为一个独立的线程运行。
|
||||
- 阻塞式地等待任务队列。
|
||||
- 从队列中取出任务,并交给Processor进行耗时处理。
|
||||
"""
|
||||
|
||||
from app.uqueue import Queue
|
||||
from app.processor import Processor
|
||||
from app.logs.logger import log
|
||||
|
||||
|
||||
def worker_task(task_queue: Queue, processor: Processor):
|
||||
"""
|
||||
工作线程的主函数。
|
||||
|
||||
Args:
|
||||
task_queue (uqueue.Queue): 共享的任务队列。
|
||||
processor (Processor): 业务处理器实例。
|
||||
"""
|
||||
log("工作线程已启动,等待任务...")
|
||||
|
||||
while True:
|
||||
try:
|
||||
# 1. 阻塞式地从队列中获取任务
|
||||
# 如果队列为空,程序会在这里自动挂起,不消耗CPU
|
||||
# get()方法是线程安全的
|
||||
packet_bytes = task_queue.get()
|
||||
|
||||
log(f"工作线程:收到新任务,开始处理... 数据: {packet_bytes.hex()}")
|
||||
|
||||
# 2. 调用processor进行耗时的、阻塞式的处理
|
||||
# 这个处理过程不会影响主线程的LoRa监听
|
||||
processor.handle_packet(packet_bytes)
|
||||
|
||||
log("工作线程:任务处理完毕,继续等待下一个任务。")
|
||||
|
||||
except Exception as e:
|
||||
log(f"错误:工作线程在处理任务时发生异常: {e}")
|
||||
85
main.py
85
main.py
@@ -2,5 +2,88 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
猪舍主控系统主程序入口
|
||||
程序主入口 (双线程生产者-消费者模型)
|
||||
|
||||
主线程 (生产者):
|
||||
- 职责:以最高优先级不间断监听LoRa数据,并将数据包放入任务队列。
|
||||
- 特点:永远不执行耗时操作,保证LoRa数据接收的实时性。
|
||||
|
||||
工作线程 (消费者):
|
||||
- 职责:从任务队列中取出数据包,并进行耗时的业务处理。
|
||||
- 特点:可能会长时间阻塞,但不影响主线程的数据接收。
|
||||
"""
|
||||
|
||||
import time
|
||||
import _thread
|
||||
from app.config import config
|
||||
from app.uqueue import Queue # 导入我们自己创建的本地uqueue模块
|
||||
|
||||
# 导入接口和实现
|
||||
from app.lora.lora_mesh_uart_passthrough_manager import LoRaMeshUartPassthroughManager
|
||||
from app.bus.rs485_manager import RS485Manager
|
||||
from app.processor import Processor
|
||||
|
||||
# 导入工作线程的执行函数
|
||||
from app.worker import worker_task
|
||||
from app.logs.logger import log
|
||||
|
||||
# --- 模块级变量定义 (带有类型提示) ---
|
||||
lora_manager: LoRaMeshUartPassthroughManager | None = None
|
||||
bus_manager: RS485Manager | None = None
|
||||
processor: Processor | None = None
|
||||
task_queue: Queue | None = None
|
||||
|
||||
|
||||
def setup():
|
||||
"""
|
||||
初始化函数,负责创建所有对象实例、共享队列,并启动工作线程。
|
||||
"""
|
||||
global lora_manager, bus_manager, processor, task_queue
|
||||
|
||||
log("--- 系统初始化开始 ---")
|
||||
|
||||
# 1. 初始化硬件驱动和业务处理器
|
||||
lora_manager = LoRaMeshUartPassthroughManager(config.LORA_CONFIG)
|
||||
bus_manager = RS485Manager(config.BUS_CONFIG, config.DEFAULT_TIMEOUTS)
|
||||
|
||||
processor = Processor(lora_handler=lora_manager, bus_manager=bus_manager)
|
||||
|
||||
# 2. 从配置文件读取队列长度,并创建线程安全的队列
|
||||
queue_size = config.SYSTEM_PARAMS.get('task_queue_max_size', 10)
|
||||
task_queue = Queue(maxsize=queue_size)
|
||||
log(f"任务队列已创建,最大容量: {queue_size}")
|
||||
|
||||
# 3. 启动工作线程
|
||||
_thread.start_new_thread(worker_task, (task_queue, processor))
|
||||
|
||||
log("--- 系统初始化完成 ---")
|
||||
|
||||
|
||||
def loop():
|
||||
"""
|
||||
主线程循环函数 (生产者)。
|
||||
只负责监听LoRa,并将数据放入队列。
|
||||
"""
|
||||
packet = lora_manager.receive_packet()
|
||||
if packet:
|
||||
log(f"主线程:收到新LoRa数据包: {packet.hex()}")
|
||||
if task_queue.full():
|
||||
log("警告:任务队列已满,新的LoRa数据包被丢弃!")
|
||||
return
|
||||
|
||||
try:
|
||||
task_queue.put_nowait(packet)
|
||||
log(f"主线程:新LoRa数据包已入队。当前队列大小: {task_queue.qsize()}")
|
||||
except Exception as e:
|
||||
log(f"错误:数据包入队失败: {e}")
|
||||
|
||||
time.sleep_ms(10)
|
||||
|
||||
|
||||
# --- 程序主执行区 ---
|
||||
if __name__ == "__main__":
|
||||
setup()
|
||||
|
||||
log("--- 主线程进入循环 (LoRa监听) ---")
|
||||
while True:
|
||||
loop()
|
||||
|
||||
Reference in New Issue
Block a user