82 lines
2.3 KiB
Python
82 lines
2.3 KiB
Python
"""Message encoding/decoding utilities for hbd protocol."""
|
|
from typing import Dict, Any
|
|
import zlib
|
|
|
|
|
|
def dicttos(ID: str, d: Dict[str, Any], compress: bool = False):
|
|
"""Serialize a dict to protocol message bytes.
|
|
|
|
If compress is True, the payload is zlib-compressed and the message is
|
|
prefixed with `!ID:` as the original script did. Otherwise the format is
|
|
`ID:key=value;...` (bytes).
|
|
"""
|
|
s = []
|
|
for k in d:
|
|
v = d[k]
|
|
if isinstance(v, float):
|
|
s.append(f"{k}={v:0.5f}")
|
|
else:
|
|
s.append(f"{k}={v}")
|
|
pk = ";".join(s)
|
|
if compress:
|
|
zpk = zlib.compress(pk.encode(), 6)
|
|
hdr = ("!" + ID + ":").encode()
|
|
return hdr + zpk
|
|
else:
|
|
return (ID + ":" + pk).encode()
|
|
|
|
|
|
def stodict(msg: bytes):
|
|
"""Deserialize a protocol message into a dict.
|
|
|
|
Mirrors original behaviour: detects compressed messages starting with
|
|
'!' and decodes accordingly. Returns a dict with key 'ID' set to the
|
|
message ID and the parsed key/value pairs.
|
|
"""
|
|
d = {}
|
|
if len(msg) > 0 and chr(msg[0]) == "!":
|
|
# message is: b'!ID:' + compressed_payload
|
|
# original code used msg[1:4].decode() for ID (3 bytes including colon)
|
|
try:
|
|
pk = zlib.decompress(msg[5:]).decode()
|
|
except Exception:
|
|
# malformed compressed payload
|
|
return {}
|
|
d["ID"] = msg[1:4].decode()
|
|
else:
|
|
try:
|
|
r0 = msg.split(b":", 1)
|
|
pk = r0[1].decode()
|
|
d["ID"] = r0[0].decode()
|
|
except Exception:
|
|
return {}
|
|
if not pk:
|
|
return d
|
|
parts = pk.split(";")
|
|
for v in parts:
|
|
if not v:
|
|
continue
|
|
vr = v.split("=", 1)
|
|
k = vr[0].strip()
|
|
if len(vr) == 1:
|
|
d[k] = None
|
|
else:
|
|
val = vr[1].strip()
|
|
if val and val[0].isdigit():
|
|
try:
|
|
val_e = eval(val)
|
|
except Exception:
|
|
val_e = val
|
|
d[k] = val_e
|
|
else:
|
|
d[k] = val
|
|
return d
|
|
|
|
|
|
def oldmtodict(msg: bytes):
|
|
"""Compatibility wrapper for old-style messages (no ID prefix).
|
|
|
|
The original implementation prefixed with 'HTB:' and called stodict.
|
|
"""
|
|
return stodict(b"HTB:" + msg)
|