From d57d7cba70c1ac4d19b9abc9111d88b8ccae2ba8 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 8 Oct 2025 15:58:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8D=95=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main/proto/client_pb.py | 100 ++++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 51 deletions(-) diff --git a/main/proto/client_pb.py b/main/proto/client_pb.py index 3a2af95..09874ab 100644 --- a/main/proto/client_pb.py +++ b/main/proto/client_pb.py @@ -8,10 +8,10 @@ import struct -# --- Helper Functions for Protobuf Basic Types --- +# --- Protobuf基础类型辅助函数 --- def encode_varint(value): - """编码varint值""" + """编码varint整数""" buf = bytearray() while value >= 0x80: buf.append((value & 0x7F) | 0x80) @@ -20,7 +20,7 @@ def encode_varint(value): return buf def decode_varint(buf, pos=0): - """解码varint值""" + """解码varint整数""" result = 0 shift = 0 while pos < len(buf): @@ -45,14 +45,14 @@ def decode_string(buf, pos=0): pos += length return value, pos -# --- Message Encoding/Decoding Functions --- +# --- 消息编码/解码函数 --- def encode_raw_485_command(bus_number, command_bytes): """ 编码Raw485Command消息 Args: - bus_number: 总线号 (int) - command_bytes: 原始485指令的字节数组 (bytes) + bus_number (int): 总线号 + command_bytes (bytes): 原始485指令 Returns: bytearray: 编码后的数据 """ @@ -70,7 +70,7 @@ def decode_raw_485_command(buf): """ 解码Raw485Command消息 Args: - buf: 编码后的数据 (bytes) + buf (bytes): 编码后的数据 Returns: dict: 解码后的消息 """ @@ -102,7 +102,7 @@ def encode_collect_task(command_msg): """ 编码CollectTask消息 Args: - command_msg: Raw485Command消息字典 + command_msg (dict): Raw485Command消息字典 Returns: bytearray: 编码后的数据 """ @@ -118,7 +118,7 @@ def decode_collect_task(buf): """ 解码CollectTask消息 Args: - buf: 编码后的数据 (bytes) + buf (bytes): 编码后的数据 Returns: dict: 解码后的消息 """ @@ -145,8 +145,8 @@ def encode_batch_collect_command(correlation_id, tasks): """ 编码BatchCollectCommand消息 Args: - correlation_id: 关联ID (str) - tasks: CollectTask消息字典列表 + correlation_id (str): 关联ID + tasks (list): CollectTask消息字典列表 Returns: bytearray: 编码后的数据 """ @@ -166,7 +166,7 @@ def decode_batch_collect_command(buf): """ 解码BatchCollectCommand消息 Args: - buf: 编码后的数据 (bytes) + buf (bytes): 编码后的数据 Returns: dict: 解码后的消息 """ @@ -197,8 +197,8 @@ def encode_collect_result(correlation_id, values): """ 编码CollectResult消息 Args: - correlation_id: 关联ID (str) - values: 采集值列表 (list of float) + correlation_id (str): 关联ID + values (list): 采集值列表 (float) Returns: bytearray: 编码后的数据 """ @@ -209,14 +209,14 @@ def encode_collect_result(correlation_id, values): # values (field 2, wire type 5) - repeated fixed32 for value in values: result.extend(encode_varint((2 << 3) | 5)) # Tag for fixed32 - result.extend(struct.pack('> 3 wire_type = tag & 0x07 - if wire_type == 2: # Length-delimited type for all oneof fields + if wire_type == 2: # 所有oneof字段都使用长度分隔类型 length, pos = decode_varint(buf, pos) value_buf = buf[pos:pos+length] pos += length @@ -297,34 +297,33 @@ def decode_instruction(buf): result['batch_collect_command'] = decode_batch_collect_command(value_buf) elif field_number == 3: # collect_result result['collect_result'] = decode_collect_result(value_buf) - # else: unknown field, already skipped by default behavior else: - # 跳过未知字段 (或非长度分隔类型,尽管oneof字段通常是长度分隔的) + # 跳过未知字段 if wire_type == 0: _, pos = decode_varint(buf, pos) - elif wire_type == 5: pos += 4 # fixed32 + elif wire_type == 5: pos += 4 elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length else: pos += 1 return result -# --- Usage Example --- +# --- 单元测试与使用范例 --- if __name__ == "__main__": - print("--- Testing Raw485Command ---") + print("--- 测试 Raw485Command ---") raw_cmd_data = {'bus_number': 1, 'command_bytes': b'\x01\x03\x00\x00\x00\x02\xc4\x0b'} encoded_raw_cmd = encode_raw_485_command(raw_cmd_data['bus_number'], raw_cmd_data['command_bytes']) - print(f"Encoded Raw485Command: {encoded_raw_cmd.hex()}") + print(f"编码后 Raw485Command: {encoded_raw_cmd.hex()}") decoded_raw_cmd = decode_raw_485_command(encoded_raw_cmd) - print(f"Decoded Raw485Command: {decoded_raw_cmd}") + print(f"解码后 Raw485Command: {decoded_raw_cmd}") assert decoded_raw_cmd == raw_cmd_data - print("\n--- Testing CollectTask ---") + print("\n--- 测试 CollectTask ---") collect_task_data = {'command': raw_cmd_data} encoded_collect_task = encode_collect_task(collect_task_data['command']) - print(f"Encoded CollectTask: {encoded_collect_task.hex()}") + print(f"编码后 CollectTask: {encoded_collect_task.hex()}") decoded_collect_task = decode_collect_task(encoded_collect_task) - print(f"Decoded CollectTask: {decoded_collect_task}") + print(f"解码后 CollectTask: {decoded_collect_task}") assert decoded_collect_task == collect_task_data - print("\n--- Testing BatchCollectCommand ---") + print("\n--- 测试 BatchCollectCommand ---") batch_collect_data = { 'correlation_id': 'abc-123', 'tasks': [ @@ -333,48 +332,47 @@ if __name__ == "__main__": ] } encoded_batch_collect = encode_batch_collect_command(batch_collect_data['correlation_id'], batch_collect_data['tasks']) - print(f"Encoded BatchCollectCommand: {encoded_batch_collect.hex()}") + print(f"编码后 BatchCollectCommand: {encoded_batch_collect.hex()}") decoded_batch_collect = decode_batch_collect_command(encoded_batch_collect) - print(f"Decoded BatchCollectCommand: {decoded_batch_collect}") + print(f"解码后 BatchCollectCommand: {decoded_batch_collect}") assert decoded_batch_collect == batch_collect_data - print("\n--- Testing CollectResult ---") + print("\n--- 测试 CollectResult ---") collect_result_data = { 'correlation_id': 'res-456', 'values': [12.34, 56.78, 90.12] } encoded_collect_result = encode_collect_result(collect_result_data['correlation_id'], collect_result_data['values']) - print(f"Encoded CollectResult: {encoded_collect_result.hex()}") + print(f"编码后 CollectResult: {encoded_collect_result.hex()}") decoded_collect_result = decode_collect_result(encoded_collect_result) - print(f"Decoded CollectResult: {decoded_collect_result}") - # Due to float precision, direct assert might fail. Compare elements. + print(f"解码后 CollectResult: {decoded_collect_result}") + # 由于32位浮点数精度问题,直接比较可能会失败,此处设置一个合理的容忍度 assert decoded_collect_result['correlation_id'] == collect_result_data['correlation_id'] for i in range(len(collect_result_data['values'])): - assert abs(decoded_collect_result['values'][i] - collect_result_data['values'][i]) < 1e-6 + assert abs(decoded_collect_result['values'][i] - collect_result_data['values'][i]) < 1e-5 # 已放宽容忍度 - print("\n--- Testing Instruction with Raw485Command ---") + print("\n--- 测试 Instruction (内含Raw485Command) ---") instruction_raw_485 = encode_instruction('raw_485_command', raw_cmd_data) - print(f"Encoded Instruction (Raw485Command): {instruction_raw_485.hex()}") + print(f"编码后 Instruction (Raw485Command): {instruction_raw_485.hex()}") decoded_instruction_raw_485 = decode_instruction(instruction_raw_485) - print(f"Decoded Instruction (Raw485Command): {decoded_instruction_raw_485}") + print(f"解码后 Instruction (Raw485Command): {decoded_instruction_raw_485}") assert decoded_instruction_raw_485['raw_485_command'] == raw_cmd_data - print("\n--- Testing Instruction with BatchCollectCommand ---") + print("\n--- 测试 Instruction (内含BatchCollectCommand) ---") instruction_batch_collect = encode_instruction('batch_collect_command', batch_collect_data) - print(f"Encoded Instruction (BatchCollectCommand): {instruction_batch_collect.hex()}") + print(f"编码后 Instruction (BatchCollectCommand): {instruction_batch_collect.hex()}") decoded_instruction_batch_collect = decode_instruction(instruction_batch_collect) - print(f"Decoded Instruction (BatchCollectCommand): {decoded_instruction_batch_collect}") + print(f"解码后 Instruction (BatchCollectCommand): {decoded_instruction_batch_collect}") assert decoded_instruction_batch_collect['batch_collect_command']['correlation_id'] == batch_collect_data['correlation_id'] assert len(decoded_instruction_batch_collect['batch_collect_command']['tasks']) == len(batch_collect_data['tasks']) - # More detailed assertion for tasks if needed - print("\n--- Testing Instruction with CollectResult ---") + print("\n--- 测试 Instruction (内含CollectResult) ---") instruction_collect_result = encode_instruction('collect_result', collect_result_data) - print(f"Encoded Instruction (CollectResult): {instruction_collect_result.hex()}") + print(f"编码后 Instruction (CollectResult): {instruction_collect_result.hex()}") decoded_instruction_collect_result = decode_instruction(instruction_collect_result) - print(f"Decoded Instruction (CollectResult): {decoded_instruction_collect_result}") + print(f"解码后 Instruction (CollectResult): {decoded_instruction_collect_result}") assert decoded_instruction_collect_result['collect_result']['correlation_id'] == collect_result_data['correlation_id'] for i in range(len(collect_result_data['values'])): - assert abs(decoded_instruction_collect_result['collect_result']['values'][i] - collect_result_data['values'][i]) < 1e-6 + assert abs(decoded_instruction_collect_result['collect_result']['values'][i] - collect_result_data['values'][i]) < 1e-5 - print("\nAll tests passed!") + print("\n所有测试均已通过!")