重写代码

This commit is contained in:
2025-09-25 19:14:15 +08:00
parent b80a04bfc1
commit 5519d43253
28 changed files with 0 additions and 920 deletions

View File

@@ -1,173 +0,0 @@
# 开发环境搭建指南
## 系统要求
- Python 3.7+
- Raspberry Pi 或其他兼容的嵌入式Linux系统
- LoRa通信模块如SX1278
- RS485通信接口
## 协议栈架构
本系统采用标准物联网协议栈:
```
应用层: LwM2M
传输层: CoAP (Constrained Application Protocol)
网络层: LoRaWAN
数据链路层: LoRa
物理层: LoRa
```
## 安装步骤
### 1. 克隆项目
```bash
git clone <项目地址>
cd pig-house-controller
```
### 2. 创建虚拟环境
```bash
python3 -m venv venv
source venv/bin/activate # Linux/Mac
# 或
venv\Scripts\activate # Windows
```
### 3. 安装依赖
```bash
pip install -r requirements.txt
```
### 4. LwM2M支持的特殊处理
由于 `lwm2m-client` 包在 PyPI 上不可用,我们需要手动安装或使用替代方案:
#### 方案一:使用 wakaama-python (推荐)
```bash
# 克隆 wakaama-python 仓库
git clone https://github.com/djarek/wakaama-python.git
cd wakaama-python
# 按照项目说明进行安装
```
#### 方案二:使用 Eclipse Wakaama
Eclipse Wakaama 是一个成熟的 LwM2M 实现,可以作为 C 扩展集成到 Python 项目中。
#### 方案三:自行实现 LwM2M 客户端
根据 LwM2M 规范自行实现必要的功能。
### 5. 配置系统
复制示例配置文件并根据实际环境进行修改:
```bash
cp config.json.example config.json
```
修改 [config.json](file:///C:/Users/divano/Desktop/work/AA-Pig/pig-house-controller/config.json) 文件中的参数,包括:
- LoRa通信参数
- 总线配置
- 设备配置
- 日志配置
### 6. 硬件连接
1. 连接LoRa模块到树莓派的SPI接口
2. 连接传感器总线RS485模块到树莓派的UART接口
3. 连接执行器总线RS485模块到树莓派的另一个UART接口
### 7. 运行系统
```bash
python main.py
```
## 开发工具推荐
- **IDE**: PyCharm 或 VS Code
- **版本控制**: Git
- **调试工具**: Python内置pdb或IDE调试器
- **日志查看**: tail命令或专业日志查看工具
## 测试环境
建议在实际部署前进行充分测试:
1. 单元测试: 验证各模块功能
2. 集成测试: 验证模块间协作
3. 系统测试: 验证完整功能流程
4. 硬件测试: 验证实际硬件连接和通信
## 协议栈实现说明
### LoRaWAN层
使用SX1278等LoRa芯片通过SPI接口与树莓派通信。实现基本的LoRaWAN功能
- OTAA/ABP入网
- 数据加解密
- 数据包重传机制
### CoAP层
实现CoAP协议栈支持
- GET/POST/PUT/DELETE方法
- 资源发现
- 块传输
- 观察者模式
### LwM2M层
实现LwM2M客户端功能
- 设备注册与管理
- 固件更新
- 参数配置
- 数据上报
注意:由于 `lwm2m-client` 包不可用,需要使用替代方案实现 LwM2M 功能。
### SenML数据格式
所有传感器数据使用SenML格式进行编码和传输确保数据标准化和互操作性。
## 抽象接口开发说明
系统定义了以下抽象接口,开发者在实现具体功能时需要继承这些基类:
### 通信接口 (BaseComm)
位于 [comms/base_comm.py](file:///C:/Users/divano/Desktop/work/AA-Pig/pig-house-controller/comms/base_comm.py),定义了通信模块的基本操作:
- `connect()`: 建立通信连接
- `disconnect()`: 断开通信连接
- `send()`: 发送数据
- `receive()`: 接收数据
- `is_connected()`: 检查连接状态
### 设备接口 (BaseDevice)
位于 [devices/base_device.py](file:///C:/Users/divano/Desktop/work/AA-Pig/pig-house-controller/devices/base_device.py),定义了设备的基本操作:
- `connect()`: 连接设备
- `disconnect()`: 断开设备连接
- `read_data()`: 读取设备数据
- `write_data()`: 向设备写入数据
- `get_status()`: 获取设备状态
### 存储接口 (BaseStorage)
位于 [storage/base_storage.py](file:///C:/Users/divano/Desktop/work/AA-Pig/pig-house-controller/storage/base_storage.py),定义了存储模块的基本操作:
- `save()`: 保存数据
- `load()`: 加载数据
- `delete()`: 删除数据
- `exists()`: 检查键是否存在
- `list_keys()`: 列出所有键
### 命令处理器接口 (BaseHandler)
位于 [core/base_handler.py](file:///C:/Users/divano/Desktop/work/AA-Pig/pig-house-controller/core/base_handler.py),定义了命令处理的基本操作:
- `handle_command()`: 处理命令
- `register_command()`: 注册命令处理函数
- `unregister_command()`: 注销命令处理函数
## 注意事项
1. 确保硬件连接正确特别是UART接口不要接反
2. 根据实际硬件调整配置文件中的串口设备路径
3. 注意LoRa频段的合法性遵守当地无线电管理规定
4. 建议在开发阶段使用DEBUG日志级别生产环境使用INFO或更高
5. LwM2M 功能需要特殊处理,因为标准包不可用

View File

@@ -1 +0,0 @@
# 通信层

View File

@@ -1,72 +0,0 @@
from abc import ABC, abstractmethod
from typing import Any, Callable, Optional
class BaseComm(ABC):
"""
通信接口抽象基类
定义所有通信模块需要实现的基本方法
"""
@abstractmethod
def connect(self) -> bool:
"""
建立通信连接
Returns:
bool: 连接是否成功
"""
pass
@abstractmethod
def disconnect(self) -> None:
"""
断开通信连接
"""
pass
@abstractmethod
def send(self, data: bytes, address: Optional[str] = None) -> bool:
"""
发送数据
Args:
data: 要发送的数据
address: 目标地址(可选)
Returns:
bool: 发送是否成功
"""
pass
@abstractmethod
def receive(self, timeout: Optional[float] = None) -> Optional[bytes]:
"""
接收数据
Args:
timeout: 超时时间(秒)
Returns:
bytes: 接收到的数据如果没有数据则返回None
"""
pass
@abstractmethod
def is_connected(self) -> bool:
"""
检查通信连接状态
Returns:
bool: 是否已连接
"""
pass
def set_callback(self, callback: Callable[[bytes], None]) -> None:
"""
设置数据接收回调函数
Args:
callback: 接收数据时调用的回调函数
"""
pass

View File

@@ -1 +0,0 @@
# lora实现

View File

@@ -1,73 +0,0 @@
{
"lora": {
"address": "0x1234",
"frequency": 433,
"bandwidth": 125,
"spreading_factor": 7,
"coding_rate": 5,
"encryption_key": "your_encryption_key_here"
},
"master": {
"lora_address": "0xABCD",
"protocol": "modbus"
},
"bus": {
"sensor": {
"port": "/dev/ttyUSB0",
"baudrate": 9600,
"devices": []
},
"actuator": {
"port": "/dev/ttyUSB1",
"baudrate": 9600,
"devices": []
}
},
"log": {
"level": "INFO",
"file_path": "logs/pig_house.log",
"max_size": "1MB",
"backup_count": 1,
"report_errors": true,
"terminate_on_report_failure": true
},
"system": {
"heartbeat_interval": 60,
"data_collection_interval": 30,
"command_timeout": 10,
"retry_count": 3,
"error_handling": "retry"
},
"devices": [
{
"id": "temp_01",
"type": "temperature",
"address": "0x01",
"bus": "sensor",
"unit": "celsius",
"location": "main_hall"
},
{
"id": "humidity_01",
"type": "humidity",
"address": "0x02",
"bus": "sensor",
"unit": "%",
"location": "main_hall"
},
{
"id": "feed_01",
"type": "feed_port",
"address": "0x10",
"bus": "actuator",
"location": "feeding_area_1"
},
{
"id": "water_01",
"type": "water_valve",
"address": "0x11",
"bus": "actuator",
"location": "watering_area_1"
}
]
}

165
config.py
View File

@@ -1,165 +0,0 @@
import json
import os
class Config:
def __init__(self, config_file="config.json"):
self.config_file = config_file
self.default_config = {
# LoRa通信配置
"lora": {
"address": "0x1234",
"frequency": 433,
"bandwidth": 125,
"spreading_factor": 7,
"coding_rate": 5,
"encryption_key": "default_key"
},
# 上位机配置
"master": {
"lora_address": "0x5678",
"protocol": "modbus"
},
# 总线配置
"bus": {
"sensor": {
"port": "/dev/ttyUSB0",
"baudrate": 9600,
"devices": []
},
"actuator": {
"port": "/dev/ttyUSB1",
"baudrate": 9600,
"devices": []
}
},
# 日志配置
"log": {
"level": "INFO",
"file_path": "logs/pig_house.log",
"max_size": "1KB", # 减小日志文件大小
"backup_count": 1, # 只保留一个备份文件
"report_errors": True, # 是否上报错误
"terminate_on_report_failure": True # 上报失败时是否终止程序
},
# 系统参数
"system": {
"heartbeat_interval": 60, # 心跳包间隔(秒)
"data_collection_interval": 300, # 数据采集间隔(秒)
"command_timeout": 10, # 命令超时时间(秒)
"retry_count": 3, # 重试次数
"error_handling": "retry"
},
# 设备配置
"devices": [
{
"id": "temp_01",
"type": "temperature",
"address": "0x01",
"bus": "sensor",
"unit": "celsius"
},
{
"id": "humidity_01",
"type": "humidity",
"address": "0x02",
"bus": "sensor",
"unit": "%"
},
{
"id": "feed_01",
"type": "feed_port",
"address": "0x10",
"bus": "actuator"
},
{
"id": "water_01",
"type": "water_valve",
"address": "0x11",
"bus": "actuator"
}
]
}
self.config = {}
self.load_config()
def load_config(self):
"""加载配置文件"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, "r", encoding="utf-8") as file:
config_data = json.load(file)
# 合并默认配置和文件配置
self.config = self._merge_dict(self.default_config, config_data)
except Exception as e:
print(f"加载配置文件出错: {e},使用默认配置")
self.config = self.default_config
else:
print("配置文件不存在,使用默认配置")
self.config = self.default_config
self.save_config()
def save_config(self):
"""保存配置到文件"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(self.config_file), exist_ok=True)
except:
pass
with open(self.config_file, "w", encoding="utf-8") as file:
json.dump(self.config, file, indent=4, ensure_ascii=False)
def get(self, key_path, default=None):
"""根据路径获取配置项,例如: get('lora.address')"""
keys = key_path.split('.')
value = self.config
try:
for key in keys:
value = value[key]
return value
except (KeyError, TypeError):
return default
def set(self, key_path, value):
"""根据路径设置配置项"""
keys = key_path.split('.')
config_ref = self.config
for key in keys[:-1]:
if key not in config_ref:
config_ref[key] = {}
config_ref = config_ref[key]
config_ref[keys[-1]] = value
def _merge_dict(self, default, override):
"""合并两个字典,保留默认值"""
merged = default.copy()
for key, value in override.items():
if key in merged and isinstance(merged[key], dict) and isinstance(value, dict):
merged[key] = self._merge_dict(merged[key], value)
else:
merged[key] = value
return merged
# 便捷访问方法
def lora_config(self):
return self.config.get('lora', {})
def master_config(self):
return self.config.get('master', {})
def bus_config(self):
return self.config.get('bus', {})
def log_config(self):
return self.config.get('log', {})
def system_config(self):
return self.config.get('system', {})
def devices_config(self):
return self.config.get('devices', [])

View File

@@ -1,50 +0,0 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class BaseHandler(ABC):
"""
命令处理器抽象基类
定义所有命令处理器需要实现的基本方法
"""
@abstractmethod
def handle_command(self, command: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
处理命令
Args:
command: 命令类型
data: 命令数据
Returns:
Dict[str, Any]: 处理结果
"""
pass
@abstractmethod
def register_command(self, command: str, handler_func) -> bool:
"""
注册命令处理函数
Args:
command: 命令类型
handler_func: 处理函数
Returns:
bool: 注册是否成功
"""
pass
@abstractmethod
def unregister_command(self, command: str) -> bool:
"""
注销命令处理函数
Args:
command: 命令类型
Returns:
bool: 注销是否成功
"""
pass

View File

@@ -1 +0,0 @@
# 处理接收的指令

View File

@@ -1,43 +0,0 @@
from enum import Enum
class LogLevel(Enum):
"""日志等级枚举"""
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class DeviceType(Enum):
"""设备类型枚举"""
# 传感器类型
TEMPERATURE = "temperature"
HUMIDITY = "humidity"
PRESSURE = "pressure"
LIGHT = "light"
CO2 = "co2"
NH3 = "nh3" # 氨气
H2S = "h2s" # 硫化氢
# 执行器类型
FEED_PORT = "feed_port"
WATER_VALVE = "water_valve"
FAN = "fan"
HEATER = "heater"
COOLER = "cooler"
LIGHT_CONTROLLER = "light_controller"
class BusType(Enum):
"""总线类型枚举"""
SENSOR = "sensor"
ACTUATOR = "actuator"
class ErrorHandlingStrategy(Enum):
"""错误处理策略枚举"""
RETRY = "retry"
SKIP = "skip"
ALERT = "alert"

View File

@@ -1 +0,0 @@
# 心跳包

View File

@@ -1 +0,0 @@
# 定时任务

View File

@@ -1 +0,0 @@
# 设备驱动

View File

@@ -1,100 +0,0 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from enum import Enum
class DeviceStatus(Enum):
"""设备状态枚举"""
UNKNOWN = "unknown"
ONLINE = "online"
OFFLINE = "offline"
ERROR = "error"
BUSY = "busy"
class BaseDevice(ABC):
"""
设备接口抽象基类
定义所有设备需要实现的基本方法
"""
def __init__(self, device_id: str, device_type: str, address: str, bus: str):
"""
初始化设备
Args:
device_id: 设备唯一标识
device_type: 设备类型
address: 设备地址
bus: 所在总线
"""
self.device_id = device_id
self.device_type = device_type
self.address = address
self.bus = bus
self.status = DeviceStatus.UNKNOWN
@abstractmethod
def connect(self) -> bool:
"""
连接设备
Returns:
bool: 连接是否成功
"""
pass
@abstractmethod
def disconnect(self) -> None:
"""
断开设备连接
"""
pass
@abstractmethod
def read_data(self) -> Optional[Dict[str, Any]]:
"""
读取设备数据
Returns:
dict: 设备数据,格式为 {数据名: 数据值}失败时返回None
"""
pass
@abstractmethod
def write_data(self, data: Dict[str, Any]) -> bool:
"""
向设备写入数据
Args:
data: 要写入的数据,格式为 {数据名: 数据值}
Returns:
bool: 写入是否成功
"""
pass
@abstractmethod
def get_status(self) -> DeviceStatus:
"""
获取设备状态
Returns:
DeviceStatus: 设备状态
"""
pass
def get_info(self) -> Dict[str, Any]:
"""
获取设备信息
Returns:
dict: 设备信息
"""
return {
"device_id": self.device_id,
"device_type": self.device_type,
"address": self.address,
"bus": self.bus,
"status": self.status.value
}

View File

@@ -1 +0,0 @@
# 下料口

View File

@@ -1 +0,0 @@
# 温度传感器

View File

@@ -1,21 +0,0 @@
# 项目依赖包
# LoRa通信相关
pyserial>=3.5
paho-mqtt>=1.6.1
# CoAP协议支持
aiocoap>=0.4.5
# LwM2M支持 (使用wakaama-python作为替代)
# 注意lwm2m-client在PyPI上不可用需要手动安装或使用其他实现
# wakaama-python>=0.1.0
# 数据格式(SenML)
senml>=1.0.0
# 其他工具
jsonschema>=4.0.0
# 通用依赖
typing-extensions>=3.10.0; python_version < "3.10"

View File

@@ -1 +0,0 @@
# 持久化存储

View File

@@ -1,109 +0,0 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
class BaseStorage(ABC):
"""
存储接口抽象基类
定义所有存储模块需要实现的基本方法
"""
@abstractmethod
def save(self, key: str, value: Any) -> bool:
"""
保存数据
Args:
key: 键
value: 值
Returns:
bool: 保存是否成功
"""
raise NotImplementedError
@abstractmethod
def load(self, key: str, default: Any = None) -> Any:
"""
加载数据
Args:
key: 键
default: 默认值
Returns:
Any: 加载的数据
"""
raise NotImplementedError
@abstractmethod
def delete(self, key: str) -> bool:
"""
删除数据
Args:
key: 要删除的键
Returns:
bool: 删除是否成功
"""
raise NotImplementedError
@abstractmethod
def exists(self, key: str) -> bool:
"""
检查键是否存在
Args:
key: 键
Returns:
bool: 键是否存在
"""
raise NotImplementedError
@abstractmethod
def list_keys(self) -> List[str]:
"""
列出所有键
Returns:
List[str]: 所有键的列表
"""
raise NotImplementedError
@abstractmethod
def clear(self) -> bool:
"""
清空所有数据
Returns:
bool: 清空是否成功
"""
raise NotImplementedError
@abstractmethod
def save_batch(self, data: Dict[str, Any]) -> bool:
"""
批量保存数据
Args:
data: 要保存的数据字典
Returns:
bool: 保存是否成功
"""
raise NotImplementedError
@abstractmethod
def load_batch(self, keys: List[str]) -> Dict[str, Any]:
"""
批量加载数据
Args:
keys: 要加载的键列表
Returns:
Dict[str, Any]: 加载的数据字典
"""
raise NotImplementedError

View File

@@ -1,40 +0,0 @@
# json 文件实现
import json
from storage.base_storage import BaseStorage
class JSONStorage(BaseStorage):
def __init__(self, filename="data.json"):
self.filename = filename
# 如果文件不存在,先创建空字典
try:
with open(self.filename, "r") as f:
pass
except OSError:
with open(self.filename, "w") as f:
f.write("{}")
def _read_all(self):
with open(self.filename, "r") as f:
return json.load(f)
def _write_all(self, data):
with open(self.filename, "w") as f:
json.dump(data, f)
def save(self, key, value):
data = self._read_all()
data[key] = value
self._write_all(data)
def load(self, key, default=None):
data = self._read_all()
return data.get(key, default)
def delete(self, key):
data = self._read_all()
if key in data:
del data[key]
self._write_all(data)

View File

@@ -1 +0,0 @@
# 内存实现

View File

@@ -1 +0,0 @@
#

View File

@@ -1 +0,0 @@
# 核心逻辑测试

View File

@@ -1 +0,0 @@
#

View File

@@ -1 +0,0 @@
#

View File

@@ -1 +0,0 @@
# 校验工具

View File

@@ -1,57 +0,0 @@
# 兼容PC和MicroPython的文件操作
# compat_fs.py
try:
import uos as os # MicroPython
MICROPYTHON = True
except ImportError:
import os # CPython
MICROPYTHON = False
def list_dir(path="."):
"""列出目录内容"""
return os.listdir(path)
def make_dir(path):
"""创建目录"""
if MICROPYTHON:
os.mkdir(path)
else:
os.makedirs(path, exist_ok=True)
def remove_file(path):
"""删除文件"""
os.remove(path)
def read_file(path, mode="r"):
"""读取文件内容"""
with open(path, mode) as f:
return f.read()
def write_file(path, data, mode="w"):
"""写入文件内容"""
with open(path, mode) as f:
f.write(data)
def is_file(path):
"""判断是否是文件"""
try:
st = os.stat(path)
# MicroPython: stat()[0] >> 14 & 0xF == 8 表示普通文件
if MICROPYTHON:
return (st[0] >> 14) & 0xF == 8
else:
return os.path.isfile(path)
except OSError:
return False
def is_dir(path):
"""判断是否是目录"""
try:
st = os.stat(path)
if MICROPYTHON:
return (st[0] >> 14) & 0xF == 2
else:
return os.path.isdir(path)
except OSError:
return False

View File

@@ -1 +0,0 @@
# 日志

View File

@@ -1 +0,0 @@
# 数据解析工具