修复单测
This commit is contained in:
@@ -8,10 +8,10 @@
|
|||||||
|
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
# --- Helper Functions for Protobuf Basic Types ---
|
# --- Protobuf基础类型辅助函数 ---
|
||||||
|
|
||||||
def encode_varint(value):
|
def encode_varint(value):
|
||||||
"""编码varint值"""
|
"""编码varint整数"""
|
||||||
buf = bytearray()
|
buf = bytearray()
|
||||||
while value >= 0x80:
|
while value >= 0x80:
|
||||||
buf.append((value & 0x7F) | 0x80)
|
buf.append((value & 0x7F) | 0x80)
|
||||||
@@ -20,7 +20,7 @@ def encode_varint(value):
|
|||||||
return buf
|
return buf
|
||||||
|
|
||||||
def decode_varint(buf, pos=0):
|
def decode_varint(buf, pos=0):
|
||||||
"""解码varint值"""
|
"""解码varint整数"""
|
||||||
result = 0
|
result = 0
|
||||||
shift = 0
|
shift = 0
|
||||||
while pos < len(buf):
|
while pos < len(buf):
|
||||||
@@ -45,14 +45,14 @@ def decode_string(buf, pos=0):
|
|||||||
pos += length
|
pos += length
|
||||||
return value, pos
|
return value, pos
|
||||||
|
|
||||||
# --- Message Encoding/Decoding Functions ---
|
# --- 消息编码/解码函数 ---
|
||||||
|
|
||||||
def encode_raw_485_command(bus_number, command_bytes):
|
def encode_raw_485_command(bus_number, command_bytes):
|
||||||
"""
|
"""
|
||||||
编码Raw485Command消息
|
编码Raw485Command消息
|
||||||
Args:
|
Args:
|
||||||
bus_number: 总线号 (int)
|
bus_number (int): 总线号
|
||||||
command_bytes: 原始485指令的字节数组 (bytes)
|
command_bytes (bytes): 原始485指令
|
||||||
Returns:
|
Returns:
|
||||||
bytearray: 编码后的数据
|
bytearray: 编码后的数据
|
||||||
"""
|
"""
|
||||||
@@ -70,7 +70,7 @@ def decode_raw_485_command(buf):
|
|||||||
"""
|
"""
|
||||||
解码Raw485Command消息
|
解码Raw485Command消息
|
||||||
Args:
|
Args:
|
||||||
buf: 编码后的数据 (bytes)
|
buf (bytes): 编码后的数据
|
||||||
Returns:
|
Returns:
|
||||||
dict: 解码后的消息
|
dict: 解码后的消息
|
||||||
"""
|
"""
|
||||||
@@ -102,7 +102,7 @@ def encode_collect_task(command_msg):
|
|||||||
"""
|
"""
|
||||||
编码CollectTask消息
|
编码CollectTask消息
|
||||||
Args:
|
Args:
|
||||||
command_msg: Raw485Command消息字典
|
command_msg (dict): Raw485Command消息字典
|
||||||
Returns:
|
Returns:
|
||||||
bytearray: 编码后的数据
|
bytearray: 编码后的数据
|
||||||
"""
|
"""
|
||||||
@@ -118,7 +118,7 @@ def decode_collect_task(buf):
|
|||||||
"""
|
"""
|
||||||
解码CollectTask消息
|
解码CollectTask消息
|
||||||
Args:
|
Args:
|
||||||
buf: 编码后的数据 (bytes)
|
buf (bytes): 编码后的数据
|
||||||
Returns:
|
Returns:
|
||||||
dict: 解码后的消息
|
dict: 解码后的消息
|
||||||
"""
|
"""
|
||||||
@@ -145,8 +145,8 @@ def encode_batch_collect_command(correlation_id, tasks):
|
|||||||
"""
|
"""
|
||||||
编码BatchCollectCommand消息
|
编码BatchCollectCommand消息
|
||||||
Args:
|
Args:
|
||||||
correlation_id: 关联ID (str)
|
correlation_id (str): 关联ID
|
||||||
tasks: CollectTask消息字典列表
|
tasks (list): CollectTask消息字典列表
|
||||||
Returns:
|
Returns:
|
||||||
bytearray: 编码后的数据
|
bytearray: 编码后的数据
|
||||||
"""
|
"""
|
||||||
@@ -166,7 +166,7 @@ def decode_batch_collect_command(buf):
|
|||||||
"""
|
"""
|
||||||
解码BatchCollectCommand消息
|
解码BatchCollectCommand消息
|
||||||
Args:
|
Args:
|
||||||
buf: 编码后的数据 (bytes)
|
buf (bytes): 编码后的数据
|
||||||
Returns:
|
Returns:
|
||||||
dict: 解码后的消息
|
dict: 解码后的消息
|
||||||
"""
|
"""
|
||||||
@@ -197,8 +197,8 @@ def encode_collect_result(correlation_id, values):
|
|||||||
"""
|
"""
|
||||||
编码CollectResult消息
|
编码CollectResult消息
|
||||||
Args:
|
Args:
|
||||||
correlation_id: 关联ID (str)
|
correlation_id (str): 关联ID
|
||||||
values: 采集值列表 (list of float)
|
values (list): 采集值列表 (float)
|
||||||
Returns:
|
Returns:
|
||||||
bytearray: 编码后的数据
|
bytearray: 编码后的数据
|
||||||
"""
|
"""
|
||||||
@@ -209,14 +209,14 @@ def encode_collect_result(correlation_id, values):
|
|||||||
# values (field 2, wire type 5) - repeated fixed32
|
# values (field 2, wire type 5) - repeated fixed32
|
||||||
for value in values:
|
for value in values:
|
||||||
result.extend(encode_varint((2 << 3) | 5)) # Tag for fixed32
|
result.extend(encode_varint((2 << 3) | 5)) # Tag for fixed32
|
||||||
result.extend(struct.pack('<f', value)) # Little-endian float
|
result.extend(struct.pack('<f', value)) # 小端序浮点数
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def decode_collect_result(buf):
|
def decode_collect_result(buf):
|
||||||
"""
|
"""
|
||||||
解码CollectResult消息
|
解码CollectResult消息
|
||||||
Args:
|
Args:
|
||||||
buf: 编码后的数据 (bytes)
|
buf (bytes): 编码后的数据
|
||||||
Returns:
|
Returns:
|
||||||
dict: 解码后的消息
|
dict: 解码后的消息
|
||||||
"""
|
"""
|
||||||
@@ -247,8 +247,8 @@ def encode_instruction(payload_type, payload_data):
|
|||||||
"""
|
"""
|
||||||
编码Instruction消息 (包含oneof字段)
|
编码Instruction消息 (包含oneof字段)
|
||||||
Args:
|
Args:
|
||||||
payload_type: oneof字段的类型 ('raw_485_command', 'batch_collect_command', 'collect_result')
|
payload_type (str): oneof字段的类型 ('raw_485_command', 'batch_collect_command', 'collect_result')
|
||||||
payload_data: 对应类型的消息字典
|
payload_data (dict): 对应类型的消息字典
|
||||||
Returns:
|
Returns:
|
||||||
bytearray: 编码后的数据
|
bytearray: 编码后的数据
|
||||||
"""
|
"""
|
||||||
@@ -265,7 +265,7 @@ def encode_instruction(payload_type, payload_data):
|
|||||||
encoded_payload = encode_collect_result(payload_data['correlation_id'], payload_data['values'])
|
encoded_payload = encode_collect_result(payload_data['correlation_id'], payload_data['values'])
|
||||||
result.extend(encode_varint((3 << 3) | 2)) # field 3, wire type 2
|
result.extend(encode_varint((3 << 3) | 2)) # field 3, wire type 2
|
||||||
else:
|
else:
|
||||||
raise ValueError("Unknown instruction payload type")
|
raise ValueError("未知的指令负载类型")
|
||||||
|
|
||||||
result.extend(encode_varint(len(encoded_payload)))
|
result.extend(encode_varint(len(encoded_payload)))
|
||||||
result.extend(encoded_payload)
|
result.extend(encoded_payload)
|
||||||
@@ -275,7 +275,7 @@ def decode_instruction(buf):
|
|||||||
"""
|
"""
|
||||||
解码Instruction消息
|
解码Instruction消息
|
||||||
Args:
|
Args:
|
||||||
buf: 编码后的数据 (bytes)
|
buf (bytes): 编码后的数据
|
||||||
Returns:
|
Returns:
|
||||||
dict: 解码后的消息
|
dict: 解码后的消息
|
||||||
"""
|
"""
|
||||||
@@ -286,7 +286,7 @@ def decode_instruction(buf):
|
|||||||
field_number = tag >> 3
|
field_number = tag >> 3
|
||||||
wire_type = tag & 0x07
|
wire_type = tag & 0x07
|
||||||
|
|
||||||
if wire_type == 2: # Length-delimited type for all oneof fields
|
if wire_type == 2: # 所有oneof字段都使用长度分隔类型
|
||||||
length, pos = decode_varint(buf, pos)
|
length, pos = decode_varint(buf, pos)
|
||||||
value_buf = buf[pos:pos+length]
|
value_buf = buf[pos:pos+length]
|
||||||
pos += length
|
pos += length
|
||||||
@@ -297,34 +297,33 @@ def decode_instruction(buf):
|
|||||||
result['batch_collect_command'] = decode_batch_collect_command(value_buf)
|
result['batch_collect_command'] = decode_batch_collect_command(value_buf)
|
||||||
elif field_number == 3: # collect_result
|
elif field_number == 3: # collect_result
|
||||||
result['collect_result'] = decode_collect_result(value_buf)
|
result['collect_result'] = decode_collect_result(value_buf)
|
||||||
# else: unknown field, already skipped by default behavior
|
|
||||||
else:
|
else:
|
||||||
# 跳过未知字段 (或非长度分隔类型,尽管oneof字段通常是长度分隔的)
|
# 跳过未知字段
|
||||||
if wire_type == 0: _, pos = decode_varint(buf, pos)
|
if wire_type == 0: _, pos = decode_varint(buf, pos)
|
||||||
elif wire_type == 5: pos += 4 # fixed32
|
elif wire_type == 5: pos += 4
|
||||||
elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length
|
elif wire_type == 2: length, pos = decode_varint(buf, pos); pos += length
|
||||||
else: pos += 1
|
else: pos += 1
|
||||||
return result
|
return result
|
||||||
|
|
||||||
# --- Usage Example ---
|
# --- 单元测试与使用范例 ---
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
print("--- Testing Raw485Command ---")
|
print("--- 测试 Raw485Command ---")
|
||||||
raw_cmd_data = {'bus_number': 1, 'command_bytes': b'\x01\x03\x00\x00\x00\x02\xc4\x0b'}
|
raw_cmd_data = {'bus_number': 1, 'command_bytes': b'\x01\x03\x00\x00\x00\x02\xc4\x0b'}
|
||||||
encoded_raw_cmd = encode_raw_485_command(raw_cmd_data['bus_number'], raw_cmd_data['command_bytes'])
|
encoded_raw_cmd = encode_raw_485_command(raw_cmd_data['bus_number'], raw_cmd_data['command_bytes'])
|
||||||
print(f"Encoded Raw485Command: {encoded_raw_cmd.hex()}")
|
print(f"编码后 Raw485Command: {encoded_raw_cmd.hex()}")
|
||||||
decoded_raw_cmd = decode_raw_485_command(encoded_raw_cmd)
|
decoded_raw_cmd = decode_raw_485_command(encoded_raw_cmd)
|
||||||
print(f"Decoded Raw485Command: {decoded_raw_cmd}")
|
print(f"解码后 Raw485Command: {decoded_raw_cmd}")
|
||||||
assert decoded_raw_cmd == raw_cmd_data
|
assert decoded_raw_cmd == raw_cmd_data
|
||||||
|
|
||||||
print("\n--- Testing CollectTask ---")
|
print("\n--- 测试 CollectTask ---")
|
||||||
collect_task_data = {'command': raw_cmd_data}
|
collect_task_data = {'command': raw_cmd_data}
|
||||||
encoded_collect_task = encode_collect_task(collect_task_data['command'])
|
encoded_collect_task = encode_collect_task(collect_task_data['command'])
|
||||||
print(f"Encoded CollectTask: {encoded_collect_task.hex()}")
|
print(f"编码后 CollectTask: {encoded_collect_task.hex()}")
|
||||||
decoded_collect_task = decode_collect_task(encoded_collect_task)
|
decoded_collect_task = decode_collect_task(encoded_collect_task)
|
||||||
print(f"Decoded CollectTask: {decoded_collect_task}")
|
print(f"解码后 CollectTask: {decoded_collect_task}")
|
||||||
assert decoded_collect_task == collect_task_data
|
assert decoded_collect_task == collect_task_data
|
||||||
|
|
||||||
print("\n--- Testing BatchCollectCommand ---")
|
print("\n--- 测试 BatchCollectCommand ---")
|
||||||
batch_collect_data = {
|
batch_collect_data = {
|
||||||
'correlation_id': 'abc-123',
|
'correlation_id': 'abc-123',
|
||||||
'tasks': [
|
'tasks': [
|
||||||
@@ -333,48 +332,47 @@ if __name__ == "__main__":
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
encoded_batch_collect = encode_batch_collect_command(batch_collect_data['correlation_id'], batch_collect_data['tasks'])
|
encoded_batch_collect = encode_batch_collect_command(batch_collect_data['correlation_id'], batch_collect_data['tasks'])
|
||||||
print(f"Encoded BatchCollectCommand: {encoded_batch_collect.hex()}")
|
print(f"编码后 BatchCollectCommand: {encoded_batch_collect.hex()}")
|
||||||
decoded_batch_collect = decode_batch_collect_command(encoded_batch_collect)
|
decoded_batch_collect = decode_batch_collect_command(encoded_batch_collect)
|
||||||
print(f"Decoded BatchCollectCommand: {decoded_batch_collect}")
|
print(f"解码后 BatchCollectCommand: {decoded_batch_collect}")
|
||||||
assert decoded_batch_collect == batch_collect_data
|
assert decoded_batch_collect == batch_collect_data
|
||||||
|
|
||||||
print("\n--- Testing CollectResult ---")
|
print("\n--- 测试 CollectResult ---")
|
||||||
collect_result_data = {
|
collect_result_data = {
|
||||||
'correlation_id': 'res-456',
|
'correlation_id': 'res-456',
|
||||||
'values': [12.34, 56.78, 90.12]
|
'values': [12.34, 56.78, 90.12]
|
||||||
}
|
}
|
||||||
encoded_collect_result = encode_collect_result(collect_result_data['correlation_id'], collect_result_data['values'])
|
encoded_collect_result = encode_collect_result(collect_result_data['correlation_id'], collect_result_data['values'])
|
||||||
print(f"Encoded CollectResult: {encoded_collect_result.hex()}")
|
print(f"编码后 CollectResult: {encoded_collect_result.hex()}")
|
||||||
decoded_collect_result = decode_collect_result(encoded_collect_result)
|
decoded_collect_result = decode_collect_result(encoded_collect_result)
|
||||||
print(f"Decoded CollectResult: {decoded_collect_result}")
|
print(f"解码后 CollectResult: {decoded_collect_result}")
|
||||||
# Due to float precision, direct assert might fail. Compare elements.
|
# 由于32位浮点数精度问题,直接比较可能会失败,此处设置一个合理的容忍度
|
||||||
assert decoded_collect_result['correlation_id'] == collect_result_data['correlation_id']
|
assert decoded_collect_result['correlation_id'] == collect_result_data['correlation_id']
|
||||||
for i in range(len(collect_result_data['values'])):
|
for i in range(len(collect_result_data['values'])):
|
||||||
assert abs(decoded_collect_result['values'][i] - collect_result_data['values'][i]) < 1e-6
|
assert abs(decoded_collect_result['values'][i] - collect_result_data['values'][i]) < 1e-5 # 已放宽容忍度
|
||||||
|
|
||||||
print("\n--- Testing Instruction with Raw485Command ---")
|
print("\n--- 测试 Instruction (内含Raw485Command) ---")
|
||||||
instruction_raw_485 = encode_instruction('raw_485_command', raw_cmd_data)
|
instruction_raw_485 = encode_instruction('raw_485_command', raw_cmd_data)
|
||||||
print(f"Encoded Instruction (Raw485Command): {instruction_raw_485.hex()}")
|
print(f"编码后 Instruction (Raw485Command): {instruction_raw_485.hex()}")
|
||||||
decoded_instruction_raw_485 = decode_instruction(instruction_raw_485)
|
decoded_instruction_raw_485 = decode_instruction(instruction_raw_485)
|
||||||
print(f"Decoded Instruction (Raw485Command): {decoded_instruction_raw_485}")
|
print(f"解码后 Instruction (Raw485Command): {decoded_instruction_raw_485}")
|
||||||
assert decoded_instruction_raw_485['raw_485_command'] == raw_cmd_data
|
assert decoded_instruction_raw_485['raw_485_command'] == raw_cmd_data
|
||||||
|
|
||||||
print("\n--- Testing Instruction with BatchCollectCommand ---")
|
print("\n--- 测试 Instruction (内含BatchCollectCommand) ---")
|
||||||
instruction_batch_collect = encode_instruction('batch_collect_command', batch_collect_data)
|
instruction_batch_collect = encode_instruction('batch_collect_command', batch_collect_data)
|
||||||
print(f"Encoded Instruction (BatchCollectCommand): {instruction_batch_collect.hex()}")
|
print(f"编码后 Instruction (BatchCollectCommand): {instruction_batch_collect.hex()}")
|
||||||
decoded_instruction_batch_collect = decode_instruction(instruction_batch_collect)
|
decoded_instruction_batch_collect = decode_instruction(instruction_batch_collect)
|
||||||
print(f"Decoded Instruction (BatchCollectCommand): {decoded_instruction_batch_collect}")
|
print(f"解码后 Instruction (BatchCollectCommand): {decoded_instruction_batch_collect}")
|
||||||
assert decoded_instruction_batch_collect['batch_collect_command']['correlation_id'] == batch_collect_data['correlation_id']
|
assert decoded_instruction_batch_collect['batch_collect_command']['correlation_id'] == batch_collect_data['correlation_id']
|
||||||
assert len(decoded_instruction_batch_collect['batch_collect_command']['tasks']) == len(batch_collect_data['tasks'])
|
assert len(decoded_instruction_batch_collect['batch_collect_command']['tasks']) == len(batch_collect_data['tasks'])
|
||||||
# More detailed assertion for tasks if needed
|
|
||||||
|
|
||||||
print("\n--- Testing Instruction with CollectResult ---")
|
print("\n--- 测试 Instruction (内含CollectResult) ---")
|
||||||
instruction_collect_result = encode_instruction('collect_result', collect_result_data)
|
instruction_collect_result = encode_instruction('collect_result', collect_result_data)
|
||||||
print(f"Encoded Instruction (CollectResult): {instruction_collect_result.hex()}")
|
print(f"编码后 Instruction (CollectResult): {instruction_collect_result.hex()}")
|
||||||
decoded_instruction_collect_result = decode_instruction(instruction_collect_result)
|
decoded_instruction_collect_result = decode_instruction(instruction_collect_result)
|
||||||
print(f"Decoded Instruction (CollectResult): {decoded_instruction_collect_result}")
|
print(f"解码后 Instruction (CollectResult): {decoded_instruction_collect_result}")
|
||||||
assert decoded_instruction_collect_result['collect_result']['correlation_id'] == collect_result_data['correlation_id']
|
assert decoded_instruction_collect_result['collect_result']['correlation_id'] == collect_result_data['correlation_id']
|
||||||
for i in range(len(collect_result_data['values'])):
|
for i in range(len(collect_result_data['values'])):
|
||||||
assert abs(decoded_instruction_collect_result['collect_result']['values'][i] - collect_result_data['values'][i]) < 1e-6
|
assert abs(decoded_instruction_collect_result['collect_result']['values'][i] - collect_result_data['values'][i]) < 1e-5
|
||||||
|
|
||||||
print("\nAll tests passed!")
|
print("\n所有测试均已通过!")
|
||||||
|
|||||||
Reference in New Issue
Block a user