Files
pig-house-controller/client_pb.py
2025-09-25 20:14:48 +08:00

306 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
根据client.proto生成的解析代码
适用于ESP32 MicroPython环境
"""
import struct
# MethodType枚举
METHOD_TYPE_SWITCH = 0
METHOD_TYPE_COLLECT = 1
def encode_varint(value):
"""编码varint值"""
buf = bytearray()
while value >= 0x80:
buf.append((value & 0x7F) | 0x80)
value >>= 7
buf.append(value & 0x7F)
return buf
def decode_varint(buf, pos=0):
"""解码varint值"""
result = 0
shift = 0
while pos < len(buf):
byte = buf[pos]
pos += 1
result |= (byte & 0x7F) << shift
if not (byte & 0x80):
break
shift += 7
return result, pos
def encode_string(value):
"""编码字符串"""
value_bytes = value.encode('utf-8')
length = encode_varint(len(value_bytes))
return length + value_bytes
def decode_string(buf, pos=0):
"""解码字符串"""
length, pos = decode_varint(buf, pos)
value = buf[pos:pos+length].decode('utf-8')
pos += length
return value, pos
def encode_instruction(method, data):
"""
编码Instruction消息
Args:
method: 方法类型 (int)
data: 数据 (bytes)
Returns:
bytearray: 编码后的数据
"""
result = bytearray()
# 编码method字段 (field_number=1, wire_type=0)
result.extend(encode_varint((1 << 3) | 0)) # tag
result.extend(encode_varint(method)) # value
# 编码data字段 (field_number=2, wire_type=2)
result.extend(encode_varint((2 << 3) | 2)) # tag
result.extend(encode_varint(len(data))) # length
result.extend(data) # value
return result
def decode_instruction(buf):
"""
解码Instruction消息
Args:
buf: 编码后的数据 (bytes)
Returns:
dict: 解码后的消息
"""
result = {}
pos = 0
while pos < len(buf):
# 读取标签
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 1: # method字段
if wire_type == 0: # varint类型
value, pos = decode_varint(buf, pos)
result['method'] = value
elif field_number == 2: # data字段
if wire_type == 2: # 长度分隔类型
length, pos = decode_varint(buf, pos)
value = buf[pos:pos+length]
pos += length
result['data'] = value
else:
# 跳过未知字段
if wire_type == 0: # varint
_, pos = decode_varint(buf, pos)
elif wire_type == 2: # 长度分隔
length, pos = decode_varint(buf, pos)
pos += length
else:
pos += 1
return result
def encode_switch(device_action, bus_number, bus_address, relay_channel):
"""
编码Switch消息
Args:
device_action: 设备动作指令 (str)
bus_number: 总线号 (int)
bus_address: 总线地址 (int)
relay_channel: 继电器通道号 (int)
Returns:
bytearray: 编码后的数据
"""
result = bytearray()
# 编码device_action字段 (field_number=1, wire_type=2)
result.extend(encode_varint((1 << 3) | 2)) # tag
action_bytes = encode_string(device_action) # value (length + string)
result.extend(action_bytes)
# 编码bus_number字段 (field_number=2, wire_type=0)
result.extend(encode_varint((2 << 3) | 0)) # tag
result.extend(encode_varint(bus_number)) # value
# 编码bus_address字段 (field_number=3, wire_type=0)
result.extend(encode_varint((3 << 3) | 0)) # tag
result.extend(encode_varint(bus_address)) # value
# 编码relay_channel字段 (field_number=4, wire_type=0)
result.extend(encode_varint((4 << 3) | 0)) # tag
result.extend(encode_varint(relay_channel)) # value
return result
def decode_switch(buf):
"""
解码Switch消息
Args:
buf: 编码后的数据 (bytes)
Returns:
dict: 解码后的消息
"""
result = {}
pos = 0
while pos < len(buf):
# 读取标签
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 1: # device_action字段
if wire_type == 2: # 字符串类型
value, pos = decode_string(buf, pos)
result['device_action'] = value
elif field_number == 2: # bus_number字段
if wire_type == 0: # varint类型
value, pos = decode_varint(buf, pos)
result['bus_number'] = value
elif field_number == 3: # bus_address字段
if wire_type == 0: # varint类型
value, pos = decode_varint(buf, pos)
result['bus_address'] = value
elif field_number == 4: # relay_channel字段
if wire_type == 0: # varint类型
value, pos = decode_varint(buf, pos)
result['relay_channel'] = value
else:
# 跳过未知字段
if wire_type == 0: # varint
_, pos = decode_varint(buf, pos)
elif wire_type == 2: # 长度分隔
length, pos = decode_varint(buf, pos)
pos += length
else:
pos += 1
return result
def encode_collect(bus_number, bus_address, value):
"""
编码Collect消息
Args:
bus_number: 总线号 (int)
bus_address: 总线地址 (int)
value: 采集值 (float)
Returns:
bytearray: 编码后的数据
"""
result = bytearray()
# 编码bus_number字段 (field_number=1, wire_type=0)
result.extend(encode_varint((1 << 3) | 0)) # tag
result.extend(encode_varint(bus_number)) # value
# 编码bus_address字段 (field_number=2, wire_type=0)
result.extend(encode_varint((2 << 3) | 0)) # tag
result.extend(encode_varint(bus_address)) # value
# 编码value字段 (field_number=3, wire_type=5)
result.extend(encode_varint((3 << 3) | 5)) # tag
# 将float转换为little-endian的4字节
result.extend(struct.pack('<f', value)) # value
return result
def decode_collect(buf):
"""
解码Collect消息
Args:
buf: 编码后的数据 (bytes)
Returns:
dict: 解码后的消息
"""
result = {}
pos = 0
while pos < len(buf):
# 读取标签
tag, pos = decode_varint(buf, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 1: # bus_number字段
if wire_type == 0: # varint类型
value, pos = decode_varint(buf, pos)
result['bus_number'] = value
elif field_number == 2: # bus_address字段
if wire_type == 0: # varint类型
value, pos = decode_varint(buf, pos)
result['bus_address'] = value
elif field_number == 3: # value字段
if wire_type == 5: # 32位浮点类型
# 从little-endian的4字节解析float
value = struct.unpack('<f', buf[pos:pos+4])[0]
pos += 4
result['value'] = value
else:
# 跳过未知字段
if wire_type == 0: # varint
_, pos = decode_varint(buf, pos)
elif wire_type == 5: # 32位固定长度
pos += 4
elif wire_type == 2: # 长度分隔
length, pos = decode_varint(buf, pos)
pos += length
else:
pos += 1
return result
# 使用示例
if __name__ == "__main__":
# 创建一个Switch消息
switch_data = encode_switch("ON", 1, 10, 2)
print(f"编码后的Switch消息: {switch_data.hex()}")
# 创建一个Instruction消息包含Switch数据
instruction_data = encode_instruction(METHOD_TYPE_SWITCH, switch_data)
print(f"编码后的Instruction消息: {instruction_data.hex()}")
# 解码Instruction消息
decoded_instruction = decode_instruction(instruction_data)
print(f"解码后的Instruction消息: {decoded_instruction}")
# 解码Switch消息
if 'data' in decoded_instruction:
decoded_switch = decode_switch(decoded_instruction['data'])
print(f"解码后的Switch消息: {decoded_switch}")
# 创建一个Collect消息
collect_data = encode_collect(1, 20, 25.6)
print(f"编码后的Collect消息: {collect_data.hex()}")
# 创建一个Instruction消息包含Collect数据
instruction_data2 = encode_instruction(METHOD_TYPE_COLLECT, collect_data)
print(f"编码后的Instruction消息(Collect): {instruction_data2.hex()}")
# 解码Instruction消息
decoded_instruction2 = decode_instruction(instruction_data2)
print(f"解码后的Instruction消息: {decoded_instruction2}")
# 解码Collect消息
if 'data' in decoded_instruction2:
decoded_collect = decode_collect(decoded_instruction2['data'])
print(f"解码后的Collect消息: {decoded_collect}")