#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 中继器测试脚本 用于模拟平台向中继器发送指令 """ import json import time import threading from internal.core import RelayService def test_relay_commands(): """测试中继器指令处理""" print("启动中继器测试") # 创建中继器服务实例(仅用于访问其方法,不启动完整服务) relay_service = RelayService() relay_service.initialize() # 模拟平台发送各种指令 test_commands = [ # 控制设备指令 { "type": "command", "command": "control_device", "data": { "device_id": "3", "action": "on" }, "timestamp": "2023-01-01T12:00:00Z" }, # 查询单个设备状态指令 { "type": "command", "command": "query_device_status", "data": { "device_id": "3" }, "timestamp": "2023-01-01T12:00:05Z" }, # 查询所有设备状态指令 { "type": "command", "command": "query_all_device_status", "timestamp": "2023-01-01T12:00:10Z" } ] print("开始发送测试指令...") for i, command in enumerate(test_commands): print(f"发送指令 {i+1}: {command['command']}") command_json = json.dumps(command) relay_service.handle_platform_command(command_json) time.sleep(1) print("测试指令发送完成") if __name__ == "__main__": test_relay_commands()