Files
relay/internal/simulation/device_interface.py

397 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设备模拟接口模块,用于模拟场内子系统设备
支持四级结构:平台->中继->区域主控->普通设备
"""
import random
import time
import logging
from datetime import datetime
from enum import Enum
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DeviceType(Enum):
"""设备类型枚举"""
AREA_CONTROLLER = "area_controller" # 区域主控
NORMAL_DEVICE = "normal_device" # 普通设备
class SimulatedDevice:
"""模拟设备基类"""
def __init__(self, device_id, device_type, status="stopped"):
"""
初始化模拟设备
Args:
device_id (str): 设备ID
device_type (DeviceType): 设备类型
status (str): 初始状态
"""
self.device_id = device_id
self.device_type = device_type
self.status = status
self.created_at = time.time()
logger.info(f"创建模拟设备: ID={device_id}, 类型={device_type.value}, 状态={status}")
class AreaController(SimulatedDevice):
"""区域主控设备"""
def __init__(self, device_id, lora_address, status="stopped"):
"""
初始化区域主控设备
Args:
device_id (str): 设备ID
lora_address (str): LoRa地址
status (str): 初始状态
"""
super().__init__(device_id, DeviceType.AREA_CONTROLLER, status)
self.lora_address = lora_address
self.devices = {} # 管理的普通设备
logger.info(f"创建区域主控: ID={device_id}, LoRa地址={lora_address}")
def add_device(self, device):
"""
添加普通设备到区域主控
Args:
device (NormalDevice): 普通设备实例
"""
self.devices[device.device_id] = device
logger.info(f"区域主控 {self.device_id} 添加设备: {device.device_id}")
def remove_device(self, device_id):
"""
从区域主控移除普通设备
Args:
device_id (str): 设备ID
"""
if device_id in self.devices:
del self.devices[device_id]
logger.info(f"区域主控 {self.device_id} 移除设备: {device_id}")
def get_device(self, device_id):
"""
获取区域主控下的普通设备
Args:
device_id (str): 设备ID
Returns:
NormalDevice: 普通设备实例如果不存在则返回None
"""
return self.devices.get(device_id)
def get_all_devices(self):
"""
获取区域主控下的所有普通设备
Returns:
list: 所有普通设备实例的列表
"""
return list(self.devices.values())
class NormalDevice(SimulatedDevice):
"""普通设备"""
def __init__(self, device_id, device_type, rs485_bus, rs485_address, controller_id, status="stopped"):
"""
初始化普通设备
Args:
device_id (str): 设备ID
device_type (str): 设备类型(如风机、水帘等)
rs485_bus (str): 485总线号
rs485_address (str): 485总线地址
controller_id (str): 所属区域主控ID
status (str): 初始状态
"""
super().__init__(device_id, DeviceType.NORMAL_DEVICE, status)
self.device_type = device_type
self.rs485_bus = rs485_bus
self.rs485_address = rs485_address
self.controller_id = controller_id
logger.info(f"创建普通设备: ID={device_id}, 类型={device_type}, "
f"485总线={rs485_bus}, 485地址={rs485_address}, 所属主控={controller_id}")
class DeviceManager:
"""设备管理器"""
def __init__(self):
"""初始化设备管理器"""
self.controllers = {} # 区域主控设备
self.devices = {} # 所有普通设备
logger.info("初始化设备管理器")
def add_controller(self, controller):
"""
添加区域主控设备
Args:
controller (AreaController): 区域主控设备实例
"""
self.controllers[controller.device_id] = controller
logger.info(f"添加区域主控到管理器: {controller.device_id}")
def remove_controller(self, controller_id):
"""
移除区域主控设备
Args:
controller_id (str): 区域主控设备ID
"""
if controller_id in self.controllers:
# 同时移除该主控下的所有普通设备
controller = self.controllers[controller_id]
for device_id in list(controller.devices.keys()):
if device_id in self.devices:
del self.devices[device_id]
del self.controllers[controller_id]
logger.info(f"从管理器移除区域主控: {controller_id}")
def get_controller(self, controller_id):
"""
获取区域主控设备
Args:
controller_id (str): 区域主控设备ID
Returns:
AreaController: 区域主控设备实例如果不存在则返回None
"""
controller = self.controllers.get(controller_id)
if controller:
logger.debug(f"获取区域主控: {controller_id}")
else:
logger.warning(f"尝试获取不存在的区域主控: {controller_id}")
return controller
def get_all_controllers(self):
"""
获取所有区域主控设备
Returns:
list: 所有区域主控设备实例的列表
"""
logger.info(f"获取所有区域主控,共 {len(self.controllers)}")
return list(self.controllers.values())
def add_device(self, device):
"""
添加普通设备
Args:
device (NormalDevice): 普通设备实例
"""
self.devices[device.device_id] = device
logger.info(f"添加普通设备到管理器: {device.device_id}")
def remove_device(self, device_id):
"""
移除普通设备
Args:
device_id (str): 设备ID
"""
if device_id in self.devices:
del self.devices[device_id]
logger.info(f"从管理器移除设备: {device_id}")
def get_device(self, device_id):
"""
获取普通设备
Args:
device_id (str): 设备ID
Returns:
NormalDevice: 普通设备实例如果不存在则返回None
"""
device = self.devices.get(device_id)
if device:
logger.debug(f"获取普通设备: {device_id}")
else:
logger.warning(f"尝试获取不存在的普通设备: {device_id}")
return device
def get_all_devices(self):
"""
获取所有普通设备
Returns:
list: 所有普通设备实例的列表
"""
logger.info(f"获取所有普通设备,共 {len(self.devices)}")
return list(self.devices.values())
def reinitialize_devices(self, controllers_config, devices_config):
"""
重新初始化设备(根据新的配置)
Args:
controllers_config (list): 区域主控设备配置列表
devices_config (list): 普通设备配置列表
"""
logger.info("重新初始化设备")
# 清空现有设备
self.controllers.clear()
self.devices.clear()
# 根据新配置创建区域主控设备
controllers = {}
for controller_config in controllers_config:
controller = AreaController(
device_id=controller_config['id'],
lora_address=controller_config['lora_address'],
status=controller_config.get('status', 'stopped')
)
self.add_controller(controller)
controllers[controller.device_id] = controller
# 根据新配置创建普通设备,并关联到区域主控
for device_config in devices_config:
device = NormalDevice(
device_id=device_config['id'],
device_type=device_config['type'],
rs485_bus=device_config['rs485_bus'],
rs485_address=device_config['rs485_address'],
controller_id=device_config['controller_id'],
status=device_config.get('status', 'stopped')
)
self.add_device(device)
# 将设备添加到对应的区域主控中
controller_id = device_config['controller_id']
if controller_id in controllers:
controllers[controller_id].add_device(device)
logger.info(f"设备重新初始化完成,创建了 {len(controllers)} 个区域主控和 {len(devices_config)} 个普通设备")
def control_device(self, device_id, action):
"""
控制指定设备
Args:
device_id (str): 设备ID
action (str): 控制动作
Returns:
dict: 控制结果
"""
logger.info(f"控制设备: ID={device_id}, 动作={action}")
# 查找设备(先在普通设备中查找,再在区域主控中查找)
device = self.get_device(device_id)
if not device:
device = self.get_controller(device_id)
if not device:
return {
"status": "failed",
"message": f"设备不存在: {device_id}"
}
# 执行控制动作
if action in ["on", "off"]:
device.status = "running" if action == "on" else "stopped"
return {
"status": "success",
"message": f"设备控制成功: {action}",
"data": {
"device_id": device_id,
"status": device.status
}
}
else:
return {
"status": "failed",
"message": f"不支持的动作: {action}"
}
def query_device_status(self, device_id):
"""
查询设备状态
Args:
device_id (str): 设备ID
Returns:
dict: 设备状态信息
"""
logger.info(f"查询设备状态: ID={device_id}")
# 查找设备(先在普通设备中查找,再在区域主控中查找)
device = self.get_device(device_id)
if not device:
device = self.get_controller(device_id)
if not device:
return {
"status": "failed",
"message": f"设备不存在: {device_id}"
}
# 返回设备状态
return {
"status": "success",
"message": "查询设备状态成功",
"data": {
"device_id": device_id,
"status": device.status
}
}
def query_all_device_status(self):
"""
查询所有设备状态
Returns:
list: 所有设备状态信息列表
"""
logger.info("查询所有设备状态")
statuses = []
# 添加所有区域主控设备状态
for controller in self.get_all_controllers():
statuses.append({
"device_id": controller.device_id,
"device_type": "area_controller",
"status": controller.status
})
# 添加该控制器下的所有普通设备状态
for device in controller.get_all_devices():
statuses.append({
"device_id": device.device_id,
"device_type": device.device_type,
"status": device.status
})
# 添加未分配到控制器的普通设备状态
controller_device_ids = set()
for controller in self.get_all_controllers():
controller_device_ids.update(controller.devices.keys())
for device in self.get_all_devices():
if device.device_id not in controller_device_ids:
statuses.append({
"device_id": device.device_id,
"device_type": device.device_type,
"status": device.status
})
return statuses