#!/usr/bin/env python # -*- coding: utf-8 -*- """ 根据client.proto生成的解析代码 (V2 - 已修复解码逻辑) 适用于ESP32 MicroPython环境 """ import struct # --- Protobuf基础类型辅助函数 --- def encode_varint(value): buf = bytearray() while value >= 0x80: buf.append((value & 0x7F) | 0x80) value >>= 7 buf.append(value & 0x7F) return buf def decode_varint(buf, pos=0): result = 0 shift = 0 while pos < len(buf): byte = buf[pos] pos += 1 result |= (byte & 0x7F) << shift if not (byte & 0x80): break shift += 7 return result, pos def encode_string(value): value_bytes = value.encode('utf-8') length = encode_varint(len(value_bytes)) return length + value_bytes def decode_string(buf, pos=0): """解码字符串 (已修复)""" length, pos_after_len = decode_varint(buf, pos) end_pos = pos_after_len + length value = buf[pos_after_len:end_pos].decode('utf-8') return value, end_pos # --- 消息编码/解码函数 --- def encode_raw_485_command(bus_number, command_bytes): result = bytearray() result.extend(encode_varint((1 << 3) | 0)) result.extend(encode_varint(bus_number)) result.extend(encode_varint((2 << 3) | 2)) result.extend(encode_varint(len(command_bytes))) result.extend(command_bytes) return result def decode_raw_485_command(buf): """解码Raw485Command消息 (已修复)""" result = {} pos = 0 while pos < len(buf): tag, pos = decode_varint(buf, pos) field_number = tag >> 3 wire_type = tag & 0x07 if field_number == 1: # bus_number value, pos = decode_varint(buf, pos) result['bus_number'] = value elif field_number == 2: # command_bytes length, pos_after_len = decode_varint(buf, pos) end_pos = pos_after_len + length result['command_bytes'] = buf[pos_after_len:end_pos] pos = end_pos else: # 跳过未知字段 if wire_type == 0: _, pos = decode_varint(buf, pos) elif wire_type == 2: length, pos_after_len = decode_varint(buf, pos) pos = pos_after_len + length elif wire_type == 5: pos += 4 else: pos += 1 return result def encode_collect_task(command_msg): result = bytearray() encoded_command = encode_raw_485_command(command_msg['bus_number'], command_msg['command_bytes']) result.extend(encode_varint((1 << 3) | 2)) result.extend(encode_varint(len(encoded_command))) result.extend(encoded_command) return result def decode_collect_task(buf): """解码CollectTask消息 (已修复)""" result = {} pos = 0 while pos < len(buf): tag, pos = decode_varint(buf, pos) field_number = tag >> 3 wire_type = tag & 0x07 if field_number == 1: # command length, pos_after_len = decode_varint(buf, pos) end_pos = pos_after_len + length value_buf = buf[pos_after_len:end_pos] result['command'] = decode_raw_485_command(value_buf) pos = end_pos else: if wire_type == 0: _, pos = decode_varint(buf, pos) elif wire_type == 2: length, pos_after_len = decode_varint(buf, pos) pos = pos_after_len + length elif wire_type == 5: pos += 4 else: pos += 1 return result def encode_batch_collect_command(correlation_id, tasks): result = bytearray() result.extend(encode_varint((1 << 3) | 2)) result.extend(encode_string(correlation_id)) for task in tasks: encoded_task = encode_collect_task(task['command']) result.extend(encode_varint((2 << 3) | 2)) result.extend(encode_varint(len(encoded_task))) result.extend(encoded_task) return result def decode_batch_collect_command(buf): """解码BatchCollectCommand消息 (已修复)""" result = {'tasks': []} pos = 0 while pos < len(buf): tag, pos = decode_varint(buf, pos) field_number = tag >> 3 wire_type = tag & 0x07 if field_number == 1: # correlation_id value, pos = decode_string(buf, pos) result['correlation_id'] = value elif field_number == 2: # tasks (repeated) length, pos_after_len = decode_varint(buf, pos) end_pos = pos_after_len + length value_buf = buf[pos_after_len:end_pos] result['tasks'].append(decode_collect_task(value_buf)) pos = end_pos else: if wire_type == 0: _, pos = decode_varint(buf, pos) elif wire_type == 2: length, pos_after_len = decode_varint(buf, pos) pos = pos_after_len + length elif wire_type == 5: pos += 4 else: pos += 1 return result def encode_collect_result(correlation_id, values): result = bytearray() result.extend(encode_varint((1 << 3) | 2)) result.extend(encode_string(correlation_id)) for value in values: result.extend(encode_varint((2 << 3) | 5)) result.extend(struct.pack('> 3 wire_type = tag & 0x07 if field_number == 1: # correlation_id value, pos = decode_string(buf, pos) result['correlation_id'] = value elif field_number == 2: # values (repeated) if wire_type == 5: # fixed32 value = struct.unpack('> 3 wire_type = tag & 0x07 if wire_type == 2: length, pos_after_len = decode_varint(buf, pos) end_pos = pos_after_len + length value_buf = buf[pos_after_len:end_pos] if field_number == 1: result['raw_485_command'] = decode_raw_485_command(value_buf) elif field_number == 2: result['batch_collect_command'] = decode_batch_collect_command(value_buf) elif field_number == 3: result['collect_result'] = decode_collect_result(value_buf) pos = end_pos else: if wire_type == 0: _, pos = decode_varint(buf, pos) elif wire_type == 5: pos += 4 else: pos += 1 return result # --- 单元测试与使用范例 --- if __name__ == "__main__": print("--- 测试 Raw485Command ---") raw_cmd_data = {'bus_number': 1, 'command_bytes': b'\x01\x03\x00\x00\x00\x02\xc4\x0b'} encoded_raw_cmd = encode_raw_485_command(raw_cmd_data['bus_number'], raw_cmd_data['command_bytes']) decoded_raw_cmd = decode_raw_485_command(encoded_raw_cmd) assert decoded_raw_cmd == raw_cmd_data, f"Expected {raw_cmd_data}, got {decoded_raw_cmd}" print("\n--- 测试 CollectTask ---") collect_task_data = {'command': raw_cmd_data} encoded_collect_task = encode_collect_task(collect_task_data['command']) decoded_collect_task = decode_collect_task(encoded_collect_task) assert decoded_collect_task == collect_task_data, f"Expected {collect_task_data}, got {decoded_collect_task}" print("\n--- 测试 BatchCollectCommand ---") batch_collect_data = { 'correlation_id': 'abc-123', 'tasks': [ {'command': {'bus_number': 1, 'command_bytes': b'\x01\x04\x00\x01\x00\x01\x60\x0a'}}, {'command': {'bus_number': 2, 'command_bytes': b'\x02\x03\x00\x01\x00\x01\xd5\xfa'}} ] } encoded_batch_collect = encode_batch_collect_command(batch_collect_data['correlation_id'], batch_collect_data['tasks']) decoded_batch_collect = decode_batch_collect_command(encoded_batch_collect) assert decoded_batch_collect == batch_collect_data, f"Expected {batch_collect_data}, got {decoded_batch_collect}" print("\n--- 测试 CollectResult ---") collect_result_data = {'correlation_id': 'res-456', 'values': [12.34, 56.78]} encoded_collect_result = encode_collect_result(collect_result_data['correlation_id'], collect_result_data['values']) decoded_collect_result = decode_collect_result(encoded_collect_result) assert decoded_collect_result['correlation_id'] == collect_result_data['correlation_id'] for i in range(len(collect_result_data['values'])): assert abs(decoded_collect_result['values'][i] - collect_result_data['values'][i]) < 1e-5 print("\n--- 测试 Instruction (内含BatchCollectCommand) ---") instruction_batch_collect = encode_instruction('batch_collect_command', batch_collect_data) decoded_instruction_batch_collect = decode_instruction(instruction_batch_collect) assert decoded_instruction_batch_collect == {'batch_collect_command': batch_collect_data} print("\n所有测试均已通过!")