#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LoRa协议处理模块 处理LoRa物理层通信相关功能 支持四级结构:平台->中继->区域主控->普通设备 """ import logging import time import random import os from ..base import ProtocolHandler # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class LoRaHandler(ProtocolHandler): """LoRa协议处理器""" def __init__(self, config): """ 初始化LoRa协议处理器 Args: config: 配置对象 """ super().__init__(config) self.initialized = False self.last_config_mtime = None # 上次配置文件修改时间 logger.info("初始化LoRa协议处理器") def initialize(self): """初始化LoRa协议处理器""" logger.info("开始初始化LoRa通信") # 模拟LoRa硬件初始化 time.sleep(0.5) self.initialized = True # 如果启用模拟模式,初始化设备管理器 if self.config.simulation_enabled: self._initialize_simulation() else: logger.info("真实模式,需要连接LoRa硬件") logger.info("LoRa协议处理器初始化完成") def _initialize_simulation(self): """初始化模拟模式""" from internal.simulation import DeviceManager, AreaController, NormalDevice # 如果设备管理器尚未创建,则创建它 if not hasattr(self, 'device_manager') or self.device_manager is None: self.device_manager = DeviceManager() # 检查是否需要重新加载配置(在模拟模式下) self._check_and_reload_config() # 重新初始化设备(根据最新的配置) self.device_manager.reinitialize_devices( self.config.simulation_controllers, self.config.simulation_devices ) def _check_and_reload_config(self): """检查并重新加载配置文件(仅在模拟模式下)""" if not self.config.simulation_enabled: return try: # 获取配置文件的修改时间 current_mtime = os.path.getmtime(self.config.config_path) # 如果是第一次检查或者文件已被修改,则重新加载配置 if self.last_config_mtime is None or current_mtime > self.last_config_mtime: logger.info("检测到配置文件发生变化,重新加载配置") self.config.reload_config() self.last_config_mtime = current_mtime except Exception as e: logger.error(f"检查配置文件修改时间时发生错误: {e}") def send_command(self, command, data): """ 通过LoRa发送命令到设备 Args: command (str): 命令类型 data (dict): 命令数据 Returns: dict: 命令执行结果 """ if not self.initialized: logger.error("LoRa协议处理器未初始化") return { "status": "failed", "message": "LoRa协议处理器未初始化" } logger.info(f"通过LoRa发送命令: {command}, 数据: {data}") # 如果启用模拟模式,直接处理命令 if self.config.simulation_enabled and self.device_manager: # 在模拟模式下,每次都检查配置文件是否有更新 self._check_and_reload_config() result = self._handle_simulated_command(command, data) logger.info(f"模拟命令处理结果: {result}") return result # 真实模式下需要实际发送命令到LoRa设备 logger.info("真实模式下发送命令到LoRa设备") # 这里应该是实际的LoRa通信代码 # 暂时返回模拟响应 time.sleep(0.1) # 模拟通信延迟 return { "status": "success" if random.choice([True, False]) else "failed", "message": "命令发送成功" if random.choice([True, False]) else "命令发送失败", "data": { "device_id": data.get("device_id", "unknown"), "timestamp": time.time() } } def receive_response(self): """ 从LoRa接收设备响应 Returns: dict: 设备响应数据 """ if not self.initialized: logger.error("LoRa协议处理器未初始化") return { "status": "failed", "message": "LoRa协议处理器未初始化" } logger.info("从LoRa接收设备响应") # 如果启用模拟模式,返回模拟响应 if self.config.simulation_enabled and self.device_manager: # 在模拟模式下,我们假设命令已经直接处理完成,不需要单独接收响应 logger.info("模拟模式下命令已直接处理完成") return { "status": "success", "message": "模拟响应", "data": {} } # 真实模式下需要实际从LoRa设备接收响应 logger.info("真实模式下从LoRa设备接收响应") # 这里应该是实际的LoRa通信代码 # 暂时返回模拟响应 time.sleep(0.1) # 模拟通信延迟 return { "status": "success", "message": "接收响应成功", "data": { "response_data": "sample_data", "timestamp": time.time() } } def _handle_simulated_command(self, command, data): """ 处理模拟命令 Args: command (str): 命令类型 data (dict): 命令数据 Returns: dict: 命令执行结果 """ logger.info(f"处理模拟命令: {command}") if command == "control_device": device_id = data.get("device_id") action = data.get("action") return self.device_manager.control_device(device_id, action) elif command == "query_device_status": device_id = data.get("device_id") return self.device_manager.query_device_status(device_id) elif command == "query_all_device_status": statuses = self.device_manager.query_all_device_status() return { "status": "success", "message": "查询所有设备状态成功", "data": statuses } else: return { "status": "failed", "message": f"不支持的命令: {command}" }