From 23889b80de13714d3f6f10789ff674b8ed290ac2 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 7 Oct 2025 20:11:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0proto?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 10 +- client_pb.py | 542 ++++++++++++++++++++++++++------------------- main.py | 296 +------------------------ proto/client.proto | 59 +++-- 4 files changed, 360 insertions(+), 547 deletions(-) diff --git a/Makefile b/Makefile index 20d3d66..f69cb84 100644 --- a/Makefile +++ b/Makefile @@ -1,2 +1,8 @@ -swag: - python -m grpc_tools.protoc -I./proto --python_out=./proto ./proto/client.proto + +.PHONY: proto + +proto: proto-standard + +proto-standard: + cmd /c python -m grpc_tools.protoc -I./proto --python_out=./proto ./proto/client.proto && move proto\client_pb2.py proto\client_pb.py + diff --git a/client_pb.py b/client_pb.py index 16f5f20..3a2af95 100644 --- a/client_pb.py +++ b/client_pb.py @@ -8,9 +8,7 @@ import struct -# MethodType枚举 -METHOD_TYPE_SWITCH = 0 -METHOD_TYPE_COLLECT = 1 +# --- Helper Functions for Protobuf Basic Types --- def encode_varint(value): """编码varint值""" @@ -47,260 +45,336 @@ def decode_string(buf, pos=0): pos += length return value, pos -def encode_instruction(method, data): +# --- Message Encoding/Decoding Functions --- + +def encode_raw_485_command(bus_number, command_bytes): """ - 编码Instruction消息 - + 编码Raw485Command消息 Args: - method: 方法类型 (int) - data: 数据 (bytes) - + bus_number: 总线号 (int) + command_bytes: 原始485指令的字节数组 (bytes) Returns: bytearray: 编码后的数据 """ result = bytearray() - - # 编码method字段 (field_number=1, wire_type=0) - result.extend(encode_varint((1 << 3) | 0)) # tag - result.extend(encode_varint(method)) # value - - # 编码data字段 (field_number=2, wire_type=2) - result.extend(encode_varint((2 << 3) | 2)) # tag - result.extend(encode_varint(len(data))) # length - result.extend(data) # value - + # bus_number (field 1, wire type 0) + result.extend(encode_varint((1 << 3) | 0)) + result.extend(encode_varint(bus_number)) + # command_bytes (field 2, wire type 2) + result.extend(encode_varint((2 << 3) | 2)) + result.extend(encode_varint(len(command_bytes))) + result.extend(command_bytes) + return result + +def decode_raw_485_command(buf): + """ + 解码Raw485Command消息 + Args: + buf: 编码后的数据 (bytes) + Returns: + dict: 解码后的消息 + """ + result = {} + pos = 0 + while pos < len(buf): + tag, pos = decode_varint(buf, pos) + field_number = tag >> 3 + wire_type = tag & 0x07 + + if field_number == 1: # bus_number + if wire_type == 0: + value, pos = decode_varint(buf, pos) + result['bus_number'] = value + elif field_number == 2: # command_bytes + if wire_type == 2: + length, pos = decode_varint(buf, pos) + value = buf[pos:pos+length] + pos += length + result['command_bytes'] = value + else: + # 跳过未知字段 + if wire_type == 0: _, pos = decode_varint(buf, pos) + elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length + else: pos += 1 + return result + +def encode_collect_task(command_msg): + """ + 编码CollectTask消息 + Args: + command_msg: Raw485Command消息字典 + Returns: + bytearray: 编码后的数据 + """ + result = bytearray() + # command (field 2, wire type 2) + encoded_command = encode_raw_485_command(command_msg['bus_number'], command_msg['command_bytes']) + result.extend(encode_varint((2 << 3) | 2)) + result.extend(encode_varint(len(encoded_command))) + result.extend(encoded_command) + return result + +def decode_collect_task(buf): + """ + 解码CollectTask消息 + Args: + buf: 编码后的数据 (bytes) + Returns: + dict: 解码后的消息 + """ + result = {} + pos = 0 + while pos < len(buf): + tag, pos = decode_varint(buf, pos) + field_number = tag >> 3 + wire_type = tag & 0x07 + + if field_number == 2: # command + if wire_type == 2: + length, pos = decode_varint(buf, pos) + value_buf = buf[pos:pos+length] + pos += length + result['command'] = decode_raw_485_command(value_buf) + else: + if wire_type == 0: _, pos = decode_varint(buf, pos) + elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length + else: pos += 1 + return result + +def encode_batch_collect_command(correlation_id, tasks): + """ + 编码BatchCollectCommand消息 + Args: + correlation_id: 关联ID (str) + tasks: CollectTask消息字典列表 + Returns: + bytearray: 编码后的数据 + """ + result = bytearray() + # correlation_id (field 1, wire type 2) + result.extend(encode_varint((1 << 3) | 2)) + result.extend(encode_string(correlation_id)) + # tasks (field 2, wire type 2) - repeated + for task in tasks: + encoded_task = encode_collect_task(task['command']) + result.extend(encode_varint((2 << 3) | 2)) + result.extend(encode_varint(len(encoded_task))) + result.extend(encoded_task) + return result + +def decode_batch_collect_command(buf): + """ + 解码BatchCollectCommand消息 + Args: + buf: 编码后的数据 (bytes) + Returns: + dict: 解码后的消息 + """ + result = {'tasks': []} + pos = 0 + while pos < len(buf): + tag, pos = decode_varint(buf, pos) + field_number = tag >> 3 + wire_type = tag & 0x07 + + if field_number == 1: # correlation_id + if wire_type == 2: + value, pos = decode_string(buf, pos) + result['correlation_id'] = value + elif field_number == 2: # tasks (repeated) + if wire_type == 2: + length, pos = decode_varint(buf, pos) + value_buf = buf[pos:pos+length] + pos += length + result['tasks'].append(decode_collect_task(value_buf)) + else: + if wire_type == 0: _, pos = decode_varint(buf, pos) + elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length + else: pos += 1 + return result + +def encode_collect_result(correlation_id, values): + """ + 编码CollectResult消息 + Args: + correlation_id: 关联ID (str) + values: 采集值列表 (list of float) + Returns: + bytearray: 编码后的数据 + """ + result = bytearray() + # correlation_id (field 1, wire type 2) + result.extend(encode_varint((1 << 3) | 2)) + result.extend(encode_string(correlation_id)) + # values (field 2, wire type 5) - repeated fixed32 + for value in values: + result.extend(encode_varint((2 << 3) | 5)) # Tag for fixed32 + result.extend(struct.pack('> 3 + wire_type = tag & 0x07 + + if field_number == 1: # correlation_id + if wire_type == 2: + value, pos = decode_string(buf, pos) + result['correlation_id'] = value + elif field_number == 2: # values (repeated) + if wire_type == 5: # fixed32 + value = struct.unpack('> 3 wire_type = tag & 0x07 - - if field_number == 1: # method字段 - if wire_type == 0: # varint类型 - value, pos = decode_varint(buf, pos) - result['method'] = value - elif field_number == 2: # data字段 - if wire_type == 2: # 长度分隔类型 - length, pos = decode_varint(buf, pos) - value = buf[pos:pos+length] - pos += length - result['data'] = value + + if wire_type == 2: # Length-delimited type for all oneof fields + length, pos = decode_varint(buf, pos) + value_buf = buf[pos:pos+length] + pos += length + + if field_number == 1: # raw_485_command + result['raw_485_command'] = decode_raw_485_command(value_buf) + elif field_number == 2: # batch_collect_command + result['batch_collect_command'] = decode_batch_collect_command(value_buf) + elif field_number == 3: # collect_result + result['collect_result'] = decode_collect_result(value_buf) + # else: unknown field, already skipped by default behavior else: - # 跳过未知字段 - if wire_type == 0: # varint - _, pos = decode_varint(buf, pos) - elif wire_type == 2: # 长度分隔 - length, pos = decode_varint(buf, pos) - pos += length - else: - pos += 1 - + # 跳过未知字段 (或非长度分隔类型,尽管oneof字段通常是长度分隔的) + if wire_type == 0: _, pos = decode_varint(buf, pos) + elif wire_type == 5: pos += 4 # fixed32 + elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length + else: pos += 1 return result -def encode_switch(device_action, bus_number, bus_address, relay_channel): - """ - 编码Switch消息 - - Args: - device_action: 设备动作指令 (str) - bus_number: 总线号 (int) - bus_address: 总线地址 (int) - relay_channel: 继电器通道号 (int) - - Returns: - bytearray: 编码后的数据 - """ - result = bytearray() - - # 编码device_action字段 (field_number=1, wire_type=2) - result.extend(encode_varint((1 << 3) | 2)) # tag - action_bytes = encode_string(device_action) # value (length + string) - result.extend(action_bytes) - - # 编码bus_number字段 (field_number=2, wire_type=0) - result.extend(encode_varint((2 << 3) | 0)) # tag - result.extend(encode_varint(bus_number)) # value - - # 编码bus_address字段 (field_number=3, wire_type=0) - result.extend(encode_varint((3 << 3) | 0)) # tag - result.extend(encode_varint(bus_address)) # value - - # 编码relay_channel字段 (field_number=4, wire_type=0) - result.extend(encode_varint((4 << 3) | 0)) # tag - result.extend(encode_varint(relay_channel)) # value - - return result - -def decode_switch(buf): - """ - 解码Switch消息 - - Args: - buf: 编码后的数据 (bytes) - - Returns: - dict: 解码后的消息 - """ - result = {} - pos = 0 - - while pos < len(buf): - # 读取标签 - tag, pos = decode_varint(buf, pos) - field_number = tag >> 3 - wire_type = tag & 0x07 - - if field_number == 1: # device_action字段 - if wire_type == 2: # 字符串类型 - value, pos = decode_string(buf, pos) - result['device_action'] = value - elif field_number == 2: # bus_number字段 - if wire_type == 0: # varint类型 - value, pos = decode_varint(buf, pos) - result['bus_number'] = value - elif field_number == 3: # bus_address字段 - if wire_type == 0: # varint类型 - value, pos = decode_varint(buf, pos) - result['bus_address'] = value - elif field_number == 4: # relay_channel字段 - if wire_type == 0: # varint类型 - value, pos = decode_varint(buf, pos) - result['relay_channel'] = value - else: - # 跳过未知字段 - if wire_type == 0: # varint - _, pos = decode_varint(buf, pos) - elif wire_type == 2: # 长度分隔 - length, pos = decode_varint(buf, pos) - pos += length - else: - pos += 1 - - return result - -def encode_collect(bus_number, bus_address, value): - """ - 编码Collect消息 - - Args: - bus_number: 总线号 (int) - bus_address: 总线地址 (int) - value: 采集值 (float) - - Returns: - bytearray: 编码后的数据 - """ - result = bytearray() - - # 编码bus_number字段 (field_number=1, wire_type=0) - result.extend(encode_varint((1 << 3) | 0)) # tag - result.extend(encode_varint(bus_number)) # value - - # 编码bus_address字段 (field_number=2, wire_type=0) - result.extend(encode_varint((2 << 3) | 0)) # tag - result.extend(encode_varint(bus_address)) # value - - # 编码value字段 (field_number=3, wire_type=5) - result.extend(encode_varint((3 << 3) | 5)) # tag - # 将float转换为little-endian的4字节 - result.extend(struct.pack('> 3 - wire_type = tag & 0x07 - - if field_number == 1: # bus_number字段 - if wire_type == 0: # varint类型 - value, pos = decode_varint(buf, pos) - result['bus_number'] = value - elif field_number == 2: # bus_address字段 - if wire_type == 0: # varint类型 - value, pos = decode_varint(buf, pos) - result['bus_address'] = value - elif field_number == 3: # value字段 - if wire_type == 5: # 32位浮点类型 - # 从little-endian的4字节解析float - value = struct.unpack('= 3 and buffer[0] == 0xAA and buffer[-1] == 0x55: - # 检查地址是否匹配 - if len(buffer) >= 3 and buffer[2] == ESP32_ADDRESS: - # 提取有效数据(去掉帧头、地址、校验和帧尾) - return buffer[3:-2] - else: - break # 没有更多数据可读 - - return None - -def send_lora_message(data): - """ - 通过LoRaWAN模块发送消息 - - 参数: - data: 要发送的字节数据 - """ - # 切换到发送模式 - rs485_re_de_pin.value(1) - time.sleep_ms(10) - - try: - # 构造发送给LoRaWAN模块的RS485帧 - frame = bytearray() - frame.append(0xAA) # 帧头 - frame.append(LORA_MODULE_ADDRESS & 0xFF) # LoRaWAN模块地址 - frame.append(ESP32_ADDRESS & 0xFF) # 本设备地址(作为源地址) - - # 添加数据 - frame.extend(data) - - # 计算校验和 - checksum = sum(frame[1:]) & 0xFF - frame.append(checksum) - frame.append(0x55) # 帧尾 - - # 发送命令 - rs485_uart.write(frame) - print(f"通过LoRa发送数据: {frame.hex()}") - - finally: - # 切换回接收模式 - time.sleep_ms(10) - rs485_re_de_pin.value(0) - -def send_rs485_command(bus_number, bus_address, command, channel=None): - """ - 发送命令到RS485总线上的设备 - - 参数: - bus_number: 总线号 - bus_address: 设备地址 - command: 命令内容 - channel: 通道号(可选) - """ - # 切换到发送模式 - rs485_re_de_pin.value(1) - time.sleep_ms(10) - - try: - # 构造RS485命令帧 - frame = bytearray() - frame.append(0xAA) # 帧头 - frame.append(bus_number & 0xFF) # 总线号 - frame.append(bus_address & 0xFF) # 设备地址 - - # 添加命令数据 - if isinstance(command, str): - frame.extend(command.encode('utf-8')) - elif isinstance(command, bytes): - frame.extend(command) - elif isinstance(command, int): - frame.append(command & 0xFF) - - # 如果有通道号,则添加 - if channel is not None: - frame.append(channel & 0xFF) - - # 计算校验和 - checksum = sum(frame[1:]) & 0xFF - frame.append(checksum) - frame.append(0x55) # 帧尾 - - # 发送命令 - rs485_uart.write(frame) - print(f"已发送RS485命令: {frame.hex()}") - - finally: - # 切换回接收模式 - time.sleep_ms(10) - rs485_re_de_pin.value(0) - -def collect_sensor_data(bus_number, bus_address): - """ - 从传感器收集数据 - - 参数: - bus_number: 总线号 - bus_address: 传感器地址 - - 返回: - 传感器数据 - """ - # 切换到发送模式 - rs485_re_de_pin.value(1) - time.sleep_ms(10) - - try: - # 构造读取传感器数据的命令 - frame = bytearray([0xAA, bus_number & 0xFF, bus_address & 0xFF, 0x01, 0x00, 0x55]) - rs485_uart.write(frame) - print(f"已发送传感器读取命令: {frame.hex()}") - - # 等待响应 - time.sleep_ms(50) # 短暂等待响应 - - # 读取传感器返回的数据 - buffer = bytearray() - - while rs485_uart.any(): - data = rs485_uart.read() - if data: - buffer.extend(data) - # 简单的帧检测逻辑 - if len(buffer) >= 3 and buffer[0] == 0xAA and buffer[-1] == 0x55: - # 检查地址是否匹配 - if len(buffer) >= 3 and buffer[2] == ESP32_ADDRESS: - # 提取有效数据 - if len(buffer) >= 5: - # 模拟一个浮点数值 - value = float(buffer[3] + (buffer[4] << 8)) / 100.0 - return value - else: - break - - if len(buffer) > 0: - print(f"传感器响应不完整: {buffer.hex()}") - else: - print("传感器无响应") - return None - - finally: - # 切换回接收模式 - time.sleep_ms(10) - rs485_re_de_pin.value(0) - -def parse_instruction(data): - """ - 解析来自LoRaWAN的指令 - - 参数: - data: protobuf编码的指令数据 - - 返回: - 解析后的指令对象,失败时返回None - """ - try: - instruction = client_pb.decode_instruction(data) - return instruction - except Exception as e: - print(f"解析指令失败: {e}") - return None - -def handle_switch_instruction(switch_msg): - """ - 处理开关指令 - - 参数: - switch_msg: Switch消息字典 - """ - action = switch_msg.get('device_action', '') - bus_number = switch_msg.get('bus_number', 0) - bus_address = switch_msg.get('bus_address', 0) - channel = switch_msg.get('relay_channel', 0) - - print(f"处理开关指令: 动作={action}, 总线={bus_number}, 地址={bus_address}, 通道={channel}") - - if action.upper() == "ON": - # 发送开启设备命令 - send_rs485_command(bus_number, bus_address, 0x01, channel) - elif action.upper() == "OFF": - # 发送关闭设备命令 - send_rs485_command(bus_number, bus_address, 0x00, channel) - else: - # 其他自定义命令 - send_rs485_command(bus_number, bus_address, action, channel) - -def handle_collect_instruction(collect_msg): - """ - 处理采集指令 - - 参数: - collect_msg: Collect消息字典 - """ - bus_number = collect_msg.get('bus_number', 0) - bus_address = collect_msg.get('bus_address', 0) - - print(f"处理采集指令: 总线={bus_number}, 地址={bus_address}") - - # 从传感器采集数据 - value = collect_sensor_data(bus_number, bus_address) - - if value is not None: - # 构造Collect响应消息 - collect_data = client_pb.encode_collect(bus_number, bus_address, value) - - # 构造Instruction消息包装Collect数据 - instruction_data = client_pb.encode_instruction(client_pb.METHOD_TYPE_COLLECT, collect_data) - - # 发送回上位机 - send_lora_message(instruction_data) - else: - print("采集数据失败") - -def process_instruction(instruction): - """ - 处理解析后的指令 - - 参数: - instruction: 解析后的指令字典 - """ - method = instruction.get('method', -1) - - if method == client_pb.METHOD_TYPE_SWITCH: - # 处理开关指令 - if 'data' in instruction: - switch_msg = client_pb.decode_switch(instruction['data']) - handle_switch_instruction(switch_msg) - else: - print("开关指令缺少data字段") - elif method == client_pb.METHOD_TYPE_COLLECT: - # 处理采集指令 - if 'data' in instruction: - collect_msg = client_pb.decode_collect(instruction['data']) - handle_collect_instruction(collect_msg) - else: - print("采集指令缺少data字段") - else: - print(f"不支持的指令类型: {method}") def main_loop(): """ 主循环 """ print("猪舍控制系统启动...") - print(f"设备地址: {ESP32_ADDRESS}") while True: - # 接收LoRaWAN消息 - lora_data = receive_lora_message() - - if lora_data: - print(f"收到LoRaWAN消息: {lora_data.hex()}") - - # 解析指令 - instruction = parse_instruction(lora_data) - - if instruction: - # 处理指令 - process_instruction(instruction) - else: - print("无效的指令数据") - - # 其他周期性任务可以放在这里 - # 例如定时采集传感器数据等 - time.sleep(0.01) # 短暂休眠避免过度占用CPU + # 在这里添加你的逻辑 + time.sleep(1) # 避免空循环占用过多CPU # 程序入口 if __name__ == "__main__": @@ -314,4 +28,4 @@ if __name__ == "__main__": except KeyboardInterrupt: print("程序被中断") except Exception as e: - print(f"程序异常: {e}") \ No newline at end of file + print(f"程序异常: {e}") diff --git a/proto/client.proto b/proto/client.proto index 8a98e32..666f68a 100644 --- a/proto/client.proto +++ b/proto/client.proto @@ -2,31 +2,50 @@ syntax = "proto3"; package device; -import "google/protobuf/any.proto"; +// import "google/protobuf/any.proto"; // REMOVED: Not suitable for embedded systems. -option go_package = "internal/app/service/device/proto"; +option go_package = "internal/domain/device/proto"; -// 指令类型 -enum MethodType{ - SWITCH = 0; // 启停 - COLLECT = 1; // 采集 +// --- Concrete Command & Data Structures --- + +// 平台生成的原始485指令,单片机直接发送到总线 +message Raw485Command { + int32 bus_number = 1; // 总线号,用于指示单片机将指令发送到哪个总线 + bytes command_bytes = 2; // 原始485指令的字节数组 } -// 指令 -message Instruction{ - MethodType method = 1; - google.protobuf.Any data = 2; +// BatchCollectCommand +// 一个完整的、包含所有元数据的批量采集任务。 +message BatchCollectCommand { + string correlation_id = 1; // 用于关联请求和响应的唯一ID + repeated CollectTask tasks = 2; // 采集任务列表 } -message Switch{ - string device_action = 1; // 指令 - int32 bus_number = 2; // 总线号 - int32 bus_address = 3; // 总线地址 - int32 relay_channel = 4; // 继电器通道号 +// CollectTask +// 定义了单个采集任务的“意图”。 +message CollectTask { + Raw485Command command = 2; // 平台生成的原始485指令 } -message Collect{ - int32 bus_number = 1; // 总线号 - int32 bus_address = 2; // 总线地址 - float value = 3; // 采集值 -} \ No newline at end of file +// CollectResult +// 这是设备响应的、极致精简的数据包。 +message CollectResult { + string correlation_id = 1; // 从下行指令中原样返回的关联ID + repeated float values = 2; // 按预定顺序排列的采集值 +} + + +// --- Main Downlink Instruction Wrapper --- + +// 指令 (所有从平台下发到设备的数据都应该被包装在这里面) +// 使用 oneof 来替代 google.protobuf.Any,这是嵌入式环境下的标准做法。 +// 它高效、类型安全,且只解码一次。 +message Instruction { + oneof payload { + Raw485Command raw_485_command = 1; + BatchCollectCommand batch_collect_command = 2; + CollectResult collect_result = 3; // ADDED:用于上行数据 + // 如果未来有其他指令类型,比如开关控制,可以直接在这里添加 + // SwitchCommand switch_command = 3; + } +}