Compare commits

...

17 Commits

Author SHA1 Message Date
e3167a0144 优化代码 2025-10-17 15:36:54 +08:00
452aaaeaba 修bug 2025-10-17 10:32:52 +08:00
c0a1925150 修bug 2025-10-13 16:20:50 +08:00
6d50e0a810 md 2025-10-13 16:12:23 +08:00
b69805063d 增加日志 2025-10-13 16:11:55 +08:00
c025530c25 调整以适应esp32 2025-10-13 14:36:49 +08:00
902c90cf19 调整以适应esp32 2025-10-13 14:35:20 +08:00
961b0c170b 更新lora逻辑 2025-10-10 15:00:01 +08:00
c3d4531870 更新协议 2025-10-10 14:36:26 +08:00
5f81314540 更新 README.md 2025-10-10 14:28:43 +08:00
d5de3a7a2b 更新 main/config/config.py 2025-10-09 22:53:15 +08:00
97a9c778e8 实现 lora 2025-10-09 20:10:23 +08:00
dfb50f5c74 readme 2025-10-09 17:39:04 +08:00
7bc7a95379 定义 LoRaMeshUartPassthroughManager 2025-10-09 17:01:52 +08:00
f30d0e0865 更新proto 2025-10-09 14:00:38 +08:00
46922e8505 实现 RS485Manager 2025-10-08 19:36:48 +08:00
fe1c307129 采集失败传NaN 2025-10-08 17:50:49 +08:00
17 changed files with 932 additions and 623 deletions

216
README.md
View File

@@ -1,194 +1,44 @@
# 猪舍主控
# pig-house-controller
## 简介
## LoRa通信协议约定
猪舍主控系统根据上位机的指令控制当前猪舍内所有设备(传感器,阀门,电机等),并汇聚当前猪舍传感器数据统一上报。本系统作为猪场智能化管理的重要组成部分,实现了猪舍环境的自动化监控与调节
本项目中的LoRa通信采用自定义的帧格式以支持精确寻址和大数据包的自动分片与重组。所有数据包均由主控节点主动发起
## 功能概述
### 1. 物理帧结构
### 与上位机交互
1. 根据上位机指令定期采集栏内所有传感器的数据,并统一上报
2. 根据上位机指令启动或关闭栏内设备,上位机发送的启动指令分两种:
- 常开指令:收到后启动设备,直到收到关闭指令后关闭设备
- 短暂开启指令:收到后启动设备,但需要上位机每过两秒发送一次指令,超过五秒没收到下一个开启指令或受到关闭指令将会关闭设备
3. 定期检查栏内设备状态,发现异常立即上报上位机
4. 定期向上位机发送心跳包
5. 接收上位机发送的总线上各机器的位置和类型
6. 根据上位机指令调整设备功率大小
7. 接收上位机批量控制指令并执行
8. 接收上位机发送的配置信息
每个通过LoRa UART模块发送的物理数据包都遵循以下结构
### 与设备交互
1. 控制栏内设备启停
2. 调整风机等功率可调设备的功率
3. 定时检查栏内设备状态
4. 定时采集栏内数据
| 字段 | 长度 (字节) | 值 (Hex) | 描述 |
|:-------------------------| :------------ | :---------------- |:------------------------------|
| **帧头 (Header)** | 1 | `0xED` | 固定值,表示一个数据包的开始。 |
| **数据长度 (Length)** | 1 | `0x00`-`0xFF` | 从`总包数`字段到`数据块`末尾的总字节数,不包含源地址。 |
| **目标地址 (DestAddr)** | 2 | `0x0000`-`0xFFFF` | 接收该数据包的设备地址。 |
| **总包数 (TotalChunks)** | 1 | `0x01`-`0xFF` | 表示当前消息被分成了几个包。`0x01`代表这是唯一的包。 |
| **当前包序号 (CurrentChunk)** | 1 | `0x00`-`0xFE` | 当前是第几个数据包从0开始计数。 |
| **数据块 (ChunkData)** | N | - | 实际传输的数据片段。 |
| **源地址 (SourceAddr)** | 2 | `0x0000`-`0xFFFF` | 发送该数据包的设备地址,由硬件自动拼接。 |
### 数据管理
1. 保存总线上各机器的位置和类型
2. 临时保存上位机发送的指令
3. 保存上位机发送的配置信息
4. 汇总栏内所有传感器数据
5. 临时保存栏内设备故障信息,直到上报成功后清除
6. 根据批量指令控制对应设备工作
**示例:**
# 猪舍控制器
发送一个数据为 `[0x01, 0x02, 0x03]` 的单包消息到地址 `0x1234`,发送方地址为 `0x5678`
`ED 05 12 34 01 00 01 02 03 56 78`
- `ED`: 帧头
- `05`: 后续长度 (1+1+3 = 5)
- `12 34`: 目标地址
- `01`: 总包数 (共1包)
- `00`: 当前包序号 (第0包)
- `01 02 03`: 数据块
- `56 78`: 源地址
猪舍控制器是一个用于监控和控制猪舍环境的系统。它可以通过LoRa与上位机通信并通过RS485总线控制传感器和执行器设备。
### 2. 数据分片 (Fragmentation)
## 协议栈和技术选型
- LoRa模块的物理层限制单次发送的数据部分**最大为240字节**。
- 根据项目约定为自定义协议头总包数、当前包序号预留2字节地址由模块处理。
- 因此,每个物理包中 **`数据块 (ChunkData)` 的最大长度为 `238` 字节**。
- `send_packet` 方法会自动处理分片逻辑。
本系统采用以下物联网标准协议栈:
### 3. 数据重组 (Reassembly)
### 物理层
- **LoRa**:低功耗广域网物理层技术,提供远距离无线传输能力
### 数据链路层和网络层
- **LoRaWAN**基于LoRa物理层的广域网协议提供设备认证、加密和网络管理
### 传输层
- **CoAP**受限应用协议轻量级的RESTful协议适用于资源受限设备
### 应用层
- **LwM2M**:轻量级机器到机器协议,提供设备管理、固件更新等功能
### 数据格式
- **SenML**:传感器标记语言,标准化的传感器数据表示格式
这种协议栈选择具有以下优势:
1. **低功耗**:适合电池供电或节能要求高的场景
2. **远距离传输**LoRa技术可实现数公里覆盖
3. **标准化**:采用业界标准协议,便于系统集成和扩展
4. **安全性**LoRaWAN和CoAP均提供安全机制
5. **互操作性**:基于标准协议,便于与不同厂商设备集成
## 系统架构
```
猪舍控制器
├── 通信层 (LoRa)
├── 控制层 (核心逻辑)
├── 设备层 (传感器和执行器)
└── 存储层 (数据存储)
```
## 抽象接口设计
为了提高系统的可扩展性和可维护性,我们定义了以下抽象接口:
### 通信接口 (BaseComm)
定义了通信模块的基本操作,包括连接、断开连接、发送和接收数据等方法。
### 设备接口 (BaseDevice)
定义了设备的基本操作,包括连接、断开连接、读取数据、写入数据和获取状态等方法。
### 存储接口 (BaseStorage)
定义了存储模块的基本操作,包括保存、加载、删除数据等方法。
### 命令处理器接口 (BaseHandler)
定义了命令处理的基本操作,包括处理命令、注册和注销命令处理函数等方法。
## 设计理念
按照功能区分将传感器和执行器分别连接到不同的RS485总线上可以带来以下优势
1. **减少总线负载**:传感器通常需要频繁读取数据,而执行器可能需要较大的电流,分离可以避免相互干扰。
2. **提高响应速度**:控制命令可以直接发送到执行器总线,无需等待传感器数据采集完成。
3. **增强系统稳定性**:一条总线故障不会影响另一条总线上的设备。
4. **便于维护**:可以根据需要单独重启或维护某一总线。
## 配置说明
系统配置文件为 `config.json`,如果不存在,系统会根据默认配置创建。配置项包括:
### LoRa通信配置
- `lora.address`: 本机LoRa地址
- `lora.frequency`: 工作频率(MHz)
- `lora.bandwidth`: 带宽(kHz)
- `lora.spreading_factor`: 扩频因子
- `lora.coding_rate`: 编码率
- `lora.encryption_key`: 加密密钥
### 上位机配置
- `master.lora_address`: 上位机LoRa地址
- `master.protocol`: 与上位机通信协议
### 总线配置
- `bus.sensor.port`: 传感器总线串口
- `bus.sensor.baudrate`: 传感器总线波特率
- `bus.actuator.port`: 执行器总线串口
- `bus.actuator.baudrate`: 执行器总线波特率
### 日志配置
- `log.level`: 日志级别 (DEBUG, INFO, WARNING, ERROR)
- `log.file_path`: 日志文件路径
- `log.max_size`: 日志文件最大大小
- `log.backup_count`: 保留的日志文件数量
- `log.report_errors`: 是否上报错误信息
- `log.terminate_on_report_failure`: 错误上报失败时是否终止程序
### 系统参数
- `system.heartbeat_interval`: 心跳包发送间隔(秒)
- `system.data_collection_interval`: 数据采集间隔(秒)
- `system.command_timeout`: 命令超时时间(秒)
- `system.retry_count`: 命令重试次数
- `system.error_handling`: 错误处理策略
### 设备配置
- `devices`: 设备列表(包括传感器和执行器)
每个设备包含以下属性:
- `id`: 设备唯一标识
- `type`: 设备类型
- `address`: 设备地址
- `bus`: 所在总线(sensor/actuator)
- `location`: 设备位置(可选)
- `unit`: 单位(仅传感器需要,如温度单位、湿度单位等)
参考示例配置文件 `config.json.example` 创建您的配置文件。
## 枚举类型定义
为了提高代码的可读性和维护性,系统定义了以下枚举类型:
1. `LogLevel`: 日志等级枚举 (DEBUG, INFO, WARNING, ERROR, CRITICAL)
2. `DeviceType`: 设备类型枚举 (包括温度、湿度等传感器类型和喂料口、阀门等执行器类型)
3. `BusType`: 总线类型枚举 (SENSOR, ACTUATOR)
4. `ErrorHandlingStrategy`: 错误处理策略枚举 (RETRY, SKIP, ALERT)
## 日志和错误处理机制
考虑到树莓派等嵌入式设备的存储空间限制,系统采用以下策略:
1. **限制日志文件大小**默认将日志文件大小限制为1MB仅保留一个备份文件
2. **错误上报机制**当发生错误时系统会尝试通过LoRa将错误信息上报给上位机
3. **上报成功处理**:错误信息上报成功后,系统会删除本地日志中的该条目
4. **上报失败处理**:如果错误信息上报失败,说明与上位机之间的通信不稳定,系统将根据配置决定是否终止程序运行
这种机制既节省了本地存储空间,又能确保关键错误信息能够及时传递给上位机。
## 开发顺序建议
当然,从对其他模块依赖最小的模块开始开发是一个明智的策略,以便逐步构建项目的基础。以下是建议的开发顺序:
1. **配置模块config.py**:首先实现配置模块,以定义应用程序所需的基本配置。这将为其他模块提供必要的设置。
2. **实用程序模块utils/**:开发实用程序函数,这些函数可以在整个项目中被重复使用。这样可以为其他模块提供基本的辅助功能。
3. **数据存储模块storage/**:实现数据存储逻辑,包括存储传感器数据、设备状态和配置信息。这一模块可以在独立于其他模块的情况下开发。
4. **设备交互模块devices/**:实现与设备交互的模块,包括传感器和执行器。这将为之后的通信和核心逻辑提供基础。
5. **通信模块comms/**:开发通信模块,以处理与上位机和设备的通信协议。此模块可能需要依赖设备交互模块。
6. **核心逻辑模块core/**:实现核心逻辑,包括处理命令、控制设备和管理传感器数据。这一模块将利用之前开发的模块。
7. **测试模块tests/**:在开发过程中,逐步添加测试用例以验证每个模块的功能。
8. **主程序main.py**:最后实现主程序,作为应用程序的入口点,将所有模块整合在一起。
这种顺序将帮助你逐步构建项目,并确保每个模块在开发过程中得到充分的测试和验证。
## 许可证
禁止未经授权使用本项目代码,否则后果自负。
- `receive_packet` 方法会缓存收到的分片。
- 当一个设备的所有分片都接收完毕后,`receive_packet` 会将它们自动重组成一个完整的消息,并向上层返回。
- 由于通信是单向的(仅主控发送),接收端无需管理多个源地址的重组缓冲区。

300
app/bus/rs485_manager.py Normal file
View File

@@ -0,0 +1,300 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
RS485 总线管理器实现
此模块实现了 IBusManager 接口,用于管理 RS485 总线通信。
"""
from ..logs.logger import log
# 导入 MicroPython 的 UART 和 Pin 库
from machine import UART, Pin
import time # 用于添加延时确保RS485方向切换
import _thread # 用于线程同步
import struct # 用于浮点数转换
class RS485Manager:
"""
RS485 总线管理器。
负责 RS485 设备的指令发送、响应接收和数据解析。
"""
def __init__(self, bus_config, default_timeouts):
"""
构造函数,注入配置。
根据传入的配置初始化 RS485 总线对应的 UART 管理器。
Args:
bus_config: 包含所有总线配置的字典。
键是总线ID值是该总线的详细配置。
default_timeouts: 包含各种默认超时设置的字典。
"""
self.bus_config = bus_config
self.default_timeouts = default_timeouts
# 存储以总线号为key的UART管理器实例、RTS引脚和锁
self.bus_ports = {}
log("RS485Manager 已使用配置初始化。")
log(f"总线配置: {self.bus_config}")
log(f"默认超时设置: {self.default_timeouts}")
# 遍历 bus_config初始化 RS485 端口
for bus_id, config in bus_config.items():
if config.get('protocol') == 'RS485':
try:
uart_id = config['uart_id']
baudrate = config['baudrate']
pins = config['pins']
tx_pin_num = pins['tx']
rx_pin_num = pins['rx']
rts_pin_num = pins['rts'] # RS485 的 DE/RE 方向控制引脚
# 初始化 Pin 对象
rts_pin = Pin(rts_pin_num, Pin.OUT) # RTS 引脚设置为输出模式
rts_pin.value(0) # 默认设置为接收模式
# 初始化 UART 对象
# 注意MicroPython 的 UART 构造函数可能不支持直接传入 Pin 对象,而是 Pin 编号
# 并且 rts 参数通常用于硬件流控制RS485 的 DE/RE 需要手动控制
uart = UART(uart_id, baudrate=baudrate, tx=tx_pin_num, rx=rx_pin_num,
timeout=self.default_timeouts.get('rs485_response', 500))
self.bus_ports[bus_id] = {
'uart': uart,
'rts_pin': rts_pin,
'lock': _thread.allocate_lock()
}
log(f"总线 {bus_id} (RS485) 的 UART 管理器初始化成功。UART ID: {uart_id}, 波特率: {baudrate}, TX: {tx_pin_num}, RX: {rx_pin_num}, RTS(DE/RE): {rts_pin_num}")
except KeyError as e:
log(f"错误: 总线 {bus_id} 的 RS485 配置缺少关键参数: {e}")
except Exception as e:
log(f"错误: 初始化总线 {bus_id} 的 RS485 管理器失败: {e}")
else:
log(f"总线 {bus_id} 的协议不是 RS485跳过初始化。")
@staticmethod
def _calculate_crc16_modbus(data):
"""
计算 Modbus RTU 的 CRC16 校验码。
"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc
def execute_raw_command(self, bus_id, command):
"""
【契约】执行一个“发后不理”的原始指令。
Args:
bus_id (int): 目标总线的编号。
command (bytes): 要发送的原始命令字节。
"""
if bus_id not in self.bus_ports:
log(f"错误: 未找到总线 {bus_id} 的 RS485 配置。")
return
port_info = self.bus_ports[bus_id]
uart = port_info['uart']
rts_pin = port_info['rts_pin']
lock = port_info['lock']
with lock:
try:
rts_pin.value(1) # 设置为发送模式 (DE/RE = HIGH)
time.sleep_us(100) # 短暂延时,确保方向切换完成
uart.write(command)
# 等待所有数据发送完毕
uart.flush()
time.sleep_us(100) # 短暂延时,确保数据完全发出
rts_pin.value(0) # 切换回接收模式 (DE/RE = LOW)
log(f"总线 {bus_id} 原始命令发送成功: {command.hex()}")
except Exception as e:
log(f"错误: 在总线 {bus_id} 上执行原始命令失败: {e}")
def execute_collect_task(self, task):
"""
【契约】执行一个完整的采集任务,并直接返回最终的数值。
一个符合本接口的实现必须自己处理所有细节:
- 从task字典中解析出 bus_id, command, parser_type。
- 发送指令。
- 接收响应。
- 根据parser_type选择正确的内部解析器进行解析。
- 返回最终的float数值或在任何失败情况下返回None。
Args:
task: 从Protobuf解码出的单个CollectTask消息字典。
期望结构: {"command": {"bus_number": int, "command_bytes": bytes}}
Returns:
成功解析则返回数值否则返回None。
"""
# I. 任务参数解析与初步验证
try:
command_info = task.get("command")
if not command_info:
log("错误: CollectTask 缺少 'command' 字段。")
return None
bus_id = command_info.get("bus_number")
command_bytes = command_info.get("command_bytes")
# 增加对命令有效性的检查
if bus_id is None or not command_bytes or len(command_bytes) < 2:
log(f"错误: CollectTask 的 'command' 字段无效。bus_id: {bus_id}, command_bytes: {command_bytes}")
return None
except Exception as e:
log(f"错误: 解析CollectTask失败: {e}. 任务: {task}")
return None
if bus_id not in self.bus_ports:
log(f"错误: 未找到总线 {bus_id} 的 RS485 配置。")
return None
port_info = self.bus_ports[bus_id]
uart = port_info['uart']
rts_pin = port_info['rts_pin']
lock = port_info['lock']
response_bytes = None # 在锁外部初始化,确保其作用域
response_buffer = bytearray()
with lock:
try:
# II. 线程安全与指令发送
rts_pin.value(1)
time.sleep_us(100)
uart.write(command_bytes)
uart.flush()
time.sleep_us(100)
rts_pin.value(0)
log(f"总线 {bus_id} 原始命令发送成功: {command_bytes.hex()}")
# III. 接收响应
start_time = time.ticks_ms()
response_timeout = self.default_timeouts.get('rs485_response', 500)
while time.ticks_diff(time.ticks_ms(), start_time) < response_timeout:
if uart.any():
chunk = uart.read(32)
if chunk:
response_buffer.extend(chunk)
start_time = time.ticks_ms() # 收到数据就重置超时
time.sleep_ms(5)
if response_buffer:
# 动态地从请求命令中获取预期的从站ID和功能码
expected_slave_id = command_bytes[0]
expected_func_code = command_bytes[1]
found_frame = self._find_modbus_frame(response_buffer, expected_slave_id, expected_func_code)
if found_frame:
log(f"总线 {bus_id} 收到有效响应: {found_frame.hex()}")
response_bytes = found_frame # 将找到的帧赋值给外部变量
else:
log(f"警告: 总线 {bus_id} 响应中无有效帧。收到响应: {response_buffer.hex()}")
else:
log(f"警告: 总线 {bus_id} 未收到响应。")
except Exception as e:
log(f"错误: 在总线 {bus_id} 上执行采集命令失败: {e}")
# IV. 统一处理和解析
# 无论是因为超时、未找到有效帧还是发生异常,只要 response_bytes 仍为 None就任务失败
if response_bytes is None:
return None
# 使用找到的有效帧进行解析
parsed_value = RS485Manager._parse_modbus_rtu_default(response_bytes)
return parsed_value
def _find_modbus_frame(self, buffer: bytearray, expected_slave: int, func_code: int) -> bytes | None:
"""
修复版加调试优先头检查CRC 字节序标准 Modbus (低字节在前)。
"""
log(f"搜索帧: buffer 长度 {len(buffer)}, hex {buffer.hex()}")
i = 0
while i < len(buffer) - 6: # 最小 7 字节,-6 安全
if buffer[i] == expected_slave and buffer[i + 1] == func_code:
byte_count = buffer[i + 2]
frame_len = 3 + byte_count + 2
if len(buffer) - i >= frame_len:
frame = bytes(buffer[i:i + frame_len])
# CRC 预校验(标准 ModbusCRC 低字节在前)
core = frame[:-2]
calc_crc = self._calculate_crc16_modbus(core)
low_crc = frame[-2]
high_crc = frame[-1]
recv_crc = (high_crc << 8) | low_crc # 高<<8 | 低
log(f"候选帧 at {i}: {frame.hex()}, calc CRC {calc_crc:04X}, recv {recv_crc:04X}")
if calc_crc == recv_crc:
log(f"找到有效帧: {frame.hex()}")
return frame
else:
log(f"CRC 不匹配,跳过 (calc {calc_crc:04X} != recv {recv_crc:04X})")
i += 1
log("无有效帧")
return None
@staticmethod
def _parse_modbus_rtu_default(response_bytes): # 改名,支持整数/浮点
"""
修复版动态数据长CRC 只用核心。
"""
if not response_bytes or len(response_bytes) < 7:
log(f"警告: 响应过短。响应: {response_bytes.hex() if response_bytes else 'None'}")
return None
# CRC 校验(只核心)
data_for_crc = response_bytes[:-2]
received_crc = (response_bytes[-1] << 8) | response_bytes[-2]
calculated_crc = RS485Manager._calculate_crc16_modbus(data_for_crc)
if calculated_crc != received_crc:
log(f"错误: CRC失败。接收: {received_crc:04X}, 计算: {calculated_crc:04X}. 响应: {response_bytes.hex()}")
return None
function_code = response_bytes[1]
byte_count = response_bytes[2]
data_bytes = response_bytes[3:3 + byte_count]
if function_code not in [0x03, 0x04]:
log(f"警告: 功能码 {function_code:02X} 不符。")
return None
if len(data_bytes) != byte_count:
log(f"错误: 数据长 {len(data_bytes)} != {byte_count}")
return None
# 动态解析
if byte_count == 2:
# 整数 (e.g., 温度)
try:
value = int.from_bytes(data_bytes, 'big') # 或 signed '>h'
parsed_value = value
log(f"成功解析整数: {parsed_value}")
return parsed_value
except Exception as e:
log(f"整数解析失败: {e}")
return None
elif byte_count == 4:
# 浮点
try:
parsed_value = struct.unpack('>f', data_bytes)[0]
log(f"成功解析浮点: {parsed_value}")
return parsed_value
except Exception as e:
log(f"浮点失败: {e}")
return None
else:
log(f"警告: 未知字节数 {byte_count}")
return None

View File

@@ -11,6 +11,9 @@
# --- LoRa 模块配置 ---
# 假设LoRa模块使用独立的UART进行通信
LORA_CONFIG = {
# 平台LoRa地址
'master_address': 0x01,
# LoRa模块连接的UART总线ID (0, 1, or 2 on ESP32)
'uart_id': 2,
@@ -19,9 +22,24 @@ LORA_CONFIG = {
# LoRa模块连接的GPIO引脚
'pins': {
'tx': 17, # UART TX
'rx': 16, # UART RX
}
'tx': 5, # UART TX
'rx': 4, # UART RX
},
# LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包)
# e.g.
# EC: 接收端只会接收到消息, 不会接收到请求头
# e.g. 发送: EC 05 02 01 48 65 6c 6c 6f
# (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体))
# 接收: 48 65 6c 6c 6f ("Hello")
# ED: 接收端会接收完整数据包,包含自定义协议头和地址信息。
# e.g. 发送: ED 05 12 34 01 00 01 02 03
# (ED(帧头) + 05(Length, 即 1(总包数)+1(当前包序号)+3(数据块)) + 12 34(目标地址) + 01(总包数) + 00(当前包序号) + 01 02 03(数据块))
# 接收: ED 05 12 34 01 00 01 02 03 56 78(56 78 是发送方地址,会自动拼接到消息末尾)
'lora_mesh_mode': 'ED',
# 单包最大用户数据数据长度, 模块限制240, 去掉两位自定义包头, 还剩238
'max_chunk_size': 238
}
# --- 总线配置 ---
@@ -42,23 +60,23 @@ BUS_CONFIG = {
# 该总线使用的GPIO引脚
'pins': {
'tx': 4, # RS485 TX
'rx': 5, # RS485 RX
'rts': 2, # RS485 DE/RE 方向控制引脚
'tx': 16, # RS485 TX
'rx': 17, # RS485 RX
'rts': 15, # RS485 DE/RE 方向控制引脚
}
},
# 如果未来有第二条总线,或不同协议的总线,可以直接在这里添加
2: {
'protocol': 'RS485',
'uart_id': 0,
'baudrate': 19200, # 这条总线可以有不同的波特率
'pins': {
'tx': 25,
'rx': 26,
'rts': 27,
}
},
# 2: {
# 'protocol': 'RS485',
# 'uart_id': 0,
# 'baudrate': 19200, # 这条总线可以有不同的波特率
# 'pins': {
# 'tx': 25,
# 'rx': 26,
# 'rts': 27,
# }
# },
}
# --- 全局超时设置 (毫秒) ---

View File

@@ -4,19 +4,25 @@
"""
一个简单的可配置的日志记录器模块
"""
import _thread
from app.config.config import *
# 创建一个锁用于在多线程环境中同步对print的调用
log_lock = _thread.allocate_lock()
import config
def log(message: str):
"""
打印一条日志消息是否实际输出取决于配置文件
使用锁来确保多线程环境下的输出不会混乱
Args:
message (str): 要打印的日志消息
"""
# 从配置文件中获取调试开关的状态
# .get()方法可以安全地获取值如果键不存在则返回默认值False
if config.SYSTEM_PARAMS.get('debug_enabled', False):
print(message)
if SYSTEM_PARAMS.get('debug_enabled', False):
with log_lock:
print(message)
# 如果开关为False此函数会立即返回不执行任何操作。

0
app/lora/__init__.py Normal file
View File

View File

@@ -12,7 +12,7 @@ LoRa通信模块的抽象接口定义 (契约)
# abc (Abstract Base Class) 是Python定义接口的标准方式
from abc import ABC, abstractmethod
class ILoraHandler(ABC):
class ILoraManager(ABC):
"""
LoRa处理器接口
它规定了所有LoRa处理器实现类必须提供的功能

View File

@@ -0,0 +1,203 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
LoRa模块的具体实现 (UART Passthrough for LoRa Mesh)
负责与LoRa模块进行底层通信并向上层提供标准化的数据包收发接口。
这个实现针对的是通过UART进行透传的LoRa Mesh模块。
"""
from ..logs.logger import log
from machine import UART
import time
class LoRaMeshUartPassthroughManager:
"""
通过UART与LoRa Mesh模块通信的处理器实现 (ED模式)。
实现了自动分片与重组逻辑。
"""
def __init__(self, lora_config: dict):
"""
初始化LoRa处理器。
Args:
lora_config (dict): 来自全局配置文件的LoRa配置字典。
"""
log("LoRaMeshUartPassthroughManager: 初始化...")
# --- 配置注入 ---
self.master_address = lora_config.get('master_address')
self.uart_id = lora_config.get('uart_id')
self.baudrate = lora_config.get('baudrate')
self.pins = lora_config.get('pins')
self.max_chunk_size = lora_config.get('max_chunk_size')
self.lora_mesh_mode = b'\xed'
# TODO 目前这个配置没用, 完全按ED处理的
if lora_config.get('lora_mesh_mode') == 'EC':
self.lora_mesh_mode = b'\xec'
# --- 硬件初始化 ---
self.uart = UART(self.uart_id, self.baudrate, tx=self.pins['tx'], rx=self.pins['rx'])
# --- 内部状态变量 ---
self._rx_buffer = bytearray() # UART接收缓冲区
self._reassembly_cache = {} # 分片重组缓冲区 { chunk_index: chunk_data }
self._expected_chunks = 0 # 当前会话期望的总分片数
log(f"LoRaMeshUartPassthroughManager: 配置加载完成. UART ID: {self.uart_id}, Baudrate: {self.baudrate}, 针脚: {self.pins}")
def send_packet(self, payload: bytes) -> bool:
"""
【实现】发送一个数据包,自动处理分片。
Args:
payload (bytes): 需要发送的完整业务数据。
Returns:
bool: True表示所有分片都已成功提交发送False表示失败。
"""
max_chunk_size = self.max_chunk_size
if not payload:
total_chunks = 1
else:
total_chunks = (len(payload) + max_chunk_size - 1) // max_chunk_size
try:
for i in range(total_chunks):
chunk_index = i
start = i * max_chunk_size
end = start + max_chunk_size
chunk_data = payload[start:end]
# --- 组装物理包 ---
header = b'\xed'
dest_addr_bytes = self.master_address.to_bytes(2, 'big')
total_chunks_bytes = total_chunks.to_bytes(1, 'big')
current_chunk_bytes = chunk_index.to_bytes(1, 'big')
# 计算后续长度(总包数和当前包序号是自定义包头, 各占一位, 标准包头算在长度内)
length_val = 2 + len(chunk_data)
length_bytes = length_val.to_bytes(1, 'big')
# 拼接成最终的数据包
packet_to_send = header + length_bytes + dest_addr_bytes + total_chunks_bytes + current_chunk_bytes + chunk_data
self.uart.write(packet_to_send)
log(f"LoRa: 发送分片 {chunk_index + 1}/{total_chunks} 到地址 {self.master_address}")
# 让出CPU, 模块将缓存区的数据发出去本身也需要时间
time.sleep_ms(10)
return True
except Exception as e:
log(f"LoRa: 发送数据包失败: {e}")
return False
def receive_packet(self) -> bytes | None:
"""
【实现】非阻塞地检查、解析并重组一个完整的数据包。
"""
# 1. 从硬件读取数据到缓冲区
if self.uart.any():
new_data = self.uart.read()
if new_data:
log(f"LoRa: UART收到原始数据 (长度 {len(new_data)}): {new_data.hex()}")
self._rx_buffer.extend(new_data)
# 如果缓冲区为空,没有必要继续处理
if not self._rx_buffer:
return None
# 2. 只要缓冲区有数据就持续尝试从缓冲区解析包
while len(self._rx_buffer) > 0:
log(f"LoRa: --- 开始新一轮解析, 缓冲区 (长度 {len(self._rx_buffer)}): {self._rx_buffer.hex()} ---")
# 2.1 检查头部和长度字段是否存在
if len(self._rx_buffer) < 2:
log("LoRa: 缓冲区数据不足 (小于2字节),无法读取包头。等待更多数据...")
return None # 数据不足,无法读取长度
# 2.2 检查帧头是否正确
if self._rx_buffer[0] != 0xED:
log(f"LoRa: 接收到错误帧头: {hex(self._rx_buffer[0])}正在寻找下一个ED...")
next_ed = self._rx_buffer.find(b'\xed', 1)
if next_ed == -1:
log("LoRa: 缓冲区无有效帧头,已清空。")
self._rx_buffer[:] = b''
return None # 清空后没有数据了, 直接返回
else:
log(f"LoRa: 在位置 {next_ed} 找到下一个有效帧头,丢弃之前的数据。")
self._rx_buffer = self._rx_buffer[next_ed:]
continue # 继续循环,用新的缓冲区数据重新开始解析
# 2.3 检查包是否完整
payload_len = self._rx_buffer[1]
# 物理层在末尾又加了2字节的源地址所以完整包长需要+2。
total_packet_len = 1 + 1 + payload_len + 2
log(f"LoRa: 帧头正确(ED)。声明的后续包长(payload_len): {payload_len}。计算出的总包长: {total_packet_len}")
if len(self._rx_buffer) < total_packet_len:
log(f"LoRa: '半包'情况,需要 {total_packet_len} 字节,但缓冲区只有 {len(self._rx_buffer)} 字节。等待更多数据...")
return None # "半包"情况,等待更多数据
# 3. 提取和解析一个完整的物理包
log(f"LoRa: 发现完整物理包 (长度 {total_packet_len}),正在提取...")
packet = self._rx_buffer[:total_packet_len]
self._rx_buffer = self._rx_buffer[total_packet_len:]
log(f"LoRa: 提取的包: {packet.hex()}。剩余缓冲区 (长度 {len(self._rx_buffer)}): {self._rx_buffer.hex()}")
# --- 包结构解析 ---
if len(packet) < 8:
log(f"LoRa: 包长度 {len(packet)} 小于协议最小长度8, 判定为坏包,已丢弃。")
continue
addr = int.from_bytes(packet[2:4], 'big')
total_chunks = packet[4]
current_chunk = packet[5]
# 提取数据块
chunk_data = packet[6:]
source_addr = int.from_bytes(packet[-2:], 'big')
log(f"LoRa: 解析包: 源地址={source_addr}, 目标地址={addr}, 总分片={total_chunks}, 当前分片={current_chunk}, 数据块长度={len(chunk_data)}")
# 4. 重组逻辑
if total_chunks == 1:
log(f"LoRa: 收到单包消息,来自地址 {source_addr},长度 {len(chunk_data)}")
self._reassembly_cache.clear()
self._expected_chunks = 0
return chunk_data
# 对于多包消息,只有当收到第一个分片时才清空缓存并设置期望分片数
if current_chunk == 0:
log(f"LoRa: 开始接收新的多包会话 ({total_chunks}个分片) from {source_addr}...")
self._reassembly_cache.clear()
self._expected_chunks = total_chunks
elif not self._reassembly_cache and self._expected_chunks == 0:
# 如果不是第一个分片,但缓存是空的,说明错过了第一个分片,丢弃当前分片
log(f"LoRa: 收到非首个分片 {current_chunk} from {source_addr},但未检测到会话开始,已丢弃。")
continue
self._reassembly_cache[current_chunk] = chunk_data
log(f"LoRa: 收到分片 {current_chunk + 1}/{self._expected_chunks} from {source_addr},已缓存 {len(self._reassembly_cache)}")
if len(self._reassembly_cache) == self._expected_chunks:
log(f"LoRa: 所有分片已集齐 (from {source_addr}),正在重组...")
full_payload = bytearray()
for i in range(self._expected_chunks):
if i not in self._reassembly_cache:
log(f"LoRa: 重组失败!缺少分片 {i}")
self._reassembly_cache.clear()
self._expected_chunks = 0
return None
full_payload.extend(self._reassembly_cache[i])
log(f"LoRa: 重组完成,总长度 {len(full_payload)}")
self._reassembly_cache.clear()
self._expected_chunks = 0
return bytes(full_payload)
# while 循环结束,意味着缓冲区被处理完毕但没有返回一个完整的包
return None

View File

@@ -8,15 +8,12 @@
- 编排业务流程解码指令并将业务任务分发给相应的管理器
- 完全不关心总线通信和数据解析的技术实现细节
"""
# 导入我们定义的“契约”(接口)
from lora.lora_interface import ILoraHandler
from bus.bus_interface import IBusManager
from app.lora.lora_mesh_uart_passthrough_manager import LoRaMeshUartPassthroughManager
from app.bus.rs485_manager import RS485Manager
# 导入Protobuf解析代码
from proto import client_pb
from app.proto import client_pb
from logger import log
from app.logs.logger import log
class Processor:
@@ -25,12 +22,12 @@ class Processor:
它依赖于抽象的面向业务的接口
"""
def __init__(self, lora_handler: ILoraHandler, bus_manager: IBusManager):
def __init__(self, lora_handler: LoRaMeshUartPassthroughManager, bus_manager: RS485Manager):
"""
构造函数 (依赖注入)
Args:
lora_handler (ILoraHandler): 一个实现了LoRa接口的对象
lora_handler (ILoraManager): 一个实现了LoRa接口的对象
bus_manager (IBusManager): 一个实现了总线接口的对象
"""
self.lora = lora_handler
@@ -45,16 +42,25 @@ class Processor:
try:
instruction = client_pb.decode_instruction(packet_bytes)
log(f"解析指令成功: {instruction}")
except Exception as e:
log(f"错误:解码指令失败: {e}")
return
# 根据指令类型,分发到不同的业务处理方法
if 'raw_485_command' in instruction:
self._process_exec_command(instruction['raw_485_command'])
cmd = instruction['raw_485_command']
if cmd:
self._process_exec_command(cmd)
else:
log("警告:'raw_485_command' 指令内容为空。")
elif 'batch_collect_command' in instruction:
self._process_collect_command(instruction['batch_collect_command'])
cmd = instruction['batch_collect_command']
if cmd:
self._process_collect_command(cmd)
else:
log("警告:'batch_collect_command' 指令内容为空。")
else:
log(f"警告:收到未知或不适用于此设备的指令类型: {instruction}")
@@ -92,7 +98,7 @@ class Processor:
log(f" => 成功,获取值为: {value}")
else:
# 如果返回None表示任务失败超时或解析错误
sensor_values.append(-1) # 添加一个默认/错误
sensor_values.append(float('nan')) # 使用NaN表示无效
log(" => 失败,任务未返回有效值。")
# 所有任务执行完毕,构建并发送响应

View File

@@ -24,7 +24,7 @@ message BatchCollectCommand {
// CollectTask
//
message CollectTask {
Raw485Command command = 2; // 485
Raw485Command command = 1; // 485
}
// CollectResult

305
app/proto/client_pb.py Normal file
View File

@@ -0,0 +1,305 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
根据client.proto生成的解析代码 (V2 - 已修复解码逻辑)
适用于ESP32 MicroPython环境
"""
import struct
# --- Protobuf基础类型辅助函数 ---
def encode_varint(value):
buf = bytearray()
while value >= 0x80:
buf.append((value & 0x7F) | 0x80)
value >>= 7
buf.append(value & 0x7F)
return buf
def decode_varint(buf, pos=0):
result = 0
shift = 0
while pos < len(buf):
byte = buf[pos]
pos += 1
result |= (byte & 0x7F) << shift
if not (byte & 0x80):
break
shift += 7
return result, pos
def encode_string(value):
value_bytes = value.encode('utf-8')
length = encode_varint(len(value_bytes))
return length + value_bytes
def decode_string(buf, pos=0):
"""解码字符串 (已修复)"""
length, pos_after_len = decode_varint(buf, pos)
end_pos = pos_after_len + length
value = buf[pos_after_len:end_pos].decode('utf-8')
return value, end_pos
# --- 消息编码/解码函数 ---
def encode_raw_485_command(bus_number, command_bytes):
result = bytearray()
result.extend(encode_varint((1 << 3) | 0))
result.extend(encode_varint(bus_number))
result.extend(encode_varint((2 << 3) | 2))
result.extend(encode_varint(len(command_bytes)))
result.extend(command_bytes)
return result
def decode_raw_485_command(buf):
"""解码Raw485Command消息 (已修复)"""
result = {}
pos = 0
while pos < len(buf):
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 1: # bus_number
value, pos = decode_varint(buf, pos)
result['bus_number'] = value
elif field_number == 2: # command_bytes
length, pos_after_len = decode_varint(buf, pos)
end_pos = pos_after_len + length
result['command_bytes'] = buf[pos_after_len:end_pos]
pos = end_pos
else:
# 跳过未知字段
if wire_type == 0:
_, pos = decode_varint(buf, pos)
elif wire_type == 2:
length, pos_after_len = decode_varint(buf, pos)
pos = pos_after_len + length
elif wire_type == 5:
pos += 4
else:
pos += 1
return result
def encode_collect_task(command_msg):
result = bytearray()
encoded_command = encode_raw_485_command(command_msg['bus_number'], command_msg['command_bytes'])
result.extend(encode_varint((1 << 3) | 2))
result.extend(encode_varint(len(encoded_command)))
result.extend(encoded_command)
return result
def decode_collect_task(buf):
"""解码CollectTask消息 (已修复)"""
result = {}
pos = 0
while pos < len(buf):
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 1: # command
length, pos_after_len = decode_varint(buf, pos)
end_pos = pos_after_len + length
value_buf = buf[pos_after_len:end_pos]
result['command'] = decode_raw_485_command(value_buf)
pos = end_pos
else:
if wire_type == 0:
_, pos = decode_varint(buf, pos)
elif wire_type == 2:
length, pos_after_len = decode_varint(buf, pos)
pos = pos_after_len + length
elif wire_type == 5:
pos += 4
else:
pos += 1
return result
def encode_batch_collect_command(correlation_id, tasks):
result = bytearray()
result.extend(encode_varint((1 << 3) | 2))
result.extend(encode_string(correlation_id))
for task in tasks:
encoded_task = encode_collect_task(task['command'])
result.extend(encode_varint((2 << 3) | 2))
result.extend(encode_varint(len(encoded_task)))
result.extend(encoded_task)
return result
def decode_batch_collect_command(buf):
"""解码BatchCollectCommand消息 (已修复)"""
result = {'tasks': []}
pos = 0
while pos < len(buf):
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 1: # correlation_id
value, pos = decode_string(buf, pos)
result['correlation_id'] = value
elif field_number == 2: # tasks (repeated)
length, pos_after_len = decode_varint(buf, pos)
end_pos = pos_after_len + length
value_buf = buf[pos_after_len:end_pos]
result['tasks'].append(decode_collect_task(value_buf))
pos = end_pos
else:
if wire_type == 0:
_, pos = decode_varint(buf, pos)
elif wire_type == 2:
length, pos_after_len = decode_varint(buf, pos)
pos = pos_after_len + length
elif wire_type == 5:
pos += 4
else:
pos += 1
return result
def encode_collect_result(correlation_id, values):
result = bytearray()
result.extend(encode_varint((1 << 3) | 2))
result.extend(encode_string(correlation_id))
for value in values:
result.extend(encode_varint((2 << 3) | 5))
result.extend(struct.pack('<f', value))
return result
def decode_collect_result(buf):
"""解码CollectResult消息 (已修复)"""
result = {'values': []}
pos = 0
while pos < len(buf):
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 1: # correlation_id
value, pos = decode_string(buf, pos)
result['correlation_id'] = value
elif field_number == 2: # values (repeated)
if wire_type == 5: # fixed32
value = struct.unpack('<f', buf[pos:pos + 4])[0]
pos += 4
result['values'].append(value)
else:
if wire_type == 0:
_, pos = decode_varint(buf, pos)
elif wire_type == 2:
length, pos_after_len = decode_varint(buf, pos)
pos = pos_after_len + length
elif wire_type == 5:
pos += 4
else:
pos += 1
return result
def encode_instruction(payload_type, payload_data):
result = bytearray()
encoded_payload = bytearray()
if payload_type == 'raw_485_command':
encoded_payload = encode_raw_485_command(payload_data['bus_number'], payload_data['command_bytes'])
result.extend(encode_varint((1 << 3) | 2))
elif payload_type == 'batch_collect_command':
encoded_payload = encode_batch_collect_command(payload_data['correlation_id'], payload_data['tasks'])
result.extend(encode_varint((2 << 3) | 2))
elif payload_type == 'collect_result':
encoded_payload = encode_collect_result(payload_data['correlation_id'], payload_data['values'])
result.extend(encode_varint((3 << 3) | 2))
else:
raise ValueError("未知的指令负载类型")
result.extend(encode_varint(len(encoded_payload)))
result.extend(encoded_payload)
return result
def decode_instruction(buf):
"""解码Instruction消息 (已修复)"""
result = {}
pos = 0
while pos < len(buf):
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if wire_type == 2:
length, pos_after_len = decode_varint(buf, pos)
end_pos = pos_after_len + length
value_buf = buf[pos_after_len:end_pos]
if field_number == 1:
result['raw_485_command'] = decode_raw_485_command(value_buf)
elif field_number == 2:
result['batch_collect_command'] = decode_batch_collect_command(value_buf)
elif field_number == 3:
result['collect_result'] = decode_collect_result(value_buf)
pos = end_pos
else:
if wire_type == 0:
_, pos = decode_varint(buf, pos)
elif wire_type == 5:
pos += 4
else:
pos += 1
return result
# --- 单元测试与使用范例 ---
if __name__ == "__main__":
print("--- 测试 Raw485Command ---")
raw_cmd_data = {'bus_number': 1, 'command_bytes': b'\x01\x03\x00\x00\x00\x02\xc4\x0b'}
encoded_raw_cmd = encode_raw_485_command(raw_cmd_data['bus_number'], raw_cmd_data['command_bytes'])
decoded_raw_cmd = decode_raw_485_command(encoded_raw_cmd)
assert decoded_raw_cmd == raw_cmd_data, f"Expected {raw_cmd_data}, got {decoded_raw_cmd}"
print("\n--- 测试 CollectTask ---")
collect_task_data = {'command': raw_cmd_data}
encoded_collect_task = encode_collect_task(collect_task_data['command'])
decoded_collect_task = decode_collect_task(encoded_collect_task)
assert decoded_collect_task == collect_task_data, f"Expected {collect_task_data}, got {decoded_collect_task}"
print("\n--- 测试 BatchCollectCommand ---")
batch_collect_data = {
'correlation_id': 'abc-123',
'tasks': [
{'command': {'bus_number': 1, 'command_bytes': b'\x01\x04\x00\x01\x00\x01\x60\x0a'}},
{'command': {'bus_number': 2, 'command_bytes': b'\x02\x03\x00\x01\x00\x01\xd5\xfa'}}
]
}
encoded_batch_collect = encode_batch_collect_command(batch_collect_data['correlation_id'], batch_collect_data['tasks'])
decoded_batch_collect = decode_batch_collect_command(encoded_batch_collect)
assert decoded_batch_collect == batch_collect_data, f"Expected {batch_collect_data}, got {decoded_batch_collect}"
print("\n--- 测试 CollectResult ---")
collect_result_data = {'correlation_id': 'res-456', 'values': [12.34, 56.78]}
encoded_collect_result = encode_collect_result(collect_result_data['correlation_id'], collect_result_data['values'])
decoded_collect_result = decode_collect_result(encoded_collect_result)
assert decoded_collect_result['correlation_id'] == collect_result_data['correlation_id']
for i in range(len(collect_result_data['values'])):
assert abs(decoded_collect_result['values'][i] - collect_result_data['values'][i]) < 1e-5
print("\n--- 测试 Instruction (内含BatchCollectCommand) ---")
instruction_batch_collect = encode_instruction('batch_collect_command', batch_collect_data)
decoded_instruction_batch_collect = decode_instruction(instruction_batch_collect)
assert decoded_instruction_batch_collect == {'batch_collect_command': batch_collect_data}
print("\n所有测试均已通过!")

View File

@@ -10,12 +10,12 @@
- 从队列中取出任务并交给Processor进行耗时处理
"""
import uqueue
from processor import Processor
from logger import log
from app.uqueue import Queue
from app.processor import Processor
from app.logs.logger import log
def worker_task(task_queue: uqueue.Queue, processor: Processor):
def worker_task(task_queue: Queue, processor: Processor):
"""
工作线程的主函数
@@ -24,21 +24,21 @@ def worker_task(task_queue: uqueue.Queue, processor: Processor):
processor (Processor): 业务处理器实例
"""
log("工作线程已启动,等待任务...")
while True:
try:
# 1. 阻塞式地从队列中获取任务
# 如果队列为空程序会在这里自动挂起不消耗CPU
# get()方法是线程安全的
packet_bytes = task_queue.get()
log(f"工作线程:收到新任务,开始处理... 数据: {packet_bytes.hex()}")
# 2. 调用processor进行耗时的、阻塞式的处理
# 这个处理过程不会影响主线程的LoRa监听
processor.handle_packet(packet_bytes)
log("工作线程:任务处理完毕,继续等待下一个任务。")
except Exception as e:
log(f"错误:工作线程在处理任务时发生异常: {e}")

View File

@@ -15,43 +15,42 @@
import time
import _thread
import config
import uqueue # 导入我们自己创建的本地uqueue模块
from app.config import config
from app.uqueue import Queue # 导入我们自己创建的本地uqueue模块
# 导入接口和实现
from lora.lora_interface import ILoraHandler
from bus.bus_interface import IBusManager
from lora.lora_handler import LoRaHandler
from bus.rs485_manager import RS485Manager
from processor import Processor
from app.lora.lora_mesh_uart_passthrough_manager import LoRaMeshUartPassthroughManager
from app.bus.rs485_manager import RS485Manager
from app.processor import Processor
# 导入工作线程的执行函数
from worker import worker_task
from logger import log
from app.worker import worker_task
from app.logs.logger import log
# --- 模块级变量定义 (带有类型提示) ---
lora_controller: ILoraHandler | None = None
bus_manager: IBusManager | None = None
lora_manager: LoRaMeshUartPassthroughManager | None = None
bus_manager: RS485Manager | None = None
processor: Processor | None = None
task_queue: uqueue.Queue | None = None
task_queue: Queue | None = None
def setup():
"""
初始化函数负责创建所有对象实例共享队列并启动工作线程
"""
global lora_controller, bus_manager, processor, task_queue
global lora_manager, bus_manager, processor, task_queue
log("--- 系统初始化开始 ---")
# 1. 初始化硬件驱动和业务处理器
lora_controller = LoRaHandler()
bus_manager = RS485Manager()
processor = Processor(lora_handler=lora_controller, bus_manager=bus_manager)
lora_manager = LoRaMeshUartPassthroughManager(config.LORA_CONFIG)
bus_manager = RS485Manager(config.BUS_CONFIG, config.DEFAULT_TIMEOUTS)
processor = Processor(lora_handler=lora_manager, bus_manager=bus_manager)
# 2. 从配置文件读取队列长度,并创建线程安全的队列
queue_size = config.SYSTEM_PARAMS.get('task_queue_max_size', 10)
task_queue = uqueue.Queue(maxsize=queue_size)
task_queue = Queue(maxsize=queue_size)
log(f"任务队列已创建,最大容量: {queue_size}")
# 3. 启动工作线程
@@ -65,9 +64,9 @@ def loop():
主线程循环函数 (生产者)
只负责监听LoRa并将数据放入队列
"""
packet = lora_controller.receive_packet()
packet = lora_manager.receive_packet()
if packet:
log(f"主线程收到新LoRa数据包: {packet.hex()}")
if task_queue.full():
log("警告任务队列已满新的LoRa数据包被丢弃")
return

View File

@@ -1,378 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
根据client.proto生成的解析代码
适用于ESP32 MicroPython环境
"""
import struct
# --- Protobuf基础类型辅助函数 ---
def encode_varint(value):
"""编码varint整数"""
buf = bytearray()
while value >= 0x80:
buf.append((value & 0x7F) | 0x80)
value >>= 7
buf.append(value & 0x7F)
return buf
def decode_varint(buf, pos=0):
"""解码varint整数"""
result = 0
shift = 0
while pos < len(buf):
byte = buf[pos]
pos += 1
result |= (byte & 0x7F) << shift
if not (byte & 0x80):
break
shift += 7
return result, pos
def encode_string(value):
"""编码字符串"""
value_bytes = value.encode('utf-8')
length = encode_varint(len(value_bytes))
return length + value_bytes
def decode_string(buf, pos=0):
"""解码字符串"""
length, pos = decode_varint(buf, pos)
value = buf[pos:pos+length].decode('utf-8')
pos += length
return value, pos
# --- 消息编码/解码函数 ---
def encode_raw_485_command(bus_number, command_bytes):
"""
编码Raw485Command消息
Args:
bus_number (int): 总线号
command_bytes (bytes): 原始485指令
Returns:
bytearray: 编码后的数据
"""
result = bytearray()
# bus_number (field 1, wire type 0)
result.extend(encode_varint((1 << 3) | 0))
result.extend(encode_varint(bus_number))
# command_bytes (field 2, wire type 2)
result.extend(encode_varint((2 << 3) | 2))
result.extend(encode_varint(len(command_bytes)))
result.extend(command_bytes)
return result
def decode_raw_485_command(buf):
"""
解码Raw485Command消息
Args:
buf (bytes): 编码后的数据
Returns:
dict: 解码后的消息
"""
result = {}
pos = 0
while pos < len(buf):
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 1: # bus_number
if wire_type == 0:
value, pos = decode_varint(buf, pos)
result['bus_number'] = value
elif field_number == 2: # command_bytes
if wire_type == 2:
length, pos = decode_varint(buf, pos)
value = buf[pos:pos+length]
pos += length
result['command_bytes'] = value
else:
# 跳过未知字段
if wire_type == 0: _, pos = decode_varint(buf, pos)
elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length
else: pos += 1
return result
def encode_collect_task(command_msg):
"""
编码CollectTask消息
Args:
command_msg (dict): Raw485Command消息字典
Returns:
bytearray: 编码后的数据
"""
result = bytearray()
# command (field 2, wire type 2)
encoded_command = encode_raw_485_command(command_msg['bus_number'], command_msg['command_bytes'])
result.extend(encode_varint((2 << 3) | 2))
result.extend(encode_varint(len(encoded_command)))
result.extend(encoded_command)
return result
def decode_collect_task(buf):
"""
解码CollectTask消息
Args:
buf (bytes): 编码后的数据
Returns:
dict: 解码后的消息
"""
result = {}
pos = 0
while pos < len(buf):
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 2: # command
if wire_type == 2:
length, pos = decode_varint(buf, pos)
value_buf = buf[pos:pos+length]
pos += length
result['command'] = decode_raw_485_command(value_buf)
else:
if wire_type == 0: _, pos = decode_varint(buf, pos)
elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length
else: pos += 1
return result
def encode_batch_collect_command(correlation_id, tasks):
"""
编码BatchCollectCommand消息
Args:
correlation_id (str): 关联ID
tasks (list): CollectTask消息字典列表
Returns:
bytearray: 编码后的数据
"""
result = bytearray()
# correlation_id (field 1, wire type 2)
result.extend(encode_varint((1 << 3) | 2))
result.extend(encode_string(correlation_id))
# tasks (field 2, wire type 2) - repeated
for task in tasks:
encoded_task = encode_collect_task(task['command'])
result.extend(encode_varint((2 << 3) | 2))
result.extend(encode_varint(len(encoded_task)))
result.extend(encoded_task)
return result
def decode_batch_collect_command(buf):
"""
解码BatchCollectCommand消息
Args:
buf (bytes): 编码后的数据
Returns:
dict: 解码后的消息
"""
result = {'tasks': []}
pos = 0
while pos < len(buf):
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 1: # correlation_id
if wire_type == 2:
value, pos = decode_string(buf, pos)
result['correlation_id'] = value
elif field_number == 2: # tasks (repeated)
if wire_type == 2:
length, pos = decode_varint(buf, pos)
value_buf = buf[pos:pos+length]
pos += length
result['tasks'].append(decode_collect_task(value_buf))
else:
if wire_type == 0: _, pos = decode_varint(buf, pos)
elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length
else: pos += 1
return result
def encode_collect_result(correlation_id, values):
"""
编码CollectResult消息
Args:
correlation_id (str): 关联ID
values (list): 采集值列表 (float)
Returns:
bytearray: 编码后的数据
"""
result = bytearray()
# correlation_id (field 1, wire type 2)
result.extend(encode_varint((1 << 3) | 2))
result.extend(encode_string(correlation_id))
# values (field 2, wire type 5) - repeated fixed32
for value in values:
result.extend(encode_varint((2 << 3) | 5)) # Tag for fixed32
result.extend(struct.pack('<f', value)) # 小端序浮点数
return result
def decode_collect_result(buf):
"""
解码CollectResult消息
Args:
buf (bytes): 编码后的数据
Returns:
dict: 解码后的消息
"""
result = {'values': []}
pos = 0
while pos < len(buf):
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 1: # correlation_id
if wire_type == 2:
value, pos = decode_string(buf, pos)
result['correlation_id'] = value
elif field_number == 2: # values (repeated)
if wire_type == 5: # fixed32
value = struct.unpack('<f', buf[pos:pos+4])[0]
pos += 4
result['values'].append(value)
else:
if wire_type == 0: _, pos = decode_varint(buf, pos)
elif wire_type == 5: pos += 4 # fixed32
elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length
else: pos += 1
return result
def encode_instruction(payload_type, payload_data):
"""
编码Instruction消息 (包含oneof字段)
Args:
payload_type (str): oneof字段的类型 ('raw_485_command', 'batch_collect_command', 'collect_result')
payload_data (dict): 对应类型的消息字典
Returns:
bytearray: 编码后的数据
"""
result = bytearray()
encoded_payload = bytearray()
if payload_type == 'raw_485_command':
encoded_payload = encode_raw_485_command(payload_data['bus_number'], payload_data['command_bytes'])
result.extend(encode_varint((1 << 3) | 2)) # field 1, wire type 2
elif payload_type == 'batch_collect_command':
encoded_payload = encode_batch_collect_command(payload_data['correlation_id'], payload_data['tasks'])
result.extend(encode_varint((2 << 3) | 2)) # field 2, wire type 2
elif payload_type == 'collect_result':
encoded_payload = encode_collect_result(payload_data['correlation_id'], payload_data['values'])
result.extend(encode_varint((3 << 3) | 2)) # field 3, wire type 2
else:
raise ValueError("未知的指令负载类型")
result.extend(encode_varint(len(encoded_payload)))
result.extend(encoded_payload)
return result
def decode_instruction(buf):
"""
解码Instruction消息
Args:
buf (bytes): 编码后的数据
Returns:
dict: 解码后的消息
"""
result = {}
pos = 0
while pos < len(buf):
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if wire_type == 2: # 所有oneof字段都使用长度分隔类型
length, pos = decode_varint(buf, pos)
value_buf = buf[pos:pos+length]
pos += length
if field_number == 1: # raw_485_command
result['raw_485_command'] = decode_raw_485_command(value_buf)
elif field_number == 2: # batch_collect_command
result['batch_collect_command'] = decode_batch_collect_command(value_buf)
elif field_number == 3: # collect_result
result['collect_result'] = decode_collect_result(value_buf)
else:
# 跳过未知字段
if wire_type == 0: _, pos = decode_varint(buf, pos)
elif wire_type == 5: pos += 4
elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length
else: pos += 1
return result
# --- 单元测试与使用范例 ---
if __name__ == "__main__":
print("--- 测试 Raw485Command ---")
raw_cmd_data = {'bus_number': 1, 'command_bytes': b'\x01\x03\x00\x00\x00\x02\xc4\x0b'}
encoded_raw_cmd = encode_raw_485_command(raw_cmd_data['bus_number'], raw_cmd_data['command_bytes'])
print(f"编码后 Raw485Command: {encoded_raw_cmd.hex()}")
decoded_raw_cmd = decode_raw_485_command(encoded_raw_cmd)
print(f"解码后 Raw485Command: {decoded_raw_cmd}")
assert decoded_raw_cmd == raw_cmd_data
print("\n--- 测试 CollectTask ---")
collect_task_data = {'command': raw_cmd_data}
encoded_collect_task = encode_collect_task(collect_task_data['command'])
print(f"编码后 CollectTask: {encoded_collect_task.hex()}")
decoded_collect_task = decode_collect_task(encoded_collect_task)
print(f"解码后 CollectTask: {decoded_collect_task}")
assert decoded_collect_task == collect_task_data
print("\n--- 测试 BatchCollectCommand ---")
batch_collect_data = {
'correlation_id': 'abc-123',
'tasks': [
{'command': {'bus_number': 1, 'command_bytes': b'\x01\x03\x00\x00\x00\x02\xc4\x0b'}},
{'command': {'bus_number': 2, 'command_bytes': b'\x02\x03\x00\x01\x00\x01\xd5\xfa'}}
]
}
encoded_batch_collect = encode_batch_collect_command(batch_collect_data['correlation_id'], batch_collect_data['tasks'])
print(f"编码后 BatchCollectCommand: {encoded_batch_collect.hex()}")
decoded_batch_collect = decode_batch_collect_command(encoded_batch_collect)
print(f"解码后 BatchCollectCommand: {decoded_batch_collect}")
assert decoded_batch_collect == batch_collect_data
print("\n--- 测试 CollectResult ---")
collect_result_data = {
'correlation_id': 'res-456',
'values': [12.34, 56.78, 90.12]
}
encoded_collect_result = encode_collect_result(collect_result_data['correlation_id'], collect_result_data['values'])
print(f"编码后 CollectResult: {encoded_collect_result.hex()}")
decoded_collect_result = decode_collect_result(encoded_collect_result)
print(f"解码后 CollectResult: {decoded_collect_result}")
# 由于32位浮点数精度问题直接比较可能会失败此处设置一个合理的容忍度
assert decoded_collect_result['correlation_id'] == collect_result_data['correlation_id']
for i in range(len(collect_result_data['values'])):
assert abs(decoded_collect_result['values'][i] - collect_result_data['values'][i]) < 1e-5 # 已放宽容忍度
print("\n--- 测试 Instruction (内含Raw485Command) ---")
instruction_raw_485 = encode_instruction('raw_485_command', raw_cmd_data)
print(f"编码后 Instruction (Raw485Command): {instruction_raw_485.hex()}")
decoded_instruction_raw_485 = decode_instruction(instruction_raw_485)
print(f"解码后 Instruction (Raw485Command): {decoded_instruction_raw_485}")
assert decoded_instruction_raw_485['raw_485_command'] == raw_cmd_data
print("\n--- 测试 Instruction (内含BatchCollectCommand) ---")
instruction_batch_collect = encode_instruction('batch_collect_command', batch_collect_data)
print(f"编码后 Instruction (BatchCollectCommand): {instruction_batch_collect.hex()}")
decoded_instruction_batch_collect = decode_instruction(instruction_batch_collect)
print(f"解码后 Instruction (BatchCollectCommand): {decoded_instruction_batch_collect}")
assert decoded_instruction_batch_collect['batch_collect_command']['correlation_id'] == batch_collect_data['correlation_id']
assert len(decoded_instruction_batch_collect['batch_collect_command']['tasks']) == len(batch_collect_data['tasks'])
print("\n--- 测试 Instruction (内含CollectResult) ---")
instruction_collect_result = encode_instruction('collect_result', collect_result_data)
print(f"编码后 Instruction (CollectResult): {instruction_collect_result.hex()}")
decoded_instruction_collect_result = decode_instruction(instruction_collect_result)
print(f"解码后 Instruction (CollectResult): {decoded_instruction_collect_result}")
assert decoded_instruction_collect_result['collect_result']['correlation_id'] == collect_result_data['correlation_id']
for i in range(len(collect_result_data['values'])):
assert abs(decoded_instruction_collect_result['collect_result']['values'][i] - collect_result_data['values'][i]) < 1e-5
print("\n所有测试均已通过!")