Files
heartbeat/hbd/common/proto.py
T
Andreas Wrede 0543266c92 Major refactoring of the codebase, including restructuring of files and directories, renaming of modules and classes, and improvements to the overall organization and readability of the code. This refactoring aims to enhance maintainability, scalability, and clarity of the codebase while preserving existing functionality. The changes include:
- Restructuring of the project directory into client and server components
- Renaming of modules and classes to better reflect their purpose and functionality
- Moving common utilities and configurations to a shared location
- Updating import statements to reflect the new structure
- Adding new documentation files for better clarity on various aspects of the project
- Removing deprecated or unused code to streamline the codebase
- Ensuring that all existing functionality is preserved and that the codebase remains functional after the refactoring.
2026-03-29 11:13:40 -04:00

161 lines
4.3 KiB
Python

"""Message encoding/decoding utilities for hbd protocol.
Message Types:
HTB: Heartbeat message (client -> server)
ACK: Acknowledgment (server -> client)
CMD: Command message (server -> client)
UPD: Update message (server -> client)
PLG: Plugin data message (client -> server)
"""
from typing import Dict, Any, Union
import json
import zlib
def encode_value(v: Any) -> str:
"""Encode a value for protocol transmission.
Args:
v: Value to encode (int, float, str, bool, list, dict, etc.)
Returns:
String representation suitable for protocol
"""
if isinstance(v, float):
return f"{v:0.5f}"
elif isinstance(v, (list, dict)):
# Use JSON encoding for complex types, prefixed with @
return "@" + json.dumps(v)
elif isinstance(v, bool):
return str(int(v)) # True->1, False->0
else:
return str(v)
def decode_value(val: str) -> Any:
"""Decode a value from protocol format.
Args:
val: String value from protocol
Returns:
Decoded Python object
"""
if not val:
return val
# Check for JSON-encoded complex types
if val.startswith("@"):
try:
return json.loads(val[1:])
except Exception:
return val[1:] # Return as string without @
# Try numeric evaluation (original behavior)
if val[0].isdigit() or (val[0] == '-' and len(val) > 1 and val[1].isdigit()):
try:
return eval(val)
except Exception:
return val
return val
def dicttos(ID: str, d: Dict[str, Any], compress: bool = False):
"""Serialize a dict to protocol message bytes.
If compress is True, the payload is zlib-compressed and the message is
prefixed with `!ID:` as the original script did. Otherwise the format is
`ID:key=value;...` (bytes).
"""
s = []
for k in d:
v = d[k]
encoded_val = encode_value(v)
s.append(f"{k}={encoded_val}")
pk = ";".join(s)
if compress:
zpk = zlib.compress(pk.encode(), 6)
hdr = ("!" + ID + ":").encode()
return hdr + zpk
else:
return (ID + ":" + pk).encode()
def stodict(msg: bytes):
"""Deserialize a protocol message into a dict.
Mirrors original behaviour: detects compressed messages starting with
'!' and decodes accordingly. Returns a dict with key 'ID' set to the
message ID and the parsed key/value pairs.
"""
d = {}
if len(msg) > 0 and chr(msg[0]) == "!":
# message is: b'!ID:' + compressed_payload
# original code used msg[1:4].decode() for ID (3 bytes including colon)
try:
pk = zlib.decompress(msg[5:]).decode()
except Exception:
# malformed compressed payload
return {}
d["ID"] = msg[1:4].decode()
else:
try:
r0 = msg.split(b":", 1)
pk = r0[1].decode()
d["ID"] = r0[0].decode()
except Exception:
return {}
if not pk:
return d
parts = pk.split(";")
for v in parts:
if not v:
continue
vr = v.split("=", 1)
k = vr[0].strip()
if len(vr) == 1:
d[k] = None
else:
val = vr[1].strip()
d[k] = decode_value(val)
return d
def oldmtodict(msg: bytes):
"""Compatibility wrapper for old-style messages (no ID prefix).
The original implementation prefixed with 'HTB:' and called stodict.
"""
return stodict(b"HTB:" + msg)
def encode_plugin_data(plugin_name: str, data: Dict[str, Any], compress: bool = False) -> bytes:
"""Encode plugin data into a PLG message.
Args:
plugin_name: Name of the plugin (e.g., "os_info", "cpu_monitor")
data: Plugin data dictionary
compress: Whether to compress the payload
Returns:
Encoded message bytes
"""
# Add plugin name to data
full_data = {"plugin": plugin_name, **data}
return dicttos("PLG", full_data, compress)
def decode_plugin_data(msg: bytes) -> Dict[str, Any]:
"""Decode a PLG message into plugin data.
Args:
msg: Raw message bytes
Returns:
Dictionary with 'ID', 'plugin', and plugin data fields
"""
return stodict(msg)