"""Message encoding/decoding utilities for hbd protocol. Message Types: HTB: Heartbeat message (client -> server) ACK: Acknowledgment (server -> client) CMD: Command message (server -> client) UPD: Update message (server -> client) PLG: Plugin data message (client -> server) """ from typing import Dict, Any, Union import json import zlib def encode_value(v: Any) -> str: """Encode a value for protocol transmission. Args: v: Value to encode (int, float, str, bool, list, dict, etc.) Returns: String representation suitable for protocol """ if isinstance(v, float): return f"{v:0.5f}" elif isinstance(v, (list, dict)): # Use JSON encoding for complex types, prefixed with @ return "@" + json.dumps(v) elif isinstance(v, bool): return str(int(v)) # True->1, False->0 else: return str(v) def decode_value(val: str) -> Any: """Decode a value from protocol format. Args: val: String value from protocol Returns: Decoded Python object """ if not val: return val # Check for JSON-encoded complex types if val.startswith("@"): try: return json.loads(val[1:]) except Exception: return val[1:] # Return as string without @ # Try numeric conversion (avoid eval to prevent SyntaxWarnings on version strings) if val[0].isdigit() or (val[0] == '-' and len(val) > 1 and val[1].isdigit()): try: return int(val) except ValueError: pass try: return float(val) except ValueError: pass return val return val def dicttos(ID: str, d: Dict[str, Any]): """Serialize a dict to protocol message bytes. If compress is True, the payload is zlib-compressed and the message is prefixed with `!ID:` as the original script did. Otherwise the format is `ID:key=value;...` (bytes). """ s = [] for k in d: v = d[k] encoded_val = encode_value(v) s.append(f"{k}={encoded_val}") pk = ";".join(s) zpk = zlib.compress(pk.encode(), 6) hdr = ("!" + ID + ":").encode() return hdr + zpk def stodict(msg: bytes): """Deserialize a protocol message into a dict. Mirrors original behaviour: detects compressed messages starting with '!' and decodes accordingly. Returns a dict with key 'ID' set to the message ID and the parsed key/value pairs. """ d = {} if len(msg) > 0 and chr(msg[0]) == "!": # message is: b'!ID:' + compressed_payload # original code used msg[1:4].decode() for ID (3 bytes including colon) try: pk = zlib.decompress(msg[5:]).decode() except Exception: # malformed compressed payload return {} d["ID"] = msg[1:4].decode() else: try: r0 = msg.split(b":", 1) pk = r0[1].decode() d["ID"] = r0[0].decode() except Exception: return {} if not pk: return d parts = pk.split(";") for v in parts: if not v: continue vr = v.split("=", 1) k = vr[0].strip() if len(vr) == 1: d[k] = None else: val = vr[1].strip() d[k] = decode_value(val) return d def oldmtodict(msg: bytes): """Compatibility wrapper for old-style messages (no ID prefix). The original implementation prefixed with 'HTB:' and called stodict. """ return stodict(b"HTB:" + msg) def encode_plugin_data(plugin_name: str, data: Dict[str, Any]) -> bytes: """Encode plugin data into a PLG message. Args: plugin_name: Name of the plugin (e.g., "os_info", "cpu_monitor") data: Plugin data dictionary compress: Whether to compress the payload Returns: Encoded message bytes """ # Add plugin name to data full_data = {"plugin": plugin_name, **data} return dicttos("PLG", full_data) def decode_plugin_data(msg: bytes) -> Dict[str, Any]: """Decode a PLG message into plugin data. Args: msg: Raw message bytes Returns: Dictionary with 'ID', 'plugin', and plugin data fields """ return stodict(msg)