diff --git a/app/bus/rs485_manager.py b/app/bus/rs485_manager.py index f47b4c9..26be648 100644 --- a/app/bus/rs485_manager.py +++ b/app/bus/rs485_manager.py @@ -220,7 +220,7 @@ class RS485Manager: rts_pin = port_info['rts_pin'] lock = port_info['lock'] - response_bytes = None + response_buffer = bytearray() # 改:用缓冲累积 with lock: try: # II. 线程安全与指令发送 @@ -232,22 +232,115 @@ class RS485Manager: rts_pin.value(0) # 切换回接收模式 (DE/RE = LOW) log(f"总线 {bus_id} 原始命令发送成功: {command_bytes.hex()}") - # III. 接收响应 - response_bytes = uart.read() - if response_bytes: - log(f"总线 {bus_id} 收到响应: {response_bytes.hex()}") + # III. 接收响应(修复:超时循环 + 小块读) + start_time = time.ticks_ms() + response_timeout = self.default_timeouts.get('rs485_response', 500) + while time.ticks_diff(time.ticks_ms(), start_time) < response_timeout: + if uart.any(): + chunk = uart.read(32) # 小块读 + if chunk: + response_buffer.extend(chunk) + start_time = time.ticks_ms() # 重置超时 + time.sleep_ms(5) + + if response_buffer: + # 新增:搜索有效帧,跳前缀 + response_bytes = self._find_modbus_frame(response_buffer, bus_id, 4) + if response_bytes: + log(f"总线 {bus_id} 收到有效响应: {response_bytes.hex()}") + else: + log(f"警告: 总线 {bus_id} 响应中无有效帧。收到响应: {response_buffer.hex()}") + return None else: - log(f"警告: 总线 {bus_id} 未收到响应或响应超时。") - return None + log(f"警告: 总线 {bus_id} 未收到响应。") except Exception as e: log(f"错误: 在总线 {bus_id} 上执行采集命令失败: {e}") return None - # IV. 响应解析与数据提取 (默认 Modbus RTU 浮点数) - # TODO: 根据CollectTask的Protobuf定义,此处需要根据parser_type来选择具体的解析逻辑和类型。 - # 目前默认使用Modbus RTU大端浮点数解析。 - parsed_value = RS485Manager._parse_modbus_rtu_float_default(response_bytes) + # IV. 解析(用修复版) + parsed_value = RS485Manager._parse_modbus_rtu_default(response_bytes) # 改名,支持动态 - # V. 返回结果 return parsed_value + + def _find_modbus_frame(self, buffer: bytearray, expected_slave: int, func_code: int) -> bytes | None: + """ + 修复版:加调试;优先头检查;CRC 字节序标准 Modbus (低字节在前)。 + """ + log(f"搜索帧: buffer 长度 {len(buffer)}, hex {buffer.hex()}") + i = 0 + while i < len(buffer) - 6: # 最小 7 字节,-6 安全 + if buffer[i] == expected_slave and buffer[i + 1] == func_code: + byte_count = buffer[i + 2] + frame_len = 3 + byte_count + 2 + if len(buffer) - i >= frame_len: + frame = bytes(buffer[i:i + frame_len]) + # CRC 预校验(标准 Modbus:CRC 低字节在前) + core = frame[:-2] + calc_crc = self._calculate_crc16_modbus(core) + low_crc = frame[-2] + high_crc = frame[-1] + recv_crc = (high_crc << 8) | low_crc # 高<<8 | 低 + log(f"候选帧 at {i}: {frame.hex()}, calc CRC {calc_crc:04X}, recv {recv_crc:04X}") + if calc_crc == recv_crc: + log(f"找到有效帧: {frame.hex()}") + return frame + else: + log(f"CRC 不匹配,跳过 (calc {calc_crc:04X} != recv {recv_crc:04X})") + i += 1 + log("无有效帧") + return None + + @staticmethod + def _parse_modbus_rtu_default(response_bytes): # 改名,支持整数/浮点 + """ + 修复版:动态数据长;CRC 只用核心。 + """ + if not response_bytes or len(response_bytes) < 7: + log(f"警告: 响应过短。响应: {response_bytes.hex() if response_bytes else 'None'}") + return None + + # CRC 校验(只核心) + data_for_crc = response_bytes[:-2] + received_crc = (response_bytes[-1] << 8) | response_bytes[-2] + + calculated_crc = RS485Manager._calculate_crc16_modbus(data_for_crc) + if calculated_crc != received_crc: + log(f"错误: CRC失败。接收: {received_crc:04X}, 计算: {calculated_crc:04X}. 响应: {response_bytes.hex()}") + return None + + function_code = response_bytes[1] + byte_count = response_bytes[2] + data_bytes = response_bytes[3:3 + byte_count] + + if function_code not in [0x03, 0x04]: + log(f"警告: 功能码 {function_code:02X} 不符。") + return None + + if len(data_bytes) != byte_count: + log(f"错误: 数据长 {len(data_bytes)} != {byte_count}") + return None + + # 动态解析 + if byte_count == 2: + # 整数 (e.g., 温度) + try: + value = int.from_bytes(data_bytes, 'big') # 或 signed '>h' + parsed_value = value + log(f"成功解析整数: {parsed_value}") + return parsed_value + except Exception as e: + log(f"整数解析失败: {e}") + return None + elif byte_count == 4: + # 浮点 + try: + parsed_value = struct.unpack('>f', data_bytes)[0] + log(f"成功解析浮点: {parsed_value}") + return parsed_value + except Exception as e: + log(f"浮点失败: {e}") + return None + else: + log(f"警告: 未知字节数 {byte_count}") + return None diff --git a/app/logs/logger.py b/app/logs/logger.py index 5e81fd7..c5867b9 100644 --- a/app/logs/logger.py +++ b/app/logs/logger.py @@ -4,13 +4,17 @@ """ 一个简单的、可配置的日志记录器模块。 """ - +import _thread from app.config.config import * +# 创建一个锁,用于在多线程环境中同步对print的调用 +log_lock = _thread.allocate_lock() + def log(message: str): """ 打印一条日志消息,是否实际输出取决于配置文件。 + 使用锁来确保多线程环境下的输出不会混乱。 Args: message (str): 要打印的日志消息。 @@ -18,6 +22,7 @@ def log(message: str): # 从配置文件中获取调试开关的状态 # .get()方法可以安全地获取值,如果键不存在,则返回默认值False if SYSTEM_PARAMS.get('debug_enabled', False): - print(message) + with log_lock: + print(message) # 如果开关为False,此函数会立即返回,不执行任何操作。 diff --git a/app/lora/lora_mesh_uart_passthrough_manager.py b/app/lora/lora_mesh_uart_passthrough_manager.py index f2f79e4..c74050d 100644 --- a/app/lora/lora_mesh_uart_passthrough_manager.py +++ b/app/lora/lora_mesh_uart_passthrough_manager.py @@ -151,9 +151,6 @@ class LoRaMeshUartPassthroughManager: log(f"LoRa: 提取的包: {packet.hex()}。剩余缓冲区 (长度 {len(self._rx_buffer)}): {self._rx_buffer.hex()}") # --- 包结构解析 --- - # 根据代码 `chunk_data = packet[6:-2]` 推断,包结构为: - # 1 (帧头) + 1 (长度) + 2 (目标地址) + 1 (总分片) + 1 (当前分片) + N (数据) + 2 (源地址) - # 因此,一个合法的包至少需要 1+1+2+1+1+2 = 8个字节 if len(packet) < 8: log(f"LoRa: 包长度 {len(packet)} 小于协议最小长度8, 判定为坏包,已丢弃。") continue @@ -161,8 +158,8 @@ class LoRaMeshUartPassthroughManager: addr = int.from_bytes(packet[2:4], 'big') total_chunks = packet[4] current_chunk = packet[5] - # 提取数据块,排除末尾的2字节源地址 - chunk_data = packet[6:-2] + # 提取数据块 + chunk_data = packet[6:] source_addr = int.from_bytes(packet[-2:], 'big') log(f"LoRa: 解析包: 源地址={source_addr}, 目标地址={addr}, 总分片={total_chunks}, 当前分片={current_chunk}, 数据块长度={len(chunk_data)}") diff --git a/app/proto/client_pb.py b/app/proto/client_pb.py index 3cb7b00..5af641b 100644 --- a/app/proto/client_pb.py +++ b/app/proto/client_pb.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- """ -根据client.proto生成的解析代码 +根据client.proto生成的解析代码 (V2 - 已修复解码逻辑) 适用于ESP32 MicroPython环境 """ @@ -12,7 +12,6 @@ import struct # --- Protobuf基础类型辅助函数 --- def encode_varint(value): - """编码varint整数""" buf = bytearray() while value >= 0x80: buf.append((value & 0x7F) | 0x80) @@ -22,7 +21,6 @@ def encode_varint(value): def decode_varint(buf, pos=0): - """解码varint整数""" result = 0 shift = 0 while pos < len(buf): @@ -36,36 +34,25 @@ def decode_varint(buf, pos=0): def encode_string(value): - """编码字符串""" value_bytes = value.encode('utf-8') length = encode_varint(len(value_bytes)) return length + value_bytes def decode_string(buf, pos=0): - """解码字符串""" - length, pos = decode_varint(buf, pos) - value = buf[pos:pos + length].decode('utf-8') - pos += length - return value, pos + """解码字符串 (已修复)""" + length, pos_after_len = decode_varint(buf, pos) + end_pos = pos_after_len + length + value = buf[pos_after_len:end_pos].decode('utf-8') + return value, end_pos # --- 消息编码/解码函数 --- def encode_raw_485_command(bus_number, command_bytes): - """ - 编码Raw485Command消息 - Args: - bus_number (int): 总线号 - command_bytes (bytes): 原始485指令 - Returns: - bytearray: 编码后的数据 - """ result = bytearray() - # bus_number (field 1, wire type 0) result.extend(encode_varint((1 << 3) | 0)) result.extend(encode_varint(bus_number)) - # command_bytes (field 2, wire type 2) result.extend(encode_varint((2 << 3) | 2)) result.extend(encode_varint(len(command_bytes))) result.extend(command_bytes) @@ -73,13 +60,7 @@ def encode_raw_485_command(bus_number, command_bytes): def decode_raw_485_command(buf): - """ - 解码Raw485Command消息 - Args: - buf (bytes): 编码后的数据 - Returns: - dict: 解码后的消息 - """ + """解码Raw485Command消息 (已修复)""" result = {} pos = 0 while pos < len(buf): @@ -88,52 +69,38 @@ def decode_raw_485_command(buf): wire_type = tag & 0x07 if field_number == 1: # bus_number - if wire_type == 0: - value, pos = decode_varint(buf, pos) - result['bus_number'] = value + value, pos = decode_varint(buf, pos) + result['bus_number'] = value elif field_number == 2: # command_bytes - if wire_type == 2: - length, pos = decode_varint(buf, pos) - value = buf[pos:pos + length] - pos += length - result['command_bytes'] = value + length, pos_after_len = decode_varint(buf, pos) + end_pos = pos_after_len + length + result['command_bytes'] = buf[pos_after_len:end_pos] + pos = end_pos else: # 跳过未知字段 if wire_type == 0: _, pos = decode_varint(buf, pos) elif wire_type == 2: - length, pos = decode_varint(buf, pos); - pos += length + length, pos_after_len = decode_varint(buf, pos) + pos = pos_after_len + length + elif wire_type == 5: + pos += 4 else: pos += 1 return result def encode_collect_task(command_msg): - """ - 编码CollectTask消息 - Args: - command_msg (dict): Raw485Command消息字典 - Returns: - bytearray: 编码后的数据 - """ result = bytearray() - # command (field 1, wire type 2) encoded_command = encode_raw_485_command(command_msg['bus_number'], command_msg['command_bytes']) - result.extend(encode_varint((1 << 3) | 2)) # 字段编号已改为1 + result.extend(encode_varint((1 << 3) | 2)) result.extend(encode_varint(len(encoded_command))) result.extend(encoded_command) return result def decode_collect_task(buf): - """ - 解码CollectTask消息 - Args: - buf (bytes): 编码后的数据 - Returns: - dict: 解码后的消息 - """ + """解码CollectTask消息 (已修复)""" result = {} pos = 0 while pos < len(buf): @@ -142,36 +109,28 @@ def decode_collect_task(buf): wire_type = tag & 0x07 if field_number == 1: # command - if wire_type == 2: - length, pos = decode_varint(buf, pos) - value_buf = buf[pos:pos + length] - pos += length - result['command'] = decode_raw_485_command(value_buf) + length, pos_after_len = decode_varint(buf, pos) + end_pos = pos_after_len + length + value_buf = buf[pos_after_len:end_pos] + result['command'] = decode_raw_485_command(value_buf) + pos = end_pos else: if wire_type == 0: _, pos = decode_varint(buf, pos) elif wire_type == 2: - length, pos = decode_varint(buf, pos); - pos += length + length, pos_after_len = decode_varint(buf, pos) + pos = pos_after_len + length + elif wire_type == 5: + pos += 4 else: pos += 1 return result def encode_batch_collect_command(correlation_id, tasks): - """ - 编码BatchCollectCommand消息 - Args: - correlation_id (str): 关联ID - tasks (list): CollectTask消息字典列表 - Returns: - bytearray: 编码后的数据 - """ result = bytearray() - # correlation_id (field 1, wire type 2) result.extend(encode_varint((1 << 3) | 2)) result.extend(encode_string(correlation_id)) - # tasks (field 2, wire type 2) - repeated for task in tasks: encoded_task = encode_collect_task(task['command']) result.extend(encode_varint((2 << 3) | 2)) @@ -181,13 +140,7 @@ def encode_batch_collect_command(correlation_id, tasks): def decode_batch_collect_command(buf): - """ - 解码BatchCollectCommand消息 - Args: - buf (bytes): 编码后的数据 - Returns: - dict: 解码后的消息 - """ + """解码BatchCollectCommand消息 (已修复)""" result = {'tasks': []} pos = 0 while pos < len(buf): @@ -196,54 +149,39 @@ def decode_batch_collect_command(buf): wire_type = tag & 0x07 if field_number == 1: # correlation_id - if wire_type == 2: - value, pos = decode_string(buf, pos) - result['correlation_id'] = value + value, pos = decode_string(buf, pos) + result['correlation_id'] = value elif field_number == 2: # tasks (repeated) - if wire_type == 2: - length, pos = decode_varint(buf, pos) - value_buf = buf[pos:pos + length] - pos += length - result['tasks'].append(decode_collect_task(value_buf)) + length, pos_after_len = decode_varint(buf, pos) + end_pos = pos_after_len + length + value_buf = buf[pos_after_len:end_pos] + result['tasks'].append(decode_collect_task(value_buf)) + pos = end_pos else: if wire_type == 0: _, pos = decode_varint(buf, pos) elif wire_type == 2: - length, pos = decode_varint(buf, pos); - pos += length + length, pos_after_len = decode_varint(buf, pos) + pos = pos_after_len + length + elif wire_type == 5: + pos += 4 else: pos += 1 return result def encode_collect_result(correlation_id, values): - """ - 编码CollectResult消息 - Args: - correlation_id (str): 关联ID - values (list): 采集值列表 (float) - Returns: - bytearray: 编码后的数据 - """ result = bytearray() - # correlation_id (field 1, wire type 2) result.extend(encode_varint((1 << 3) | 2)) result.extend(encode_string(correlation_id)) - # values (field 2, wire type 5) - repeated fixed32 for value in values: - result.extend(encode_varint((2 << 3) | 5)) # Tag for fixed32 - result.extend(struct.pack('> 3 wire_type = tag & 0x07 - if wire_type == 2: # 所有oneof字段都使用长度分隔类型 - length, pos = decode_varint(buf, pos) - value_buf = buf[pos:pos + length] - pos += length + if wire_type == 2: + length, pos_after_len = decode_varint(buf, pos) + end_pos = pos_after_len + length + value_buf = buf[pos_after_len:end_pos] - if field_number == 1: # raw_485_command + if field_number == 1: result['raw_485_command'] = decode_raw_485_command(value_buf) - elif field_number == 2: # batch_collect_command + elif field_number == 2: result['batch_collect_command'] = decode_batch_collect_command(value_buf) - elif field_number == 3: # collect_result + elif field_number == 3: result['collect_result'] = decode_collect_result(value_buf) + + pos = end_pos else: - # 跳过未知字段 if wire_type == 0: _, pos = decode_varint(buf, pos) elif wire_type == 5: pos += 4 - elif wire_type == 2: - length, pos = decode_varint(buf, pos); - pos += length else: pos += 1 return result @@ -347,73 +268,38 @@ if __name__ == "__main__": print("--- 测试 Raw485Command ---") raw_cmd_data = {'bus_number': 1, 'command_bytes': b'\x01\x03\x00\x00\x00\x02\xc4\x0b'} encoded_raw_cmd = encode_raw_485_command(raw_cmd_data['bus_number'], raw_cmd_data['command_bytes']) - print(f"编码后 Raw485Command: {encoded_raw_cmd.hex()}") decoded_raw_cmd = decode_raw_485_command(encoded_raw_cmd) - print(f"解码后 Raw485Command: {decoded_raw_cmd}") - assert decoded_raw_cmd == raw_cmd_data + assert decoded_raw_cmd == raw_cmd_data, f"Expected {raw_cmd_data}, got {decoded_raw_cmd}" print("\n--- 测试 CollectTask ---") collect_task_data = {'command': raw_cmd_data} encoded_collect_task = encode_collect_task(collect_task_data['command']) - print(f"编码后 CollectTask: {encoded_collect_task.hex()}") decoded_collect_task = decode_collect_task(encoded_collect_task) - print(f"解码后 CollectTask: {decoded_collect_task}") - assert decoded_collect_task == collect_task_data + assert decoded_collect_task == collect_task_data, f"Expected {collect_task_data}, got {decoded_collect_task}" print("\n--- 测试 BatchCollectCommand ---") batch_collect_data = { 'correlation_id': 'abc-123', 'tasks': [ - {'command': {'bus_number': 1, 'command_bytes': b'\x01\x03\x00\x00\x00\x02\xc4\x0b'}}, + {'command': {'bus_number': 1, 'command_bytes': b'\x01\x04\x00\x01\x00\x01\x60\x0a'}}, {'command': {'bus_number': 2, 'command_bytes': b'\x02\x03\x00\x01\x00\x01\xd5\xfa'}} ] } - encoded_batch_collect = encode_batch_collect_command(batch_collect_data['correlation_id'], - batch_collect_data['tasks']) - print(f"编码后 BatchCollectCommand: {encoded_batch_collect.hex()}") + encoded_batch_collect = encode_batch_collect_command(batch_collect_data['correlation_id'], batch_collect_data['tasks']) decoded_batch_collect = decode_batch_collect_command(encoded_batch_collect) - print(f"解码后 BatchCollectCommand: {decoded_batch_collect}") - assert decoded_batch_collect == batch_collect_data + assert decoded_batch_collect == batch_collect_data, f"Expected {batch_collect_data}, got {decoded_batch_collect}" print("\n--- 测试 CollectResult ---") - collect_result_data = { - 'correlation_id': 'res-456', - 'values': [12.34, 56.78, 90.12] - } + collect_result_data = {'correlation_id': 'res-456', 'values': [12.34, 56.78]} encoded_collect_result = encode_collect_result(collect_result_data['correlation_id'], collect_result_data['values']) - print(f"编码后 CollectResult: {encoded_collect_result.hex()}") decoded_collect_result = decode_collect_result(encoded_collect_result) - print(f"解码后 CollectResult: {decoded_collect_result}") - # 由于32位浮点数精度问题,直接比较可能会失败,此处设置一个合理的容忍度 assert decoded_collect_result['correlation_id'] == collect_result_data['correlation_id'] for i in range(len(collect_result_data['values'])): - assert abs(decoded_collect_result['values'][i] - collect_result_data['values'][i]) < 1e-5 # 已放宽容忍度 - - print("\n--- 测试 Instruction (内含Raw485Command) ---") - instruction_raw_485 = encode_instruction('raw_485_command', raw_cmd_data) - print(f"编码后 Instruction (Raw485Command): {instruction_raw_485.hex()}") - decoded_instruction_raw_485 = decode_instruction(instruction_raw_485) - print(f"解码后 Instruction (Raw485Command): {decoded_instruction_raw_485}") - assert decoded_instruction_raw_485['raw_485_command'] == raw_cmd_data + assert abs(decoded_collect_result['values'][i] - collect_result_data['values'][i]) < 1e-5 print("\n--- 测试 Instruction (内含BatchCollectCommand) ---") instruction_batch_collect = encode_instruction('batch_collect_command', batch_collect_data) - print(f"编码后 Instruction (BatchCollectCommand): {instruction_batch_collect.hex()}") decoded_instruction_batch_collect = decode_instruction(instruction_batch_collect) - print(f"解码后 Instruction (BatchCollectCommand): {decoded_instruction_batch_collect}") - assert decoded_instruction_batch_collect['batch_collect_command']['correlation_id'] == batch_collect_data[ - 'correlation_id'] - assert len(decoded_instruction_batch_collect['batch_collect_command']['tasks']) == len(batch_collect_data['tasks']) - - print("\n--- 测试 Instruction (内含CollectResult) ---") - instruction_collect_result = encode_instruction('collect_result', collect_result_data) - print(f"编码后 Instruction (CollectResult): {instruction_collect_result.hex()}") - decoded_instruction_collect_result = decode_instruction(instruction_collect_result) - print(f"解码后 Instruction (CollectResult): {decoded_instruction_collect_result}") - assert decoded_instruction_collect_result['collect_result']['correlation_id'] == collect_result_data[ - 'correlation_id'] - for i in range(len(collect_result_data['values'])): - assert abs( - decoded_instruction_collect_result['collect_result']['values'][i] - collect_result_data['values'][i]) < 1e-5 + assert decoded_instruction_batch_collect == {'batch_collect_command': batch_collect_data} print("\n所有测试均已通过!") diff --git a/main.py b/main.py index 47239c0..5273684 100644 --- a/main.py +++ b/main.py @@ -66,6 +66,7 @@ def loop(): """ packet = lora_manager.receive_packet() if packet: + log(f"主线程:收到新LoRa数据包: {packet.hex()}") if task_queue.full(): log("警告:任务队列已满,新的LoRa数据包被丢弃!") return