Files
relay/tests/test_relay.py

63 lines
1.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
中继器测试脚本
用于模拟平台向中继器发送指令
"""
import json
import time
import threading
from internal.core import RelayService
def test_relay_commands():
"""测试中继器指令处理"""
print("启动中继器测试")
# 创建中继器服务实例(仅用于访问其方法,不启动完整服务)
relay_service = RelayService()
relay_service.initialize()
# 模拟平台发送各种指令
test_commands = [
# 控制设备指令
{
"type": "command",
"command": "control_device",
"data": {
"device_id": "3",
"action": "on"
},
"timestamp": "2023-01-01T12:00:00Z"
},
# 查询单个设备状态指令
{
"type": "command",
"command": "query_device_status",
"data": {
"device_id": "3"
},
"timestamp": "2023-01-01T12:00:05Z"
},
# 查询所有设备状态指令
{
"type": "command",
"command": "query_all_device_status",
"timestamp": "2023-01-01T12:00:10Z"
}
]
print("开始发送测试指令...")
for i, command in enumerate(test_commands):
print(f"发送指令 {i+1}: {command['command']}")
command_json = json.dumps(command)
relay_service.handle_platform_command(command_json)
time.sleep(1)
print("测试指令发送完成")
if __name__ == "__main__":
test_relay_commands()