Files
2026-04-12 16:39:51 -04:00

163 lines
4.3 KiB
Python

"""Message encoding/decoding utilities for hbd protocol.
Message Types:
HTB: Heartbeat message (client -> server)
ACK: Acknowledgment (server -> client)
CMD: Command message (server -> client)
UPD: Update message (server -> client)
PLG: Plugin data message (client -> server)
"""
from typing import Dict, Any, Union
import json
import zlib
def encode_value(v: Any) -> str:
"""Encode a value for protocol transmission.
Args:
v: Value to encode (int, float, str, bool, list, dict, etc.)
Returns:
String representation suitable for protocol
"""
if isinstance(v, float):
return f"{v:0.5f}"
elif isinstance(v, (list, dict)):
# Use JSON encoding for complex types, prefixed with @
return "@" + json.dumps(v)
elif isinstance(v, bool):
return str(int(v)) # True->1, False->0
else:
return str(v)
def decode_value(val: str) -> Any:
"""Decode a value from protocol format.
Args:
val: String value from protocol
Returns:
Decoded Python object
"""
if not val:
return val
# Check for JSON-encoded complex types
if val.startswith("@"):
try:
return json.loads(val[1:])
except Exception:
return val[1:] # Return as string without @
# Try numeric conversion (avoid eval to prevent SyntaxWarnings on version strings)
if val[0].isdigit() or (val[0] == '-' and len(val) > 1 and val[1].isdigit()):
try:
return int(val)
except ValueError:
pass
try:
return float(val)
except ValueError:
pass
return val
return val
def dicttos(ID: str, d: Dict[str, Any]):
"""Serialize a dict to protocol message bytes.
If compress is True, the payload is zlib-compressed and the message is
prefixed with `!ID:` as the original script did. Otherwise the format is
`ID:key=value;...` (bytes).
"""
s = []
for k in d:
v = d[k]
encoded_val = encode_value(v)
s.append(f"{k}={encoded_val}")
pk = ";".join(s)
zpk = zlib.compress(pk.encode(), 6)
hdr = ("!" + ID + ":").encode()
return hdr + zpk
def stodict(msg: bytes):
"""Deserialize a protocol message into a dict.
Mirrors original behaviour: detects compressed messages starting with
'!' and decodes accordingly. Returns a dict with key 'ID' set to the
message ID and the parsed key/value pairs.
"""
d = {}
if len(msg) > 0 and chr(msg[0]) == "!":
# message is: b'!ID:' + compressed_payload
# original code used msg[1:4].decode() for ID (3 bytes including colon)
try:
pk = zlib.decompress(msg[5:]).decode()
except Exception:
# malformed compressed payload
return {}
d["ID"] = msg[1:4].decode()
else:
try:
r0 = msg.split(b":", 1)
pk = r0[1].decode()
d["ID"] = r0[0].decode()
except Exception:
return {}
if not pk:
return d
parts = pk.split(";")
for v in parts:
if not v:
continue
vr = v.split("=", 1)
k = vr[0].strip()
if len(vr) == 1:
d[k] = None
else:
val = vr[1].strip()
d[k] = decode_value(val)
return d
def oldmtodict(msg: bytes):
"""Compatibility wrapper for old-style messages (no ID prefix).
The original implementation prefixed with 'HTB:' and called stodict.
"""
return stodict(b"HTB:" + msg)
def encode_plugin_data(plugin_name: str, data: Dict[str, Any]) -> bytes:
"""Encode plugin data into a PLG message.
Args:
plugin_name: Name of the plugin (e.g., "os_info", "cpu_monitor")
data: Plugin data dictionary
compress: Whether to compress the payload
Returns:
Encoded message bytes
"""
# Add plugin name to data
full_data = {"plugin": plugin_name, **data}
return dicttos("PLG", full_data)
def decode_plugin_data(msg: bytes) -> Dict[str, Any]:
"""Decode a PLG message into plugin data.
Args:
msg: Raw message bytes
Returns:
Dictionary with 'ID', 'plugin', and plugin data fields
"""
return stodict(msg)