63 lines
1.6 KiB
Python
63 lines
1.6 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
中继器测试脚本
|
|
用于模拟平台向中继器发送指令
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import threading
|
|
from internal.core import RelayService
|
|
|
|
def test_relay_commands():
|
|
"""测试中继器指令处理"""
|
|
print("启动中继器测试")
|
|
|
|
# 创建中继器服务实例(仅用于访问其方法,不启动完整服务)
|
|
relay_service = RelayService()
|
|
relay_service.initialize()
|
|
|
|
# 模拟平台发送各种指令
|
|
test_commands = [
|
|
# 控制设备指令
|
|
{
|
|
"type": "command",
|
|
"command": "control_device",
|
|
"data": {
|
|
"device_id": "3",
|
|
"action": "on"
|
|
},
|
|
"timestamp": "2023-01-01T12:00:00Z"
|
|
},
|
|
|
|
# 查询单个设备状态指令
|
|
{
|
|
"type": "command",
|
|
"command": "query_device_status",
|
|
"data": {
|
|
"device_id": "3"
|
|
},
|
|
"timestamp": "2023-01-01T12:00:05Z"
|
|
},
|
|
|
|
# 查询所有设备状态指令
|
|
{
|
|
"type": "command",
|
|
"command": "query_all_device_status",
|
|
"timestamp": "2023-01-01T12:00:10Z"
|
|
}
|
|
]
|
|
|
|
print("开始发送测试指令...")
|
|
for i, command in enumerate(test_commands):
|
|
print(f"发送指令 {i+1}: {command['command']}")
|
|
command_json = json.dumps(command)
|
|
relay_service.handle_platform_command(command_json)
|
|
time.sleep(1)
|
|
|
|
print("测试指令发送完成")
|
|
|
|
if __name__ == "__main__":
|
|
test_relay_commands() |