#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 设备模拟接口模块,用于模拟场内子系统设备 支持四级结构:平台->中继->区域主控->普通设备 """ import random import time import logging from datetime import datetime from enum import Enum # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DeviceType(Enum): """设备类型枚举""" AREA_CONTROLLER = "area_controller" # 区域主控 NORMAL_DEVICE = "normal_device" # 普通设备 class SimulatedDevice: """模拟设备基类""" def __init__(self, device_id, device_type, status="stopped"): """ 初始化模拟设备 Args: device_id (str): 设备ID device_type (DeviceType): 设备类型 status (str): 初始状态 """ self.device_id = device_id self.device_type = device_type self.status = status self.created_at = time.time() logger.info(f"创建模拟设备: ID={device_id}, 类型={device_type.value}, 状态={status}") class AreaController(SimulatedDevice): """区域主控设备""" def __init__(self, device_id, lora_address, status="stopped"): """ 初始化区域主控设备 Args: device_id (str): 设备ID lora_address (str): LoRa地址 status (str): 初始状态 """ super().__init__(device_id, DeviceType.AREA_CONTROLLER, status) self.lora_address = lora_address self.devices = {} # 管理的普通设备 logger.info(f"创建区域主控: ID={device_id}, LoRa地址={lora_address}") def add_device(self, device): """ 添加普通设备到区域主控 Args: device (NormalDevice): 普通设备实例 """ self.devices[device.device_id] = device logger.info(f"区域主控 {self.device_id} 添加设备: {device.device_id}") def remove_device(self, device_id): """ 从区域主控移除普通设备 Args: device_id (str): 设备ID """ if device_id in self.devices: del self.devices[device_id] logger.info(f"区域主控 {self.device_id} 移除设备: {device_id}") def get_device(self, device_id): """ 获取区域主控下的普通设备 Args: device_id (str): 设备ID Returns: NormalDevice: 普通设备实例,如果不存在则返回None """ return self.devices.get(device_id) def get_all_devices(self): """ 获取区域主控下的所有普通设备 Returns: list: 所有普通设备实例的列表 """ return list(self.devices.values()) class NormalDevice(SimulatedDevice): """普通设备""" def __init__(self, device_id, device_type, rs485_bus, rs485_address, controller_id, status="stopped"): """ 初始化普通设备 Args: device_id (str): 设备ID device_type (str): 设备类型(如风机、水帘等) rs485_bus (str): 485总线号 rs485_address (str): 485总线地址 controller_id (str): 所属区域主控ID status (str): 初始状态 """ super().__init__(device_id, DeviceType.NORMAL_DEVICE, status) self.device_type = device_type self.rs485_bus = rs485_bus self.rs485_address = rs485_address self.controller_id = controller_id logger.info(f"创建普通设备: ID={device_id}, 类型={device_type}, " f"485总线={rs485_bus}, 485地址={rs485_address}, 所属主控={controller_id}") class DeviceManager: """设备管理器""" def __init__(self): """初始化设备管理器""" self.controllers = {} # 区域主控设备 self.devices = {} # 所有普通设备 logger.info("初始化设备管理器") def add_controller(self, controller): """ 添加区域主控设备 Args: controller (AreaController): 区域主控设备实例 """ self.controllers[controller.device_id] = controller logger.info(f"添加区域主控到管理器: {controller.device_id}") def remove_controller(self, controller_id): """ 移除区域主控设备 Args: controller_id (str): 区域主控设备ID """ if controller_id in self.controllers: # 同时移除该主控下的所有普通设备 controller = self.controllers[controller_id] for device_id in list(controller.devices.keys()): if device_id in self.devices: del self.devices[device_id] del self.controllers[controller_id] logger.info(f"从管理器移除区域主控: {controller_id}") def get_controller(self, controller_id): """ 获取区域主控设备 Args: controller_id (str): 区域主控设备ID Returns: AreaController: 区域主控设备实例,如果不存在则返回None """ controller = self.controllers.get(controller_id) if controller: logger.debug(f"获取区域主控: {controller_id}") else: logger.warning(f"尝试获取不存在的区域主控: {controller_id}") return controller def get_all_controllers(self): """ 获取所有区域主控设备 Returns: list: 所有区域主控设备实例的列表 """ logger.info(f"获取所有区域主控,共 {len(self.controllers)} 个") return list(self.controllers.values()) def add_device(self, device): """ 添加普通设备 Args: device (NormalDevice): 普通设备实例 """ self.devices[device.device_id] = device logger.info(f"添加普通设备到管理器: {device.device_id}") def remove_device(self, device_id): """ 移除普通设备 Args: device_id (str): 设备ID """ if device_id in self.devices: del self.devices[device_id] logger.info(f"从管理器移除普通设备: {device_id}") def get_device(self, device_id): """ 获取普通设备 Args: device_id (str): 设备ID Returns: NormalDevice: 普通设备实例,如果不存在则返回None """ device = self.devices.get(device_id) if device: logger.debug(f"获取普通设备: {device_id}") else: logger.warning(f"尝试获取不存在的普通设备: {device_id}") return device def get_all_devices(self): """ 获取所有普通设备 Returns: list: 所有普通设备实例的列表 """ logger.info(f"获取所有普通设备,共 {len(self.devices)} 个") return list(self.devices.values()) def control_device(self, device_id, action): """ 控制指定设备 Args: device_id (str): 设备ID action (str): 控制动作 Returns: dict: 控制结果 """ logger.info(f"控制设备: ID={device_id}, 动作={action}") # 先尝试查找普通设备 device = self.get_device(device_id) if device: return self._control_normal_device(device, action) # 再尝试查找区域主控设备 controller = self.get_controller(device_id) if controller: return self._control_area_controller(controller, action) result = { "device_id": device_id, "status": "failed", "message": f"设备 {device_id} 不存在" } logger.error(f"控制设备失败: 设备 {device_id} 不存在") return result def _control_normal_device(self, device, action): """ 控制普通设备 Args: device (NormalDevice): 普通设备实例 action (str): 控制动作 Returns: dict: 控制结果 """ logger.info(f"控制普通设备 {device.device_id}: 动作={action}") if action == "on": device.status = "running" result = { "device_id": device.device_id, "status": "success", "message": f"设备 {device.device_id} 已开启" } logger.info(f"普通设备 {device.device_id} 开启成功") return result elif action == "off": device.status = "stopped" result = { "device_id": device.device_id, "status": "success", "message": f"设备 {device.device_id} 已关闭" } logger.info(f"普通设备 {device.device_id} 关闭成功") return result else: result = { "device_id": device.device_id, "status": "failed", "message": f"不支持的操作: {action}" } logger.warning(f"普通设备 {device.device_id} 不支持的操作: {action}") return result def _control_area_controller(self, controller, action): """ 控制区域主控设备 Args: controller (AreaController): 区域主控设备实例 action (str): 控制动作 Returns: dict: 控制结果 """ logger.info(f"控制区域主控 {controller.device_id}: 动作={action}") if action == "on": controller.status = "running" result = { "device_id": controller.device_id, "status": "success", "message": f"区域主控 {controller.device_id} 已开启" } logger.info(f"区域主控 {controller.device_id} 开启成功") return result elif action == "off": controller.status = "stopped" result = { "device_id": controller.device_id, "status": "success", "message": f"区域主控 {controller.device_id} 已关闭" } logger.info(f"区域主控 {controller.device_id} 关闭成功") return result else: result = { "device_id": controller.device_id, "status": "failed", "message": f"不支持的操作: {action}" } logger.warning(f"区域主控 {controller.device_id} 不支持的操作: {action}") return result def query_device_status(self, device_id): """ 查询指定设备状态 Args: device_id (str): 设备ID Returns: dict: 设备状态信息 """ logger.info(f"查询设备状态: ID={device_id}") # 先尝试查找普通设备 device = self.get_device(device_id) if device: return self._query_normal_device_status(device) # 再尝试查找区域主控设备 controller = self.get_controller(device_id) if controller: return self._query_area_controller_status(controller) result = { "device_id": device_id, "status": "failed", "message": f"设备 {device_id} 不存在" } logger.error(f"查询设备状态失败: 设备 {device_id} 不存在") return result def _query_normal_device_status(self, device): """ 查询普通设备状态 Args: device (NormalDevice): 普通设备实例 Returns: dict: 设备状态信息 """ logger.info(f"查询普通设备 {device.device_id} 状态") # 模拟一些随机的设备数据 if device.status == "running": power = random.randint(200, 240) current = random.uniform(4.0, 6.0) result = { "device_id": device.device_id, "device_type": device.device_type, "status": device.status, "rs485_bus": device.rs485_bus, "rs485_address": device.rs485_address, "controller_id": device.controller_id, "power": power, "current": round(current, 2) } logger.info(f"普通设备 {device.device_id} 状态: 运行中, 功率={power}V, 电流={round(current, 2)}A") return result else: result = { "device_id": device.device_id, "device_type": device.device_type, "status": device.status, "rs485_bus": device.rs485_bus, "rs485_address": device.rs485_address, "controller_id": device.controller_id, "power": 0, "current": 0.0 } logger.info(f"普通设备 {device.device_id} 状态: 已停止") return result def _query_area_controller_status(self, controller): """ 查询区域主控状态 Args: controller (AreaController): 区域主控设备实例 Returns: dict: 设备状态信息 """ logger.info(f"查询区域主控 {controller.device_id} 状态") result = { "device_id": controller.device_id, "device_type": "area_controller", "status": controller.status, "lora_address": controller.lora_address, "managed_devices": len(controller.devices) } logger.info(f"区域主控 {controller.device_id} 状态: {controller.status}, 管理设备数: {len(controller.devices)}") return result def query_all_device_status(self): """ 查询所有设备状态 Returns: list: 所有设备状态信息列表 """ logger.info("查询所有设备状态") statuses = [] # 添加区域主控状态 for controller in self.controllers.values(): status_info = { "device_id": controller.device_id, "device_type": "area_controller", "status": controller.status, "lora_address": controller.lora_address, "managed_devices": len(controller.devices) } statuses.append(status_info) logger.debug(f"区域主控 {controller.device_id} 状态: {controller.status}") # 添加普通设备状态 for device in self.devices.values(): status_info = { "device_id": device.device_id, "device_type": device.device_type, "status": device.status, "rs485_bus": device.rs485_bus, "rs485_address": device.rs485_address, "controller_id": device.controller_id } statuses.append(status_info) logger.debug(f"普通设备 {device.device_id} 状态: {device.status}") logger.info(f"查询所有设备状态完成,共 {len(statuses)} 个设备") return statuses