diff --git a/RELAY_API.md b/RELAY_API.md index 96e26b3..fb799f5 100644 --- a/RELAY_API.md +++ b/RELAY_API.md @@ -103,6 +103,29 @@ ws://[server_address]:[port]/ws/device?device_id=[device_id] - `command`: 指令名称,固定为"query_all_device_status" - `timestamp`: 指令发送时间 +### 3.4 心跳包指令 + +平台向中继设备发送心跳包指令,用于检测设备连接状态并获取下级设备状态信息。 + +**请求格式** +```json +{ + "type": "command", + "command": "heartbeat", + "data": { + "timestamp": 1672545600 + }, + "timestamp": "2023-01-01T12:00:00Z" +} +``` + +**参数说明** +- `type`: 消息类型,固定为"command" +- `command`: 指令名称,固定为"heartbeat" +- `data`: 指令数据 + - `timestamp`: 时间戳(Unix时间戳格式) +- `timestamp`: 指令发送时间 + ## 4. 响应接口 ### 4.1 设备控制响应 @@ -167,6 +190,48 @@ ws://[server_address]:[port]/ws/device?device_id=[device_id] } ``` +### 4.4 心跳包响应 + +中继设备响应心跳包指令,返回自身及下级设备的状态信息。 + +**响应格式** +```json +{ + "type": "response", + "command": "heartbeat", + "data": { + "devices": [ + { + "device_id": "relay-001", + "device_type": "relay", + "status": "running" + }, + { + "device_id": "fan-001", + "device_type": "fan", + "status": "running" + }, + { + "device_id": "curtain-001", + "device_type": "water_curtain", + "status": "stopped" + } + ] + }, + "timestamp": "2023-01-01T12:00:05Z" +} +``` + +**参数说明** +- `type`: 消息类型,固定为"response" +- `command`: 指令名称,固定为"heartbeat" +- `data`: 响应数据 + - `devices`: 设备列表 + - `device_id`: 设备唯一标识符 + - `device_type`: 设备类型 + - `status`: 设备状态(如: running, stopped, online, offline等) +- `timestamp`: 平台发送的时间戳, 需要原封不动的返回 + ## 5. 请求-响应机制 平台在发送指令后会等待中继设备的响应,超时时间由配置文件决定,默认为5秒。 @@ -206,6 +271,8 @@ websocket: ## 7. 响应结构定义 +平台提供统一的响应结构定义,用于处理中继设备返回的响应: + ### 7.1 CommandResponse 结构体 ```go @@ -230,7 +297,31 @@ type CommandResponse struct { } ``` -### 7.2 响应处理规则 +### 7.2 ParseData 方法 + +CommandResponse结构体提供了ParseData方法,用于将响应数据解析到指定的结构体中: + +```go +func (cr *CommandResponse) ParseData(target interface{}) error +``` + +使用示例: +```go +// 定义目标结构体 +type DeviceStatus struct { + DeviceID string `json:"device_id"` + Status string `json:"status"` + Message string `json:"message"` +} + +// 解析响应数据 +var status DeviceStatus +if err := response.ParseData(&status); err != nil { + // 处理错误 +} +``` + +### 7.3 响应处理规则 1. `Status` 字段:表示操作的整体状态,如 "success"、"failed" 等 2. `Message` 字段:提供人类可读的操作结果描述 @@ -251,6 +342,7 @@ type CommandResponse struct { |------|------| | fan | 风机设备 | | water_curtain | 水帘设备 | +| relay | 中继设备 | ## 10. 动作说明 @@ -266,4 +358,7 @@ type CommandResponse struct { | success | 操作成功 | | failed | 操作失败 | | running | 设备运行中 | -| stopped | 设备已停止 | \ No newline at end of file +| stopped | 设备已停止 | +| online | 设备在线 | +| offline | 设备离线 | +| active | 设备激活 | \ No newline at end of file diff --git a/config/config.py b/config/config.py index 097631e..51825e6 100644 --- a/config/config.py +++ b/config/config.py @@ -20,12 +20,22 @@ class Config: config_file (str): 配置文件路径 """ # 获取项目根目录 - project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - config_path = os.path.join(project_root, config_file) + self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + self.config_file = config_file + self.config_path = os.path.join(self.project_root, config_file) # 加载配置文件 - with open(config_path, 'r', encoding='utf-8') as f: - self._config = yaml.safe_load(f) + self._config = self._load_config() + + def _load_config(self): + """加载配置文件""" + with open(self.config_path, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) + + def reload_config(self): + """重新加载配置文件""" + self._config = self._load_config() + return self._config @property def simulation_enabled(self): @@ -48,14 +58,9 @@ class Config: return self._config.get('websocket', {}) @property - def websocket_address(self): - """获取WebSocket服务器地址""" - return self.websocket_config.get('address', '127.0.0.1') - - @property - def websocket_port(self): - """获取WebSocket服务器端口""" - return self.websocket_config.get('port', 8080) + def relay_config(self): + """获取中继器配置""" + return self._config.get('relay', {}) # 全局配置实例 diff --git a/internal/core/relay.py b/internal/core/relay.py index a099712..d6ef214 100644 --- a/internal/core/relay.py +++ b/internal/core/relay.py @@ -92,36 +92,24 @@ class RelayService: # 创建WebSocket客户端 self.websocket_client = WebSocketClient(self) - # 连接到平台 - if await self.websocket_client.connect(): - # 启动心跳任务 - heartbeat_task = asyncio.create_task(self._heartbeat_routine()) - - # 监听平台消息 - await self.websocket_client.listen() - - # 取消心跳任务 - heartbeat_task.cancel() - else: - logger.error("无法连接到平台") - - async def _heartbeat_routine(self): - """心跳协程""" - logger.info("心跳协程启动") - - try: - while self.running: - if self.websocket_client and self.websocket_client.is_connected(): - await self.websocket_client.send_heartbeat() + # 持续尝试连接到平台 + while self.running: + # 连接到平台 + if await self.websocket_client.connect(): + # 监听平台消息 + await self.websocket_client.listen() - # 每30秒发送一次心跳 - await asyncio.sleep(30) - except asyncio.CancelledError: - logger.info("心跳协程被取消") - except Exception as e: - logger.error(f"心跳协程发生错误: {e}") + # 如果连接断开但服务仍在运行,则尝试重新连接 + if self.running and not self.websocket_client.is_connected(): + logger.info("连接已断开,尝试重新连接...") + await self.websocket_client.reconnect() + else: + # 连接失败,等待后重试 + logger.info("连接失败,5秒后重试...") + await asyncio.sleep(5) - logger.info("心跳协程停止") + # 服务停止时断开连接 + await self.websocket_client.disconnect() def _main_loop(self): """主循环""" @@ -174,11 +162,13 @@ class RelayService: self._handle_query_device_status(command_data) elif command_name == 'query_all_device_status': self._handle_query_all_device_status(command_data) + elif command_name == 'heartbeat': + self._handle_heartbeat_command(command_data) else: logger.warning(f"未知指令: {command_name}") self._send_error_response(command_data, f"未知指令: {command_name}") elif command_type == 'heartbeat': - self._handle_heartbeat(command_data) + self._handle_heartbeat_message(command_data) elif command_type == 'system': # 处理系统消息 logger.info(f"收到系统消息: {command_name}") @@ -224,21 +214,59 @@ class RelayService: # 发送响应到平台 self._send_command_response(command_data, result) - def _handle_heartbeat(self, command_data): - """处理心跳消息""" - logger.info("收到平台心跳消息") + def _handle_heartbeat_command(self, command_data): + """处理心跳指令""" + logger.info("收到平台心跳指令") + + # 获取所有设备状态 + devices_status = [] + + # 添加中继器自身状态 + devices_status.append({ + "device_id": self.device_id, + "device_type": "relay", + "status": "running" + }) + + # 如果启用了模拟模式,获取模拟设备状态 + if self.lora_handler and config.simulation_enabled and self.lora_handler.device_manager: + # 获取所有区域主控设备状态 + controllers = self.lora_handler.device_manager.get_all_controllers() + for controller in controllers: + devices_status.append({ + "device_id": controller.device_id, + "device_type": "area_controller", + "status": controller.status + }) + + # 获取该控制器下的所有普通设备状态 + devices = controller.get_all_devices() + for device in devices: + devices_status.append({ + "device_id": device.device_id, + "device_type": device.device_type, + "status": device.status + }) - # 发送心跳响应 response = { "type": "response", "command": "heartbeat", - "status": "success", - "message": "心跳响应", - "timestamp": datetime.utcnow().isoformat() + 'Z' + "data": { + "devices": devices_status + }, + "timestamp": command_data.get("timestamp") # 原封不动地返回平台发送的时间戳 } self._send_response(response) + def _handle_heartbeat_message(self, command_data): + """处理心跳消息""" + logger.info("收到平台心跳消息") + + # 根据RELAY_API.md,平台发送的心跳消息不需要特殊处理 + # 设备的心跳响应已经在_handle_heartbeat_command中处理 + pass + def _send_command_response(self, command_data, result_data): """发送指令响应""" response = { diff --git a/internal/protocol/lora/handler.py b/internal/protocol/lora/handler.py index 78a70cd..a0a8ab2 100644 --- a/internal/protocol/lora/handler.py +++ b/internal/protocol/lora/handler.py @@ -10,6 +10,7 @@ LoRa协议处理模块 import logging import time import random +import os from ..base import ProtocolHandler # 配置日志 @@ -29,6 +30,7 @@ class LoRaHandler(ProtocolHandler): """ super().__init__(config) self.initialized = False + self.last_config_mtime = None # 上次配置文件修改时间 logger.info("初始化LoRa协议处理器") def initialize(self): @@ -41,43 +43,46 @@ class LoRaHandler(ProtocolHandler): # 如果启用模拟模式,初始化设备管理器 if self.config.simulation_enabled: - from internal.simulation import DeviceManager, AreaController, NormalDevice - self.device_manager = DeviceManager() - - # 根据配置创建模拟区域主控设备 - controllers = {} - for controller_config in self.config.simulation_controllers: - controller = AreaController( - device_id=controller_config['id'], - lora_address=controller_config['lora_address'], - status=controller_config.get('status', 'stopped') - ) - self.device_manager.add_controller(controller) - controllers[controller.device_id] = controller - - # 根据配置创建模拟普通设备,并关联到区域主控 - for device_config in self.config.simulation_devices: - device = NormalDevice( - device_id=device_config['id'], - device_type=device_config['type'], - rs485_bus=device_config['rs485_bus'], - rs485_address=device_config['rs485_address'], - controller_id=device_config['controller_id'], - status=device_config.get('status', 'stopped') - ) - self.device_manager.add_device(device) - - # 将设备添加到对应的区域主控中 - controller_id = device_config['controller_id'] - if controller_id in controllers: - controllers[controller_id].add_device(device) - - logger.info(f"模拟模式已启用,创建了 {len(controllers)} 个区域主控和 {len(self.config.simulation_devices)} 个普通设备") + self._initialize_simulation() else: logger.info("真实模式,需要连接LoRa硬件") logger.info("LoRa协议处理器初始化完成") + def _initialize_simulation(self): + """初始化模拟模式""" + from internal.simulation import DeviceManager, AreaController, NormalDevice + + # 如果设备管理器尚未创建,则创建它 + if not hasattr(self, 'device_manager') or self.device_manager is None: + self.device_manager = DeviceManager() + + # 检查是否需要重新加载配置(在模拟模式下) + self._check_and_reload_config() + + # 重新初始化设备(根据最新的配置) + self.device_manager.reinitialize_devices( + self.config.simulation_controllers, + self.config.simulation_devices + ) + + def _check_and_reload_config(self): + """检查并重新加载配置文件(仅在模拟模式下)""" + if not self.config.simulation_enabled: + return + + try: + # 获取配置文件的修改时间 + current_mtime = os.path.getmtime(self.config.config_path) + + # 如果是第一次检查或者文件已被修改,则重新加载配置 + if self.last_config_mtime is None or current_mtime > self.last_config_mtime: + logger.info("检测到配置文件发生变化,重新加载配置") + self.config.reload_config() + self.last_config_mtime = current_mtime + except Exception as e: + logger.error(f"检查配置文件修改时间时发生错误: {e}") + def send_command(self, command, data): """ 通过LoRa发送命令到设备 @@ -100,6 +105,8 @@ class LoRaHandler(ProtocolHandler): # 如果启用模拟模式,直接处理命令 if self.config.simulation_enabled and self.device_manager: + # 在模拟模式下,每次都检查配置文件是否有更新 + self._check_and_reload_config() result = self._handle_simulated_command(command, data) logger.info(f"模拟命令处理结果: {result}") return result diff --git a/internal/protocol/websocket/client.py b/internal/protocol/websocket/client.py index 37e8a8e..04a599c 100644 --- a/internal/protocol/websocket/client.py +++ b/internal/protocol/websocket/client.py @@ -93,6 +93,8 @@ class WebSocketClient: return True except Exception as e: logger.error(f"发送消息失败: {e}") + # 连接可能已断开,更新状态 + self.connected = False return False async def listen(self): @@ -125,18 +127,26 @@ class WebSocketClient: logger.error(f"监听平台消息时发生错误: {e}") self.connected = False - async def send_heartbeat(self): - """发送心跳消息""" - if not self.connected: - return False + async def reconnect(self): + """尝试重新连接到平台""" + logger.info("尝试重新连接到平台...") + reconnect_delay = 5 # 初始重连延迟(秒) + max_delay = 60 # 最大重连延迟(秒) - heartbeat_msg = { - "type": "heartbeat", - "device_id": self.device_id, - "timestamp": datetime.utcnow().isoformat() + 'Z' - } - - return await self.send_message(heartbeat_msg) + while self.running and not self.connected: + try: + await self.connect() + if self.connected: + logger.info("重新连接成功") + return True + else: + logger.warning(f"重新连接失败,{reconnect_delay}秒后重试...") + await asyncio.sleep(reconnect_delay) + # 指数退避,但不超过最大延迟 + reconnect_delay = min(reconnect_delay * 2, max_delay) + except Exception as e: + logger.error(f"重新连接时发生错误: {e}") + await asyncio.sleep(reconnect_delay) def is_connected(self): """检查是否已连接""" diff --git a/internal/simulation/device_interface.py b/internal/simulation/device_interface.py index 04eecb7..448532f 100644 --- a/internal/simulation/device_interface.py +++ b/internal/simulation/device_interface.py @@ -207,7 +207,7 @@ class DeviceManager: """ if device_id in self.devices: del self.devices[device_id] - logger.info(f"从管理器移除普通设备: {device_id}") + logger.info(f"从管理器移除设备: {device_id}") def get_device(self, device_id): """ @@ -236,6 +236,50 @@ class DeviceManager: logger.info(f"获取所有普通设备,共 {len(self.devices)} 个") return list(self.devices.values()) + def reinitialize_devices(self, controllers_config, devices_config): + """ + 重新初始化设备(根据新的配置) + + Args: + controllers_config (list): 区域主控设备配置列表 + devices_config (list): 普通设备配置列表 + """ + logger.info("重新初始化设备") + + # 清空现有设备 + self.controllers.clear() + self.devices.clear() + + # 根据新配置创建区域主控设备 + controllers = {} + for controller_config in controllers_config: + controller = AreaController( + device_id=controller_config['id'], + lora_address=controller_config['lora_address'], + status=controller_config.get('status', 'stopped') + ) + self.add_controller(controller) + controllers[controller.device_id] = controller + + # 根据新配置创建普通设备,并关联到区域主控 + for device_config in devices_config: + device = NormalDevice( + device_id=device_config['id'], + device_type=device_config['type'], + rs485_bus=device_config['rs485_bus'], + rs485_address=device_config['rs485_address'], + controller_id=device_config['controller_id'], + status=device_config.get('status', 'stopped') + ) + self.add_device(device) + + # 将设备添加到对应的区域主控中 + controller_id = device_config['controller_id'] + if controller_id in controllers: + controllers[controller_id].add_device(device) + + logger.info(f"设备重新初始化完成,创建了 {len(controllers)} 个区域主控和 {len(devices_config)} 个普通设备") + def control_device(self, device_id, action): """ 控制指定设备 @@ -249,107 +293,37 @@ class DeviceManager: """ logger.info(f"控制设备: ID={device_id}, 动作={action}") - # 先尝试查找普通设备 + # 查找设备(先在普通设备中查找,再在区域主控中查找) device = self.get_device(device_id) - if device: - return self._control_normal_device(device, action) + if not device: + device = self.get_controller(device_id) - # 再尝试查找区域主控设备 - controller = self.get_controller(device_id) - if controller: - return self._control_area_controller(controller, action) - - result = { - "device_id": device_id, - "status": "failed", - "message": f"设备 {device_id} 不存在" - } - logger.error(f"控制设备失败: 设备 {device_id} 不存在") - return result - - def _control_normal_device(self, device, action): - """ - 控制普通设备 - - Args: - device (NormalDevice): 普通设备实例 - action (str): 控制动作 - - Returns: - dict: 控制结果 - """ - logger.info(f"控制普通设备 {device.device_id}: 动作={action}") - - if action == "on": - device.status = "running" - result = { - "device_id": device.device_id, - "status": "success", - "message": f"设备 {device.device_id} 已开启" - } - logger.info(f"普通设备 {device.device_id} 开启成功") - return result - elif action == "off": - device.status = "stopped" - result = { - "device_id": device.device_id, - "status": "success", - "message": f"设备 {device.device_id} 已关闭" - } - logger.info(f"普通设备 {device.device_id} 关闭成功") - return result - else: - result = { - "device_id": device.device_id, + if not device: + return { "status": "failed", - "message": f"不支持的操作: {action}" + "message": f"设备不存在: {device_id}" } - logger.warning(f"普通设备 {device.device_id} 不支持的操作: {action}") - return result - - def _control_area_controller(self, controller, action): - """ - 控制区域主控设备 - Args: - controller (AreaController): 区域主控设备实例 - action (str): 控制动作 - - Returns: - dict: 控制结果 - """ - logger.info(f"控制区域主控 {controller.device_id}: 动作={action}") - - if action == "on": - controller.status = "running" - result = { - "device_id": controller.device_id, + # 执行控制动作 + if action in ["on", "off"]: + device.status = "running" if action == "on" else "stopped" + return { "status": "success", - "message": f"区域主控 {controller.device_id} 已开启" + "message": f"设备控制成功: {action}", + "data": { + "device_id": device_id, + "status": device.status + } } - logger.info(f"区域主控 {controller.device_id} 开启成功") - return result - elif action == "off": - controller.status = "stopped" - result = { - "device_id": controller.device_id, - "status": "success", - "message": f"区域主控 {controller.device_id} 已关闭" - } - logger.info(f"区域主控 {controller.device_id} 关闭成功") - return result else: - result = { - "device_id": controller.device_id, + return { "status": "failed", - "message": f"不支持的操作: {action}" + "message": f"不支持的动作: {action}" } - logger.warning(f"区域主控 {controller.device_id} 不支持的操作: {action}") - return result def query_device_status(self, device_id): """ - 查询指定设备状态 + 查询设备状态 Args: device_id (str): 设备ID @@ -359,87 +333,26 @@ class DeviceManager: """ logger.info(f"查询设备状态: ID={device_id}") - # 先尝试查找普通设备 + # 查找设备(先在普通设备中查找,再在区域主控中查找) device = self.get_device(device_id) - if device: - return self._query_normal_device_status(device) + if not device: + device = self.get_controller(device_id) - # 再尝试查找区域主控设备 - controller = self.get_controller(device_id) - if controller: - return self._query_area_controller_status(controller) - - result = { - "device_id": device_id, - "status": "failed", - "message": f"设备 {device_id} 不存在" - } - logger.error(f"查询设备状态失败: 设备 {device_id} 不存在") - return result - - def _query_normal_device_status(self, device): - """ - 查询普通设备状态 - - Args: - device (NormalDevice): 普通设备实例 - - Returns: - dict: 设备状态信息 - """ - logger.info(f"查询普通设备 {device.device_id} 状态") - - # 模拟一些随机的设备数据 - if device.status == "running": - power = random.randint(200, 240) - current = random.uniform(4.0, 6.0) - result = { - "device_id": device.device_id, - "device_type": device.device_type, - "status": device.status, - "rs485_bus": device.rs485_bus, - "rs485_address": device.rs485_address, - "controller_id": device.controller_id, - "power": power, - "current": round(current, 2) + if not device: + return { + "status": "failed", + "message": f"设备不存在: {device_id}" } - logger.info(f"普通设备 {device.device_id} 状态: 运行中, 功率={power}V, 电流={round(current, 2)}A") - return result - else: - result = { - "device_id": device.device_id, - "device_type": device.device_type, - "status": device.status, - "rs485_bus": device.rs485_bus, - "rs485_address": device.rs485_address, - "controller_id": device.controller_id, - "power": 0, - "current": 0.0 + + # 返回设备状态 + return { + "status": "success", + "message": "查询设备状态成功", + "data": { + "device_id": device_id, + "status": device.status } - logger.info(f"普通设备 {device.device_id} 状态: 已停止") - return result - - def _query_area_controller_status(self, controller): - """ - 查询区域主控状态 - - Args: - controller (AreaController): 区域主控设备实例 - - Returns: - dict: 设备状态信息 - """ - logger.info(f"查询区域主控 {controller.device_id} 状态") - - result = { - "device_id": controller.device_id, - "device_type": "area_controller", - "status": controller.status, - "lora_address": controller.lora_address, - "managed_devices": len(controller.devices) } - logger.info(f"区域主控 {controller.device_id} 状态: {controller.status}, 管理设备数: {len(controller.devices)}") - return result def query_all_device_status(self): """ @@ -449,32 +362,36 @@ class DeviceManager: list: 所有设备状态信息列表 """ logger.info("查询所有设备状态") + statuses = [] - # 添加区域主控状态 - for controller in self.controllers.values(): - status_info = { + # 添加所有区域主控设备状态 + for controller in self.get_all_controllers(): + statuses.append({ "device_id": controller.device_id, "device_type": "area_controller", - "status": controller.status, - "lora_address": controller.lora_address, - "managed_devices": len(controller.devices) - } - statuses.append(status_info) - logger.debug(f"区域主控 {controller.device_id} 状态: {controller.status}") + "status": controller.status + }) + + # 添加该控制器下的所有普通设备状态 + for device in controller.get_all_devices(): + statuses.append({ + "device_id": device.device_id, + "device_type": device.device_type, + "status": device.status + }) - # 添加普通设备状态 - for device in self.devices.values(): - status_info = { - "device_id": device.device_id, - "device_type": device.device_type, - "status": device.status, - "rs485_bus": device.rs485_bus, - "rs485_address": device.rs485_address, - "controller_id": device.controller_id - } - statuses.append(status_info) - logger.debug(f"普通设备 {device.device_id} 状态: {device.status}") + # 添加未分配到控制器的普通设备状态 + controller_device_ids = set() + for controller in self.get_all_controllers(): + controller_device_ids.update(controller.devices.keys()) + + for device in self.get_all_devices(): + if device.device_id not in controller_device_ids: + statuses.append({ + "device_id": device.device_id, + "device_type": device.device_type, + "status": device.status + }) - logger.info(f"查询所有设备状态完成,共 {len(statuses)} 个设备") return statuses \ No newline at end of file