From c117759ac3d1c2099f8ece816a5f777014ff3e4e Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Thu, 25 Sep 2025 20:14:48 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E6=98=93=E7=89=88=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 2 + client_pb.py | 306 ++++++++++++++++++++++++++++++++++++++++++++ main.py | 311 +++++++++++++++++++++++++++++++++++++++++++++ proto/client.proto | 32 +++++ 4 files changed, 651 insertions(+) create mode 100644 Makefile create mode 100644 client_pb.py create mode 100644 proto/client.proto diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..20d3d66 --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +swag: + python -m grpc_tools.protoc -I./proto --python_out=./proto ./proto/client.proto diff --git a/client_pb.py b/client_pb.py new file mode 100644 index 0000000..16f5f20 --- /dev/null +++ b/client_pb.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +根据client.proto生成的解析代码 +适用于ESP32 MicroPython环境 +""" + +import struct + +# MethodType枚举 +METHOD_TYPE_SWITCH = 0 +METHOD_TYPE_COLLECT = 1 + +def encode_varint(value): + """编码varint值""" + buf = bytearray() + while value >= 0x80: + buf.append((value & 0x7F) | 0x80) + value >>= 7 + buf.append(value & 0x7F) + return buf + +def decode_varint(buf, pos=0): + """解码varint值""" + result = 0 + shift = 0 + while pos < len(buf): + byte = buf[pos] + pos += 1 + result |= (byte & 0x7F) << shift + if not (byte & 0x80): + break + shift += 7 + return result, pos + +def encode_string(value): + """编码字符串""" + value_bytes = value.encode('utf-8') + length = encode_varint(len(value_bytes)) + return length + value_bytes + +def decode_string(buf, pos=0): + """解码字符串""" + length, pos = decode_varint(buf, pos) + value = buf[pos:pos+length].decode('utf-8') + pos += length + return value, pos + +def encode_instruction(method, data): + """ + 编码Instruction消息 + + Args: + method: 方法类型 (int) + data: 数据 (bytes) + + Returns: + bytearray: 编码后的数据 + """ + result = bytearray() + + # 编码method字段 (field_number=1, wire_type=0) + result.extend(encode_varint((1 << 3) | 0)) # tag + result.extend(encode_varint(method)) # value + + # 编码data字段 (field_number=2, wire_type=2) + result.extend(encode_varint((2 << 3) | 2)) # tag + result.extend(encode_varint(len(data))) # length + result.extend(data) # value + + return result + +def decode_instruction(buf): + """ + 解码Instruction消息 + + Args: + buf: 编码后的数据 (bytes) + + Returns: + dict: 解码后的消息 + """ + result = {} + pos = 0 + + while pos < len(buf): + # 读取标签 + tag, pos = decode_varint(buf, pos) + field_number = tag >> 3 + wire_type = tag & 0x07 + + if field_number == 1: # method字段 + if wire_type == 0: # varint类型 + value, pos = decode_varint(buf, pos) + result['method'] = value + elif field_number == 2: # data字段 + if wire_type == 2: # 长度分隔类型 + length, pos = decode_varint(buf, pos) + value = buf[pos:pos+length] + pos += length + result['data'] = value + else: + # 跳过未知字段 + if wire_type == 0: # varint + _, pos = decode_varint(buf, pos) + elif wire_type == 2: # 长度分隔 + length, pos = decode_varint(buf, pos) + pos += length + else: + pos += 1 + + return result + +def encode_switch(device_action, bus_number, bus_address, relay_channel): + """ + 编码Switch消息 + + Args: + device_action: 设备动作指令 (str) + bus_number: 总线号 (int) + bus_address: 总线地址 (int) + relay_channel: 继电器通道号 (int) + + Returns: + bytearray: 编码后的数据 + """ + result = bytearray() + + # 编码device_action字段 (field_number=1, wire_type=2) + result.extend(encode_varint((1 << 3) | 2)) # tag + action_bytes = encode_string(device_action) # value (length + string) + result.extend(action_bytes) + + # 编码bus_number字段 (field_number=2, wire_type=0) + result.extend(encode_varint((2 << 3) | 0)) # tag + result.extend(encode_varint(bus_number)) # value + + # 编码bus_address字段 (field_number=3, wire_type=0) + result.extend(encode_varint((3 << 3) | 0)) # tag + result.extend(encode_varint(bus_address)) # value + + # 编码relay_channel字段 (field_number=4, wire_type=0) + result.extend(encode_varint((4 << 3) | 0)) # tag + result.extend(encode_varint(relay_channel)) # value + + return result + +def decode_switch(buf): + """ + 解码Switch消息 + + Args: + buf: 编码后的数据 (bytes) + + Returns: + dict: 解码后的消息 + """ + result = {} + pos = 0 + + while pos < len(buf): + # 读取标签 + tag, pos = decode_varint(buf, pos) + field_number = tag >> 3 + wire_type = tag & 0x07 + + if field_number == 1: # device_action字段 + if wire_type == 2: # 字符串类型 + value, pos = decode_string(buf, pos) + result['device_action'] = value + elif field_number == 2: # bus_number字段 + if wire_type == 0: # varint类型 + value, pos = decode_varint(buf, pos) + result['bus_number'] = value + elif field_number == 3: # bus_address字段 + if wire_type == 0: # varint类型 + value, pos = decode_varint(buf, pos) + result['bus_address'] = value + elif field_number == 4: # relay_channel字段 + if wire_type == 0: # varint类型 + value, pos = decode_varint(buf, pos) + result['relay_channel'] = value + else: + # 跳过未知字段 + if wire_type == 0: # varint + _, pos = decode_varint(buf, pos) + elif wire_type == 2: # 长度分隔 + length, pos = decode_varint(buf, pos) + pos += length + else: + pos += 1 + + return result + +def encode_collect(bus_number, bus_address, value): + """ + 编码Collect消息 + + Args: + bus_number: 总线号 (int) + bus_address: 总线地址 (int) + value: 采集值 (float) + + Returns: + bytearray: 编码后的数据 + """ + result = bytearray() + + # 编码bus_number字段 (field_number=1, wire_type=0) + result.extend(encode_varint((1 << 3) | 0)) # tag + result.extend(encode_varint(bus_number)) # value + + # 编码bus_address字段 (field_number=2, wire_type=0) + result.extend(encode_varint((2 << 3) | 0)) # tag + result.extend(encode_varint(bus_address)) # value + + # 编码value字段 (field_number=3, wire_type=5) + result.extend(encode_varint((3 << 3) | 5)) # tag + # 将float转换为little-endian的4字节 + result.extend(struct.pack('> 3 + wire_type = tag & 0x07 + + if field_number == 1: # bus_number字段 + if wire_type == 0: # varint类型 + value, pos = decode_varint(buf, pos) + result['bus_number'] = value + elif field_number == 2: # bus_address字段 + if wire_type == 0: # varint类型 + value, pos = decode_varint(buf, pos) + result['bus_address'] = value + elif field_number == 3: # value字段 + if wire_type == 5: # 32位浮点类型 + # 从little-endian的4字节解析float + value = struct.unpack('= 3 and buffer[0] == 0xAA and buffer[-1] == 0x55: + # 检查地址是否匹配 + if len(buffer) >= 3 and buffer[2] == ESP32_ADDRESS: + # 提取有效数据(去掉帧头、地址、校验和帧尾) + return buffer[3:-2] + else: + break # 没有更多数据可读 + + return None + +def send_lora_message(data): + """ + 通过LoRaWAN模块发送消息 + + 参数: + data: 要发送的字节数据 + """ + # 切换到发送模式 + rs485_re_de_pin.value(1) + time.sleep_ms(10) + + try: + # 构造发送给LoRaWAN模块的RS485帧 + frame = bytearray() + frame.append(0xAA) # 帧头 + frame.append(LORA_MODULE_ADDRESS & 0xFF) # LoRaWAN模块地址 + frame.append(ESP32_ADDRESS & 0xFF) # 本设备地址(作为源地址) + + # 添加数据 + frame.extend(data) + + # 计算校验和 + checksum = sum(frame[1:]) & 0xFF + frame.append(checksum) + frame.append(0x55) # 帧尾 + + # 发送命令 + rs485_uart.write(frame) + print(f"通过LoRa发送数据: {frame.hex()}") + + finally: + # 切换回接收模式 + time.sleep_ms(10) + rs485_re_de_pin.value(0) + +def send_rs485_command(bus_number, bus_address, command, channel=None): + """ + 发送命令到RS485总线上的设备 + + 参数: + bus_number: 总线号 + bus_address: 设备地址 + command: 命令内容 + channel: 通道号(可选) + """ + # 切换到发送模式 + rs485_re_de_pin.value(1) + time.sleep_ms(10) + + try: + # 构造RS485命令帧 + frame = bytearray() + frame.append(0xAA) # 帧头 + frame.append(bus_number & 0xFF) # 总线号 + frame.append(bus_address & 0xFF) # 设备地址 + + # 添加命令数据 + if isinstance(command, str): + frame.extend(command.encode('utf-8')) + elif isinstance(command, bytes): + frame.extend(command) + elif isinstance(command, int): + frame.append(command & 0xFF) + + # 如果有通道号,则添加 + if channel is not None: + frame.append(channel & 0xFF) + + # 计算校验和 + checksum = sum(frame[1:]) & 0xFF + frame.append(checksum) + frame.append(0x55) # 帧尾 + + # 发送命令 + rs485_uart.write(frame) + print(f"已发送RS485命令: {frame.hex()}") + + finally: + # 切换回接收模式 + time.sleep_ms(10) + rs485_re_de_pin.value(0) + +def collect_sensor_data(bus_number, bus_address): + """ + 从传感器收集数据 + + 参数: + bus_number: 总线号 + bus_address: 传感器地址 + + 返回: + 传感器数据 + """ + # 切换到发送模式 + rs485_re_de_pin.value(1) + time.sleep_ms(10) + + try: + # 构造读取传感器数据的命令 + frame = bytearray([0xAA, bus_number & 0xFF, bus_address & 0xFF, 0x01, 0x00, 0x55]) + rs485_uart.write(frame) + print(f"已发送传感器读取命令: {frame.hex()}") + + # 等待响应 + time.sleep_ms(50) # 短暂等待响应 + + # 读取传感器返回的数据 + buffer = bytearray() + + while rs485_uart.any(): + data = rs485_uart.read() + if data: + buffer.extend(data) + # 简单的帧检测逻辑 + if len(buffer) >= 3 and buffer[0] == 0xAA and buffer[-1] == 0x55: + # 检查地址是否匹配 + if len(buffer) >= 3 and buffer[2] == ESP32_ADDRESS: + # 提取有效数据 + if len(buffer) >= 5: + # 模拟一个浮点数值 + value = float(buffer[3] + (buffer[4] << 8)) / 100.0 + return value + else: + break + + if len(buffer) > 0: + print(f"传感器响应不完整: {buffer.hex()}") + else: + print("传感器无响应") + return None + + finally: + # 切换回接收模式 + time.sleep_ms(10) + rs485_re_de_pin.value(0) + +def parse_instruction(data): + """ + 解析来自LoRaWAN的指令 + + 参数: + data: protobuf编码的指令数据 + + 返回: + 解析后的指令对象,失败时返回None + """ + try: + instruction = client_pb.decode_instruction(data) + return instruction + except Exception as e: + print(f"解析指令失败: {e}") + return None + +def handle_switch_instruction(switch_msg): + """ + 处理开关指令 + + 参数: + switch_msg: Switch消息字典 + """ + action = switch_msg.get('device_action', '') + bus_number = switch_msg.get('bus_number', 0) + bus_address = switch_msg.get('bus_address', 0) + channel = switch_msg.get('relay_channel', 0) + + print(f"处理开关指令: 动作={action}, 总线={bus_number}, 地址={bus_address}, 通道={channel}") + + if action.upper() == "ON": + # 发送开启设备命令 + send_rs485_command(bus_number, bus_address, 0x01, channel) + elif action.upper() == "OFF": + # 发送关闭设备命令 + send_rs485_command(bus_number, bus_address, 0x00, channel) + else: + # 其他自定义命令 + send_rs485_command(bus_number, bus_address, action, channel) + +def handle_collect_instruction(collect_msg): + """ + 处理采集指令 + + 参数: + collect_msg: Collect消息字典 + """ + bus_number = collect_msg.get('bus_number', 0) + bus_address = collect_msg.get('bus_address', 0) + + print(f"处理采集指令: 总线={bus_number}, 地址={bus_address}") + + # 从传感器采集数据 + value = collect_sensor_data(bus_number, bus_address) + + if value is not None: + # 构造Collect响应消息 + collect_data = client_pb.encode_collect(bus_number, bus_address, value) + + # 构造Instruction消息包装Collect数据 + instruction_data = client_pb.encode_instruction(client_pb.METHOD_TYPE_COLLECT, collect_data) + + # 发送回上位机 + send_lora_message(instruction_data) + else: + print("采集数据失败") + +def process_instruction(instruction): + """ + 处理解析后的指令 + + 参数: + instruction: 解析后的指令字典 + """ + method = instruction.get('method', -1) + + if method == client_pb.METHOD_TYPE_SWITCH: + # 处理开关指令 + if 'data' in instruction: + switch_msg = client_pb.decode_switch(instruction['data']) + handle_switch_instruction(switch_msg) + else: + print("开关指令缺少data字段") + elif method == client_pb.METHOD_TYPE_COLLECT: + # 处理采集指令 + if 'data' in instruction: + collect_msg = client_pb.decode_collect(instruction['data']) + handle_collect_instruction(collect_msg) + else: + print("采集指令缺少data字段") + else: + print(f"不支持的指令类型: {method}") + +def main_loop(): + """ + 主循环 + """ + print("猪舍控制系统启动...") + print(f"设备地址: {ESP32_ADDRESS}") + + while True: + # 接收LoRaWAN消息 + lora_data = receive_lora_message() + + if lora_data: + print(f"收到LoRaWAN消息: {lora_data.hex()}") + + # 解析指令 + instruction = parse_instruction(lora_data) + + if instruction: + # 处理指令 + process_instruction(instruction) + else: + print("无效的指令数据") + + # 其他周期性任务可以放在这里 + # 例如定时采集传感器数据等 + time.sleep(0.01) # 短暂休眠避免过度占用CPU + +# 程序入口 +if __name__ == "__main__": + try: + main_loop() + except KeyboardInterrupt: + print("程序被中断") + except Exception as e: + print(f"程序异常: {e}") \ No newline at end of file diff --git a/proto/client.proto b/proto/client.proto new file mode 100644 index 0000000..8a98e32 --- /dev/null +++ b/proto/client.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package device; + +import "google/protobuf/any.proto"; + +option go_package = "internal/app/service/device/proto"; + +// 指令类型 +enum MethodType{ + SWITCH = 0; // 启停 + COLLECT = 1; // 采集 +} + +// 指令 +message Instruction{ + MethodType method = 1; + google.protobuf.Any data = 2; +} + +message Switch{ + string device_action = 1; // 指令 + int32 bus_number = 2; // 总线号 + int32 bus_address = 3; // 总线地址 + int32 relay_channel = 4; // 继电器通道号 +} + +message Collect{ + int32 bus_number = 1; // 总线号 + int32 bus_address = 2; // 总线地址 + float value = 3; // 采集值 +} \ No newline at end of file