Files
heartbeat/build/lib/hbd/proto.py
T
2026-02-04 12:45:35 -05:00

82 lines
2.3 KiB
Python

"""Message encoding/decoding utilities for hbd protocol."""
from typing import Dict, Any
import zlib
def dicttos(ID: str, d: Dict[str, Any], compress: bool = False):
"""Serialize a dict to protocol message bytes.
If compress is True, the payload is zlib-compressed and the message is
prefixed with `!ID:` as the original script did. Otherwise the format is
`ID:key=value;...` (bytes).
"""
s = []
for k in d:
v = d[k]
if isinstance(v, float):
s.append(f"{k}={v:0.5f}")
else:
s.append(f"{k}={v}")
pk = ";".join(s)
if compress:
zpk = zlib.compress(pk.encode(), 6)
hdr = ("!" + ID + ":").encode()
return hdr + zpk
else:
return (ID + ":" + pk).encode()
def stodict(msg: bytes):
"""Deserialize a protocol message into a dict.
Mirrors original behaviour: detects compressed messages starting with
'!' and decodes accordingly. Returns a dict with key 'ID' set to the
message ID and the parsed key/value pairs.
"""
d = {}
if len(msg) > 0 and chr(msg[0]) == "!":
# message is: b'!ID:' + compressed_payload
# original code used msg[1:4].decode() for ID (3 bytes including colon)
try:
pk = zlib.decompress(msg[5:]).decode()
except Exception:
# malformed compressed payload
return {}
d["ID"] = msg[1:4].decode()
else:
try:
r0 = msg.split(b":", 1)
pk = r0[1].decode()
d["ID"] = r0[0].decode()
except Exception:
return {}
if not pk:
return d
parts = pk.split(";")
for v in parts:
if not v:
continue
vr = v.split("=", 1)
k = vr[0].strip()
if len(vr) == 1:
d[k] = None
else:
val = vr[1].strip()
if val and val[0].isdigit():
try:
val_e = eval(val)
except Exception:
val_e = val
d[k] = val_e
else:
d[k] = val
return d
def oldmtodict(msg: bytes):
"""Compatibility wrapper for old-style messages (no ID prefix).
The original implementation prefixed with 'HTB:' and called stodict.
"""
return stodict(b"HTB:" + msg)