Files
pig-house-controller/config.py
2025-09-07 15:46:10 +08:00

165 lines
5.3 KiB
Python

import json
import os
class Config:
def __init__(self, config_file="config.json"):
self.config_file = config_file
self.default_config = {
# LoRa通信配置
"lora": {
"address": "0x1234",
"frequency": 433,
"bandwidth": 125,
"spreading_factor": 7,
"coding_rate": 5,
"encryption_key": "default_key"
},
# 上位机配置
"master": {
"lora_address": "0x5678",
"protocol": "modbus"
},
# 总线配置
"bus": {
"sensor": {
"port": "/dev/ttyUSB0",
"baudrate": 9600,
"devices": []
},
"actuator": {
"port": "/dev/ttyUSB1",
"baudrate": 9600,
"devices": []
}
},
# 日志配置
"log": {
"level": "INFO",
"file_path": "logs/pig_house.log",
"max_size": "1KB", # 减小日志文件大小
"backup_count": 1, # 只保留一个备份文件
"report_errors": True, # 是否上报错误
"terminate_on_report_failure": True # 上报失败时是否终止程序
},
# 系统参数
"system": {
"heartbeat_interval": 60, # 心跳包间隔(秒)
"data_collection_interval": 300, # 数据采集间隔(秒)
"command_timeout": 10, # 命令超时时间(秒)
"retry_count": 3, # 重试次数
"error_handling": "retry"
},
# 设备配置
"devices": [
{
"id": "temp_01",
"type": "temperature",
"address": "0x01",
"bus": "sensor",
"unit": "celsius"
},
{
"id": "humidity_01",
"type": "humidity",
"address": "0x02",
"bus": "sensor",
"unit": "%"
},
{
"id": "feed_01",
"type": "feed_port",
"address": "0x10",
"bus": "actuator"
},
{
"id": "water_01",
"type": "water_valve",
"address": "0x11",
"bus": "actuator"
}
]
}
self.config = {}
self.load_config()
def load_config(self):
"""加载配置文件"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, "r", encoding="utf-8") as file:
config_data = json.load(file)
# 合并默认配置和文件配置
self.config = self._merge_dict(self.default_config, config_data)
except Exception as e:
print(f"加载配置文件出错: {e},使用默认配置")
self.config = self.default_config
else:
print("配置文件不存在,使用默认配置")
self.config = self.default_config
self.save_config()
def save_config(self):
"""保存配置到文件"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(self.config_file), exist_ok=True)
except:
pass
with open(self.config_file, "w", encoding="utf-8") as file:
json.dump(self.config, file, indent=4, ensure_ascii=False)
def get(self, key_path, default=None):
"""根据路径获取配置项,例如: get('lora.address')"""
keys = key_path.split('.')
value = self.config
try:
for key in keys:
value = value[key]
return value
except (KeyError, TypeError):
return default
def set(self, key_path, value):
"""根据路径设置配置项"""
keys = key_path.split('.')
config_ref = self.config
for key in keys[:-1]:
if key not in config_ref:
config_ref[key] = {}
config_ref = config_ref[key]
config_ref[keys[-1]] = value
def _merge_dict(self, default, override):
"""合并两个字典,保留默认值"""
merged = default.copy()
for key, value in override.items():
if key in merged and isinstance(merged[key], dict) and isinstance(value, dict):
merged[key] = self._merge_dict(merged[key], value)
else:
merged[key] = value
return merged
# 便捷访问方法
def lora_config(self):
return self.config.get('lora', {})
def master_config(self):
return self.config.get('master', {})
def bus_config(self):
return self.config.get('bus', {})
def log_config(self):
return self.config.get('log', {})
def system_config(self):
return self.config.get('system', {})
def devices_config(self):
return self.config.get('devices', [])